diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 41390f8b3..a75c897f4 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -79,6 +79,12 @@ BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION} ERRCODE_SESSION_EXPIRED = -14 SESSION_PAUSE_DURATION_S = 60 * 60 +# iLink context_token is observed to expire server-side after ~90-160s of +# agent inactivity (openclaw/openclaw#61174). Proactively refresh before +# sending if the cached token is older than this threshold. +CONTEXT_TOKEN_MAX_AGE_S = 60 + + # Retry constants (matching the reference plugin's monitor.ts) MAX_CONSECUTIVE_FAILURES = 3 BACKOFF_DELAY_S = 30 @@ -159,6 +165,8 @@ class WeixinChannel(BaseChannel): self._session_pause_until: float = 0.0 self._typing_tasks: dict[str, asyncio.Task] = {} self._typing_tickets: dict[str, dict[str, Any]] = {} + self._context_token_at: dict[str, float] = {} + self._pending_tool_hints: dict[str, list[str]] = {} # ------------------------------------------------------------------ # State persistence @@ -486,6 +494,7 @@ class WeixinChannel(BaseChannel): except Exception: if not self._running: break + self.logger.exception("WeChat poll loop error") consecutive_failures += 1 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: consecutive_failures = 0 @@ -495,6 +504,7 @@ class WeixinChannel(BaseChannel): async def stop(self) -> None: self._running = False + self._pending_tool_hints.clear() if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() for chat_id in list(self._typing_tasks): @@ -545,6 +555,7 @@ class WeixinChannel(BaseChannel): # Check for API-level errors (monitor.ts checks both ret and errcode) ret = data.get("ret", 0) errcode = data.get("errcode", 0) + is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0) if is_error: @@ -575,8 +586,10 @@ class WeixinChannel(BaseChannel): # Process messages (WeixinMessage[] from types.ts) msgs: list[dict] = data.get("msgs", []) or [] for msg in msgs: - with suppress(Exception): + try: await self._process_message(msg) + except Exception: + self.logger.exception("Failed to process WeChat message") # ------------------------------------------------------------------ # Inbound message processing (matches inbound.ts + process-message.ts) @@ -610,6 +623,7 @@ class WeixinChannel(BaseChannel): ctx_token = msg.get("context_token", "") if ctx_token: self._context_tokens[from_user_id] = ctx_token + self._context_token_at[from_user_id] = time.time() self._save_state() # Parse item_list (WeixinMessage.item_list — types.ts:161) @@ -915,6 +929,99 @@ class WeixinChannel(BaseChannel): } return "" + async def _refresh_context_token_if_stale( + self, chat_id: str, context_token: str + ) -> str: + """Return a fresh context_token if the cached one is too old. + + iLink context_token expires server-side after a short idle period + (empirically ~90s). Proactively refreshing before sending prevents + silent message loss on long agent turns or cron pushes. + """ + if not context_token: + return context_token + + now = time.time() + cached_at = self._context_token_at.get(chat_id, 0) + age = now - cached_at + + if age < CONTEXT_TOKEN_MAX_AGE_S: + return context_token + + self.logger.debug( + "WeChat context_token for {} is {:.0f}s old; refreshing via getconfig", + chat_id, + age, + ) + + body: dict[str, Any] = { + "ilink_user_id": chat_id, + "context_token": context_token, + "base_info": BASE_INFO, + } + try: + data = await self._api_post("ilink/bot/getconfig", body) + except Exception as e: + self.logger.warning("WeChat getconfig failed for {}: {}", chat_id, e) + return context_token + + if data.get("ret", 0) != 0: + self.logger.warning( + "WeChat getconfig returned ret={} for {}: {}", + data.get("ret"), + chat_id, + data.get("errmsg", ""), + ) + return context_token + + new_token = str(data.get("context_token", "") or "") + if new_token and new_token != context_token: + self.logger.info( + "WeChat context_token refreshed for {} (age {:.0f}s -> fresh)", + chat_id, + age, + ) + self._context_tokens[chat_id] = new_token + self._context_token_at[chat_id] = now + self._save_state() + return new_token + + return context_token + + async def _flush_tool_hints(self, chat_id: str) -> None: + """Send any buffered tool hints for *chat_id* as a single message. + + Tool hints are coalesced to reduce message count and avoid hitting the + WeChat iLink rate limit (~7 msgs / 5 min). Failures are logged but + not raised so that the main message send is never blocked. + """ + hints = self._pending_tool_hints.pop(chat_id, None) + if not hints: + return + + self.logger.info( + "Flushing {} buffered tool hint(s) for {}", + len(hints), + chat_id, + ) + + ctx_token = self._context_tokens.get(chat_id, "") + ctx_token = await self._refresh_context_token_if_stale(chat_id, ctx_token) + if not ctx_token: + self.logger.warning( + "Dropped {} buffered tool hint(s) for {}: no context_token", + len(hints), + chat_id, + ) + return + + try: + await self._send_text(chat_id, "\n\n".join(hints), ctx_token) + except Exception: + self.logger.exception( + "Failed to flush buffered tool hints for {}", chat_id + ) + async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None: """Best-effort sendtyping wrapper.""" if not typing_ticket: @@ -944,11 +1051,47 @@ class WeixinChannel(BaseChannel): self._assert_session_active() is_progress = bool((msg.metadata or {}).get("_progress", False)) + + # Buffer tool hints to coalesce consecutive ones and avoid burning + # WeChat iLink rate-limit quota (~7 msgs / 5 min). + if is_progress and (msg.metadata or {}).get("_tool_hint"): + if not self.send_tool_hints: + return + self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content) + self.logger.debug( + "Buffered tool hint for {} (count={})", + msg.chat_id, + len(self._pending_tool_hints[msg.chat_id]), + ) + return + + # Reasoning deltas are invisible in WeChat (there is no reasoning + # UI). Skip them entirely — do not send and do not flush buffer. + if is_progress and (msg.metadata or {}).get("_reasoning_delta"): + self.logger.debug( + "Dropped invisible reasoning delta for {}", msg.chat_id + ) + return + + content = msg.content.strip() + + # Empty progress messages (e.g. after_iteration tool_events) must + # NOT act as separators — they have no visible content. + if is_progress and not content and not (msg.media or []): + self.logger.debug( + "Skipped empty progress message for {} (no visible content)", + msg.chat_id, + ) + return + + # Flush buffered hints before sending any visible message. + await self._flush_tool_hints(msg.chat_id) + if not is_progress: await self._stop_typing(msg.chat_id, clear_remote=True) - content = msg.content.strip() ctx_token = self._context_tokens.get(msg.chat_id, "") + ctx_token = await self._refresh_context_token_if_stale(msg.chat_id, ctx_token) if not ctx_token: raise RuntimeError( f"WeChat context_token missing for chat_id={msg.chat_id}, cannot send" @@ -1037,6 +1180,18 @@ class WeixinChannel(BaseChannel): with suppress(Exception): await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL) + async def send_delta( + self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None + ) -> None: + """Weixin iLink does not support native streaming deltas. + + We only hook ``_stream_end`` so buffered tool hints are flushed even + when the final answer carries the ``_streamed`` flag and bypasses + :meth:`send`. + """ + if metadata and metadata.get("_stream_end"): + await self._flush_tool_hints(chat_id) + async def _start_typing(self, chat_id: str, context_token: str = "") -> None: """Start typing indicator immediately when a message is received.""" if not self._client or not self._token or not chat_id: @@ -1120,10 +1275,11 @@ class WeixinChannel(BaseChannel): } data = await self._api_post("ilink/bot/sendmessage", body) + ret = data.get("ret", 0) errcode = data.get("errcode", 0) - if errcode and errcode != 0: + if (ret is not None and ret != 0) or (errcode is not None and errcode != 0): raise RuntimeError( - f"WeChat send text error (code {errcode}): {data.get('errmsg', '')}" + f"WeChat send text error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" ) async def _send_media_file( @@ -1270,10 +1426,11 @@ class WeixinChannel(BaseChannel): } data = await self._api_post("ilink/bot/sendmessage", body) + ret = data.get("ret", 0) errcode = data.get("errcode", 0) - if errcode and errcode != 0: + if (ret is not None and ret != 0) or (errcode is not None and errcode != 0): raise RuntimeError( - f"WeChat send media error (code {errcode}): {data.get('errmsg', '')}" + f"WeChat send media error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" ) diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index a695ba936..3d3606e75 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -1,6 +1,7 @@ import asyncio import json import tempfile +import time from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock @@ -374,6 +375,7 @@ async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None channel._client = object() channel._token = "token" channel._context_tokens["wx-user"] = "ctx-typing" + channel._context_token_at["wx-user"] = time.time() channel._send_text = AsyncMock() channel._api_post = AsyncMock( side_effect=[ @@ -402,6 +404,7 @@ async def test_send_still_sends_text_when_typing_ticket_missing() -> None: channel._client = object() channel._token = "token" channel._context_tokens["wx-user"] = "ctx-no-ticket" + channel._context_token_at["wx-user"] = time.time() channel._send_text = AsyncMock() channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"}) @@ -1254,3 +1257,526 @@ async def test_send_text_succeeds_on_zero_errcode() -> None: await channel._send_text("wx-user", "hello", "ctx-ok") channel._api_post.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_send_text_raises_on_nonzero_ret_even_when_errcode_zero() -> None: + """_send_text must raise when the API returns ret != 0, even if errcode is 0. + + The iLink API signals failure through either field. Checking only errcode + caused silent message drops (responses generated but never delivered). + """ + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._api_post = AsyncMock( + return_value={"ret": -100, "errcode": 0, "errmsg": "internal error"} + ) + + with pytest.raises(RuntimeError, match="WeChat send text error.*ret=-100.*errcode=0"): + await channel._send_text("wx-user", "hello", "ctx-ok") + + channel._api_post.assert_awaited_once() + + +# --------------------------------------------------------------------------- +# Tests for _poll_once not silently dropping messages on processing errors +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_poll_once_logs_exception_on_process_message_failure(monkeypatch) -> None: + """When _process_message raises, _poll_once must log the error and continue + processing remaining messages instead of silently swallowing the exception.""" + channel, _bus = _make_channel() + channel._client = SimpleNamespace(timeout=None) + channel._token = "token" + channel._get_updates_buf = "old-buf" + + calls = [] + logged_messages: list[str] = [] + + async def _failing_process(msg: dict) -> None: + calls.append(msg.get("message_id")) + if msg.get("message_id") == "msg-1": + raise RuntimeError("processing failed") + + channel._process_message = _failing_process # type: ignore[method-assign] + + monkeypatch.setattr( + channel.logger, + "exception", + lambda message, *args, **kwargs: logged_messages.append(str(message)), + ) + + channel._api_post = AsyncMock( # type: ignore[method-assign] + return_value={ + "ret": 0, + "errcode": 0, + "get_updates_buf": "new-buf", + "msgs": [ + {"message_id": "msg-1", "message_type": 1}, + {"message_id": "msg-2", "message_type": 1}, + ], + } + ) + + await channel._poll_once() + + # Both messages should have been attempted + assert calls == ["msg-1", "msg-2"] + # Buffer should still advance (already updated before processing) + assert channel._get_updates_buf == "new-buf" + # Error should be logged + assert any("Failed to process WeChat message" in m for m in logged_messages) + + +@pytest.mark.asyncio +async def test_poll_loop_logs_exception_and_continues_on_poll_failure(monkeypatch) -> None: + """When _poll_once raises a non-timeout exception, the start() loop must log + the error and continue polling instead of exiting silently.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.config.token = "token" # skip QR login in start() + channel._running = True + + call_count = 0 + logged_messages: list[str] = [] + + async def _failing_poll() -> None: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("poll exploded") + channel._running = False # Stop after second call + + channel._poll_once = _failing_poll # type: ignore[method-assign] + + monkeypatch.setattr( + channel.logger, + "exception", + lambda message, *args, **kwargs: logged_messages.append(str(message)), + ) + + # Use a tiny retry delay so the test finishes quickly + original_retry = weixin_mod.RETRY_DELAY_S + weixin_mod.RETRY_DELAY_S = 0.01 + try: + await channel.start() + finally: + weixin_mod.RETRY_DELAY_S = original_retry + + assert call_count == 2 + assert any("WeChat poll loop error" in m for m in logged_messages) + + +# --------------------------------------------------------------------------- +# Tool-hint buffering +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_buffer_single_tool_hint_not_sent_immediately() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Using tool", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + channel._send_text.assert_not_awaited() + assert channel._pending_tool_hints["wx-user"] == ["Using tool"] + + +@pytest.mark.asyncio +async def test_buffer_multiple_tool_hints_flushed_on_final_answer() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + for hint in ["tool1", "tool2"]: + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": hint, + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._send_text.await_count == 2 + channel._send_text.assert_any_await("wx-user", "tool1\n\ntool2", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_thought_progress_flushes_tool_hints() -> None: + """Thoughts are visible progress messages and must act as separators, + flushing buffered tool hints before they are sent.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + # Buffer a tool hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "search 'foo'", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + # Send a thought — progress but not a tool_hint. + # It must act as a separator and flush the buffered hint. + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Let me think...", + "media": [], + "metadata": {"_progress": True}, + }, + )() + ) + + # The buffered hint was flushed before the thought was sent. + channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Let me think...", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + # Final answer arrives with nothing left to flush. + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._send_text.await_count == 3 + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + + +@pytest.mark.asyncio +async def test_reasoning_delta_does_not_flush_tool_hints() -> None: + """Reasoning deltas are invisible in WeChat and must NOT flush buffered + tool hints — otherwise hints separated only by hidden reasoning would + fail to coalesce.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + # Buffer a tool hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "search 'foo'", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + # Send a reasoning delta — invisible in WeChat, must NOT flush + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Thinking step 1...", + "media": [], + "metadata": {"_progress": True, "_reasoning_delta": True}, + }, + )() + ) + + # Reasoning is invisible; hint stays buffered, _send_text not called + channel._send_text.assert_not_awaited() + assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"] + + # Final answer flushes the buffered hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_empty_progress_message_does_not_flush_tool_hints() -> None: + """Empty progress messages (e.g. after_iteration tool_events) have no + visible content and must NOT act as separators.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + # Buffer a tool hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "search 'foo'", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + # Send an empty progress message (no content, no media) + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "", + "media": [], + "metadata": {"_progress": True, "_tool_events": [{"phase": "end"}]}, + }, + )() + ) + + # Nothing should have been sent yet + channel._send_text.assert_not_awaited() + assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"] + + # Final answer flushes the buffered hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_buffer_flush_refreshes_context_token() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-old" + channel._context_token_at["wx-user"] = time.time() + channel._refresh_context_token_if_stale = AsyncMock(return_value="ctx-refreshed") + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._refresh_context_token_if_stale.await_count == 2 + channel._refresh_context_token_if_stale.assert_any_await("wx-user", "ctx-old") + channel._send_text.assert_any_await("wx-user", "hint", "ctx-refreshed") + + +@pytest.mark.asyncio +async def test_buffer_flush_failure_does_not_block_final_answer() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock(side_effect=[RuntimeError("boom"), None]) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._send_text.await_count == 2 + channel._send_text.assert_any_await("wx-user", "hint", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + + +@pytest.mark.asyncio +async def test_buffer_flushed_on_stream_end() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send_delta("wx-user", "", {"_stream_end": True}) + + channel._send_text.assert_awaited_once_with("wx-user", "hint", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_stop_clears_buffer() -> None: + channel, _bus = _make_channel() + channel._pending_tool_hints["wx-user"] = ["hint1", "hint2"] + await channel.stop() + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_send_tool_hints_false_drops_tool_hints() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = False + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + channel._send_text.assert_not_awaited() + assert "wx-user" not in channel._pending_tool_hints