feat(feishu): streaming resuming + inline tool hints

Two improvements to Feishu streaming card experience:

1. Handle _resuming in send_delta: when a mid-turn _stream_end arrives
   with resuming=True (tool call between segments), flush current text
   to the card but keep the buffer alive so subsequent segments append
   to the same card instead of creating a new one.

2. Inline tool hints into streaming cards: when a tool hint arrives
   while a streaming card is active, append it to the card content
   (e.g. "🔧 web_fetch(...)") instead of sending a separate card.
   The hint is automatically stripped when the next delta arrives.

Made-with: Cursor
This commit is contained in:
xzq.xu 2026-04-01 17:32:55 +08:00 committed by chengyongru
parent 4962867112
commit 8d6f41e484
2 changed files with 146 additions and 4 deletions

View File

@ -267,6 +267,7 @@ class _FeishuStreamBuf:
card_id: str | None = None
sequence: int = 0
last_edit: float = 0.0
tool_hint_len: int = 0
class FeishuChannel(BaseChannel):
@ -1279,6 +1280,19 @@ class FeishuChannel(BaseChannel):
if self.config.done_emoji and message_id:
await self._add_reaction(message_id, self.config.done_emoji)
resuming = meta.get("_resuming", False)
if resuming:
# Mid-turn pause (e.g. tool call between streaming segments).
# Flush current text to card but keep the buffer alive so the
# next segment appends to the same card.
buf = self._stream_bufs.get(chat_id)
if buf and buf.card_id and buf.text:
buf.sequence += 1
await loop.run_in_executor(
None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence,
)
return
buf = self._stream_bufs.pop(chat_id, None)
if not buf or not buf.text:
return
@ -1317,6 +1331,9 @@ class FeishuChannel(BaseChannel):
if buf is None:
buf = _FeishuStreamBuf()
self._stream_bufs[chat_id] = buf
if buf.tool_hint_len > 0:
buf.text = buf.text[:-buf.tool_hint_len]
buf.tool_hint_len = 0
buf.text += delta
if not buf.text.strip():
return
@ -1350,12 +1367,26 @@ class FeishuChannel(BaseChannel):
receive_id_type = "chat_id" if msg.chat_id.startswith("oc_") else "open_id"
loop = asyncio.get_running_loop()
# Handle tool hint messages as code blocks in interactive cards.
# These are progress-only messages and should bypass normal reply routing.
# Handle tool hint messages. When a streaming card is active for
# this chat, inline the hint into the card instead of sending a
# separate message so the user experience stays cohesive.
if msg.metadata.get("_tool_hint"):
if msg.content and msg.content.strip():
hint = (msg.content or "").strip()
if not hint:
return
buf = self._stream_bufs.get(msg.chat_id)
if buf and buf.card_id:
suffix = f"\n\n---\n🔧 {hint}"
buf.text += suffix
buf.tool_hint_len = len(suffix)
buf.sequence += 1
await loop.run_in_executor(
None, self._stream_update_text_sync,
buf.card_id, buf.text, buf.sequence,
)
else:
await self._send_tool_hint_card(
receive_id_type, msg.chat_id, msg.content.strip()
receive_id_type, msg.chat_id, hint
)
return

View File

@ -5,6 +5,7 @@ from unittest.mock import MagicMock
import pytest
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.feishu import FeishuChannel, FeishuConfig, _FeishuStreamBuf
@ -203,6 +204,55 @@ class TestSendDelta:
ch._client.cardkit.v1.card_element.content.assert_not_called()
ch._client.im.v1.message.create.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_resuming_keeps_buffer(self):
"""_resuming=True flushes text to card but keeps the buffer for the next segment."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Partial answer", card_id="card_1", sequence=2, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
buf = ch._stream_bufs["oc_chat1"]
assert buf.card_id == "card_1"
assert buf.sequence == 3
ch._client.cardkit.v1.card_element.content.assert_called_once()
ch._client.cardkit.v1.card.settings.assert_not_called()
@pytest.mark.asyncio
async def test_stream_end_resuming_then_final_end(self):
"""Full multi-segment flow: resuming mid-turn, then final end closes the card."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Seg1", card_id="card_1", sequence=1, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
ch._stream_bufs["oc_chat1"].text += " Seg2"
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
assert "oc_chat1" not in ch._stream_bufs
ch._client.cardkit.v1.card.settings.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_resuming_no_card_is_noop(self):
"""_resuming with no card_id (card creation failed) is a safe no-op."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="text", card_id=None, sequence=0, last_edit=0.0,
)
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
ch._client.cardkit.v1.card_element.content.assert_not_called()
@pytest.mark.asyncio
async def test_stream_end_without_buf_is_noop(self):
ch = _make_channel()
@ -239,6 +289,67 @@ class TestSendDelta:
assert buf.sequence == 7
class TestToolHintInlineStreaming:
"""Tool hint messages should be inlined into active streaming cards."""
@pytest.mark.asyncio
async def test_tool_hint_inlined_when_stream_active(self):
"""With an active streaming buffer, tool hint appends to the card."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Partial answer", card_id="card_1", sequence=2, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
msg = OutboundMessage(
channel="feishu", chat_id="oc_chat1",
content='web_fetch("https://example.com")',
metadata={"_tool_hint": True},
)
await ch.send(msg)
buf = ch._stream_bufs["oc_chat1"]
assert buf.text.endswith('🔧 web_fetch("https://example.com")')
assert buf.tool_hint_len > 0
assert buf.sequence == 3
ch._client.cardkit.v1.card_element.content.assert_called_once()
ch._client.im.v1.message.create.assert_not_called()
@pytest.mark.asyncio
async def test_tool_hint_stripped_on_next_delta(self):
"""When new delta arrives, the previously appended tool hint is removed."""
ch = _make_channel()
suffix = "\n\n---\n🔧 web_fetch(\"url\")"
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Partial answer" + suffix,
card_id="card_1", sequence=3, last_edit=0.0,
tool_hint_len=len(suffix),
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", " continued")
buf = ch._stream_bufs["oc_chat1"]
assert buf.text == "Partial answer continued"
assert buf.tool_hint_len == 0
@pytest.mark.asyncio
async def test_tool_hint_fallback_when_no_stream(self):
"""Without an active buffer, tool hint falls back to a standalone card."""
ch = _make_channel()
ch._client.im.v1.message.create.return_value = _mock_send_response("om_hint")
msg = OutboundMessage(
channel="feishu", chat_id="oc_chat1",
content='read_file("path")',
metadata={"_tool_hint": True},
)
await ch.send(msg)
assert "oc_chat1" not in ch._stream_bufs
ch._client.im.v1.message.create.assert_called_once()
class TestSendMessageReturnsId:
def test_returns_message_id_on_success(self):
ch = _make_channel()