feat(feishu): support stream output (cardkit) (#2382)

* feat(feishu): add streaming support via CardKit PATCH API

Implement send_delta() for Feishu channel using interactive card
progressive editing:
- First delta creates a card with markdown content and typing cursor
- Subsequent deltas throttled at 0.5s to respect 5 QPS PATCH limit
- stream_end finalizes with full formatted card (tables, rich markdown)

Also refactors _send_message_sync to return message_id (str | None)
and adds _patch_card_sync for card updates.

Includes 17 new unit tests covering streaming lifecycle, config,
card building, and edge cases.

Made-with: Cursor

* feat(feishu): close CardKit streaming_mode on stream end

Call cardkit card.settings after final content update so chat preview
leaves default [生成中...] summary (Feishu streaming docs).

Made-with: Cursor

* style: polish Feishu streaming (PEP8 spacing, drop unused test imports)

Made-with: Cursor

* docs(feishu): document cardkit:card:write for streaming

- README: permissions, upgrade note for existing apps, streaming toggle
- CHANNEL_PLUGIN_GUIDE: Feishu CardKit scope and when to disable streaming

Made-with: Cursor

* docs: address PR 2382 review (test path, plugin guide, README, English docstrings)

- Move Feishu streaming tests to tests/channels/
- Remove Feishu CardKit scope from CHANNEL_PLUGIN_GUIDE (plugin-dev doc only)
- README Feishu permissions: consistent English
- feishu.py: replace Chinese in streaming docstrings/comments

Made-with: Cursor
This commit is contained in:
LeftX 2026-03-24 15:57:14 +08:00 committed by Xubin Ren
parent cf25a582ba
commit 0ba71298e6
3 changed files with 412 additions and 8 deletions

View File

@ -505,14 +505,17 @@ nanobot gateway
</details>
<details>
<summary><b>Feishu (飞书)</b></summary>
<summary><b>Feishu</b></summary>
Uses **WebSocket** long connection — no public IP required.
**1. Create a Feishu bot**
- Visit [Feishu Open Platform](https://open.feishu.cn/app)
- Create a new app → Enable **Bot** capability
- **Permissions**: Add `im:message` (send messages) and `im:message.p2p_msg:readonly` (receive messages)
- **Permissions**:
- `im:message` (send messages) and `im:message.p2p_msg:readonly` (receive messages)
- **Streaming replies** (default in nanobot): add **`cardkit:card:write`** (often labeled **Create and update cards** in the Feishu developer console). Required for CardKit entities and streamed assistant text. Older apps may not have it yet — open **Permission management**, enable the scope, then **publish** a new app version if the console requires it.
- If you **cannot** add `cardkit:card:write`, set `"streaming": false` under `channels.feishu` (see below). The bot still works; replies use normal interactive cards without token-by-token streaming.
- **Events**: Add `im.message.receive_v1` (receive messages)
- Select **Long Connection** mode (requires running nanobot first to establish connection)
- Get **App ID** and **App Secret** from "Credentials & Basic Info"
@ -530,12 +533,14 @@ Uses **WebSocket** long connection — no public IP required.
"encryptKey": "",
"verificationToken": "",
"allowFrom": ["ou_YOUR_OPEN_ID"],
"groupPolicy": "mention"
"groupPolicy": "mention",
"streaming": true
}
}
}
```
> `streaming` defaults to `true`. Use `false` if your app does not have **`cardkit:card:write`** (see permissions above).
> `encryptKey` and `verificationToken` are optional for Long Connection mode.
> `allowFrom`: Add your open_id (find it in nanobot logs when you message the bot). Use `["*"]` to allow all users.
> `groupPolicy`: `"mention"` (default — respond only when @mentioned), `"open"` (respond to all group messages). Private chats always respond.

View File

@ -5,7 +5,10 @@ import json
import os
import re
import threading
import time
import uuid
from collections import OrderedDict
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
@ -248,6 +251,19 @@ class FeishuConfig(Base):
react_emoji: str = "THUMBSUP"
group_policy: Literal["open", "mention"] = "mention"
reply_to_message: bool = False # If True, bot replies quote the user's original message
streaming: bool = True
_STREAM_ELEMENT_ID = "streaming_md"
@dataclass
class _FeishuStreamBuf:
"""Per-chat streaming accumulator using CardKit streaming API."""
text: str = ""
card_id: str | None = None
sequence: int = 0
last_edit: float = 0.0
class FeishuChannel(BaseChannel):
@ -265,6 +281,8 @@ class FeishuChannel(BaseChannel):
name = "feishu"
display_name = "Feishu"
_STREAM_EDIT_INTERVAL = 0.5 # throttle between CardKit streaming updates
@classmethod
def default_config(cls) -> dict[str, Any]:
return FeishuConfig().model_dump(by_alias=True)
@ -279,6 +297,7 @@ class FeishuChannel(BaseChannel):
self._ws_thread: threading.Thread | None = None
self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache
self._loop: asyncio.AbstractEventLoop | None = None
self._stream_bufs: dict[str, _FeishuStreamBuf] = {}
@staticmethod
def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any:
@ -906,8 +925,8 @@ class FeishuChannel(BaseChannel):
logger.error("Error replying to Feishu message {}: {}", parent_message_id, e)
return False
def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool:
"""Send a single message (text/image/file/interactive) synchronously."""
def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> str | None:
"""Send a single message and return the message_id on success."""
from lark_oapi.api.im.v1 import CreateMessageRequest, CreateMessageRequestBody
try:
request = CreateMessageRequest.builder() \
@ -925,13 +944,146 @@ class FeishuChannel(BaseChannel):
"Failed to send Feishu {} message: code={}, msg={}, log_id={}",
msg_type, response.code, response.msg, response.get_log_id()
)
return False
logger.debug("Feishu {} message sent to {}", msg_type, receive_id)
return True
return None
msg_id = getattr(response.data, "message_id", None)
logger.debug("Feishu {} message sent to {}: {}", msg_type, receive_id, msg_id)
return msg_id
except Exception as e:
logger.error("Error sending Feishu {} message: {}", msg_type, e)
return None
def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None:
"""Create a CardKit streaming card, send it to chat, return card_id."""
from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody
card_json = {
"schema": "2.0",
"config": {"wide_screen_mode": True, "update_multi": True, "streaming_mode": True},
"body": {"elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}]},
}
try:
request = CreateCardRequest.builder().request_body(
CreateCardRequestBody.builder()
.type("card_json")
.data(json.dumps(card_json, ensure_ascii=False))
.build()
).build()
response = self._client.cardkit.v1.card.create(request)
if not response.success():
logger.warning("Failed to create streaming card: code={}, msg={}", response.code, response.msg)
return None
card_id = getattr(response.data, "card_id", None)
if card_id:
self._send_message_sync(
receive_id_type, chat_id, "interactive",
json.dumps({"type": "card", "data": {"card_id": card_id}}),
)
return card_id
except Exception as e:
logger.warning("Error creating streaming card: {}", e)
return None
def _stream_update_text_sync(self, card_id: str, content: str, sequence: int) -> bool:
"""Stream-update the markdown element on a CardKit card (typewriter effect)."""
from lark_oapi.api.cardkit.v1 import ContentCardElementRequest, ContentCardElementRequestBody
try:
request = ContentCardElementRequest.builder() \
.card_id(card_id) \
.element_id(_STREAM_ELEMENT_ID) \
.request_body(
ContentCardElementRequestBody.builder()
.content(content).sequence(sequence).build()
).build()
response = self._client.cardkit.v1.card_element.content(request)
if not response.success():
logger.warning("Failed to stream-update card {}: code={}, msg={}", card_id, response.code, response.msg)
return False
return True
except Exception as e:
logger.warning("Error stream-updating card {}: {}", card_id, e)
return False
def _close_streaming_mode_sync(self, card_id: str, sequence: int) -> bool:
"""Turn off CardKit streaming_mode so the chat list preview exits the streaming placeholder.
Per Feishu docs, streaming cards keep a generating-style summary in the session list until
streaming_mode is set to false via card settings (after final content update).
Sequence must strictly exceed the previous card OpenAPI operation on this entity.
"""
from lark_oapi.api.cardkit.v1 import SettingsCardRequest, SettingsCardRequestBody
settings_payload = json.dumps({"config": {"streaming_mode": False}}, ensure_ascii=False)
try:
request = SettingsCardRequest.builder() \
.card_id(card_id) \
.request_body(
SettingsCardRequestBody.builder()
.settings(settings_payload)
.sequence(sequence)
.uuid(str(uuid.uuid4()))
.build()
).build()
response = self._client.cardkit.v1.card.settings(request)
if not response.success():
logger.warning(
"Failed to close streaming on card {}: code={}, msg={}",
card_id, response.code, response.msg,
)
return False
return True
except Exception as e:
logger.warning("Error closing streaming on card {}: {}", card_id, e)
return False
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Progressive streaming via CardKit: create card on first delta, stream-update on subsequent."""
if not self._client:
return
meta = metadata or {}
loop = asyncio.get_running_loop()
rid_type = "chat_id" if chat_id.startswith("oc_") else "open_id"
# --- stream end: final update or fallback ---
if meta.get("_stream_end"):
buf = self._stream_bufs.pop(chat_id, None)
if not buf or not buf.text:
return
if buf.card_id:
buf.sequence += 1
await loop.run_in_executor(
None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence,
)
# Required so the chat list preview exits the streaming placeholder (Feishu streaming card docs).
buf.sequence += 1
await loop.run_in_executor(
None, self._close_streaming_mode_sync, buf.card_id, buf.sequence,
)
else:
for chunk in self._split_elements_by_table_limit(self._build_card_elements(buf.text)):
card = json.dumps({"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False)
await loop.run_in_executor(None, self._send_message_sync, rid_type, chat_id, "interactive", card)
return
# --- accumulate delta ---
buf = self._stream_bufs.get(chat_id)
if buf is None:
buf = _FeishuStreamBuf()
self._stream_bufs[chat_id] = buf
buf.text += delta
if not buf.text.strip():
return
now = time.monotonic()
if buf.card_id is None:
card_id = await loop.run_in_executor(None, self._create_streaming_card_sync, rid_type, chat_id)
if card_id:
buf.card_id = card_id
buf.sequence = 1
await loop.run_in_executor(None, self._stream_update_text_sync, card_id, buf.text, 1)
buf.last_edit = now
elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL:
buf.sequence += 1
await loop.run_in_executor(None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence)
buf.last_edit = now
async def send(self, msg: OutboundMessage) -> None:
"""Send a message through Feishu, including media (images/files) if present."""
if not self._client:

View File

@ -0,0 +1,247 @@
"""Tests for Feishu streaming (send_delta) via CardKit streaming API."""
import time
from types import SimpleNamespace
from unittest.mock import MagicMock
import pytest
from nanobot.bus.queue import MessageBus
from nanobot.channels.feishu import FeishuChannel, FeishuConfig, _FeishuStreamBuf
def _make_channel(streaming: bool = True) -> FeishuChannel:
config = FeishuConfig(
enabled=True,
app_id="cli_test",
app_secret="secret",
allow_from=["*"],
streaming=streaming,
)
ch = FeishuChannel(config, MessageBus())
ch._client = MagicMock()
ch._loop = None
return ch
def _mock_create_card_response(card_id: str = "card_stream_001"):
resp = MagicMock()
resp.success.return_value = True
resp.data = SimpleNamespace(card_id=card_id)
return resp
def _mock_send_response(message_id: str = "om_stream_001"):
resp = MagicMock()
resp.success.return_value = True
resp.data = SimpleNamespace(message_id=message_id)
return resp
def _mock_content_response(success: bool = True):
resp = MagicMock()
resp.success.return_value = success
resp.code = 0 if success else 99999
resp.msg = "ok" if success else "error"
return resp
class TestFeishuStreamingConfig:
def test_streaming_default_true(self):
assert FeishuConfig().streaming is True
def test_supports_streaming_when_enabled(self):
ch = _make_channel(streaming=True)
assert ch.supports_streaming is True
def test_supports_streaming_disabled(self):
ch = _make_channel(streaming=False)
assert ch.supports_streaming is False
class TestCreateStreamingCard:
def test_returns_card_id_on_success(self):
ch = _make_channel()
ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_123")
ch._client.im.v1.message.create.return_value = _mock_send_response()
result = ch._create_streaming_card_sync("chat_id", "oc_chat1")
assert result == "card_123"
ch._client.cardkit.v1.card.create.assert_called_once()
ch._client.im.v1.message.create.assert_called_once()
def test_returns_none_on_failure(self):
ch = _make_channel()
resp = MagicMock()
resp.success.return_value = False
resp.code = 99999
resp.msg = "error"
ch._client.cardkit.v1.card.create.return_value = resp
assert ch._create_streaming_card_sync("chat_id", "oc_chat1") is None
def test_returns_none_on_exception(self):
ch = _make_channel()
ch._client.cardkit.v1.card.create.side_effect = RuntimeError("network")
assert ch._create_streaming_card_sync("chat_id", "oc_chat1") is None
class TestCloseStreamingMode:
def test_returns_true_on_success(self):
ch = _make_channel()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response(True)
assert ch._close_streaming_mode_sync("card_1", 10) is True
def test_returns_false_on_failure(self):
ch = _make_channel()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response(False)
assert ch._close_streaming_mode_sync("card_1", 10) is False
def test_returns_false_on_exception(self):
ch = _make_channel()
ch._client.cardkit.v1.card.settings.side_effect = RuntimeError("err")
assert ch._close_streaming_mode_sync("card_1", 10) is False
class TestStreamUpdateText:
def test_returns_true_on_success(self):
ch = _make_channel()
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(True)
assert ch._stream_update_text_sync("card_1", "hello", 1) is True
def test_returns_false_on_failure(self):
ch = _make_channel()
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(False)
assert ch._stream_update_text_sync("card_1", "hello", 1) is False
def test_returns_false_on_exception(self):
ch = _make_channel()
ch._client.cardkit.v1.card_element.content.side_effect = RuntimeError("err")
assert ch._stream_update_text_sync("card_1", "hello", 1) is False
class TestSendDelta:
@pytest.mark.asyncio
async def test_first_delta_creates_card_and_sends(self):
ch = _make_channel()
ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_new")
ch._client.im.v1.message.create.return_value = _mock_send_response("om_new")
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "Hello ")
assert "oc_chat1" in ch._stream_bufs
buf = ch._stream_bufs["oc_chat1"]
assert buf.text == "Hello "
assert buf.card_id == "card_new"
assert buf.sequence == 1
ch._client.cardkit.v1.card.create.assert_called_once()
ch._client.im.v1.message.create.assert_called_once()
ch._client.cardkit.v1.card_element.content.assert_called_once()
@pytest.mark.asyncio
async def test_second_delta_within_interval_skips_update(self):
ch = _make_channel()
buf = _FeishuStreamBuf(text="Hello ", card_id="card_1", sequence=1, last_edit=time.monotonic())
ch._stream_bufs["oc_chat1"] = buf
await ch.send_delta("oc_chat1", "world")
assert buf.text == "Hello world"
ch._client.cardkit.v1.card_element.content.assert_not_called()
@pytest.mark.asyncio
async def test_delta_after_interval_updates_text(self):
ch = _make_channel()
buf = _FeishuStreamBuf(text="Hello ", card_id="card_1", sequence=1, last_edit=time.monotonic() - 1.0)
ch._stream_bufs["oc_chat1"] = buf
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "world")
assert buf.text == "Hello world"
assert buf.sequence == 2
ch._client.cardkit.v1.card_element.content.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_sends_final_update(self):
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Final content", card_id="card_1", sequence=3, last_edit=0.0,
)
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
ch._client.cardkit.v1.card.settings.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
assert "oc_chat1" not in ch._stream_bufs
ch._client.cardkit.v1.card_element.content.assert_called_once()
ch._client.cardkit.v1.card.settings.assert_called_once()
settings_call = ch._client.cardkit.v1.card.settings.call_args[0][0]
assert settings_call.body.sequence == 5 # after final content seq 4
@pytest.mark.asyncio
async def test_stream_end_fallback_when_no_card_id(self):
"""If card creation failed, stream_end falls back to a plain card message."""
ch = _make_channel()
ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf(
text="Fallback content", card_id=None, sequence=0, last_edit=0.0,
)
ch._client.im.v1.message.create.return_value = _mock_send_response("om_fb")
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
assert "oc_chat1" not in ch._stream_bufs
ch._client.cardkit.v1.card_element.content.assert_not_called()
ch._client.im.v1.message.create.assert_called_once()
@pytest.mark.asyncio
async def test_stream_end_without_buf_is_noop(self):
ch = _make_channel()
await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True})
ch._client.cardkit.v1.card_element.content.assert_not_called()
@pytest.mark.asyncio
async def test_empty_delta_skips_send(self):
ch = _make_channel()
await ch.send_delta("oc_chat1", " ")
assert "oc_chat1" in ch._stream_bufs
ch._client.cardkit.v1.card.create.assert_not_called()
@pytest.mark.asyncio
async def test_no_client_returns_early(self):
ch = _make_channel()
ch._client = None
await ch.send_delta("oc_chat1", "text")
assert "oc_chat1" not in ch._stream_bufs
@pytest.mark.asyncio
async def test_sequence_increments_correctly(self):
ch = _make_channel()
buf = _FeishuStreamBuf(text="a", card_id="card_1", sequence=5, last_edit=0.0)
ch._stream_bufs["oc_chat1"] = buf
ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response()
await ch.send_delta("oc_chat1", "b")
assert buf.sequence == 6
buf.last_edit = 0.0 # reset to bypass throttle
await ch.send_delta("oc_chat1", "c")
assert buf.sequence == 7
class TestSendMessageReturnsId:
def test_returns_message_id_on_success(self):
ch = _make_channel()
ch._client.im.v1.message.create.return_value = _mock_send_response("om_abc")
result = ch._send_message_sync("chat_id", "oc_chat1", "text", '{"text":"hi"}')
assert result == "om_abc"
def test_returns_none_on_failure(self):
ch = _make_channel()
resp = MagicMock()
resp.success.return_value = False
resp.code = 99999
resp.msg = "error"
resp.get_log_id.return_value = "log1"
ch._client.im.v1.message.create.return_value = resp
result = ch._send_message_sync("chat_id", "oc_chat1", "text", '{"text":"hi"}')
assert result is None