fix(feishu): remove resuming to avoid 10-min streaming card timeout

Feishu streaming cards auto-close after 10 minutes from creation,
regardless of update activity. With resuming enabled, a single card
lives across multiple tool-call rounds and can exceed this limit,
causing the final response to be silently lost.

Remove the _resuming logic from send_delta so each tool-call round
gets its own short-lived streaming card (well under 10 min). Add a
fallback that sends a regular interactive card when the final
streaming update fails.
This commit is contained in:
chengyongru 2026-04-14 14:14:14 +08:00 committed by chengyongru
parent 4c684540c5
commit dec26396ed
4 changed files with 93 additions and 172 deletions

View File

@ -290,7 +290,6 @@ async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] |
|------|---------|
| `_stream_delta: True` | A content chunk (delta contains the new text) |
| `_stream_end: True` | Streaming finished (delta is empty) |
| `_resuming: True` | More streaming rounds coming (e.g. tool call then another response) |
### Example: Webhook with Streaming

View File

@ -1290,7 +1290,6 @@ class FeishuChannel(BaseChannel):
Supported metadata keys:
_stream_end: Finalize the streaming card.
_resuming: Mid-turn pause flush but keep the buffer alive.
_tool_hint: Delta is a formatted tool hint (for display only).
message_id: Original message id (used with _stream_end for reaction cleanup).
reaction_id: Reaction id to remove on stream end.
@ -1309,50 +1308,44 @@ class FeishuChannel(BaseChannel):
if self.config.done_emoji and message_id:
await self._add_reaction(message_id, self.config.done_emoji)
resuming = meta.get("_resuming", False)
if resuming:
# Mid-turn pause (e.g. tool call between streaming segments).
# Flush current text to card but keep the buffer alive so the
# next segment appends to the same card.
buf = self._stream_bufs.get(chat_id)
if buf and buf.card_id and buf.text:
buf.sequence += 1
await loop.run_in_executor(
None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence,
)
return
buf = self._stream_bufs.pop(chat_id, None)
if not buf or not buf.text:
return
# Try to finalize via streaming card; if that fails (e.g.
# streaming mode was closed by Feishu due to timeout), fall
# back to sending a regular interactive card.
if buf.card_id:
buf.sequence += 1
await loop.run_in_executor(
ok = await loop.run_in_executor(
None,
self._stream_update_text_sync,
buf.card_id,
buf.text,
buf.sequence,
)
# Required so the chat list preview exits the streaming placeholder (Feishu streaming card docs).
buf.sequence += 1
await loop.run_in_executor(
None,
self._close_streaming_mode_sync,
buf.card_id,
buf.sequence,
)
else:
for chunk in self._split_elements_by_table_limit(
self._build_card_elements(buf.text)
):
card = json.dumps(
{"config": {"wide_screen_mode": True}, "elements": chunk},
ensure_ascii=False,
)
if ok:
buf.sequence += 1
await loop.run_in_executor(
None, self._send_message_sync, rid_type, chat_id, "interactive", card
None,
self._close_streaming_mode_sync,
buf.card_id,
buf.sequence,
)
return
logger.warning(
"Streaming card {} final update failed, falling back to regular card",
buf.card_id,
)
for chunk in self._split_elements_by_table_limit(
self._build_card_elements(buf.text)
):
card = json.dumps(
{"config": {"wide_screen_mode": True}, "elements": chunk},
ensure_ascii=False,
)
await loop.run_in_executor(
None, self._send_message_sync, rid_type, chat_id, "interactive", card
)
return
# --- accumulate delta ---
@ -1404,14 +1397,21 @@ class FeishuChannel(BaseChannel):
if buf and buf.card_id:
# Delegate to send_delta so tool hints get the same
# throttling (and card creation) as regular text deltas.
lines = self.__class__._format_tool_hint_lines(hint).split("\n")
delta = "\n\n" + "\n".join(
f"{self.config.tool_hint_prefix} {ln}" for ln in lines if ln.strip()
) + "\n\n"
await self.send_delta(msg.chat_id, delta)
await self.send_delta(
msg.chat_id,
"\n\n" + self._format_tool_hint_delta(hint) + "\n\n",
)
return
await self._send_tool_hint_card(
receive_id_type, msg.chat_id, hint
# No active streaming card — send as a regular
# interactive card with the same 🔧 prefix style.
card = json.dumps(
{"config": {"wide_screen_mode": True}, "elements": [
{"tag": "markdown", "content": self._format_tool_hint_delta(hint)},
]},
ensure_ascii=False,
)
await loop.run_in_executor(
None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card
)
return
@ -1708,33 +1708,9 @@ class FeishuChannel(BaseChannel):
return "\n".join(part for part in parts if part)
async def _send_tool_hint_card(
self, receive_id_type: str, receive_id: str, tool_hint: str
) -> None:
"""Send tool hint as an interactive card with formatted code block.
Args:
receive_id_type: "chat_id" or "open_id"
receive_id: The target chat or user ID
tool_hint: Formatted tool hint string (e.g., 'web_search("q"), read_file("path")')
"""
loop = asyncio.get_running_loop()
# Put each top-level tool call on its own line without altering commas inside arguments.
formatted_code = self.__class__._format_tool_hint_lines(tool_hint)
card = {
"config": {"wide_screen_mode": True},
"elements": [
{"tag": "markdown", "content": f"**Tool Calls**\n\n```text\n{formatted_code}\n```"}
],
}
await loop.run_in_executor(
None,
self._send_message_sync,
receive_id_type,
receive_id,
"interactive",
json.dumps(card, ensure_ascii=False),
def _format_tool_hint_delta(self, tool_hint: str) -> str:
"""Format a tool hint string with the 🔧 prefix for each line."""
lines = self.__class__._format_tool_hint_lines(tool_hint).split("\n")
return "\n".join(
f"{self.config.tool_hint_prefix} {ln}" for ln in lines if ln.strip()
)

View File

@ -205,53 +205,22 @@ class TestSendDelta:
ch._client.im.v1.message.create.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_resuming_keeps_buffer(self):
"""_resuming=True flushes text to card but keeps the buffer for the next segment."""
async def test_stream_end_fallback_when_final_update_fails(self):
"""If streaming mode was closed (e.g. Feishu timeout), fall back to a regular card."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Partial answer", card_id="card_1", sequence=2, last_edit=0.0,
text="Lost content", card_id="card_1", sequence=3, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(success=False)
ch._client.im.v1.message.create.return_value = _mock_send_response("om_fb")
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
buf = ch._stream_bufs["oc_chat1"]
assert buf.card_id == "card_1"
assert buf.sequence == 3
ch._client.cardkit.v1.card_element.content.assert_called_once()
ch._client.cardkit.v1.card.settings.assert_not_called()
@pytest.mark.asyncio
async def test_stream_end_resuming_then_final_end(self):
"""Full multi-segment flow: resuming mid-turn, then final end closes the card."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Seg1", card_id="card_1", sequence=1, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
ch._stream_bufs["oc_chat1"].text += " Seg2"
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
assert "oc_chat1" not in ch._stream_bufs
ch._client.cardkit.v1.card.settings.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_resuming_no_card_is_noop(self):
"""_resuming with no card_id (card creation failed) is a safe no-op."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="text", card_id=None, sequence=0, last_edit=0.0,
)
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
assert "oc_chat1" in ch._stream_bufs
ch._client.cardkit.v1.card_element.content.assert_not_called()
# Should NOT attempt to close streaming mode since update failed
ch._client.cardkit.v1.card.settings.assert_not_called()
# Should fall back to sending a regular interactive card
ch._client.im.v1.message.create.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_without_buf_is_noop(self):
@ -375,22 +344,6 @@ class TestToolHintInlineStreaming:
assert "🔧 $ cd /project" in buf.text
assert "🔧 $ git status" in buf.text
@pytest.mark.asyncio
async def test_tool_hint_preserved_on_resuming_flush(self):
"""When _resuming flushes the buffer, tool hint is kept as permanent content."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Partial answer\n\n🔧 $ cd /project\n\n",
card_id="card_1", sequence=2, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True, "_resuming": True})
buf = ch._stream_bufs["oc_chat1"]
assert "Partial answer" in buf.text
assert "🔧 $ cd /project" in buf.text
@pytest.mark.asyncio
async def test_tool_hint_preserved_on_final_stream_end(self):
"""When final _stream_end closes the card, tool hint is kept in the final text."""

View File

@ -1,6 +1,7 @@
"""Tests for FeishuChannel tool hint code block formatting."""
"""Tests for FeishuChannel tool hint formatting."""
import json
from types import SimpleNamespace
from unittest.mock import MagicMock, patch
import pytest
@ -28,15 +29,24 @@ def mock_feishu_channel():
config.app_secret = "test_app_secret"
config.encrypt_key = None
config.verification_token = None
config.tool_hint_prefix = "\U0001f527" # 🔧
bus = MagicMock()
channel = FeishuChannel(config, bus)
channel._client = MagicMock() # Simulate initialized client
channel._client = MagicMock()
return channel
def _get_tool_hint_card(mock_send):
"""Extract the interactive card from _send_message_sync calls."""
call_args = mock_send.call_args[0]
_, _, msg_type, content = call_args
assert msg_type == "interactive"
return json.loads(content)
@mark.asyncio
async def test_tool_hint_sends_code_message(mock_feishu_channel):
"""Tool hint messages should be sent as interactive cards with code blocks."""
async def test_tool_hint_sends_interactive_card(mock_feishu_channel):
"""Tool hint without active buffer sends an interactive card with 🔧 style."""
msg = OutboundMessage(
channel="feishu",
chat_id="oc_123456",
@ -47,23 +57,12 @@ async def test_tool_hint_sends_code_message(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
# Verify interactive message with card was sent
assert mock_send.call_count == 1
call_args = mock_send.call_args[0]
receive_id_type, receive_id, msg_type, content = call_args
assert receive_id_type == "chat_id"
assert receive_id == "oc_123456"
assert msg_type == "interactive"
# Parse content to verify card structure
card = json.loads(content)
card = _get_tool_hint_card(mock_send)
assert card["config"]["wide_screen_mode"] is True
assert len(card["elements"]) == 1
assert card["elements"][0]["tag"] == "markdown"
# Check that code block is properly formatted with language hint
expected_md = "**Tool Calls**\n\n```text\nweb_search(\"test query\")\n```"
assert card["elements"][0]["content"] == expected_md
md = card["elements"][0]["content"]
assert "\U0001f527" in md
assert "web_search" in md
@mark.asyncio
@ -78,8 +77,6 @@ async def test_tool_hint_empty_content_does_not_send(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
# Should not send any message
mock_send.assert_not_called()
@ -96,7 +93,6 @@ async def test_tool_hint_without_metadata_sends_as_normal(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
# Should send as text message (detected format)
assert mock_send.call_count == 1
call_args = mock_send.call_args[0]
_, _, msg_type, content = call_args
@ -106,7 +102,7 @@ async def test_tool_hint_without_metadata_sends_as_normal(mock_feishu_channel):
@mark.asyncio
async def test_tool_hint_multiple_tools_in_one_message(mock_feishu_channel):
"""Multiple tool calls should be displayed each on its own line in a code block."""
"""Multiple tool calls should each get the 🔧 prefix."""
msg = OutboundMessage(
channel="feishu",
chat_id="oc_123456",
@ -117,13 +113,11 @@ async def test_tool_hint_multiple_tools_in_one_message(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
call_args = mock_send.call_args[0]
msg_type = call_args[2]
content = json.loads(call_args[3])
assert msg_type == "interactive"
# Each tool call should be on its own line
expected_md = "**Tool Calls**\n\n```text\nweb_search(\"query\"),\nread_file(\"/path/to/file\")\n```"
assert content["elements"][0]["content"] == expected_md
card = _get_tool_hint_card(mock_send)
md = card["elements"][0]["content"]
assert "web_search" in md
assert "read_file" in md
assert "\U0001f527" in md
@mark.asyncio
@ -139,8 +133,8 @@ async def test_tool_hint_new_format_basic(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3])
md = content["elements"][0]["content"]
card = _get_tool_hint_card(mock_send)
md = card["elements"][0]["content"]
assert "read src/main.py" in md
assert 'grep "TODO"' in md
@ -158,16 +152,15 @@ async def test_tool_hint_new_format_with_comma_in_quotes(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3])
md = content["elements"][0]["content"]
# The comma inside quotes should NOT cause a line break
card = _get_tool_hint_card(mock_send)
md = card["elements"][0]["content"]
assert 'grep "hello, world"' in md
assert "$ echo test" in md
@mark.asyncio
async def test_tool_hint_new_format_with_folding(mock_feishu_channel):
"""Folded calls (× N) should display on separate lines."""
"""Folded calls (× N) should display correctly."""
msg = OutboundMessage(
channel="feishu",
chat_id="oc_123456",
@ -178,8 +171,8 @@ async def test_tool_hint_new_format_with_folding(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3])
md = content["elements"][0]["content"]
card = _get_tool_hint_card(mock_send)
md = card["elements"][0]["content"]
assert "\u00d7 3" in md
assert 'grep "pattern"' in md
@ -197,9 +190,12 @@ async def test_tool_hint_new_format_mcp(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3])
md = content["elements"][0]["content"]
card = _get_tool_hint_card(mock_send)
md = card["elements"][0]["content"]
assert "4_5v::analyze_image" in md
@mark.asyncio
async def test_tool_hint_keeps_commas_inside_arguments(mock_feishu_channel):
"""Commas inside a single tool argument must not be split onto a new line."""
msg = OutboundMessage(
@ -212,10 +208,7 @@ async def test_tool_hint_keeps_commas_inside_arguments(mock_feishu_channel):
with patch.object(mock_feishu_channel, '_send_message_sync') as mock_send:
await mock_feishu_channel.send(msg)
content = json.loads(mock_send.call_args[0][3])
expected_md = (
"**Tool Calls**\n\n```text\n"
"web_search(\"foo, bar\"),\n"
"read_file(\"/path/to/file\")\n```"
)
assert content["elements"][0]["content"] == expected_md
card = _get_tool_hint_card(mock_send)
md = card["elements"][0]["content"]
assert 'web_search("foo, bar")' in md
assert 'read_file("/path/to/file")' in md