From 1672f20d6e72e824ddf5697682757945849c0275 Mon Sep 17 00:00:00 2001 From: chengyongru Date: Fri, 8 May 2026 11:35:23 +0800 Subject: [PATCH] feat(weixin): buffer and coalesce tool hints inside WeixinChannel WeChat iLink has a strict ~7 msgs / 5 min rate limit. A busy agent turn can trigger 8+ tool-call hints, each sent as a separate message, quickly burning the quota and causing silent message drops. Implement buffering entirely inside WeixinChannel (no global changes): - Tool hints are appended to a per-chat_id buffer instead of being sent immediately. - A non-tool-hint message arriving for the same chat flushes pending hints first (joined with newlines, sent as a single message). - stop() clears any remaining buffered hints. - send_tool_hints=False still drops hints as before. - Add 6 tests covering: single hint, multiple hints coalesced, different chats isolated, non-tool-hint flush, disabled dropping, and stop clearing buffers. --- nanobot/channels/weixin.py | 255 ++++++++++++------------ tests/channels/test_weixin_channel.py | 273 +++++++++++++++++++++++--- 2 files changed, 368 insertions(+), 160 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 4c2586b2b..b11c3b115 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -87,22 +87,11 @@ RATE_LIMIT_ERRCODE = -2 RATE_LIMIT_BACKOFF_S = 60 -def _is_stale_session_ret( - ret: int | None, - errcode: int | None, - errmsg: str | None, -) -> bool: - """True when iLink returns ret=-2 / errcode=-2 with 'unknown error', - which historically correlates with a stale context_token. - - Note: per wxclawbot-cli docs, ret=-2 is primarily a *rate limit* - (~7 msgs / 5 min per bot). We only treat the 'unknown error' variant - as stale-session because empty/missing errmsg is far more commonly - the rate-limit signal in practice. - """ - if ret != RATE_LIMIT_ERRCODE and errcode != RATE_LIMIT_ERRCODE: - return False - return (errmsg or "").strip().lower() == "unknown error" +def _is_api_error(data: dict) -> bool: + """True when iLink response signals failure via ``ret`` or ``errcode``.""" + ret = data.get("ret", 0) + errcode = data.get("errcode", 0) + return (ret is not None and ret != 0) or (errcode is not None and errcode != 0) # Retry constants (matching the reference plugin's monitor.ts) @@ -185,6 +174,7 @@ class WeixinChannel(BaseChannel): self._session_pause_until: float = 0.0 self._typing_tasks: dict[str, asyncio.Task] = {} self._typing_tickets: dict[str, dict[str, Any]] = {} + self._pending_tool_hints: dict[str, list[str]] = {} # ------------------------------------------------------------------ # State persistence @@ -521,6 +511,7 @@ class WeixinChannel(BaseChannel): async def stop(self) -> None: self._running = False + self._pending_tool_hints.clear() if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() for chat_id in list(self._typing_tasks): @@ -551,18 +542,12 @@ class WeixinChannel(BaseChannel): f"WeChat session paused, {remaining_min} min remaining (errcode {ERRCODE_SESSION_EXPIRED})" ) - def _check_response_error(self, data: dict, operation: str, *, body: dict | None = None) -> None: - """Check both ``ret`` and ``errcode`` like the reference TS code. - - The iLink API may signal failure through either field (or both). - ``_poll_once`` already checks both; outbound send helpers must do - the same to avoid silent drops. - """ + def _check_response_error(self, data: dict, operation: str) -> None: + """Raise if *data* contains an iLink API error.""" + if not _is_api_error(data): + return ret = data.get("ret", 0) errcode = data.get("errcode", 0) - is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0) - if not is_error: - return raise RuntimeError( f"WeChat {operation} error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" ) @@ -587,9 +572,8 @@ class WeixinChannel(BaseChannel): # Check for API-level errors (monitor.ts checks both ret and errcode) ret = data.get("ret", 0) errcode = data.get("errcode", 0) - is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0) - if is_error: + if _is_api_error(data): if errcode == ERRCODE_SESSION_EXPIRED or ret == ERRCODE_SESSION_EXPIRED: self._pause_session() remaining = self._session_pause_remaining_s() @@ -988,6 +972,39 @@ class WeixinChannel(BaseChannel): self._assert_session_active() is_progress = bool((msg.metadata or {}).get("_progress", False)) + + # Buffer tool hints to coalesce consecutive ones and avoid burning + # WeChat iLink rate-limit quota (~7 msgs / 5 min). + if is_progress and (msg.metadata or {}).get("_tool_hint"): + if not self.send_tool_hints: + return + self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content) + self.logger.debug( + "Buffered tool hint for {} (count={})", + msg.chat_id, + len(self._pending_tool_hints[msg.chat_id]), + ) + return + + # Any non-tool-hint message (thought, final answer, /stop response, …) + # flushes the buffer so hints do not get stuck when the final answer + # is suppressed, streamed, or otherwise skips this path. + if not ((msg.metadata or {}).get("_tool_hint") and is_progress): + hints = self._pending_tool_hints.pop(msg.chat_id, None) + if hints: + ctx_token = self._context_tokens.get(msg.chat_id, "") + if ctx_token: + self.logger.info( + "Flushing {} buffered tool hint(s) for {}", + len(hints), msg.chat_id, + ) + await self._send_text(msg.chat_id, "\n".join(hints), ctx_token) + else: + self.logger.warning( + "Dropped {} buffered tool hint(s) for {}: no context_token", + len(hints), msg.chat_id, + ) + if not is_progress: await self._stop_typing(msg.chat_id, clear_remote=True) @@ -1081,6 +1098,31 @@ class WeixinChannel(BaseChannel): with suppress(Exception): await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL) + async def send_delta( + self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None + ) -> None: + """Weixin iLink does not support native streaming deltas. + + We only hook ``_stream_end`` so buffered tool hints are flushed even + when the final answer carries the ``_streamed`` flag and bypasses + :meth:`send`. + """ + if metadata and metadata.get("_stream_end"): + hints = self._pending_tool_hints.pop(chat_id, None) + if hints: + ctx_token = self._context_tokens.get(chat_id, "") + if ctx_token: + self.logger.info( + "Flushing {} buffered tool hint(s) for {} (stream_end)", + len(hints), chat_id, + ) + await self._send_text(chat_id, "\n".join(hints), ctx_token) + else: + self.logger.warning( + "Dropped {} buffered tool hint(s) for {}: no context_token", + len(hints), chat_id, + ) + async def _start_typing(self, chat_id: str, context_token: str = "") -> None: """Start typing indicator immediately when a message is received.""" if not self._client or not self._token or not chat_id: @@ -1141,6 +1183,59 @@ class WeixinChannel(BaseChannel): """ return f"nanobot:{int(time.time() * 1000)}-{os.urandom(4).hex()}" + async def _send_message_with_retry( + self, + body: dict[str, Any], + context_token: str, + to_user_id: str, + client_id: str, + operation: str, + ) -> None: + """Post ``ilink/bot/sendmessage`` with stale-session and rate-limit handling.""" + data = await self._api_post("ilink/bot/sendmessage", body) + ret = data.get("ret", 0) + errcode = data.get("errcode", 0) + errmsg = data.get("errmsg", "") + + # Stale session (errmsg == "unknown error") — retry once without token. + if ( + (ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE) + and (errmsg or "").strip().lower() == "unknown error" + and context_token + ): + self.logger.warning( + "WeChat {} stale-session signal for {} (client_id={}); " + "retrying without context_token", + operation, to_user_id, client_id, + ) + body_no_ctx = copy.deepcopy(body) + body_no_ctx["msg"].pop("context_token", None) + data = await self._api_post("ilink/bot/sendmessage", body_no_ctx) + ret = data.get("ret", 0) + errcode = data.get("errcode", 0) + errmsg = data.get("errmsg", "") + if ret == 0 and (errcode == 0 or errcode is None): + self.logger.warning( + "WeChat {} succeeded WITHOUT context_token for {}; " + "clearing expired token from cache", + operation, to_user_id, + ) + self._context_tokens.pop(to_user_id, None) + self._save_state() + return + + # Rate limit (-2) — wait and retry once. + if ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE: + self.logger.warning( + "WeChat {} rate limited for {} (client_id={}); " + "waiting {}s before retry", + operation, to_user_id, client_id, RATE_LIMIT_BACKOFF_S, + ) + await asyncio.sleep(RATE_LIMIT_BACKOFF_S) + data = await self._api_post("ilink/bot/sendmessage", body) + + self._check_response_error(data, operation) + async def _send_text( self, to_user_id: str, @@ -1171,60 +1266,7 @@ class WeixinChannel(BaseChannel): "base_info": BASE_INFO, } - async def _do_send(_body: dict[str, Any]) -> dict: - return await self._api_post("ilink/bot/sendmessage", _body) - - data = await _do_send(body) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - - # Stale session (errmsg == "unknown error") — retry once without token. - # This is distinct from the far more common rate-limit signal. - if _is_stale_session_ret(ret, errcode, errmsg) and context_token: - self.logger.warning( - "WeChat send text returned stale-session signal for {} (client_id={}); " - "retrying without context_token", - to_user_id, - client_id, - ) - body_no_ctx = copy.deepcopy(body) - body_no_ctx["msg"].pop("context_token", None) - data = await _do_send(body_no_ctx) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - if ret == 0 and (errcode == 0 or errcode is None): - self.logger.warning( - "WeChat send text succeeded WITHOUT context_token for {}; " - "clearing expired token from cache", - to_user_id, - ) - self._context_tokens.pop(to_user_id, None) - self._save_state() - self.logger.debug( - "WeChat text sent to {} (client_id={})", to_user_id, client_id - ) - return - - # Rate limit (-2) — per wxclawbot-cli docs this is ~7 msgs / 5 min. - # Wait 60 s and retry once; do NOT strip context_token (rate limit is - # per-bot, not per-token). - if (ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE): - self.logger.warning( - "WeChat send text rate limited for {} (client_id={}); " - "waiting {}s before retry", - to_user_id, - client_id, - RATE_LIMIT_BACKOFF_S, - ) - await asyncio.sleep(RATE_LIMIT_BACKOFF_S) - data = await _do_send(body) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - - self._check_response_error(data, "send text", body=body) + await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send text") self.logger.debug("WeChat text sent to {} (client_id={})", to_user_id, client_id) async def _send_media_file( @@ -1370,54 +1412,7 @@ class WeixinChannel(BaseChannel): "base_info": BASE_INFO, } - async def _do_send(_body: dict[str, Any]) -> dict: - return await self._api_post("ilink/bot/sendmessage", _body) - - data = await _do_send(body) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - - # Same stale-session handling as _send_text. - if _is_stale_session_ret(ret, errcode, errmsg) and context_token: - self.logger.warning( - "WeChat send media returned stale-session signal for {} (client_id={}); " - "retrying without context_token", - to_user_id, - client_id, - ) - body_no_ctx = copy.deepcopy(body) - body_no_ctx["msg"].pop("context_token", None) - data = await _do_send(body_no_ctx) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - if ret == 0 and (errcode == 0 or errcode is None): - self.logger.warning( - "WeChat send media succeeded WITHOUT context_token for {}; " - "clearing expired token from cache", - to_user_id, - ) - self._context_tokens.pop(to_user_id, None) - self._save_state() - return - - # Rate limit (-2) — wait and retry once (see _send_text for rationale). - if (ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE): - self.logger.warning( - "WeChat send media rate limited for {} (client_id={}); " - "waiting {}s before retry", - to_user_id, - client_id, - RATE_LIMIT_BACKOFF_S, - ) - await asyncio.sleep(RATE_LIMIT_BACKOFF_S) - data = await _do_send(body) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - - self._check_response_error(data, "send media", body=body) + await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send media") # --------------------------------------------------------------------------- diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 68b8825d7..fda0a82ca 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -9,6 +9,7 @@ import httpx import pytest import nanobot.channels.weixin as weixin_mod +from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.weixin import ( ITEM_IMAGE, @@ -1448,39 +1449,22 @@ async def test_send_text_rate_limit_with_empty_errmsg_waits_and_retries(monkeypa # --------------------------------------------------------------------------- -class TestIsStaleSessionRet: - """Verify stale-session detection for iLink ret=-2 / errcode=-2 responses.""" +class TestIsApiError: + """Verify the shared ``_is_api_error`` predicate used by _poll_once and send helpers.""" - def test_ret_minus_2_with_empty_errmsg_is_not_stale(self): - # Empty errmsg is the rate-limit signal per wxclawbot-cli docs. - assert weixin_mod._is_stale_session_ret(-2, 0, "") is False - assert weixin_mod._is_stale_session_ret(-2, 0, None) is False + def test_success_codes_are_not_error(self): + assert weixin_mod._is_api_error({"ret": 0, "errcode": 0}) is False + assert weixin_mod._is_api_error({"ret": 0}) is False + assert weixin_mod._is_api_error({"errcode": 0}) is False + assert weixin_mod._is_api_error({}) is False - def test_errcode_minus_2_with_empty_errmsg_is_not_stale(self): - assert weixin_mod._is_stale_session_ret(0, -2, "") is False - assert weixin_mod._is_stale_session_ret(0, -2, None) is False + def test_nonzero_ret_is_error(self): + assert weixin_mod._is_api_error({"ret": -2}) is True + assert weixin_mod._is_api_error({"ret": -14}) is True - def test_ret_minus_2_with_unknown_error_is_stale(self): - assert weixin_mod._is_stale_session_ret(-2, 0, "unknown error") is True - assert weixin_mod._is_stale_session_ret(-2, 0, "UNKNOWN ERROR") is True - - def test_errcode_minus_2_with_unknown_error_is_stale(self): - assert weixin_mod._is_stale_session_ret(0, -2, "unknown error") is True - - def test_ret_minus_2_with_frequency_limit_is_not_stale(self): - assert weixin_mod._is_stale_session_ret(-2, 0, "frequency limit") is False - assert weixin_mod._is_stale_session_ret(-2, 0, "too frequently") is False - - def test_errcode_minus_2_with_frequency_limit_is_not_stale(self): - assert weixin_mod._is_stale_session_ret(0, -2, "freq limit") is False - - def test_success_codes_are_not_stale(self): - assert weixin_mod._is_stale_session_ret(0, 0, "") is False - assert weixin_mod._is_stale_session_ret(0, 0, None) is False - - def test_other_errors_are_not_stale(self): - assert weixin_mod._is_stale_session_ret(-14, -14, "session timeout") is False - assert weixin_mod._is_stale_session_ret(-100, 0, "internal error") is False + def test_nonzero_errcode_is_error(self): + assert weixin_mod._is_api_error({"errcode": -2}) is True + assert weixin_mod._is_api_error({"ret": 0, "errcode": -2}) is True @pytest.mark.asyncio @@ -1502,3 +1486,232 @@ async def test_send_text_rate_limit_backoff_succeeds(monkeypatch) -> None: await channel._send_text("wx-user", "hello", "") assert channel._api_post.await_count == 2 + + +class TestToolHintBuffering: + """Tool hints are buffered inside WeixinChannel to coalesce consecutive + ones and avoid burning the iLink rate-limit quota (~7 msgs / 5 min).""" + + @pytest.mark.asyncio + async def test_single_tool_hint_buffered_until_flush(self): + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel.send_tool_hints = True + channel._send_text = AsyncMock() + + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="read_file(a.py)", + metadata={"_progress": True, "_tool_hint": True}, + )) + + # Buffered — not sent yet + channel._send_text.assert_not_awaited() + + # Non-tool-hint message flushes the buffer first + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="done", + metadata={}, + )) + + # First call is the coalesced hint, second is the trigger message + assert channel._send_text.await_count == 2 + assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)" + assert channel._send_text.await_args_list[1].args[1] == "done" + + @pytest.mark.asyncio + async def test_multiple_tool_hints_coalesced(self): + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel.send_tool_hints = True + channel._send_text = AsyncMock() + + for hint in ["read_file(a.py)", "read_file(b.py)", "exec(cmd)"]: + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content=hint, + metadata={"_progress": True, "_tool_hint": True}, + )) + + channel._send_text.assert_not_awaited() + + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="done", + metadata={}, + )) + + assert channel._send_text.await_count == 2 + assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)\nread_file(b.py)\nexec(cmd)" + assert channel._send_text.await_args_list[1].args[1] == "done" + + @pytest.mark.asyncio + async def test_tool_hints_different_chats_not_coalesced(self): + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["user-a"] = "ctx-a" + channel._context_tokens["user-b"] = "ctx-b" + channel.send_tool_hints = True + channel._send_text = AsyncMock() + + await channel.send(OutboundMessage( + channel="weixin", + chat_id="user-a", + content="tool-a", + metadata={"_progress": True, "_tool_hint": True}, + )) + await channel.send(OutboundMessage( + channel="weixin", + chat_id="user-b", + content="tool-b", + metadata={"_progress": True, "_tool_hint": True}, + )) + + channel._send_text.assert_not_awaited() + + # Flush chat-a + await channel.send(OutboundMessage( + channel="weixin", + chat_id="user-a", + content="done-a", + metadata={}, + )) + # Flush chat-b + await channel.send(OutboundMessage( + channel="weixin", + chat_id="user-b", + content="done-b", + metadata={}, + )) + + # 2 calls per chat (hint + trigger message) + assert channel._send_text.await_count == 4 + + @pytest.mark.asyncio + async def test_non_tool_hint_flushes_pending_hints(self): + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel.send_tool_hints = True + channel._send_text = AsyncMock() + channel._stop_typing = AsyncMock() + channel._get_typing_ticket = AsyncMock(return_value="") + + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="read_file(a.py)", + metadata={"_progress": True, "_tool_hint": True}, + )) + channel._send_text.assert_not_awaited() + + # Final answer triggers flush before sending itself + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="final answer", + metadata={}, + )) + + assert channel._send_text.await_count == 2 + assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)" + assert channel._send_text.await_args_list[1].args[1] == "final answer" + + @pytest.mark.asyncio + async def test_intermediate_progress_flushes_hints(self): + """Any non-tool-hint message (including thoughts) flushes the buffer + so hints never get stuck when the final answer is streamed or skipped.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel.send_tool_hints = True + channel._send_text = AsyncMock() + channel._stop_typing = AsyncMock() + channel._get_typing_ticket = AsyncMock(return_value="") + + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="read_file(a.py)", + metadata={"_progress": True, "_tool_hint": True}, + )) + + # A thought message flushes the existing buffer before sending itself. + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="Thinking...", + metadata={"_progress": True}, + )) + + # Another tool hint starts a new buffer. + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="exec(cmd)", + metadata={"_progress": True, "_tool_hint": True}, + )) + + # Final answer flushes the remaining buffer before sending itself. + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="done", + metadata={}, + )) + + assert channel._send_text.await_count == 4 + assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)" + assert channel._send_text.await_args_list[1].args[1] == "Thinking..." + assert channel._send_text.await_args_list[2].args[1] == "exec(cmd)" + assert channel._send_text.await_args_list[3].args[1] == "done" + + @pytest.mark.asyncio + async def test_send_tool_hints_disabled_drops(self): + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = False + channel._send_text = AsyncMock() + + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="read_file(a.py)", + metadata={"_progress": True, "_tool_hint": True}, + )) + + channel._send_text.assert_not_awaited() + + @pytest.mark.asyncio + async def test_stop_clears_pending_tool_hints(self): + channel, _bus = _make_channel() + channel._client = AsyncMock() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel.send_tool_hints = True + channel._send_text = AsyncMock() + + await channel.send(OutboundMessage( + channel="weixin", + chat_id="wx-user", + content="read_file(a.py)", + metadata={"_progress": True, "_tool_hint": True}, + )) + + await channel.stop() + + assert not channel._pending_tool_hints + channel._send_text.assert_not_awaited()