mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-20 00:22:31 +00:00
fix(feishu): use message_id as reply target and fix keyword-only arg
Align reply targeting with deer-flow: always reply to the inbound message_id (not root_id). The Feishu Reply API keeps responses in the same topic automatically when the target message is inside a topic. Also fix run_in_executor calls that passed reply_in_thread as a positional arg to a keyword-only parameter, and route standalone tool hints through the reply API for group chats.
This commit is contained in:
parent
3ece7256d1
commit
91d5f14fbd
@ -308,6 +308,8 @@ class FeishuChannel(BaseChannel):
|
|||||||
self._loop: asyncio.AbstractEventLoop | None = None
|
self._loop: asyncio.AbstractEventLoop | None = None
|
||||||
self._stream_bufs: dict[str, _FeishuStreamBuf] = {}
|
self._stream_bufs: dict[str, _FeishuStreamBuf] = {}
|
||||||
self._bot_open_id: str | None = None
|
self._bot_open_id: str | None = None
|
||||||
|
self._background_tasks: set[asyncio.Task] = set()
|
||||||
|
self._reaction_ids: dict[str, str] = {} # message_id → reaction_id
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any:
|
def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any:
|
||||||
@ -549,8 +551,11 @@ class FeishuChannel(BaseChannel):
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> str | None:
|
async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> str | None:
|
||||||
"""
|
"""Add a reaction emoji to a message.
|
||||||
Add a reaction emoji to a message (non-blocking).
|
|
||||||
|
Returns the reaction_id on success, None on failure.
|
||||||
|
When called via a tracked background task, the returned reaction_id
|
||||||
|
is stored in ``_reaction_ids`` for later cleanup by ``send_delta``.
|
||||||
|
|
||||||
Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART
|
Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART
|
||||||
"""
|
"""
|
||||||
@ -594,6 +599,30 @@ class FeishuChannel(BaseChannel):
|
|||||||
loop = asyncio.get_running_loop()
|
loop = asyncio.get_running_loop()
|
||||||
await loop.run_in_executor(None, self._remove_reaction_sync, message_id, reaction_id)
|
await loop.run_in_executor(None, self._remove_reaction_sync, message_id, reaction_id)
|
||||||
|
|
||||||
|
def _on_background_task_done(self, task: asyncio.Task) -> None:
|
||||||
|
"""Callback: remove from tracking set and log unhandled exceptions."""
|
||||||
|
self._background_tasks.discard(task)
|
||||||
|
if task.cancelled():
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
task.result()
|
||||||
|
except Exception as exc:
|
||||||
|
logger.warning("Background task failed: {}", exc)
|
||||||
|
|
||||||
|
def _on_reaction_added(self, message_id: str, task: asyncio.Task) -> None:
|
||||||
|
"""Callback: store reaction_id after background add-reaction completes."""
|
||||||
|
if task.cancelled():
|
||||||
|
return
|
||||||
|
try:
|
||||||
|
reaction_id = task.result()
|
||||||
|
if reaction_id:
|
||||||
|
self._reaction_ids[message_id] = reaction_id
|
||||||
|
except Exception:
|
||||||
|
pass # already logged by _on_background_task_done
|
||||||
|
# Trim cache to prevent unbounded growth
|
||||||
|
if len(self._reaction_ids) > 500:
|
||||||
|
self._reaction_ids.pop(next(iter(self._reaction_ids)))
|
||||||
|
|
||||||
# Regex to match markdown tables (header + separator + data rows)
|
# Regex to match markdown tables (header + separator + data rows)
|
||||||
_TABLE_RE = re.compile(
|
_TABLE_RE = re.compile(
|
||||||
r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)",
|
r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)",
|
||||||
@ -1172,8 +1201,19 @@ class FeishuChannel(BaseChannel):
|
|||||||
logger.error("Error sending Feishu {} message: {}", msg_type, e)
|
logger.error("Error sending Feishu {} message: {}", msg_type, e)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None:
|
def _create_streaming_card_sync(
|
||||||
"""Create a CardKit streaming card, send it to chat, return card_id."""
|
self,
|
||||||
|
receive_id_type: str,
|
||||||
|
chat_id: str,
|
||||||
|
reply_message_id: str | None = None,
|
||||||
|
) -> str | None:
|
||||||
|
"""Create a CardKit streaming card, send it to chat, return card_id.
|
||||||
|
|
||||||
|
When *reply_message_id* is provided the card is delivered via the
|
||||||
|
reply API (with reply_in_thread=True) so it lands inside the
|
||||||
|
originating thread / topic. Otherwise the plain create-message
|
||||||
|
API is used.
|
||||||
|
"""
|
||||||
from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody
|
from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody
|
||||||
|
|
||||||
card_json = {
|
card_json = {
|
||||||
@ -1202,13 +1242,19 @@ class FeishuChannel(BaseChannel):
|
|||||||
return None
|
return None
|
||||||
card_id = getattr(response.data, "card_id", None)
|
card_id = getattr(response.data, "card_id", None)
|
||||||
if card_id:
|
if card_id:
|
||||||
message_id = self._send_message_sync(
|
card_content = json.dumps(
|
||||||
receive_id_type,
|
{"type": "card", "data": {"card_id": card_id}}, ensure_ascii=False
|
||||||
chat_id,
|
|
||||||
"interactive",
|
|
||||||
json.dumps({"type": "card", "data": {"card_id": card_id}}),
|
|
||||||
)
|
)
|
||||||
if message_id:
|
if reply_message_id:
|
||||||
|
sent = self._reply_message_sync(
|
||||||
|
reply_message_id, "interactive", card_content,
|
||||||
|
reply_in_thread=True,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
sent = self._send_message_sync(
|
||||||
|
receive_id_type, chat_id, "interactive", card_content,
|
||||||
|
) is not None
|
||||||
|
if sent:
|
||||||
return card_id
|
return card_id
|
||||||
logger.warning(
|
logger.warning(
|
||||||
"Created streaming card {} but failed to send it to {}", card_id, chat_id
|
"Created streaming card {} but failed to send it to {}", card_id, chat_id
|
||||||
@ -1298,7 +1344,7 @@ class FeishuChannel(BaseChannel):
|
|||||||
_stream_end: Finalize the streaming card.
|
_stream_end: Finalize the streaming card.
|
||||||
_tool_hint: Delta is a formatted tool hint (for display only).
|
_tool_hint: Delta is a formatted tool hint (for display only).
|
||||||
message_id: Original message id (used with _stream_end for reaction cleanup).
|
message_id: Original message id (used with _stream_end for reaction cleanup).
|
||||||
reaction_id: Reaction id to remove on stream end.
|
chat_type: "group" or "p2p" — controls reply-in-thread for streaming cards.
|
||||||
"""
|
"""
|
||||||
if not self._client:
|
if not self._client:
|
||||||
return
|
return
|
||||||
@ -1308,10 +1354,13 @@ class FeishuChannel(BaseChannel):
|
|||||||
|
|
||||||
# --- stream end: final update or fallback ---
|
# --- stream end: final update or fallback ---
|
||||||
if meta.get("_stream_end"):
|
if meta.get("_stream_end"):
|
||||||
if (message_id := meta.get("message_id")) and (reaction_id := meta.get("reaction_id")):
|
message_id = meta.get("message_id")
|
||||||
await self._remove_reaction(message_id, reaction_id)
|
if message_id:
|
||||||
|
reaction_id = self._reaction_ids.pop(message_id, None)
|
||||||
|
if reaction_id:
|
||||||
|
await self._remove_reaction(message_id, reaction_id)
|
||||||
# Add completion emoji if configured
|
# Add completion emoji if configured
|
||||||
if self.config.done_emoji and message_id:
|
if self.config.done_emoji:
|
||||||
await self._add_reaction(message_id, self.config.done_emoji)
|
await self._add_reaction(message_id, self.config.done_emoji)
|
||||||
|
|
||||||
buf = self._stream_bufs.pop(chat_id, None)
|
buf = self._stream_bufs.pop(chat_id, None)
|
||||||
@ -1349,9 +1398,22 @@ class FeishuChannel(BaseChannel):
|
|||||||
{"config": {"wide_screen_mode": True}, "elements": chunk},
|
{"config": {"wide_screen_mode": True}, "elements": chunk},
|
||||||
ensure_ascii=False,
|
ensure_ascii=False,
|
||||||
)
|
)
|
||||||
await loop.run_in_executor(
|
# Fallback: reply via the Reply API for group chats.
|
||||||
None, self._send_message_sync, rid_type, chat_id, "interactive", card
|
# Target message_id — the Feishu API keeps the reply in
|
||||||
)
|
# the same topic automatically.
|
||||||
|
_f_msg = meta.get("message_id")
|
||||||
|
fallback_msg_id = _f_msg if meta.get("chat_type", "group") == "group" else None
|
||||||
|
if fallback_msg_id:
|
||||||
|
await loop.run_in_executor(
|
||||||
|
None, lambda: self._reply_message_sync(
|
||||||
|
fallback_msg_id, "interactive", card,
|
||||||
|
reply_in_thread=True,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await loop.run_in_executor(
|
||||||
|
None, self._send_message_sync, rid_type, chat_id, "interactive", card
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# --- accumulate delta ---
|
# --- accumulate delta ---
|
||||||
@ -1365,8 +1427,16 @@ class FeishuChannel(BaseChannel):
|
|||||||
|
|
||||||
now = time.monotonic()
|
now = time.monotonic()
|
||||||
if buf.card_id is None:
|
if buf.card_id is None:
|
||||||
|
# Send the streaming card as a reply for group chats so it
|
||||||
|
# lands inside the originating topic/thread. Always target
|
||||||
|
# message_id (the actual inbound message) — the Feishu Reply
|
||||||
|
# API keeps the response in the same topic automatically.
|
||||||
|
is_group = meta.get("chat_type", "group") == "group"
|
||||||
|
reply_msg_id = meta.get("message_id") if is_group else None
|
||||||
card_id = await loop.run_in_executor(
|
card_id = await loop.run_in_executor(
|
||||||
None, self._create_streaming_card_sync, rid_type, chat_id
|
None,
|
||||||
|
self._create_streaming_card_sync,
|
||||||
|
rid_type, chat_id, reply_msg_id,
|
||||||
)
|
)
|
||||||
if card_id:
|
if card_id:
|
||||||
buf.card_id = card_id
|
buf.card_id = card_id
|
||||||
@ -1410,43 +1480,58 @@ class FeishuChannel(BaseChannel):
|
|||||||
return
|
return
|
||||||
# No active streaming card — send as a regular
|
# No active streaming card — send as a regular
|
||||||
# interactive card with the same 🔧 prefix style.
|
# interactive card with the same 🔧 prefix style.
|
||||||
|
# Use reply API for group chats so the hint stays in topic.
|
||||||
card = json.dumps(
|
card = json.dumps(
|
||||||
{"config": {"wide_screen_mode": True}, "elements": [
|
{"config": {"wide_screen_mode": True}, "elements": [
|
||||||
{"tag": "markdown", "content": self._format_tool_hint_delta(hint)},
|
{"tag": "markdown", "content": self._format_tool_hint_delta(hint)},
|
||||||
]},
|
]},
|
||||||
ensure_ascii=False,
|
ensure_ascii=False,
|
||||||
)
|
)
|
||||||
await loop.run_in_executor(
|
_th_msg_id = msg.metadata.get("message_id")
|
||||||
None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card
|
_th_chat_type = msg.metadata.get("chat_type", "group")
|
||||||
)
|
if _th_msg_id and _th_chat_type == "group":
|
||||||
|
await loop.run_in_executor(
|
||||||
|
None, lambda: self._reply_message_sync(
|
||||||
|
_th_msg_id, "interactive", card,
|
||||||
|
reply_in_thread=True,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await loop.run_in_executor(
|
||||||
|
None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card
|
||||||
|
)
|
||||||
return
|
return
|
||||||
|
|
||||||
# Determine whether the first message should quote the user's message.
|
# Determine whether the first message should quote the user's message.
|
||||||
# Only the very first send (media or text) in this call uses reply; subsequent
|
# Only the very first send (media or text) in this call uses reply; subsequent
|
||||||
# chunks/media fall back to plain create to avoid redundant quote bubbles.
|
# chunks/media fall back to plain create to avoid redundant quote bubbles.
|
||||||
|
# Always target message_id — the Feishu Reply API keeps replies in the
|
||||||
|
# same topic automatically when the target message is inside a topic.
|
||||||
reply_message_id: str | None = None
|
reply_message_id: str | None = None
|
||||||
|
_msg_id = msg.metadata.get("message_id")
|
||||||
if self.config.reply_to_message and not msg.metadata.get("_progress", False):
|
if self.config.reply_to_message and not msg.metadata.get("_progress", False):
|
||||||
reply_message_id = msg.metadata.get("message_id") or None
|
reply_message_id = _msg_id
|
||||||
# For topic group messages, always reply to keep context in thread
|
# For topic group messages, always reply to keep context in thread
|
||||||
elif msg.metadata.get("thread_id"):
|
elif msg.metadata.get("thread_id"):
|
||||||
reply_message_id = (
|
reply_message_id = _msg_id
|
||||||
msg.metadata.get("root_id") or msg.metadata.get("message_id") or None
|
|
||||||
)
|
|
||||||
|
|
||||||
first_send = True # tracks whether the reply has already been used
|
first_send = True # tracks whether the reply has already been used
|
||||||
|
|
||||||
def _do_send(m_type: str, content: str) -> None:
|
def _do_send(m_type: str, content: str) -> None:
|
||||||
"""Send via reply (first message) or create (subsequent).
|
"""Send via reply (first message) or create (subsequent).
|
||||||
|
|
||||||
When reply_to_message is enabled, the first message uses
|
For group chats the reply API always uses reply_in_thread=True.
|
||||||
reply_in_thread=True to create a visual topic thread.
|
The Feishu API automatically keeps replies inside existing
|
||||||
|
topics — reply_in_thread only creates a *new* topic when the
|
||||||
|
target message is a plain (non-topic) message.
|
||||||
"""
|
"""
|
||||||
nonlocal first_send
|
nonlocal first_send
|
||||||
if reply_message_id and first_send:
|
if reply_message_id and first_send:
|
||||||
first_send = False
|
first_send = False
|
||||||
|
chat_type = msg.metadata.get("chat_type", "group")
|
||||||
ok = self._reply_message_sync(
|
ok = self._reply_message_sync(
|
||||||
reply_message_id, m_type, content,
|
reply_message_id, m_type, content,
|
||||||
reply_in_thread=self.config.reply_to_message,
|
reply_in_thread=chat_type == "group",
|
||||||
)
|
)
|
||||||
if ok:
|
if ok:
|
||||||
return
|
return
|
||||||
@ -1556,9 +1641,13 @@ class FeishuChannel(BaseChannel):
|
|||||||
logger.debug("Feishu: skipping group message (not mentioned)")
|
logger.debug("Feishu: skipping group message (not mentioned)")
|
||||||
return
|
return
|
||||||
|
|
||||||
# Add reaction (non-blocking — fire and forget)
|
# Add reaction (non-blocking — tracked background task)
|
||||||
reaction_id = None
|
task = asyncio.create_task(
|
||||||
asyncio.create_task(self._add_reaction(message_id, self.config.react_emoji))
|
self._add_reaction(message_id, self.config.react_emoji)
|
||||||
|
)
|
||||||
|
self._background_tasks.add(task)
|
||||||
|
task.add_done_callback(self._on_background_task_done)
|
||||||
|
task.add_done_callback(lambda t: self._on_reaction_added(message_id, t))
|
||||||
|
|
||||||
# Parse content
|
# Parse content
|
||||||
content_parts = []
|
content_parts = []
|
||||||
@ -1656,7 +1745,6 @@ class FeishuChannel(BaseChannel):
|
|||||||
media=media_paths,
|
media=media_paths,
|
||||||
metadata={
|
metadata={
|
||||||
"message_id": message_id,
|
"message_id": message_id,
|
||||||
"reaction_id": reaction_id,
|
|
||||||
"chat_type": chat_type,
|
"chat_type": chat_type,
|
||||||
"msg_type": msg_type,
|
"msg_type": msg_type,
|
||||||
"parent_id": parent_id,
|
"parent_id": parent_id,
|
||||||
|
|||||||
@ -606,3 +606,127 @@ async def test_reply_keeps_fallback_when_reply_fails() -> None:
|
|||||||
|
|
||||||
channel._client.im.v1.message.reply.assert_called()
|
channel._client.im.v1.message.reply.assert_called()
|
||||||
channel._client.im.v1.message.create.assert_called()
|
channel._client.im.v1.message.create.assert_called()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reply_no_reply_in_thread_for_p2p_chat() -> None:
|
||||||
|
"""reply_in_thread should NOT be set for p2p chats (identified by chat_type)."""
|
||||||
|
channel = _make_feishu_channel(reply_to_message=True)
|
||||||
|
|
||||||
|
reply_resp = MagicMock()
|
||||||
|
reply_resp.success.return_value = True
|
||||||
|
channel._client.im.v1.message.reply.return_value = reply_resp
|
||||||
|
|
||||||
|
await channel.send(OutboundMessage(
|
||||||
|
channel="feishu",
|
||||||
|
chat_id="oc_abc", # p2p chats also use oc_ prefix
|
||||||
|
content="hello",
|
||||||
|
metadata={"message_id": "om_001", "chat_type": "p2p"},
|
||||||
|
))
|
||||||
|
|
||||||
|
channel._client.im.v1.message.reply.assert_called_once()
|
||||||
|
call_args = channel._client.im.v1.message.reply.call_args
|
||||||
|
request = call_args[0][0]
|
||||||
|
assert request.request_body.reply_in_thread is not True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reply_uses_reply_in_thread_for_group_chat() -> None:
|
||||||
|
"""reply_in_thread should be True for group chats (identified by chat_type)."""
|
||||||
|
channel = _make_feishu_channel(reply_to_message=True)
|
||||||
|
|
||||||
|
reply_resp = MagicMock()
|
||||||
|
reply_resp.success.return_value = True
|
||||||
|
channel._client.im.v1.message.reply.return_value = reply_resp
|
||||||
|
|
||||||
|
await channel.send(OutboundMessage(
|
||||||
|
channel="feishu",
|
||||||
|
chat_id="oc_abc",
|
||||||
|
content="hello",
|
||||||
|
metadata={"message_id": "om_001", "chat_type": "group"},
|
||||||
|
))
|
||||||
|
|
||||||
|
channel._client.im.v1.message.reply.assert_called_once()
|
||||||
|
call_args = channel._client.im.v1.message.reply.call_args
|
||||||
|
request = call_args[0][0]
|
||||||
|
assert request.request_body.reply_in_thread is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_reply_targets_message_id_when_in_topic() -> None:
|
||||||
|
"""When inbound message is inside a topic (root_id != message_id),
|
||||||
|
the reply should target the inbound message_id (not root_id).
|
||||||
|
The Feishu Reply API keeps the response in the same topic
|
||||||
|
automatically when the target message is already inside a topic."""
|
||||||
|
channel = _make_feishu_channel(reply_to_message=True)
|
||||||
|
|
||||||
|
reply_resp = MagicMock()
|
||||||
|
reply_resp.success.return_value = True
|
||||||
|
channel._client.im.v1.message.reply.return_value = reply_resp
|
||||||
|
|
||||||
|
await channel.send(OutboundMessage(
|
||||||
|
channel="feishu",
|
||||||
|
chat_id="oc_abc",
|
||||||
|
content="hello",
|
||||||
|
metadata={
|
||||||
|
"message_id": "om_child456",
|
||||||
|
"chat_type": "group",
|
||||||
|
"root_id": "om_root123",
|
||||||
|
},
|
||||||
|
))
|
||||||
|
|
||||||
|
channel._client.im.v1.message.reply.assert_called_once()
|
||||||
|
call_args = channel._client.im.v1.message.reply.call_args
|
||||||
|
request = call_args[0][0]
|
||||||
|
# Should reply to the inbound message_id, not the root
|
||||||
|
assert request.message_id == "om_child456"
|
||||||
|
assert request.request_body.reply_in_thread is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_on_reaction_added_stores_reaction_id() -> None:
|
||||||
|
"""_on_reaction_added stores the returned reaction_id in _reaction_ids."""
|
||||||
|
channel = _make_feishu_channel()
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
try:
|
||||||
|
task = loop.create_task(asyncio.sleep(0, result="reaction_abc"))
|
||||||
|
loop.run_until_complete(task)
|
||||||
|
channel._on_reaction_added("om_001", task)
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
assert channel._reaction_ids["om_001"] == "reaction_abc"
|
||||||
|
|
||||||
|
|
||||||
|
def test_on_reaction_added_skips_none_result() -> None:
|
||||||
|
"""_on_reaction_added does not store None results."""
|
||||||
|
channel = _make_feishu_channel()
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
try:
|
||||||
|
task = loop.create_task(asyncio.sleep(0, result=None))
|
||||||
|
loop.run_until_complete(task)
|
||||||
|
channel._on_reaction_added("om_001", task)
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
assert "om_001" not in channel._reaction_ids
|
||||||
|
|
||||||
|
|
||||||
|
def test_on_background_task_done_removes_from_set() -> None:
|
||||||
|
"""_on_background_task_done removes task from tracking set."""
|
||||||
|
channel = _make_feishu_channel()
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
try:
|
||||||
|
async def _fail():
|
||||||
|
raise RuntimeError("test failure")
|
||||||
|
|
||||||
|
task = loop.create_task(_fail())
|
||||||
|
channel._background_tasks.add(task)
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(task)
|
||||||
|
except RuntimeError:
|
||||||
|
pass # expected
|
||||||
|
channel._on_background_task_done(task)
|
||||||
|
finally:
|
||||||
|
loop.close()
|
||||||
|
|
||||||
|
assert task not in channel._background_tasks
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user