fix(feishu): streaming card and tool hint respect reply_to_message in groups

This commit is contained in:
04cb 2026-04-30 08:09:26 +08:00 committed by Xubin Ren
parent 71eff09653
commit 651b6b933f
2 changed files with 110 additions and 14 deletions

View File

@ -1135,6 +1135,25 @@ class FeishuChannel(BaseChannel):
logger.debug("Feishu: error fetching parent message {}: {}", message_id, e) logger.debug("Feishu: error fetching parent message {}: {}", message_id, e)
return None return None
def _thread_reply_target(self, meta: dict) -> str | None:
"""Pick the inbound message_id to reply to with reply_in_thread=True.
Returns the message_id when the response must travel through the
Reply API (continuation of an existing topic, or user opted in via
``reply_to_message=True``); returns None to signal a plain send.
Used by streaming-card / fallback / tool-hint paths so they honor
the same gating as the regular send() path and don't unilaterally
spawn new topics in groups when ``reply_to_message`` is off.
"""
if meta.get("chat_type", "group") != "group":
return None
msg_id = meta.get("message_id")
if not msg_id:
return None
if meta.get("thread_id") or self.config.reply_to_message:
return msg_id
return None
def _reply_message_sync(self, parent_message_id: str, msg_type: str, content: str, *, reply_in_thread: bool = False) -> bool: def _reply_message_sync(self, parent_message_id: str, msg_type: str, content: str, *, reply_in_thread: bool = False) -> bool:
"""Reply to an existing Feishu message using the Reply API (synchronous). """Reply to an existing Feishu message using the Reply API (synchronous).
@ -1409,11 +1428,7 @@ class FeishuChannel(BaseChannel):
{"config": {"wide_screen_mode": True}, "elements": chunk}, {"config": {"wide_screen_mode": True}, "elements": chunk},
ensure_ascii=False, ensure_ascii=False,
) )
# Fallback: reply via the Reply API for group chats. fallback_msg_id = self._thread_reply_target(meta)
# Target message_id — the Feishu API keeps the reply in
# the same topic automatically.
_f_msg = meta.get("message_id")
fallback_msg_id = _f_msg if meta.get("chat_type", "group") == "group" else None
if fallback_msg_id: if fallback_msg_id:
await loop.run_in_executor( await loop.run_in_executor(
None, lambda: self._reply_message_sync( None, lambda: self._reply_message_sync(
@ -1438,12 +1453,7 @@ class FeishuChannel(BaseChannel):
now = time.monotonic() now = time.monotonic()
if buf.card_id is None: if buf.card_id is None:
# Send the streaming card as a reply for group chats so it reply_msg_id = self._thread_reply_target(meta)
# lands inside the originating topic/thread. Always target
# message_id (the actual inbound message) — the Feishu Reply
# API keeps the response in the same topic automatically.
is_group = meta.get("chat_type", "group") == "group"
reply_msg_id = meta.get("message_id") if is_group else None
card_id = await loop.run_in_executor( card_id = await loop.run_in_executor(
None, None,
self._create_streaming_card_sync, self._create_streaming_card_sync,
@ -1498,9 +1508,8 @@ class FeishuChannel(BaseChannel):
]}, ]},
ensure_ascii=False, ensure_ascii=False,
) )
_th_msg_id = msg.metadata.get("message_id") _th_msg_id = self._thread_reply_target(msg.metadata)
_th_chat_type = msg.metadata.get("chat_type", "group") if _th_msg_id:
if _th_msg_id and _th_chat_type == "group":
await loop.run_in_executor( await loop.run_in_executor(
None, lambda: self._reply_message_sync( None, lambda: self._reply_message_sync(
_th_msg_id, "interactive", card, _th_msg_id, "interactive", card,

View File

@ -728,3 +728,90 @@ def test_on_background_task_done_removes_from_set() -> None:
loop.close() loop.close()
assert task not in channel._background_tasks assert task not in channel._background_tasks
# ---------------------------------------------------------------------------
# Issue #3533: streaming card / tool hint must respect reply_to_message
# ---------------------------------------------------------------------------
@pytest.mark.parametrize(
("reply_to_message", "meta", "expected"),
[
(True, {"chat_type": "p2p", "message_id": "om_1"}, None),
(False, {"chat_type": "group", "message_id": "om_1"}, None),
(True, {"chat_type": "group", "message_id": "om_1"}, "om_1"),
(False, {"chat_type": "group", "message_id": "om_1", "thread_id": "ot_1"}, "om_1"),
(True, {"chat_type": "group"}, None),
],
)
def test_thread_reply_target_gating(reply_to_message, meta, expected) -> None:
channel = _make_feishu_channel(reply_to_message=reply_to_message)
assert channel._thread_reply_target(meta) == expected
@pytest.mark.asyncio
async def test_tool_hint_skips_reply_for_top_level_group_when_disabled() -> None:
"""Bug case: tool-hint card on a top-level group msg must NOT spawn a topic."""
channel = _make_feishu_channel(reply_to_message=False)
create_resp = MagicMock()
create_resp.success.return_value = True
create_resp.data = SimpleNamespace(message_id="om_hint")
channel._client.im.v1.message.create.return_value = create_resp
await channel.send(OutboundMessage(
channel="feishu", chat_id="oc_abc", content='web_search("q")',
metadata={"_tool_hint": True, "message_id": "om_user", "chat_type": "group"},
))
channel._client.im.v1.message.create.assert_called_once()
channel._client.im.v1.message.reply.assert_not_called()
@pytest.mark.asyncio
async def test_streaming_card_skips_reply_for_top_level_group_when_disabled() -> None:
"""Bug case: first streaming delta on a top-level group msg must NOT spawn a topic."""
channel = _make_feishu_channel(reply_to_message=False)
card_resp = MagicMock()
card_resp.success.return_value = True
card_resp.data = SimpleNamespace(card_id="card_1")
channel._client.cardkit.v1.card.create.return_value = card_resp
send_resp = MagicMock()
send_resp.success.return_value = True
send_resp.data = SimpleNamespace(message_id="om_card")
channel._client.im.v1.message.create.return_value = send_resp
update_resp = MagicMock()
update_resp.success.return_value = True
channel._client.cardkit.v1.card_element.content.return_value = update_resp
await channel.send_delta(
"oc_abc", "hello",
metadata={"message_id": "om_user", "chat_type": "group"},
)
channel._client.im.v1.message.create.assert_called_once()
channel._client.im.v1.message.reply.assert_not_called()
@pytest.mark.asyncio
async def test_streaming_card_keeps_reply_in_topic_even_when_disabled() -> None:
"""Regression guard: in-topic continuation must keep using Reply API
so the response stays inside the existing topic independent of config."""
channel = _make_feishu_channel(reply_to_message=False)
card_resp = MagicMock()
card_resp.success.return_value = True
card_resp.data = SimpleNamespace(card_id="card_1")
channel._client.cardkit.v1.card.create.return_value = card_resp
reply_resp = MagicMock()
reply_resp.success.return_value = True
channel._client.im.v1.message.reply.return_value = reply_resp
update_resp = MagicMock()
update_resp.success.return_value = True
channel._client.cardkit.v1.card_element.content.return_value = update_resp
await channel.send_delta(
"oc_abc", "hello",
metadata={"message_id": "om_user", "chat_type": "group", "thread_id": "ot_1"},
)
channel._client.im.v1.message.reply.assert_called_once()