diff --git a/README.md b/README.md index 8929d3612..c5b5d9f2f 100644 --- a/README.md +++ b/README.md @@ -505,14 +505,17 @@ nanobot gateway
-Feishu (飞书) +Feishu Uses **WebSocket** long connection — no public IP required. **1. Create a Feishu bot** - Visit [Feishu Open Platform](https://open.feishu.cn/app) - Create a new app → Enable **Bot** capability -- **Permissions**: Add `im:message` (send messages) and `im:message.p2p_msg:readonly` (receive messages) +- **Permissions**: + - `im:message` (send messages) and `im:message.p2p_msg:readonly` (receive messages) + - **Streaming replies** (default in nanobot): add **`cardkit:card:write`** (often labeled **Create and update cards** in the Feishu developer console). Required for CardKit entities and streamed assistant text. Older apps may not have it yet — open **Permission management**, enable the scope, then **publish** a new app version if the console requires it. + - If you **cannot** add `cardkit:card:write`, set `"streaming": false` under `channels.feishu` (see below). The bot still works; replies use normal interactive cards without token-by-token streaming. - **Events**: Add `im.message.receive_v1` (receive messages) - Select **Long Connection** mode (requires running nanobot first to establish connection) - Get **App ID** and **App Secret** from "Credentials & Basic Info" @@ -530,12 +533,14 @@ Uses **WebSocket** long connection — no public IP required. "encryptKey": "", "verificationToken": "", "allowFrom": ["ou_YOUR_OPEN_ID"], - "groupPolicy": "mention" + "groupPolicy": "mention", + "streaming": true } } } ``` +> `streaming` defaults to `true`. Use `false` if your app does not have **`cardkit:card:write`** (see permissions above). > `encryptKey` and `verificationToken` are optional for Long Connection mode. > `allowFrom`: Add your open_id (find it in nanobot logs when you message the bot). Use `["*"]` to allow all users. > `groupPolicy`: `"mention"` (default — respond only when @mentioned), `"open"` (respond to all group messages). Private chats always respond. diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 0ffca601e..3e9db3f4e 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -5,7 +5,10 @@ import json import os import re import threading +import time +import uuid from collections import OrderedDict +from dataclasses import dataclass from pathlib import Path from typing import Any, Literal @@ -248,6 +251,19 @@ class FeishuConfig(Base): react_emoji: str = "THUMBSUP" group_policy: Literal["open", "mention"] = "mention" reply_to_message: bool = False # If True, bot replies quote the user's original message + streaming: bool = True + + +_STREAM_ELEMENT_ID = "streaming_md" + + +@dataclass +class _FeishuStreamBuf: + """Per-chat streaming accumulator using CardKit streaming API.""" + text: str = "" + card_id: str | None = None + sequence: int = 0 + last_edit: float = 0.0 class FeishuChannel(BaseChannel): @@ -265,6 +281,8 @@ class FeishuChannel(BaseChannel): name = "feishu" display_name = "Feishu" + _STREAM_EDIT_INTERVAL = 0.5 # throttle between CardKit streaming updates + @classmethod def default_config(cls) -> dict[str, Any]: return FeishuConfig().model_dump(by_alias=True) @@ -279,6 +297,7 @@ class FeishuChannel(BaseChannel): self._ws_thread: threading.Thread | None = None self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache self._loop: asyncio.AbstractEventLoop | None = None + self._stream_bufs: dict[str, _FeishuStreamBuf] = {} @staticmethod def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any: @@ -906,8 +925,8 @@ class FeishuChannel(BaseChannel): logger.error("Error replying to Feishu message {}: {}", parent_message_id, e) return False - def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> bool: - """Send a single message (text/image/file/interactive) synchronously.""" + def _send_message_sync(self, receive_id_type: str, receive_id: str, msg_type: str, content: str) -> str | None: + """Send a single message and return the message_id on success.""" from lark_oapi.api.im.v1 import CreateMessageRequest, CreateMessageRequestBody try: request = CreateMessageRequest.builder() \ @@ -925,13 +944,146 @@ class FeishuChannel(BaseChannel): "Failed to send Feishu {} message: code={}, msg={}, log_id={}", msg_type, response.code, response.msg, response.get_log_id() ) - return False - logger.debug("Feishu {} message sent to {}", msg_type, receive_id) - return True + return None + msg_id = getattr(response.data, "message_id", None) + logger.debug("Feishu {} message sent to {}: {}", msg_type, receive_id, msg_id) + return msg_id except Exception as e: logger.error("Error sending Feishu {} message: {}", msg_type, e) + return None + + def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None: + """Create a CardKit streaming card, send it to chat, return card_id.""" + from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody + card_json = { + "schema": "2.0", + "config": {"wide_screen_mode": True, "update_multi": True, "streaming_mode": True}, + "body": {"elements": [{"tag": "markdown", "content": "", "element_id": _STREAM_ELEMENT_ID}]}, + } + try: + request = CreateCardRequest.builder().request_body( + CreateCardRequestBody.builder() + .type("card_json") + .data(json.dumps(card_json, ensure_ascii=False)) + .build() + ).build() + response = self._client.cardkit.v1.card.create(request) + if not response.success(): + logger.warning("Failed to create streaming card: code={}, msg={}", response.code, response.msg) + return None + card_id = getattr(response.data, "card_id", None) + if card_id: + self._send_message_sync( + receive_id_type, chat_id, "interactive", + json.dumps({"type": "card", "data": {"card_id": card_id}}), + ) + return card_id + except Exception as e: + logger.warning("Error creating streaming card: {}", e) + return None + + def _stream_update_text_sync(self, card_id: str, content: str, sequence: int) -> bool: + """Stream-update the markdown element on a CardKit card (typewriter effect).""" + from lark_oapi.api.cardkit.v1 import ContentCardElementRequest, ContentCardElementRequestBody + try: + request = ContentCardElementRequest.builder() \ + .card_id(card_id) \ + .element_id(_STREAM_ELEMENT_ID) \ + .request_body( + ContentCardElementRequestBody.builder() + .content(content).sequence(sequence).build() + ).build() + response = self._client.cardkit.v1.card_element.content(request) + if not response.success(): + logger.warning("Failed to stream-update card {}: code={}, msg={}", card_id, response.code, response.msg) + return False + return True + except Exception as e: + logger.warning("Error stream-updating card {}: {}", card_id, e) return False + def _close_streaming_mode_sync(self, card_id: str, sequence: int) -> bool: + """Turn off CardKit streaming_mode so the chat list preview exits the streaming placeholder. + + Per Feishu docs, streaming cards keep a generating-style summary in the session list until + streaming_mode is set to false via card settings (after final content update). + Sequence must strictly exceed the previous card OpenAPI operation on this entity. + """ + from lark_oapi.api.cardkit.v1 import SettingsCardRequest, SettingsCardRequestBody + settings_payload = json.dumps({"config": {"streaming_mode": False}}, ensure_ascii=False) + try: + request = SettingsCardRequest.builder() \ + .card_id(card_id) \ + .request_body( + SettingsCardRequestBody.builder() + .settings(settings_payload) + .sequence(sequence) + .uuid(str(uuid.uuid4())) + .build() + ).build() + response = self._client.cardkit.v1.card.settings(request) + if not response.success(): + logger.warning( + "Failed to close streaming on card {}: code={}, msg={}", + card_id, response.code, response.msg, + ) + return False + return True + except Exception as e: + logger.warning("Error closing streaming on card {}: {}", card_id, e) + return False + + async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + """Progressive streaming via CardKit: create card on first delta, stream-update on subsequent.""" + if not self._client: + return + meta = metadata or {} + loop = asyncio.get_running_loop() + rid_type = "chat_id" if chat_id.startswith("oc_") else "open_id" + + # --- stream end: final update or fallback --- + if meta.get("_stream_end"): + buf = self._stream_bufs.pop(chat_id, None) + if not buf or not buf.text: + return + if buf.card_id: + buf.sequence += 1 + await loop.run_in_executor( + None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence, + ) + # Required so the chat list preview exits the streaming placeholder (Feishu streaming card docs). + buf.sequence += 1 + await loop.run_in_executor( + None, self._close_streaming_mode_sync, buf.card_id, buf.sequence, + ) + else: + for chunk in self._split_elements_by_table_limit(self._build_card_elements(buf.text)): + card = json.dumps({"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False) + await loop.run_in_executor(None, self._send_message_sync, rid_type, chat_id, "interactive", card) + return + + # --- accumulate delta --- + buf = self._stream_bufs.get(chat_id) + if buf is None: + buf = _FeishuStreamBuf() + self._stream_bufs[chat_id] = buf + buf.text += delta + if not buf.text.strip(): + return + + now = time.monotonic() + if buf.card_id is None: + card_id = await loop.run_in_executor(None, self._create_streaming_card_sync, rid_type, chat_id) + if card_id: + buf.card_id = card_id + buf.sequence = 1 + await loop.run_in_executor(None, self._stream_update_text_sync, card_id, buf.text, 1) + buf.last_edit = now + elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: + buf.sequence += 1 + await loop.run_in_executor(None, self._stream_update_text_sync, buf.card_id, buf.text, buf.sequence) + buf.last_edit = now + async def send(self, msg: OutboundMessage) -> None: """Send a message through Feishu, including media (images/files) if present.""" if not self._client: diff --git a/tests/channels/test_feishu_streaming.py b/tests/channels/test_feishu_streaming.py new file mode 100644 index 000000000..5532f0635 --- /dev/null +++ b/tests/channels/test_feishu_streaming.py @@ -0,0 +1,247 @@ +"""Tests for Feishu streaming (send_delta) via CardKit streaming API.""" +import time +from types import SimpleNamespace +from unittest.mock import MagicMock + +import pytest + +from nanobot.bus.queue import MessageBus +from nanobot.channels.feishu import FeishuChannel, FeishuConfig, _FeishuStreamBuf + + +def _make_channel(streaming: bool = True) -> FeishuChannel: + config = FeishuConfig( + enabled=True, + app_id="cli_test", + app_secret="secret", + allow_from=["*"], + streaming=streaming, + ) + ch = FeishuChannel(config, MessageBus()) + ch._client = MagicMock() + ch._loop = None + return ch + + +def _mock_create_card_response(card_id: str = "card_stream_001"): + resp = MagicMock() + resp.success.return_value = True + resp.data = SimpleNamespace(card_id=card_id) + return resp + + +def _mock_send_response(message_id: str = "om_stream_001"): + resp = MagicMock() + resp.success.return_value = True + resp.data = SimpleNamespace(message_id=message_id) + return resp + + +def _mock_content_response(success: bool = True): + resp = MagicMock() + resp.success.return_value = success + resp.code = 0 if success else 99999 + resp.msg = "ok" if success else "error" + return resp + + +class TestFeishuStreamingConfig: + def test_streaming_default_true(self): + assert FeishuConfig().streaming is True + + def test_supports_streaming_when_enabled(self): + ch = _make_channel(streaming=True) + assert ch.supports_streaming is True + + def test_supports_streaming_disabled(self): + ch = _make_channel(streaming=False) + assert ch.supports_streaming is False + + +class TestCreateStreamingCard: + def test_returns_card_id_on_success(self): + ch = _make_channel() + ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_123") + ch._client.im.v1.message.create.return_value = _mock_send_response() + result = ch._create_streaming_card_sync("chat_id", "oc_chat1") + assert result == "card_123" + ch._client.cardkit.v1.card.create.assert_called_once() + ch._client.im.v1.message.create.assert_called_once() + + def test_returns_none_on_failure(self): + ch = _make_channel() + resp = MagicMock() + resp.success.return_value = False + resp.code = 99999 + resp.msg = "error" + ch._client.cardkit.v1.card.create.return_value = resp + assert ch._create_streaming_card_sync("chat_id", "oc_chat1") is None + + def test_returns_none_on_exception(self): + ch = _make_channel() + ch._client.cardkit.v1.card.create.side_effect = RuntimeError("network") + assert ch._create_streaming_card_sync("chat_id", "oc_chat1") is None + + +class TestCloseStreamingMode: + def test_returns_true_on_success(self): + ch = _make_channel() + ch._client.cardkit.v1.card.settings.return_value = _mock_content_response(True) + assert ch._close_streaming_mode_sync("card_1", 10) is True + + def test_returns_false_on_failure(self): + ch = _make_channel() + ch._client.cardkit.v1.card.settings.return_value = _mock_content_response(False) + assert ch._close_streaming_mode_sync("card_1", 10) is False + + def test_returns_false_on_exception(self): + ch = _make_channel() + ch._client.cardkit.v1.card.settings.side_effect = RuntimeError("err") + assert ch._close_streaming_mode_sync("card_1", 10) is False + + +class TestStreamUpdateText: + def test_returns_true_on_success(self): + ch = _make_channel() + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(True) + assert ch._stream_update_text_sync("card_1", "hello", 1) is True + + def test_returns_false_on_failure(self): + ch = _make_channel() + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response(False) + assert ch._stream_update_text_sync("card_1", "hello", 1) is False + + def test_returns_false_on_exception(self): + ch = _make_channel() + ch._client.cardkit.v1.card_element.content.side_effect = RuntimeError("err") + assert ch._stream_update_text_sync("card_1", "hello", 1) is False + + +class TestSendDelta: + @pytest.mark.asyncio + async def test_first_delta_creates_card_and_sends(self): + ch = _make_channel() + ch._client.cardkit.v1.card.create.return_value = _mock_create_card_response("card_new") + ch._client.im.v1.message.create.return_value = _mock_send_response("om_new") + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + + await ch.send_delta("oc_chat1", "Hello ") + + assert "oc_chat1" in ch._stream_bufs + buf = ch._stream_bufs["oc_chat1"] + assert buf.text == "Hello " + assert buf.card_id == "card_new" + assert buf.sequence == 1 + ch._client.cardkit.v1.card.create.assert_called_once() + ch._client.im.v1.message.create.assert_called_once() + ch._client.cardkit.v1.card_element.content.assert_called_once() + + @pytest.mark.asyncio + async def test_second_delta_within_interval_skips_update(self): + ch = _make_channel() + buf = _FeishuStreamBuf(text="Hello ", card_id="card_1", sequence=1, last_edit=time.monotonic()) + ch._stream_bufs["oc_chat1"] = buf + + await ch.send_delta("oc_chat1", "world") + + assert buf.text == "Hello world" + ch._client.cardkit.v1.card_element.content.assert_not_called() + + @pytest.mark.asyncio + async def test_delta_after_interval_updates_text(self): + ch = _make_channel() + buf = _FeishuStreamBuf(text="Hello ", card_id="card_1", sequence=1, last_edit=time.monotonic() - 1.0) + ch._stream_bufs["oc_chat1"] = buf + + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + await ch.send_delta("oc_chat1", "world") + + assert buf.text == "Hello world" + assert buf.sequence == 2 + ch._client.cardkit.v1.card_element.content.assert_called_once() + + @pytest.mark.asyncio + async def test_stream_end_sends_final_update(self): + ch = _make_channel() + ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( + text="Final content", card_id="card_1", sequence=3, last_edit=0.0, + ) + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + ch._client.cardkit.v1.card.settings.return_value = _mock_content_response() + + await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True}) + + assert "oc_chat1" not in ch._stream_bufs + ch._client.cardkit.v1.card_element.content.assert_called_once() + ch._client.cardkit.v1.card.settings.assert_called_once() + settings_call = ch._client.cardkit.v1.card.settings.call_args[0][0] + assert settings_call.body.sequence == 5 # after final content seq 4 + + @pytest.mark.asyncio + async def test_stream_end_fallback_when_no_card_id(self): + """If card creation failed, stream_end falls back to a plain card message.""" + ch = _make_channel() + ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( + text="Fallback content", card_id=None, sequence=0, last_edit=0.0, + ) + ch._client.im.v1.message.create.return_value = _mock_send_response("om_fb") + + await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True}) + + assert "oc_chat1" not in ch._stream_bufs + ch._client.cardkit.v1.card_element.content.assert_not_called() + ch._client.im.v1.message.create.assert_called_once() + + @pytest.mark.asyncio + async def test_stream_end_without_buf_is_noop(self): + ch = _make_channel() + await ch.send_delta("oc_chat1", "", metadata={"_stream_end": True}) + ch._client.cardkit.v1.card_element.content.assert_not_called() + + @pytest.mark.asyncio + async def test_empty_delta_skips_send(self): + ch = _make_channel() + await ch.send_delta("oc_chat1", " ") + + assert "oc_chat1" in ch._stream_bufs + ch._client.cardkit.v1.card.create.assert_not_called() + + @pytest.mark.asyncio + async def test_no_client_returns_early(self): + ch = _make_channel() + ch._client = None + await ch.send_delta("oc_chat1", "text") + assert "oc_chat1" not in ch._stream_bufs + + @pytest.mark.asyncio + async def test_sequence_increments_correctly(self): + ch = _make_channel() + buf = _FeishuStreamBuf(text="a", card_id="card_1", sequence=5, last_edit=0.0) + ch._stream_bufs["oc_chat1"] = buf + + ch._client.cardkit.v1.card_element.content.return_value = _mock_content_response() + await ch.send_delta("oc_chat1", "b") + assert buf.sequence == 6 + + buf.last_edit = 0.0 # reset to bypass throttle + await ch.send_delta("oc_chat1", "c") + assert buf.sequence == 7 + + +class TestSendMessageReturnsId: + def test_returns_message_id_on_success(self): + ch = _make_channel() + ch._client.im.v1.message.create.return_value = _mock_send_response("om_abc") + result = ch._send_message_sync("chat_id", "oc_chat1", "text", '{"text":"hi"}') + assert result == "om_abc" + + def test_returns_none_on_failure(self): + ch = _make_channel() + resp = MagicMock() + resp.success.return_value = False + resp.code = 99999 + resp.msg = "error" + resp.get_log_id.return_value = "log1" + ch._client.im.v1.message.create.return_value = resp + result = ch._send_message_sync("chat_id", "oc_chat1", "text", '{"text":"hi"}') + assert result is None