feat(feishu): support stream output (cardkit) (#2382)

* feat(feishu): add streaming support via CardKit PATCH API

Implement send_delta() for Feishu channel using interactive card
progressive editing:
- First delta creates a card with markdown content and typing cursor
- Subsequent deltas throttled at 0.5s to respect 5 QPS PATCH limit
- stream_end finalizes with full formatted card (tables, rich markdown)

Also refactors _send_message_sync to return message_id (str | None)
and adds _patch_card_sync for card updates.

Includes 17 new unit tests covering streaming lifecycle, config,
card building, and edge cases.

Made-with: Cursor

* feat(feishu): close CardKit streaming_mode on stream end

Call cardkit card.settings after final content update so chat preview
leaves default [生成中...] summary (Feishu streaming docs).

Made-with: Cursor

* style: polish Feishu streaming (PEP8 spacing, drop unused test imports)

Made-with: Cursor

* docs(feishu): document cardkit:card:write for streaming

- README: permissions, upgrade note for existing apps, streaming toggle
- CHANNEL_PLUGIN_GUIDE: Feishu CardKit scope and when to disable streaming

Made-with: Cursor

* docs: address PR 2382 review (test path, plugin guide, README, English docstrings)

- Move Feishu streaming tests to tests/channels/
- Remove Feishu CardKit scope from CHANNEL_PLUGIN_GUIDE (plugin-dev doc only)
- README Feishu permissions: consistent English
- feishu.py: replace Chinese in streaming docstrings/comments

Made-with: Cursor
This commit is contained in:
LeftX 2026-03-24 15:57:14 +08:00 committed by GitHub
parent 178216bcbc
commit b3e35e9476
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 412 additions and 8 deletions

View File

@ -514,14 +514,17 @@ nanobot gateway
</details> </details>
<details> <details>
<summary><b>Feishu (飞书)</b></summary> <summary><b>Feishu</b></summary>
Uses **WebSocket** long connection — no public IP required. Uses **WebSocket** long connection — no public IP required.
**1. Create a Feishu bot** **1. Create a Feishu bot**
- Visit [Feishu Open Platform](https://open.feishu.cn/app) - Visit [Feishu Open Platform](https://open.feishu.cn/app)
- Create a new app → Enable **Bot** capability - Create a new app → Enable **Bot** capability
- **Permissions**: Add `im:message` (send messages) and `im:message.p2p_msg:readonly` (receive messages) - **Permissions**:
- `im:message` (send messages) and `im:message.p2p_msg:readonly` (receive messages)
- **Streaming replies** (default in nanobot): add **`cardkit:card:write`** (often labeled **Create and update cards** in the Feishu developer console). Required for CardKit entities and streamed assistant text. Older apps may not have it yet — open **Permission management**, enable the scope, then **publish** a new app version if the console requires it.
- If you **cannot** add `cardkit:card:write`, set `"streaming": false` under `channels.feishu` (see below). The bot still works; replies use normal interactive cards without token-by-token streaming.
- **Events**: Add `im.message.receive_v1` (receive messages) - **Events**: Add `im.message.receive_v1` (receive messages)
- Select **Long Connection** mode (requires running nanobot first to establish connection) - Select **Long Connection** mode (requires running nanobot first to establish connection)
- Get **App ID** and **App Secret** from "Credentials & Basic Info" - Get **App ID** and **App Secret** from "Credentials & Basic Info"
@ -539,12 +542,14 @@ Uses **WebSocket** long connection — no public IP required.
"encryptKey": "", "encryptKey": "",
"verificationToken": "", "verificationToken": "",
"allowFrom": ["ou_YOUR_OPEN_ID"], "allowFrom": ["ou_YOUR_OPEN_ID"],
"groupPolicy": "mention" "groupPolicy": "mention",
"streaming": true
} }
} }
} }
``` ```
> `streaming` defaults to `true`. Use `false` if your app does not have **`cardkit:card:write`** (see permissions above).
> `encryptKey` and `verificationToken` are optional for Long Connection mode. > `encryptKey` and `verificationToken` are optional for Long Connection mode.
> `allowFrom`: Add your open_id (find it in nanobot logs when you message the bot). Use `["*"]` to allow all users. > `allowFrom`: Add your open_id (find it in nanobot logs when you message the bot). Use `["*"]` to allow all users.
> `groupPolicy`: `"mention"` (default — respond only when @mentioned), `"open"` (respond to all group messages). Private chats always respond. > `groupPolicy`: `"mention"` (default — respond only when @mentioned), `"open"` (respond to all group messages). Private chats always respond.

View File

@ -5,7 +5,10 @@ import json
import os import os
import re import re
import threading import threading
import time
import uuid
from collections import OrderedDict from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any, Literal from typing import Any, Literal
@ -248,6 +251,19 @@ class FeishuConfig(Base):
react_emoji: str = "THUMBSUP" react_emoji: str = "THUMBSUP"
group_policy: Literal["open", "mention"] = "mention" group_policy: Literal["open", "mention"] = "mention"
reply_to_message: bool = False # If True, bot replies quote the user's original message reply_to_message: bool = False # If True, bot replies quote the user's original message
streaming: bool = True
_STREAM_ELEMENT_ID = "streaming_md"
@dataclass
class _FeishuStreamBuf:
"""Per-chat streaming accumulator using CardKit streaming API."""
text: str = ""
card_id: str | None = None
sequence: int = 0
last_edit: float = 0.0
class FeishuChannel(BaseChannel): class FeishuChannel(BaseChannel):
@ -265,6 +281,8 @@ class FeishuChannel(BaseChannel):
name = "feishu" name = "feishu"
display_name = "Feishu" display_name = "Feishu"
_STREAM_EDIT_INTERVAL = 0.5 # throttle between CardKit streaming updates
@classmethod @classmethod
def default_config(cls) -> dict[str, Any]: def default_config(cls) -> dict[str, Any]:
return FeishuConfig().model_dump(by_alias=True) return FeishuConfig().model_dump(by_alias=True)
@ -279,6 +297,7 @@ class FeishuChannel(BaseChannel):
self._ws_thread: threading.Thread | None = None self._ws_thread: threading.Thread | None = None
self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache
self._loop: asyncio.AbstractEventLoop | None = None self._loop: asyncio.AbstractEventLoop | None = None
self._stream_bufs: dict[str, _FeishuStreamBuf] = {}
@staticmethod @staticmethod
def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any: def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any:
@ -906,8 +925,8 @@ class FeishuChannel(BaseChannel):
logger.error("Error replying to Feishu message {}: {}", parent_message_id, e) logger.error("Error replying to Feishu message {}: {}", parent_message_id, e)
return False return False
def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> str | None:
"""Send a single message (text/image/file/interactive) synchronously.""" """Send a single message and return the message_id on success."""
from lark_oapi.api.im.v1 import CreateMessageRequest, CreateMessageRequestBody from lark_oapi.api.im.v1 import CreateMessageRequest, CreateMessageRequestBody
try: try:
request = CreateMessageRequest.builder() \ request = CreateMessageRequest.builder() \
@ -925,13 +944,146 @@ class FeishuChannel(BaseChannel):
"Failed to send Feishu {} message: code={}, msg={}, log_id={}", "Failed to send Feishu {} message: code={}, msg={}, log_id={}",
msg_type, response.code, response.msg, response.get_log_id() msg_type, response.code, response.msg, response.get_log_id()
) )
return False return None
logger.debug("Feishu {} message sent to {}", msg_type, receive_id) msg_id = getattr(response.data, "message_id", None)
return True logger.debug("Feishu {} message sent to {}: {}", msg_type, receive_id, msg_id)
return msg_id
except Exception as e: except Exception as e:
logger.error("Error sending Feishu {} message: {}", msg_type, e) logger.error("Error sending Feishu {} message: {}", msg_type, e)
return None
def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None:
"""Create a CardKit streaming card, send it to chat, return card_id."""
from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody
card_json = {
"schema": "2.0",
"config": {"wide_screen_mode": True, "update_multi": True, "streaming_mode": True},
"body": {"elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}]},
}
try:
request = CreateCardRequest.builder().request_body(
CreateCardRequestBody.builder()
.type("card_json")
.data(json.dumps(card_json, ensure_ascii=False))
.build()
).build()
response = self._client.cardkit.v1.card.create(request)
if not response.success():
logger.warning("Failed to create streaming card: code={}, msg={}", response.code, response.msg)
return None
card_id = getattr(response.data, "card_id", None)
if card_id:
self._send_message_sync(
receive_id_type, chat_id, "interactive",
json.dumps({"type": "card", "data": {"card_id": card_id}}),
)
return card_id
except Exception as e:
logger.warning("Error creating streaming card: {}", e)
return None
def _stream_update_text_sync(self, card_id: str, content: str, sequence: int) -> bool:
"""Stream-update the markdown element on a CardKit card (typewriter effect)."""
from lark_oapi.api.cardkit.v1 import ContentCardElementRequest, ContentCardElementRequestBody
try:
request = ContentCardElementRequest.builder() \
.card_id(card_id) \
.element_id(_STREAM_ELEMENT_ID) \
.request_body(
ContentCardElementRequestBody.builder()
.content(content).sequence(sequence).build()
).build()
response = self._client.cardkit.v1.card_element.content(request)
if not response.success():
logger.warning("Failed to stream-update card {}: code={}, msg={}", card_id, response.code, response.msg)
return False
return True
except Exception as e:
logger.warning("Error stream-updating card {}: {}", card_id, e)
return False return False
def _close_streaming_mode_sync(self, card_id: str, sequence: int) -> bool:
"""Turn off CardKit streaming_mode so the chat list preview exits the streaming placeholder.
Per Feishu docs, streaming cards keep a generating-style summary in the session list until
streaming_mode is set to false via card settings (after final content update).
Sequence must strictly exceed the previous card OpenAPI operation on this entity.
"""
from lark_oapi.api.cardkit.v1 import SettingsCardRequest, SettingsCardRequestBody
settings_payload = json.dumps({"config": {"streaming_mode": False}}, ensure_ascii=False)
try:
request = SettingsCardRequest.builder() \
.card_id(card_id) \
.request_body(
SettingsCardRequestBody.builder()
.settings(settings_payload)
.sequence(sequence)
.uuid(str(uuid.uuid4()))
.build()
).build()
response = self._client.cardkit.v1.card.settings(request)
if not response.success():
logger.warning(
"Failed to close streaming on card {}: code={}, msg={}",
card_id, response.code, response.msg,
)
return False
return True
except Exception as e:
logger.warning("Error closing streaming on card {}: {}", card_id, e)
return False
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Progressive streaming via CardKit: create card on first delta, stream-update on subsequent."""
if not self._client:
return
meta = metadata or {}
loop = asyncio.get_running_loop()
rid_type = "chat_id" if chat_id.startswith("oc_") else "open_id"
# --- stream end: final update or fallback ---
if meta.get("_stream_end"):
buf = self._stream_bufs.pop(chat_id, None)
if not buf or not buf.text:
return
if buf.card_id:
buf.sequence += 1
await loop.run_in_executor(
None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence,
)
# Required so the chat list preview exits the streaming placeholder (Feishu streaming card docs).
buf.sequence += 1
await loop.run_in_executor(
None, self._close_streaming_mode_sync, buf.card_id, buf.sequence,
)
else:
for chunk in self._split_elements_by_table_limit(self._build_card_elements(buf.text)):
card = json.dumps({"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False)
await loop.run_in_executor(None, self._send_message_sync, rid_type, chat_id, "interactive", card)
return
# --- accumulate delta ---
buf = self._stream_bufs.get(chat_id)
if buf is None:
buf = _FeishuStreamBuf()
self._stream_bufs[chat_id] = buf
buf.text += delta
if not buf.text.strip():
return
now = time.monotonic()
if buf.card_id is None:
card_id = await loop.run_in_executor(None, self._create_streaming_card_sync, rid_type, chat_id)
if card_id:
buf.card_id = card_id
buf.sequence = 1
await loop.run_in_executor(None, self._stream_update_text_sync, card_id, buf.text, 1)
buf.last_edit = now
elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL:
buf.sequence += 1
await loop.run_in_executor(None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence)
buf.last_edit = now
async def send(self, msg: OutboundMessage) -> None: async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Feishu, including media (images/files) if present.""" """Send a message through Feishu, including media (images/files) if present."""
if not self._client: if not self._client:

View File

@ -0,0 +1,247 @@
"""Tests for Feishu streaming (send_delta) via CardKit streaming API."""
import time
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from nanobot.bus.queue import MessageBus
from nanobot.channels.feishu import FeishuChannel, FeishuConfig, _FeishuStreamBuf
def _make_channel(streaming: bool = True) -> FeishuChannel:
config = FeishuConfig(
enabled=True,
app_id="cli_test",
app_secret="secret",
allow_from=["*"],
streaming=streaming,
)
ch = FeishuChannel(config, MessageBus())
ch._client = MagicMock()
ch._loop = None
return ch
def _mock_create_card_response(card_id: str = "card_stream_001"):
resp = MagicMock()
resp.success.return_value = True
resp.data = SimpleNamespace(card_id=card_id)
return resp
def _mock_send_response(message_id: str = "om_stream_001"):
resp = MagicMock()
resp.success.return_value = True
resp.data = SimpleNamespace(message_id=message_id)
return resp
def _mock_content_response(success: bool = True):
resp = MagicMock()
resp.success.return_value = success
resp.code = 0 if success else 99999
resp.msg = "ok" if success else "error"
return resp
class TestFeishuStreamingConfig:
def test_streaming_default_true(self):
assert FeishuConfig().streaming is True
def test_supports_streaming_when_enabled(self):
ch = _make_channel(streaming=True)
assert ch.supports_streaming is True
def test_supports_streaming_disabled(self):
ch = _make_channel(streaming=False)
assert ch.supports_streaming is False
class TestCreateStreamingCard:
def test_returns_card_id_on_success(self):
ch = _make_channel()
ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_123")
ch._client.im.v1.message.create.return_value = _mock_send_response()
result = ch._create_streaming_card_sync("chat_id", "oc_chat1")
assert result == "card_123"
ch._client.cardkit.v1.card.create.assert_called_once()
ch._client.im.v1.message.create.assert_called_once()
def test_returns_none_on_failure(self):
ch = _make_channel()
resp = MagicMock()
resp.success.return_value = False
resp.code = 99999
resp.msg = "error"
ch._client.cardkit.v1.card.create.return_value = resp
assert ch._create_streaming_card_sync("chat_id", "oc_chat1") is None
def test_returns_none_on_exception(self):
ch = _make_channel()
ch._client.cardkit.v1.card.create.side_effect = RuntimeError("network")
assert ch._create_streaming_card_sync("chat_id", "oc_chat1") is None
class TestCloseStreamingMode:
def test_returns_true_on_success(self):
ch = _make_channel()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response(True)
assert ch._close_streaming_mode_sync("card_1", 10) is True
def test_returns_false_on_failure(self):
ch = _make_channel()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response(False)
assert ch._close_streaming_mode_sync("card_1", 10) is False
def test_returns_false_on_exception(self):
ch = _make_channel()
ch._client.cardkit.v1.card.settings.side_effect = RuntimeError("err")
assert ch._close_streaming_mode_sync("card_1", 10) is False
class TestStreamUpdateText:
def test_returns_true_on_success(self):
ch = _make_channel()
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(True)
assert ch._stream_update_text_sync("card_1", "hello", 1) is True
def test_returns_false_on_failure(self):
ch = _make_channel()
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(False)
assert ch._stream_update_text_sync("card_1", "hello", 1) is False
def test_returns_false_on_exception(self):
ch = _make_channel()
ch._client.cardkit.v1.card_element.content.side_effect = RuntimeError("err")
assert ch._stream_update_text_sync("card_1", "hello", 1) is False
class TestSendDelta:
@pytest.mark.asyncio
async def test_first_delta_creates_card_and_sends(self):
ch = _make_channel()
ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_new")
ch._client.im.v1.message.create.return_value = _mock_send_response("om_new")
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "Hello ")
assert "oc_chat1" in ch._stream_bufs
buf = ch._stream_bufs["oc_chat1"]
assert buf.text == "Hello "
assert buf.card_id == "card_new"
assert buf.sequence == 1
ch._client.cardkit.v1.card.create.assert_called_once()
ch._client.im.v1.message.create.assert_called_once()
ch._client.cardkit.v1.card_element.content.assert_called_once()
@pytest.mark.asyncio
async def test_second_delta_within_interval_skips_update(self):
ch = _make_channel()
buf = _FeishuStreamBuf(text="Hello ", card_id="card_1", sequence=1, last_edit=time.monotonic())
ch._stream_bufs["oc_chat1"] = buf
await ch.send_delta("oc_chat1", "world")
assert buf.text == "Hello world"
ch._client.cardkit.v1.card_element.content.assert_not_called()
@pytest.mark.asyncio
async def test_delta_after_interval_updates_text(self):
ch = _make_channel()
buf = _FeishuStreamBuf(text="Hello ", card_id="card_1", sequence=1, last_edit=time.monotonic() - 1.0)
ch._stream_bufs["oc_chat1"] = buf
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "world")
assert buf.text == "Hello world"
assert buf.sequence == 2
ch._client.cardkit.v1.card_element.content.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_sends_final_update(self):
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Final content", card_id="card_1", sequence=3, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
assert "oc_chat1" not in ch._stream_bufs
ch._client.cardkit.v1.card_element.content.assert_called_once()
ch._client.cardkit.v1.card.settings.assert_called_once()
settings_call = ch._client.cardkit.v1.card.settings.call_args[0][0]
assert settings_call.body.sequence == 5 # after final content seq 4
@pytest.mark.asyncio
async def test_stream_end_fallback_when_no_card_id(self):
"""If card creation failed, stream_end falls back to a plain card message."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Fallback content", card_id=None, sequence=0, last_edit=0.0,
)
ch._client.im.v1.message.create.return_value = _mock_send_response("om_fb")
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
assert "oc_chat1" not in ch._stream_bufs
ch._client.cardkit.v1.card_element.content.assert_not_called()
ch._client.im.v1.message.create.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_without_buf_is_noop(self):
ch = _make_channel()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
ch._client.cardkit.v1.card_element.content.assert_not_called()
@pytest.mark.asyncio
async def test_empty_delta_skips_send(self):
ch = _make_channel()
await ch.send_delta("oc_chat1", " ")
assert "oc_chat1" in ch._stream_bufs
ch._client.cardkit.v1.card.create.assert_not_called()
@pytest.mark.asyncio
async def test_no_client_returns_early(self):
ch = _make_channel()
ch._client = None
await ch.send_delta("oc_chat1", "text")
assert "oc_chat1" not in ch._stream_bufs
@pytest.mark.asyncio
async def test_sequence_increments_correctly(self):
ch = _make_channel()
buf = _FeishuStreamBuf(text="a", card_id="card_1", sequence=5, last_edit=0.0)
ch._stream_bufs["oc_chat1"] = buf
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "b")
assert buf.sequence == 6
buf.last_edit = 0.0 # reset to bypass throttle
await ch.send_delta("oc_chat1", "c")
assert buf.sequence == 7
class TestSendMessageReturnsId:
def test_returns_message_id_on_success(self):
ch = _make_channel()
ch._client.im.v1.message.create.return_value = _mock_send_response("om_abc")
result = ch._send_message_sync("chat_id", "oc_chat1", "text", '{"text":"hi"}')
assert result == "om_abc"
def test_returns_none_on_failure(self):
ch = _make_channel()
resp = MagicMock()
resp.success.return_value = False
resp.code = 99999
resp.msg = "error"
resp.get_log_id.return_value = "log1"
ch._client.im.v1.message.create.return_value = resp
result = ch._send_message_sync("chat_id", "oc_chat1", "text", '{"text":"hi"}')
assert result is None