fix(feishu): remove resuming to avoid 10-min streaming card timeout

Feishu streaming cards auto-close after 10 minutes from creation,
regardless of update activity. With resuming enabled, a single card
lives across multiple tool-call rounds and can exceed this limit,
causing the final response to be silently lost.

Remove the _resuming logic from send_delta so each tool-call round
gets its own short-lived streaming card (well under 10 min). Add a
fallback that sends a regular interactive card when the final
streaming update fails.
This commit is contained in:
chengyongru 2026-04-14 14:14:14 +08:00 committed by Xubin Ren
parent 12c12869b4
commit 0adce5405b
4 changed files with 93 additions and 172 deletions

View File

@ -290,7 +290,6 @@ async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] |
|------|---------| |------|---------|
| `_stream_delta: True` | A content chunk (delta contains the new text) | | `_stream_delta: True` | A content chunk (delta contains the new text) |
| `_stream_end: True` | Streaming finished (delta is empty) | | `_stream_end: True` | Streaming finished (delta is empty) |
| `_resuming: True` | More streaming rounds coming (e.g. tool call then another response) |
### Example: Webhook with Streaming ### Example: Webhook with Streaming

View File

@ -1290,7 +1290,6 @@ class FeishuChannel(BaseChannel):
Supported metadata keys: Supported metadata keys:
_stream_end: Finalize the streaming card. _stream_end: Finalize the streaming card.
_resuming: Mid-turn pause flush but keep the buffer alive.
_tool_hint: Delta is a formatted tool hint (for display only). _tool_hint: Delta is a formatted tool hint (for display only).
message_id: Original message id (used with _stream_end for reaction cleanup). message_id: Original message id (used with _stream_end for reaction cleanup).
reaction_id: Reaction id to remove on stream end. reaction_id: Reaction id to remove on stream end.
@ -1309,32 +1308,22 @@ class FeishuChannel(BaseChannel):
if self.config.done_emoji and message_id: if self.config.done_emoji and message_id:
await self._add_reaction(message_id, self.config.done_emoji) await self._add_reaction(message_id, self.config.done_emoji)
resuming = meta.get("_resuming", False)
if resuming:
# Mid-turn pause (e.g. tool call between streaming segments).
# Flush current text to card but keep the buffer alive so the
# next segment appends to the same card.
buf = self._stream_bufs.get(chat_id)
if buf and buf.card_id and buf.text:
buf.sequence += 1
await loop.run_in_executor(
None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence,
)
return
buf = self._stream_bufs.pop(chat_id, None) buf = self._stream_bufs.pop(chat_id, None)
if not buf or not buf.text: if not buf or not buf.text:
return return
# Try to finalize via streaming card; if that fails (e.g.
# streaming mode was closed by Feishu due to timeout), fall
# back to sending a regular interactive card.
if buf.card_id: if buf.card_id:
buf.sequence += 1 buf.sequence += 1
await loop.run_in_executor( ok = await loop.run_in_executor(
None, None,
self._stream_update_text_sync, self._stream_update_text_sync,
buf.card_id, buf.card_id,
buf.text, buf.text,
buf.sequence, buf.sequence,
) )
# Required so the chat list preview exits the streaming placeholder (Feishu streaming card docs). if ok:
buf.sequence += 1 buf.sequence += 1
await loop.run_in_executor( await loop.run_in_executor(
None, None,
@ -1342,7 +1331,11 @@ class FeishuChannel(BaseChannel):
buf.card_id, buf.card_id,
buf.sequence, buf.sequence,
) )
else: return
logger.warning(
"Streaming card {} final update failed, falling back to regular card",
buf.card_id,
)
for chunk in self._split_elements_by_table_limit( for chunk in self._split_elements_by_table_limit(
self._build_card_elements(buf.text) self._build_card_elements(buf.text)
): ):
@ -1404,14 +1397,21 @@ class FeishuChannel(BaseChannel):
if buf and buf.card_id: if buf and buf.card_id:
# Delegate to send_delta so tool hints get the same # Delegate to send_delta so tool hints get the same
# throttling (and card creation) as regular text deltas. # throttling (and card creation) as regular text deltas.
lines = self.__class__._format_tool_hint_lines(hint).split("\n") await self.send_delta(
delta = "\n\n" + "\n".join( msg.chat_id,
f"{self.config.tool_hint_prefix} {ln}" for ln in lines if ln.strip() "\n\n" + self._format_tool_hint_delta(hint) + "\n\n",
) + "\n\n" )
await self.send_delta(msg.chat_id, delta)
return return
await self._send_tool_hint_card( # No active streaming card — send as a regular
receive_id_type, msg.chat_id, hint # interactive card with the same 🔧 prefix style.
card = json.dumps(
{"config": {"wide_screen_mode": True}, "elements": [
{"tag": "markdown", "content": self._format_tool_hint_delta(hint)},
]},
ensure_ascii=False,
)
await loop.run_in_executor(
None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card
) )
return return
@ -1708,33 +1708,9 @@ class FeishuChannel(BaseChannel):
return "\n".join(part for part in parts if part) return "\n".join(part for part in parts if part)
async def _send_tool_hint_card( def _format_tool_hint_delta(self, tool_hint: str) -> str:
self, receive_id_type: str, receive_id: str, tool_hint: str """Format a tool hint string with the 🔧 prefix for each line."""
) -> None: lines = self.__class__._format_tool_hint_lines(tool_hint).split("\n")
"""Send tool hint as an interactive card with formatted code block. return "\n".join(
f"{self.config.tool_hint_prefix} {ln}" for ln in lines if ln.strip()
Args:
receive_id_type: "chat_id" or "open_id"
receive_id: The target chat or user ID
tool_hint: Formatted tool hint string (e.g., 'web_search("q"), read_file("path")')
"""
loop = asyncio.get_running_loop()
# Put each top-level tool call on its own line without altering commas inside arguments.
formatted_code = self.__class__._format_tool_hint_lines(tool_hint)
card = {
"config": {"wide_screen_mode": True},
"elements": [
{"tag": "markdown", "content": f"**Tool Calls**\n\n```text\n{formatted_code}\n```"}
],
}
await loop.run_in_executor(
None,
self._send_message_sync,
receive_id_type,
receive_id,
"interactive",
json.dumps(card, ensure_ascii=False),
) )

View File

@ -205,53 +205,22 @@ class TestSendDelta:
ch._client.im.v1.message.create.assert_called_once() ch._client.im.v1.message.create.assert_called_once()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stream_end_resuming_keeps_buffer(self): async def test_stream_end_fallback_when_final_update_fails(self):
"""_resuming=True flushes text to card but keeps the buffer for the next segment.""" """If streaming mode was closed (e.g. Feishu timeout), fall back to a regular card."""
ch = _make_channel() ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Partial answer", card_id="card_1", sequence=2, last_edit=0.0, text="Lost content", card_id="card_1", sequence=3, last_edit=0.0,
) )
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(success=False)
ch._client.im.v1.message.create.return_value = _mock_send_response("om_fb")
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
buf = ch._stream_bufs["oc_chat1"]
assert buf.card_id == "card_1"
assert buf.sequence == 3
ch._client.cardkit.v1.card_element.content.assert_called_once()
ch._client.cardkit.v1.card.settings.assert_not_called()
@pytest.mark.asyncio
async def test_stream_end_resuming_then_final_end(self):
"""Full multi-segment flow: resuming mid-turn, then final end closes the card."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Seg1", card_id="card_1", sequence=1, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
ch._stream_bufs["oc_chat1"].text += " Seg2"
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True}) await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
assert "oc_chat1" not in ch._stream_bufs assert "oc_chat1" not in ch._stream_bufs
ch._client.cardkit.v1.card.settings.assert_called_once() # Should NOT attempt to close streaming mode since update failed
ch._client.cardkit.v1.card.settings.assert_not_called()
@pytest.mark.asyncio # Should fall back to sending a regular interactive card
async def test_stream_end_resuming_no_card_is_noop(self): ch._client.im.v1.message.create.assert_called_once()
"""_resuming with no card_id (card creation failed) is a safe no-op."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="text", card_id=None, sequence=0, last_edit=0.0,
)
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
ch._client.cardkit.v1.card_element.content.assert_not_called()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_stream_end_without_buf_is_noop(self): async def test_stream_end_without_buf_is_noop(self):
@ -375,22 +344,6 @@ class TestToolHintInlineStreaming:
assert "🔧 $ cd /project" in buf.text assert "🔧 $ cd /project" in buf.text
assert "🔧 $ git status" in buf.text assert "🔧 $ git status" in buf.text
@pytest.mark.asyncio
async def test_tool_hint_preserved_on_resuming_flush(self):
"""When _resuming flushes the buffer, tool hint is kept as permanent content."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Partial answer\n\n🔧 $ cd /project\n\n",
card_id="card_1", sequence=2, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
buf = ch._stream_bufs["oc_chat1"]
assert "Partial answer" in buf.text
assert "🔧 $ cd /project" in buf.text
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_tool_hint_preserved_on_final_stream_end(self): async def test_tool_hint_preserved_on_final_stream_end(self):
"""When final _stream_end closes the card, tool hint is kept in the final text.""" """When final _stream_end closes the card, tool hint is kept in the final text."""

View File

@ -1,6 +1,7 @@
"""Tests for FeishuChannel tool hint code block formatting.""" """Tests for FeishuChannel tool hint formatting."""
import json import json
from types import SimpleNamespace
from unittest.mock import MagicMock, patch from unittest.mock import MagicMock, patch
import pytest import pytest
@ -28,15 +29,24 @@ def mock_feishu_channel():
config.app_secret = "test_app_secret" config.app_secret = "test_app_secret"
config.encrypt_key = None config.encrypt_key = None
config.verification_token = None config.verification_token = None
config.tool_hint_prefix = "\U0001f527" # 🔧
bus = MagicMock() bus = MagicMock()
channel = FeishuChannel(config, bus) channel = FeishuChannel(config, bus)
channel._client = MagicMock() # Simulate initialized client channel._client = MagicMock()
return channel return channel
def _get_tool_hint_card(mock_send):
"""Extract the interactive card from _send_message_sync calls."""
call_args = mock_send.call_args[0]
_, _, msg_type, content = call_args
assert msg_type == "interactive"
return json.loads(content)
@mark.asyncio @mark.asyncio
async def test_tool_hint_sends_code_message(mock_feishu_channel): async def test_tool_hint_sends_interactive_card(mock_feishu_channel):
"""Tool hint messages should be sent as interactive cards with code blocks.""" """Tool hint without active buffer sends an interactive card with 🔧 style."""
msg = OutboundMessage( msg = OutboundMessage(
channel="feishu", channel="feishu",
chat_id="oc_123456", chat_id="oc_123456",
@ -47,23 +57,12 @@ async def test_tool_hint_sends_code_message(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
# Verify interactive message with card was sent
assert mock_send.call_count == 1 assert mock_send.call_count == 1
call_args = mock_send.call_args[0] card = _get_tool_hint_card(mock_send)
receive_id_type, receive_id, msg_type, content = call_args
assert receive_id_type == "chat_id"
assert receive_id == "oc_123456"
assert msg_type == "interactive"
# Parse content to verify card structure
card = json.loads(content)
assert card["config"]["wide_screen_mode"] is True assert card["config"]["wide_screen_mode"] is True
assert len(card["elements"]) == 1 md = card["elements"][0]["content"]
assert card["elements"][0]["tag"] == "markdown" assert "\U0001f527" in md
# Check that code block is properly formatted with language hint assert "web_search" in md
expected_md = "**Tool Calls**\n\n```text\nweb_search(\"test query\")\n```"
assert card["elements"][0]["content"] == expected_md
@mark.asyncio @mark.asyncio
@ -78,8 +77,6 @@ async def test_tool_hint_empty_content_does_not_send(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
# Should not send any message
mock_send.assert_not_called() mock_send.assert_not_called()
@ -96,7 +93,6 @@ async def test_tool_hint_without_metadata_sends_as_normal(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
# Should send as text message (detected format)
assert mock_send.call_count == 1 assert mock_send.call_count == 1
call_args = mock_send.call_args[0] call_args = mock_send.call_args[0]
_, _, msg_type, content = call_args _, _, msg_type, content = call_args
@ -106,7 +102,7 @@ async def test_tool_hint_without_metadata_sends_as_normal(mock_feishu_channel):
@mark.asyncio @mark.asyncio
async def test_tool_hint_multiple_tools_in_one_message(mock_feishu_channel): async def test_tool_hint_multiple_tools_in_one_message(mock_feishu_channel):
"""Multiple tool calls should be displayed each on its own line in a code block.""" """Multiple tool calls should each get the 🔧 prefix."""
msg = OutboundMessage( msg = OutboundMessage(
channel="feishu", channel="feishu",
chat_id="oc_123456", chat_id="oc_123456",
@ -117,13 +113,11 @@ async def test_tool_hint_multiple_tools_in_one_message(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
call_args = mock_send.call_args[0] card = _get_tool_hint_card(mock_send)
msg_type = call_args[2] md = card["elements"][0]["content"]
content = json.loads(call_args[3]) assert "web_search" in md
assert msg_type == "interactive" assert "read_file" in md
# Each tool call should be on its own line assert "\U0001f527" in md
expected_md = "**Tool Calls**\n\n```text\nweb_search(\"query\"),\nread_file(\"/path/to/file\")\n```"
assert content["elements"][0]["content"] == expected_md
@mark.asyncio @mark.asyncio
@ -139,8 +133,8 @@ async def test_tool_hint_new_format_basic(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3]) card = _get_tool_hint_card(mock_send)
md = content["elements"][0]["content"] md = card["elements"][0]["content"]
assert "read src/main.py" in md assert "read src/main.py" in md
assert 'grep "TODO"' in md assert 'grep "TODO"' in md
@ -158,16 +152,15 @@ async def test_tool_hint_new_format_with_comma_in_quotes(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3]) card = _get_tool_hint_card(mock_send)
md = content["elements"][0]["content"] md = card["elements"][0]["content"]
# The comma inside quotes should NOT cause a line break
assert 'grep "hello, world"' in md assert 'grep "hello, world"' in md
assert "$ echo test" in md assert "$ echo test" in md
@mark.asyncio @mark.asyncio
async def test_tool_hint_new_format_with_folding(mock_feishu_channel): async def test_tool_hint_new_format_with_folding(mock_feishu_channel):
"""Folded calls (× N) should display on separate lines.""" """Folded calls (× N) should display correctly."""
msg = OutboundMessage( msg = OutboundMessage(
channel="feishu", channel="feishu",
chat_id="oc_123456", chat_id="oc_123456",
@ -178,8 +171,8 @@ async def test_tool_hint_new_format_with_folding(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3]) card = _get_tool_hint_card(mock_send)
md = content["elements"][0]["content"] md = card["elements"][0]["content"]
assert "\u00d7 3" in md assert "\u00d7 3" in md
assert 'grep "pattern"' in md assert 'grep "pattern"' in md
@ -197,9 +190,12 @@ async def test_tool_hint_new_format_mcp(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3]) card = _get_tool_hint_card(mock_send)
md = content["elements"][0]["content"] md = card["elements"][0]["content"]
assert "4_5v::analyze_image" in md assert "4_5v::analyze_image" in md
@mark.asyncio
async def test_tool_hint_keeps_commas_inside_arguments(mock_feishu_channel): async def test_tool_hint_keeps_commas_inside_arguments(mock_feishu_channel):
"""Commas inside a single tool argument must not be split onto a new line.""" """Commas inside a single tool argument must not be split onto a new line."""
msg = OutboundMessage( msg = OutboundMessage(
@ -212,10 +208,7 @@ async def test_tool_hint_keeps_commas_inside_arguments(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send: with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg) await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3]) card = _get_tool_hint_card(mock_send)
expected_md = ( md = card["elements"][0]["content"]
"**Tool Calls**\n\n```text\n" assert 'web_search("foo, bar")' in md
"web_search(\"foo, bar\"),\n" assert 'read_file("/path/to/file")' in md
"read_file(\"/path/to/file\")\n```"
)
assert content["elements"][0]["content"] == expected_md