From 8d6f41e484cfd983d1ee20b2739513de32fa6ed2 Mon Sep 17 00:00:00 2001 From: "xzq.xu" Date: Wed, 1 Apr 2026 17:32:55 +0800 Subject: [PATCH] feat(feishu): streaming resuming + inline tool hints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two improvements to Feishu streaming card experience: 1. Handle _resuming in send_delta: when a mid-turn _stream_end arrives with resuming=True (tool call between segments), flush current text to the card but keep the buffer alive so subsequent segments append to the same card instead of creating a new one. 2. Inline tool hints into streaming cards: when a tool hint arrives while a streaming card is active, append it to the card content (e.g. "šŸ”§ web_fetch(...)") instead of sending a separate card. The hint is automatically stripped when the next delta arrives. Made-with: Cursor --- nanobot/channels/feishu.py | 39 ++++++++- tests/channels/test_feishu_streaming.py | 111 ++++++++++++++++++++++++ 2 files changed, 146 insertions(+), 4 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index e18ed8b01..b5cedd8c0 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -267,6 +267,7 @@ class _FeishuStreamBuf: card_id: str | None = None sequence: int = 0 last_edit: float = 0.0 + tool_hint_len: int = 0 class FeishuChannel(BaseChannel): @@ -1279,6 +1280,19 @@ class FeishuChannel(BaseChannel): if self.config.done_emoji and message_id: await self._add_reaction(message_id, self.config.done_emoji) + resuming = meta.get("_resuming", False) + if resuming: + # Mid-turn pause (e.g. tool call between streaming segments). + # Flush current text to card but keep the buffer alive so the + # next segment appends to the same card. + buf = self._stream_bufs.get(chat_id) + if buf and buf.card_id and buf.text: + buf.sequence += 1 + await loop.run_in_executor( + None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence, + ) + return + buf = self._stream_bufs.pop(chat_id, None) if not buf or not buf.text: return @@ -1317,6 +1331,9 @@ class FeishuChannel(BaseChannel): if buf is None: buf = _FeishuStreamBuf() self._stream_bufs[chat_id] = buf + if buf.tool_hint_len > 0: + buf.text = buf.text[:-buf.tool_hint_len] + buf.tool_hint_len = 0 buf.text += delta if not buf.text.strip(): return @@ -1350,12 +1367,26 @@ class FeishuChannel(BaseChannel): receive_id_type = "chat_id" if msg.chat_id.startswith("oc_") else "open_id" loop = asyncio.get_running_loop() - # Handle tool hint messages as code blocks in interactive cards. - # These are progress-only messages and should bypass normal reply routing. + # Handle tool hint messages. When a streaming card is active for + # this chat, inline the hint into the card instead of sending a + # separate message so the user experience stays cohesive. if msg.metadata.get("_tool_hint"): - if msg.content and msg.content.strip(): + hint = (msg.content or "").strip() + if not hint: + return + buf = self._stream_bufs.get(msg.chat_id) + if buf and buf.card_id: + suffix = f"\n\n---\nšŸ”§ {hint}" + buf.text += suffix + buf.tool_hint_len = len(suffix) + buf.sequence += 1 + await loop.run_in_executor( + None, self._stream_update_text_sync, + buf.card_id, buf.text, buf.sequence, + ) + else: await self._send_tool_hint_card( - receive_id_type, msg.chat_id, msg.content.strip() + receive_id_type, msg.chat_id, hint ) return diff --git a/tests/channels/test_feishu_streaming.py b/tests/channels/test_feishu_streaming.py index 22ad8cbc6..2fc75bb8a 100644 --- a/tests/channels/test_feishu_streaming.py +++ b/tests/channels/test_feishu_streaming.py @@ -5,6 +5,7 @@ from unittest.mock import MagicMock import pytest +from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.feishu import FeishuChannel, FeishuConfig, _FeishuStreamBuf @@ -203,6 +204,55 @@ class TestSendDelta: ch._client.cardkit.v1.card_element.content.assert_not_called() ch._client.im.v1.message.create.assert_called_once() + @pytest.mark.asyncio + async def test_stream_end_resuming_keeps_buffer(self): + """_resuming=True flushes text to card but keeps the buffer for the next segment.""" + ch = _make_channel() + ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( + text="Partial answer", card_id="card_1", sequence=2, last_edit=0.0, + ) + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + + await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True}) + + assert "oc_chat1" in ch._stream_bufs + buf = ch._stream_bufs["oc_chat1"] + assert buf.card_id == "card_1" + assert buf.sequence == 3 + ch._client.cardkit.v1.card_element.content.assert_called_once() + ch._client.cardkit.v1.card.settings.assert_not_called() + + @pytest.mark.asyncio + async def test_stream_end_resuming_then_final_end(self): + """Full multi-segment flow: resuming mid-turn, then final end closes the card.""" + ch = _make_channel() + ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( + text="Seg1", card_id="card_1", sequence=1, last_edit=0.0, + ) + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + ch._client.cardkit.v1.card.settings.return_value = _mock_content_response() + + await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True}) + assert "oc_chat1" in ch._stream_bufs + + ch._stream_bufs["oc_chat1"].text += " Seg2" + await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True}) + + assert "oc_chat1" not in ch._stream_bufs + ch._client.cardkit.v1.card.settings.assert_called_once() + + @pytest.mark.asyncio + async def test_stream_end_resuming_no_card_is_noop(self): + """_resuming with no card_id (card creation failed) is a safe no-op.""" + ch = _make_channel() + ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( + text="text", card_id=None, sequence=0, last_edit=0.0, + ) + await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True}) + + assert "oc_chat1" in ch._stream_bufs + ch._client.cardkit.v1.card_element.content.assert_not_called() + @pytest.mark.asyncio async def test_stream_end_without_buf_is_noop(self): ch = _make_channel() @@ -239,6 +289,67 @@ class TestSendDelta: assert buf.sequence == 7 +class TestToolHintInlineStreaming: + """Tool hint messages should be inlined into active streaming cards.""" + + @pytest.mark.asyncio + async def test_tool_hint_inlined_when_stream_active(self): + """With an active streaming buffer, tool hint appends to the card.""" + ch = _make_channel() + ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( + text="Partial answer", card_id="card_1", sequence=2, last_edit=0.0, + ) + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + + msg = OutboundMessage( + channel="feishu", chat_id="oc_chat1", + content='web_fetch("https://example.com")', + metadata={"_tool_hint": True}, + ) + await ch.send(msg) + + buf = ch._stream_bufs["oc_chat1"] + assert buf.text.endswith('šŸ”§ web_fetch("https://example.com")') + assert buf.tool_hint_len > 0 + assert buf.sequence == 3 + ch._client.cardkit.v1.card_element.content.assert_called_once() + ch._client.im.v1.message.create.assert_not_called() + + @pytest.mark.asyncio + async def test_tool_hint_stripped_on_next_delta(self): + """When new delta arrives, the previously appended tool hint is removed.""" + ch = _make_channel() + suffix = "\n\n---\nšŸ”§ web_fetch(\"url\")" + ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( + text="Partial answer" + suffix, + card_id="card_1", sequence=3, last_edit=0.0, + tool_hint_len=len(suffix), + ) + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + + await ch.send_delta("oc_chat1", " continued") + + buf = ch._stream_bufs["oc_chat1"] + assert buf.text == "Partial answer continued" + assert buf.tool_hint_len == 0 + + @pytest.mark.asyncio + async def test_tool_hint_fallback_when_no_stream(self): + """Without an active buffer, tool hint falls back to a standalone card.""" + ch = _make_channel() + ch._client.im.v1.message.create.return_value = _mock_send_response("om_hint") + + msg = OutboundMessage( + channel="feishu", chat_id="oc_chat1", + content='read_file("path")', + metadata={"_tool_hint": True}, + ) + await ch.send(msg) + + assert "oc_chat1" not in ch._stream_bufs + ch._client.im.v1.message.create.assert_called_once() + + class TestSendMessageReturnsId: def test_returns_message_id_on_success(self): ch = _make_channel()