diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 0071ef418..1f0615f21 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1170,6 +1170,10 @@ class FeishuChannel(BaseChannel): logger.error("Error replying to Feishu message {}: {}", parent_message_id, e) return False + def _should_use_reply_in_thread(self, metadata: dict[str, Any]) -> bool: + """Return whether a group reply should create a Feishu thread/topic.""" + return metadata.get("chat_type", "group") == "group" and self.config.reply_to_message + def _send_message_sync( self, receive_id_type: str, receive_id: str, msg_type: str, content: str ) -> str | None: @@ -1211,13 +1215,15 @@ class FeishuChannel(BaseChannel): receive_id_type: str, chat_id: str, reply_message_id: str | None = None, + *, + reply_in_thread: bool = False, ) -> str | None: """Create a CardKit streaming card, send it to chat, return card_id. When *reply_message_id* is provided the card is delivered via the - reply API (with reply_in_thread=True) so it lands inside the - originating thread / topic. Otherwise the plain create-message - API is used. + reply API. *reply_in_thread* controls whether Feishu creates a + thread/topic for that reply. Otherwise the plain create-message API is + used. """ from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody @@ -1253,7 +1259,7 @@ class FeishuChannel(BaseChannel): if reply_message_id: sent = self._reply_message_sync( reply_message_id, "interactive", card_content, - reply_in_thread=True, + reply_in_thread=reply_in_thread, ) else: sent = self._send_message_sync( @@ -1409,11 +1415,10 @@ class FeishuChannel(BaseChannel): {"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False, ) - # Fallback: reply via the Reply API for group chats. - # Target message_id — the Feishu API keeps the reply in - # the same topic automatically. + # Fallback: only create a group thread/topic when reply-to-message + # is enabled. Otherwise use the plain create-message API. _f_msg = meta.get("message_id") - fallback_msg_id = _f_msg if meta.get("chat_type", "group") == "group" else None + fallback_msg_id = _f_msg if self._should_use_reply_in_thread(meta) else None if fallback_msg_id: await loop.run_in_executor( None, lambda: self._reply_message_sync( @@ -1438,16 +1443,18 @@ class FeishuChannel(BaseChannel): now = time.monotonic() if buf.card_id is None: - # Send the streaming card as a reply for group chats so it - # lands inside the originating topic/thread. Always target - # message_id (the actual inbound message) — the Feishu Reply - # API keeps the response in the same topic automatically. - is_group = meta.get("chat_type", "group") == "group" - reply_msg_id = meta.get("message_id") if is_group else None + # Send the streaming card as a group thread/topic reply only when + # reply-to-message is enabled. + use_reply_in_thread = self._should_use_reply_in_thread(meta) + reply_msg_id = meta.get("message_id") if use_reply_in_thread else None card_id = await loop.run_in_executor( None, - self._create_streaming_card_sync, - rid_type, chat_id, reply_msg_id, + lambda: self._create_streaming_card_sync( + rid_type, + chat_id, + reply_msg_id, + reply_in_thread=use_reply_in_thread, + ), ) if card_id: buf.card_id = card_id @@ -1489,9 +1496,9 @@ class FeishuChannel(BaseChannel): "\n\n" + self._format_tool_hint_delta(hint) + "\n\n", ) return - # No active streaming card — send as a regular - # interactive card with the same 🔧 prefix style. - # Use reply API for group chats so the hint stays in topic. + # No active streaming card — send as a regular interactive card + # with the same 🔧 prefix style. Only create a group thread/topic + # when reply-to-message is enabled. card = json.dumps( {"config": {"wide_screen_mode": True}, "elements": [ {"tag": "markdown", "content": self._format_tool_hint_delta(hint)}, @@ -1499,8 +1506,7 @@ class FeishuChannel(BaseChannel): ensure_ascii=False, ) _th_msg_id = msg.metadata.get("message_id") - _th_chat_type = msg.metadata.get("chat_type", "group") - if _th_msg_id and _th_chat_type == "group": + if _th_msg_id and self._should_use_reply_in_thread(msg.metadata): await loop.run_in_executor( None, lambda: self._reply_message_sync( _th_msg_id, "interactive", card, @@ -1531,18 +1537,16 @@ class FeishuChannel(BaseChannel): def _do_send(m_type: str, content: str) -> None: """Send via reply (first message) or create (subsequent). - For group chats the reply API always uses reply_in_thread=True. - The Feishu API automatically keeps replies inside existing - topics — reply_in_thread only creates a *new* topic when the - target message is a plain (non-topic) message. + Group chats only set reply_in_thread=True when + reply_to_message is enabled; otherwise a Reply API call for an + existing topic must not create a new topic. """ nonlocal first_send if reply_message_id and first_send: first_send = False - chat_type = msg.metadata.get("chat_type", "group") ok = self._reply_message_sync( reply_message_id, m_type, content, - reply_in_thread=chat_type == "group", + reply_in_thread=self._should_use_reply_in_thread(msg.metadata), ) if ok: return diff --git a/tests/channels/test_feishu_reply.py b/tests/channels/test_feishu_reply.py index f7dc39e5d..430e5abea 100644 --- a/tests/channels/test_feishu_reply.py +++ b/tests/channels/test_feishu_reply.py @@ -580,6 +580,32 @@ async def test_reply_without_reply_in_thread_when_disabled() -> None: channel._client.im.v1.message.create.assert_called_once() +@pytest.mark.asyncio +async def test_topic_reply_does_not_force_reply_in_thread_when_disabled() -> None: + """Topic replies must not create new Feishu topics when reply_to_message is False.""" + channel = _make_feishu_channel(reply_to_message=False) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + metadata={ + "message_id": "om_child456", + "chat_type": "group", + "thread_id": "om_root123", + }, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + assert request.request_body.reply_in_thread is not True + + @pytest.mark.asyncio async def test_reply_keeps_fallback_when_reply_fails() -> None: """Even with reply_to_message=True, fallback to create on reply failure.""" diff --git a/tests/channels/test_feishu_streaming.py b/tests/channels/test_feishu_streaming.py index 4bef83548..6a137aa2f 100644 --- a/tests/channels/test_feishu_streaming.py +++ b/tests/channels/test_feishu_streaming.py @@ -10,13 +10,14 @@ from nanobot.bus.queue import MessageBus from nanobot.channels.feishu import FeishuChannel, FeishuConfig, _FeishuStreamBuf -def _make_channel(streaming: bool = True) -> FeishuChannel: +def _make_channel(streaming: bool = True, reply_to_message: bool = False) -> FeishuChannel: config = FeishuConfig( enabled=True, app_id="cli_test", app_secret="secret", allow_from=["*"], streaming=streaming, + reply_to_message=reply_to_message, ) ch = FeishuChannel(config, MessageBus()) ch._client = MagicMock() @@ -148,6 +149,42 @@ class TestSendDelta: ch._client.im.v1.message.create.assert_called_once() ch._client.cardkit.v1.card_element.content.assert_called_once() + @pytest.mark.asyncio + async def test_group_delta_uses_create_when_reply_disabled(self): + ch = _make_channel(reply_to_message=False) + ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_new") + ch._client.im.v1.message.create.return_value = _mock_send_response("om_new") + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + + await ch.send_delta( + "oc_chat1", + "Hello ", + metadata={"message_id": "om_001", "chat_type": "group"}, + ) + + ch._client.im.v1.message.create.assert_called_once() + ch._client.im.v1.message.reply.assert_not_called() + + @pytest.mark.asyncio + async def test_group_delta_replies_in_thread_when_reply_enabled(self): + ch = _make_channel(reply_to_message=True) + ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_new") + reply_resp = MagicMock() + reply_resp.success.return_value = True + ch._client.im.v1.message.reply.return_value = reply_resp + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + + await ch.send_delta( + "oc_chat1", + "Hello ", + metadata={"message_id": "om_001", "chat_type": "group"}, + ) + + ch._client.im.v1.message.reply.assert_called_once() + ch._client.im.v1.message.create.assert_not_called() + request = ch._client.im.v1.message.reply.call_args[0][0] + assert request.request_body.reply_in_thread is True + @pytest.mark.asyncio async def test_second_delta_within_interval_skips_update(self): ch = _make_channel() @@ -204,6 +241,44 @@ class TestSendDelta: ch._client.cardkit.v1.card_element.content.assert_not_called() ch._client.im.v1.message.create.assert_called_once() + @pytest.mark.asyncio + async def test_stream_end_fallback_group_uses_create_when_reply_disabled(self): + ch = _make_channel(reply_to_message=False) + ch._stream_bufs["om_001"] = _FeishuStreamBuf( + text="Fallback content", card_id=None, sequence=0, last_edit=0.0, + ) + ch._client.im.v1.message.create.return_value = _mock_send_response("om_fb") + + await ch.send_delta( + "oc_chat1", + "", + metadata={"_stream_end": True, "message_id": "om_001", "chat_type": "group"}, + ) + + ch._client.im.v1.message.create.assert_called_once() + ch._client.im.v1.message.reply.assert_not_called() + + @pytest.mark.asyncio + async def test_stream_end_fallback_group_replies_when_reply_enabled(self): + ch = _make_channel(reply_to_message=True) + ch._stream_bufs["om_001"] = _FeishuStreamBuf( + text="Fallback content", card_id=None, sequence=0, last_edit=0.0, + ) + reply_resp = MagicMock() + reply_resp.success.return_value = True + ch._client.im.v1.message.reply.return_value = reply_resp + + await ch.send_delta( + "oc_chat1", + "", + metadata={"_stream_end": True, "message_id": "om_001", "chat_type": "group"}, + ) + + ch._client.im.v1.message.reply.assert_called_once() + ch._client.im.v1.message.create.assert_not_called() + request = ch._client.im.v1.message.reply.call_args[0][0] + assert request.request_body.reply_in_thread is True + @pytest.mark.asyncio async def test_stream_end_fallback_when_final_update_fails(self): """If streaming mode was closed (e.g. Feishu timeout), fall back to a regular card.""" @@ -316,6 +391,40 @@ class TestToolHintInlineStreaming: assert "oc_chat1" not in ch._stream_bufs ch._client.im.v1.message.create.assert_called_once() + @pytest.mark.asyncio + async def test_tool_hint_group_uses_create_when_reply_disabled(self): + ch = _make_channel(reply_to_message=False) + ch._client.im.v1.message.create.return_value = _mock_send_response("om_hint") + + msg = OutboundMessage( + channel="feishu", chat_id="oc_chat1", + content='read_file("path")', + metadata={"_tool_hint": True, "message_id": "om_001", "chat_type": "group"}, + ) + await ch.send(msg) + + ch._client.im.v1.message.create.assert_called_once() + ch._client.im.v1.message.reply.assert_not_called() + + @pytest.mark.asyncio + async def test_tool_hint_group_replies_when_reply_enabled(self): + ch = _make_channel(reply_to_message=True) + reply_resp = MagicMock() + reply_resp.success.return_value = True + ch._client.im.v1.message.reply.return_value = reply_resp + + msg = OutboundMessage( + channel="feishu", chat_id="oc_chat1", + content='read_file("path")', + metadata={"_tool_hint": True, "message_id": "om_001", "chat_type": "group"}, + ) + await ch.send(msg) + + ch._client.im.v1.message.reply.assert_called_once() + ch._client.im.v1.message.create.assert_not_called() + request = ch._client.im.v1.message.reply.call_args[0][0] + assert request.request_body.reply_in_thread is True + @pytest.mark.asyncio async def test_consecutive_tool_hints_append(self): """When multiple tool hints arrive consecutively, each appends to the card."""