diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 56e6df1c3..6f3503236 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -308,6 +308,8 @@ class FeishuChannel(BaseChannel): self._loop: asyncio.AbstractEventLoop | None = None self._stream_bufs: dict[str, _FeishuStreamBuf] = {} self._bot_open_id: str | None = None + self._background_tasks: set[asyncio.Task] = set() + self._reaction_ids: dict[str, str] = {} # message_id → reaction_id @staticmethod def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any: @@ -549,8 +551,11 @@ class FeishuChannel(BaseChannel): return None async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> str | None: - """ - Add a reaction emoji to a message (non-blocking). + """Add a reaction emoji to a message. + + Returns the reaction_id on success, None on failure. + When called via a tracked background task, the returned reaction_id + is stored in ``_reaction_ids`` for later cleanup by ``send_delta``. Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART """ @@ -594,6 +599,30 @@ class FeishuChannel(BaseChannel): loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._remove_reaction_sync, message_id, reaction_id) + def _on_background_task_done(self, task: asyncio.Task) -> None: + """Callback: remove from tracking set and log unhandled exceptions.""" + self._background_tasks.discard(task) + if task.cancelled(): + return + try: + task.result() + except Exception as exc: + logger.warning("Background task failed: {}", exc) + + def _on_reaction_added(self, message_id: str, task: asyncio.Task) -> None: + """Callback: store reaction_id after background add-reaction completes.""" + if task.cancelled(): + return + try: + reaction_id = task.result() + if reaction_id: + self._reaction_ids[message_id] = reaction_id + except Exception: + pass # already logged by _on_background_task_done + # Trim cache to prevent unbounded growth + if len(self._reaction_ids) > 500: + self._reaction_ids.pop(next(iter(self._reaction_ids))) + # Regex to match markdown tables (header + separator + data rows) _TABLE_RE = re.compile( r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)", @@ -1172,8 +1201,19 @@ class FeishuChannel(BaseChannel): logger.error("Error sending Feishu {} message: {}", msg_type, e) return None - def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None: - """Create a CardKit streaming card, send it to chat, return card_id.""" + def _create_streaming_card_sync( + self, + receive_id_type: str, + chat_id: str, + reply_message_id: str | None = None, + ) -> str | None: + """Create a CardKit streaming card, send it to chat, return card_id. + + When *reply_message_id* is provided the card is delivered via the + reply API (with reply_in_thread=True) so it lands inside the + originating thread / topic. Otherwise the plain create-message + API is used. + """ from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody card_json = { @@ -1202,13 +1242,19 @@ class FeishuChannel(BaseChannel): return None card_id = getattr(response.data, "card_id", None) if card_id: - message_id = self._send_message_sync( - receive_id_type, - chat_id, - "interactive", - json.dumps({"type": "card", "data": {"card_id": card_id}}), + card_content = json.dumps( + {"type": "card", "data": {"card_id": card_id}}, ensure_ascii=False ) - if message_id: + if reply_message_id: + sent = self._reply_message_sync( + reply_message_id, "interactive", card_content, + reply_in_thread=True, + ) + else: + sent = self._send_message_sync( + receive_id_type, chat_id, "interactive", card_content, + ) is not None + if sent: return card_id logger.warning( "Created streaming card {} but failed to send it to {}", card_id, chat_id @@ -1298,7 +1344,7 @@ class FeishuChannel(BaseChannel): _stream_end: Finalize the streaming card. _tool_hint: Delta is a formatted tool hint (for display only). message_id: Original message id (used with _stream_end for reaction cleanup). - reaction_id: Reaction id to remove on stream end. + chat_type: "group" or "p2p" — controls reply-in-thread for streaming cards. """ if not self._client: return @@ -1308,10 +1354,13 @@ class FeishuChannel(BaseChannel): # --- stream end: final update or fallback --- if meta.get("_stream_end"): - if (message_id := meta.get("message_id")) and (reaction_id := meta.get("reaction_id")): - await self._remove_reaction(message_id, reaction_id) + message_id = meta.get("message_id") + if message_id: + reaction_id = self._reaction_ids.pop(message_id, None) + if reaction_id: + await self._remove_reaction(message_id, reaction_id) # Add completion emoji if configured - if self.config.done_emoji and message_id: + if self.config.done_emoji: await self._add_reaction(message_id, self.config.done_emoji) buf = self._stream_bufs.pop(chat_id, None) @@ -1349,9 +1398,22 @@ class FeishuChannel(BaseChannel): {"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False, ) - await loop.run_in_executor( - None, self._send_message_sync, rid_type, chat_id, "interactive", card - ) + # Fallback: reply via the Reply API for group chats. + # Target message_id — the Feishu API keeps the reply in + # the same topic automatically. + _f_msg = meta.get("message_id") + fallback_msg_id = _f_msg if meta.get("chat_type", "group") == "group" else None + if fallback_msg_id: + await loop.run_in_executor( + None, lambda: self._reply_message_sync( + fallback_msg_id, "interactive", card, + reply_in_thread=True, + ), + ) + else: + await loop.run_in_executor( + None, self._send_message_sync, rid_type, chat_id, "interactive", card + ) return # --- accumulate delta --- @@ -1365,8 +1427,16 @@ class FeishuChannel(BaseChannel): now = time.monotonic() if buf.card_id is None: + # Send the streaming card as a reply for group chats so it + # lands inside the originating topic/thread. Always target + # message_id (the actual inbound message) — the Feishu Reply + # API keeps the response in the same topic automatically. + is_group = meta.get("chat_type", "group") == "group" + reply_msg_id = meta.get("message_id") if is_group else None card_id = await loop.run_in_executor( - None, self._create_streaming_card_sync, rid_type, chat_id + None, + self._create_streaming_card_sync, + rid_type, chat_id, reply_msg_id, ) if card_id: buf.card_id = card_id @@ -1410,43 +1480,58 @@ class FeishuChannel(BaseChannel): return # No active streaming card — send as a regular # interactive card with the same 🔧 prefix style. + # Use reply API for group chats so the hint stays in topic. card = json.dumps( {"config": {"wide_screen_mode": True}, "elements": [ {"tag": "markdown", "content": self._format_tool_hint_delta(hint)}, ]}, ensure_ascii=False, ) - await loop.run_in_executor( - None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card - ) + _th_msg_id = msg.metadata.get("message_id") + _th_chat_type = msg.metadata.get("chat_type", "group") + if _th_msg_id and _th_chat_type == "group": + await loop.run_in_executor( + None, lambda: self._reply_message_sync( + _th_msg_id, "interactive", card, + reply_in_thread=True, + ), + ) + else: + await loop.run_in_executor( + None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card + ) return # Determine whether the first message should quote the user's message. # Only the very first send (media or text) in this call uses reply; subsequent # chunks/media fall back to plain create to avoid redundant quote bubbles. + # Always target message_id — the Feishu Reply API keeps replies in the + # same topic automatically when the target message is inside a topic. reply_message_id: str | None = None + _msg_id = msg.metadata.get("message_id") if self.config.reply_to_message and not msg.metadata.get("_progress", False): - reply_message_id = msg.metadata.get("message_id") or None + reply_message_id = _msg_id # For topic group messages, always reply to keep context in thread elif msg.metadata.get("thread_id"): - reply_message_id = ( - msg.metadata.get("root_id") or msg.metadata.get("message_id") or None - ) + reply_message_id = _msg_id first_send = True # tracks whether the reply has already been used def _do_send(m_type: str, content: str) -> None: """Send via reply (first message) or create (subsequent). - When reply_to_message is enabled, the first message uses - reply_in_thread=True to create a visual topic thread. + For group chats the reply API always uses reply_in_thread=True. + The Feishu API automatically keeps replies inside existing + topics — reply_in_thread only creates a *new* topic when the + target message is a plain (non-topic) message. """ nonlocal first_send if reply_message_id and first_send: first_send = False + chat_type = msg.metadata.get("chat_type", "group") ok = self._reply_message_sync( reply_message_id, m_type, content, - reply_in_thread=self.config.reply_to_message, + reply_in_thread=chat_type == "group", ) if ok: return @@ -1556,9 +1641,13 @@ class FeishuChannel(BaseChannel): logger.debug("Feishu: skipping group message (not mentioned)") return - # Add reaction (non-blocking — fire and forget) - reaction_id = None - asyncio.create_task(self._add_reaction(message_id, self.config.react_emoji)) + # Add reaction (non-blocking — tracked background task) + task = asyncio.create_task( + self._add_reaction(message_id, self.config.react_emoji) + ) + self._background_tasks.add(task) + task.add_done_callback(self._on_background_task_done) + task.add_done_callback(lambda t: self._on_reaction_added(message_id, t)) # Parse content content_parts = [] @@ -1656,7 +1745,6 @@ class FeishuChannel(BaseChannel): media=media_paths, metadata={ "message_id": message_id, - "reaction_id": reaction_id, "chat_type": chat_type, "msg_type": msg_type, "parent_id": parent_id, diff --git a/tests/channels/test_feishu_reply.py b/tests/channels/test_feishu_reply.py index f40ffabc9..c09d4ae55 100644 --- a/tests/channels/test_feishu_reply.py +++ b/tests/channels/test_feishu_reply.py @@ -606,3 +606,127 @@ async def test_reply_keeps_fallback_when_reply_fails() -> None: channel._client.im.v1.message.reply.assert_called() channel._client.im.v1.message.create.assert_called() + + +@pytest.mark.asyncio +async def test_reply_no_reply_in_thread_for_p2p_chat() -> None: + """reply_in_thread should NOT be set for p2p chats (identified by chat_type).""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", # p2p chats also use oc_ prefix + content="hello", + metadata={"message_id": "om_001", "chat_type": "p2p"}, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + assert request.request_body.reply_in_thread is not True + + +@pytest.mark.asyncio +async def test_reply_uses_reply_in_thread_for_group_chat() -> None: + """reply_in_thread should be True for group chats (identified by chat_type).""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + metadata={"message_id": "om_001", "chat_type": "group"}, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + assert request.request_body.reply_in_thread is True + + +@pytest.mark.asyncio +async def test_reply_targets_message_id_when_in_topic() -> None: + """When inbound message is inside a topic (root_id != message_id), + the reply should target the inbound message_id (not root_id). + The Feishu Reply API keeps the response in the same topic + automatically when the target message is already inside a topic.""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + metadata={ + "message_id": "om_child456", + "chat_type": "group", + "root_id": "om_root123", + }, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + # Should reply to the inbound message_id, not the root + assert request.message_id == "om_child456" + assert request.request_body.reply_in_thread is True + + +def test_on_reaction_added_stores_reaction_id() -> None: + """_on_reaction_added stores the returned reaction_id in _reaction_ids.""" + channel = _make_feishu_channel() + loop = asyncio.new_event_loop() + try: + task = loop.create_task(asyncio.sleep(0, result="reaction_abc")) + loop.run_until_complete(task) + channel._on_reaction_added("om_001", task) + finally: + loop.close() + + assert channel._reaction_ids["om_001"] == "reaction_abc" + + +def test_on_reaction_added_skips_none_result() -> None: + """_on_reaction_added does not store None results.""" + channel = _make_feishu_channel() + loop = asyncio.new_event_loop() + try: + task = loop.create_task(asyncio.sleep(0, result=None)) + loop.run_until_complete(task) + channel._on_reaction_added("om_001", task) + finally: + loop.close() + + assert "om_001" not in channel._reaction_ids + + +def test_on_background_task_done_removes_from_set() -> None: + """_on_background_task_done removes task from tracking set.""" + channel = _make_feishu_channel() + loop = asyncio.new_event_loop() + try: + async def _fail(): + raise RuntimeError("test failure") + + task = loop.create_task(_fail()) + channel._background_tasks.add(task) + try: + loop.run_until_complete(task) + except RuntimeError: + pass # expected + channel._on_background_task_done(task) + finally: + loop.close() + + assert task not in channel._background_tasks