diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index b11c3b115..359b5f232 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -11,13 +11,13 @@ from __future__ import annotations import asyncio import base64 -import copy import hashlib import json import os import random import re import time +import uuid from collections import OrderedDict from contextlib import suppress from pathlib import Path @@ -47,14 +47,13 @@ ITEM_FILE = 4 ITEM_VIDEO = 5 # MessageType (1 = inbound from user, 2 = outbound from bot) -MESSAGE_TYPE_USER = 1 MESSAGE_TYPE_BOT = 2 # MessageState MESSAGE_STATE_FINISH = 2 WEIXIN_MAX_MESSAGE_LEN = 4000 -WEIXIN_CHANNEL_VERSION = "2.1.7" +WEIXIN_CHANNEL_VERSION = "2.1.1" ILINK_APP_ID = "bot" @@ -80,18 +79,10 @@ BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION} ERRCODE_SESSION_EXPIRED = -14 SESSION_PAUSE_DURATION_S = 60 * 60 -# iLink rate-limit / stale-session errcode -RATE_LIMIT_ERRCODE = -2 - -# iLink rate-limit backoff (wxclawbot-cli docs: ~7 msgs / 5 min per bot) -RATE_LIMIT_BACKOFF_S = 60 - - -def _is_api_error(data: dict) -> bool: - """True when iLink response signals failure via ``ret`` or ``errcode``.""" - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - return (ret is not None and ret != 0) or (errcode is not None and errcode != 0) +# iLink context_token is observed to expire server-side after ~90-160s of +# agent inactivity (openclaw/openclaw#61174). Proactively refresh before +# sending if the cached token is older than this threshold. +CONTEXT_TOKEN_MAX_AGE_S = 60 # Retry constants (matching the reference plugin's monitor.ts) @@ -174,7 +165,7 @@ class WeixinChannel(BaseChannel): self._session_pause_until: float = 0.0 self._typing_tasks: dict[str, asyncio.Task] = {} self._typing_tickets: dict[str, dict[str, Any]] = {} - self._pending_tool_hints: dict[str, list[str]] = {} + self._context_token_at: dict[str, float] = {} # ------------------------------------------------------------------ # State persistence @@ -511,7 +502,6 @@ class WeixinChannel(BaseChannel): async def stop(self) -> None: self._running = False - self._pending_tool_hints.clear() if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() for chat_id in list(self._typing_tasks): @@ -542,16 +532,6 @@ class WeixinChannel(BaseChannel): f"WeChat session paused, {remaining_min} min remaining (errcode {ERRCODE_SESSION_EXPIRED})" ) - def _check_response_error(self, data: dict, operation: str) -> None: - """Raise if *data* contains an iLink API error.""" - if not _is_api_error(data): - return - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - raise RuntimeError( - f"WeChat {operation} error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" - ) - async def _poll_once(self) -> None: remaining = self._session_pause_remaining_s() if remaining > 0: @@ -573,7 +553,9 @@ class WeixinChannel(BaseChannel): ret = data.get("ret", 0) errcode = data.get("errcode", 0) - if _is_api_error(data): + is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0) + + if is_error: if errcode == ERRCODE_SESSION_EXPIRED or ret == ERRCODE_SESSION_EXPIRED: self._pause_session() remaining = self._session_pause_remaining_s() @@ -638,6 +620,7 @@ class WeixinChannel(BaseChannel): ctx_token = msg.get("context_token", "") if ctx_token: self._context_tokens[from_user_id] = ctx_token + self._context_token_at[from_user_id] = time.time() self._save_state() # Parse item_list (WeixinMessage.item_list — types.ts:161) @@ -943,6 +926,65 @@ class WeixinChannel(BaseChannel): } return "" + async def _refresh_context_token_if_stale( + self, chat_id: str, context_token: str + ) -> str: + """Return a fresh context_token if the cached one is too old. + + iLink context_token expires server-side after a short idle period + (empirically ~90s). Proactively refreshing before sending prevents + silent message loss on long agent turns or cron pushes. + """ + if not context_token: + return context_token + + now = time.time() + cached_at = self._context_token_at.get(chat_id, 0) + age = now - cached_at + + if age < CONTEXT_TOKEN_MAX_AGE_S: + return context_token + + self.logger.debug( + "WeChat context_token for {} is {:.0f}s old; refreshing via getconfig", + chat_id, + age, + ) + + body: dict[str, Any] = { + "ilink_user_id": chat_id, + "context_token": context_token, + "base_info": BASE_INFO, + } + try: + data = await self._api_post("ilink/bot/getconfig", body) + except Exception as e: + self.logger.warning("WeChat getconfig failed for {}: {}", chat_id, e) + return context_token + + if data.get("ret", 0) != 0: + self.logger.warning( + "WeChat getconfig returned ret={} for {}: {}", + data.get("ret"), + chat_id, + data.get("errmsg", ""), + ) + return context_token + + new_token = str(data.get("context_token", "") or "") + if new_token and new_token != context_token: + self.logger.info( + "WeChat context_token refreshed for {} (age {:.0f}s -> fresh)", + chat_id, + age, + ) + self._context_tokens[chat_id] = new_token + self._context_token_at[chat_id] = now + self._save_state() + return new_token + + return context_token + async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None: """Best-effort sendtyping wrapper.""" if not typing_ticket: @@ -972,44 +1014,12 @@ class WeixinChannel(BaseChannel): self._assert_session_active() is_progress = bool((msg.metadata or {}).get("_progress", False)) - - # Buffer tool hints to coalesce consecutive ones and avoid burning - # WeChat iLink rate-limit quota (~7 msgs / 5 min). - if is_progress and (msg.metadata or {}).get("_tool_hint"): - if not self.send_tool_hints: - return - self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content) - self.logger.debug( - "Buffered tool hint for {} (count={})", - msg.chat_id, - len(self._pending_tool_hints[msg.chat_id]), - ) - return - - # Any non-tool-hint message (thought, final answer, /stop response, …) - # flushes the buffer so hints do not get stuck when the final answer - # is suppressed, streamed, or otherwise skips this path. - if not ((msg.metadata or {}).get("_tool_hint") and is_progress): - hints = self._pending_tool_hints.pop(msg.chat_id, None) - if hints: - ctx_token = self._context_tokens.get(msg.chat_id, "") - if ctx_token: - self.logger.info( - "Flushing {} buffered tool hint(s) for {}", - len(hints), msg.chat_id, - ) - await self._send_text(msg.chat_id, "\n".join(hints), ctx_token) - else: - self.logger.warning( - "Dropped {} buffered tool hint(s) for {}: no context_token", - len(hints), msg.chat_id, - ) - if not is_progress: await self._stop_typing(msg.chat_id, clear_remote=True) content = msg.content.strip() ctx_token = self._context_tokens.get(msg.chat_id, "") + ctx_token = await self._refresh_context_token_if_stale(msg.chat_id, ctx_token) if not ctx_token: raise RuntimeError( f"WeChat context_token missing for chat_id={msg.chat_id}, cannot send" @@ -1098,31 +1108,6 @@ class WeixinChannel(BaseChannel): with suppress(Exception): await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL) - async def send_delta( - self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None - ) -> None: - """Weixin iLink does not support native streaming deltas. - - We only hook ``_stream_end`` so buffered tool hints are flushed even - when the final answer carries the ``_streamed`` flag and bypasses - :meth:`send`. - """ - if metadata and metadata.get("_stream_end"): - hints = self._pending_tool_hints.pop(chat_id, None) - if hints: - ctx_token = self._context_tokens.get(chat_id, "") - if ctx_token: - self.logger.info( - "Flushing {} buffered tool hint(s) for {} (stream_end)", - len(hints), chat_id, - ) - await self._send_text(chat_id, "\n".join(hints), ctx_token) - else: - self.logger.warning( - "Dropped {} buffered tool hint(s) for {}: no context_token", - len(hints), chat_id, - ) - async def _start_typing(self, chat_id: str, context_token: str = "") -> None: """Start typing indicator immediately when a message is received.""" if not self._client or not self._token or not chat_id: @@ -1175,67 +1160,6 @@ class WeixinChannel(BaseChannel): except Exception as e: self.logger.debug("typing clear failed for {}: {}", chat_id, e) - @staticmethod - def _generate_client_id() -> str: - """Generate a client_id matching the reference plugin format. - - openclaw-weixin uses ``{prefix}:{timestamp}-{8-char hex}``. - """ - return f"nanobot:{int(time.time() * 1000)}-{os.urandom(4).hex()}" - - async def _send_message_with_retry( - self, - body: dict[str, Any], - context_token: str, - to_user_id: str, - client_id: str, - operation: str, - ) -> None: - """Post ``ilink/bot/sendmessage`` with stale-session and rate-limit handling.""" - data = await self._api_post("ilink/bot/sendmessage", body) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - - # Stale session (errmsg == "unknown error") — retry once without token. - if ( - (ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE) - and (errmsg or "").strip().lower() == "unknown error" - and context_token - ): - self.logger.warning( - "WeChat {} stale-session signal for {} (client_id={}); " - "retrying without context_token", - operation, to_user_id, client_id, - ) - body_no_ctx = copy.deepcopy(body) - body_no_ctx["msg"].pop("context_token", None) - data = await self._api_post("ilink/bot/sendmessage", body_no_ctx) - ret = data.get("ret", 0) - errcode = data.get("errcode", 0) - errmsg = data.get("errmsg", "") - if ret == 0 and (errcode == 0 or errcode is None): - self.logger.warning( - "WeChat {} succeeded WITHOUT context_token for {}; " - "clearing expired token from cache", - operation, to_user_id, - ) - self._context_tokens.pop(to_user_id, None) - self._save_state() - return - - # Rate limit (-2) — wait and retry once. - if ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE: - self.logger.warning( - "WeChat {} rate limited for {} (client_id={}); " - "waiting {}s before retry", - operation, to_user_id, client_id, RATE_LIMIT_BACKOFF_S, - ) - await asyncio.sleep(RATE_LIMIT_BACKOFF_S) - data = await self._api_post("ilink/bot/sendmessage", body) - - self._check_response_error(data, operation) - async def _send_text( self, to_user_id: str, @@ -1243,7 +1167,7 @@ class WeixinChannel(BaseChannel): context_token: str, ) -> None: """Send a text message matching the exact protocol from send.ts.""" - client_id = self._generate_client_id() + client_id = f"nanobot-{uuid.uuid4().hex[:12]}" item_list: list[dict] = [] if text: @@ -1266,8 +1190,13 @@ class WeixinChannel(BaseChannel): "base_info": BASE_INFO, } - await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send text") - self.logger.debug("WeChat text sent to {} (client_id={})", to_user_id, client_id) + data = await self._api_post("ilink/bot/sendmessage", body) + ret = data.get("ret", 0) + errcode = data.get("errcode", 0) + if (ret is not None and ret != 0) or (errcode is not None and errcode != 0): + raise RuntimeError( + f"WeChat send text error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" + ) async def _send_media_file( self, @@ -1393,7 +1322,7 @@ class WeixinChannel(BaseChannel): media_item["len"] = str(raw_size) # Send each media item as its own message (matching reference plugin) - client_id = self._generate_client_id() + client_id = f"nanobot-{uuid.uuid4().hex[:12]}" item_list: list[dict] = [{"type": item_type, item_key: media_item}] weixin_msg: dict[str, Any] = { @@ -1412,7 +1341,13 @@ class WeixinChannel(BaseChannel): "base_info": BASE_INFO, } - await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send media") + data = await self._api_post("ilink/bot/sendmessage", body) + ret = data.get("ret", 0) + errcode = data.get("errcode", 0) + if (ret is not None and ret != 0) or (errcode is not None and errcode != 0): + raise RuntimeError( + f"WeChat send media error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" + ) # --------------------------------------------------------------------------- diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index fda0a82ca..d35eca0c7 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -1,6 +1,7 @@ import asyncio import json import tempfile +import time from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock @@ -9,7 +10,6 @@ import httpx import pytest import nanobot.channels.weixin as weixin_mod -from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.weixin import ( ITEM_IMAGE, @@ -49,11 +49,11 @@ def test_make_headers_includes_route_tag_when_configured() -> None: assert headers["Authorization"] == "Bearer token" assert headers["SKRouteTag"] == "123" assert headers["iLink-App-Id"] == "bot" - assert headers["iLink-App-ClientVersion"] == str((2 << 16) | (1 << 8) | 7) + assert headers["iLink-App-ClientVersion"] == str((2 << 16) | (1 << 8) | 1) def test_channel_version_matches_reference_plugin_version() -> None: - assert WEIXIN_CHANNEL_VERSION == "2.1.7" + assert WEIXIN_CHANNEL_VERSION == "2.1.1" def test_save_and_load_state_persists_context_tokens(tmp_path) -> None: @@ -375,6 +375,7 @@ async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None channel._client = object() channel._token = "token" channel._context_tokens["wx-user"] = "ctx-typing" + channel._context_token_at["wx-user"] = time.time() channel._send_text = AsyncMock() channel._api_post = AsyncMock( side_effect=[ @@ -403,6 +404,7 @@ async def test_send_still_sends_text_when_typing_ticket_missing() -> None: channel._client = object() channel._token = "token" channel._context_tokens["wx-user"] = "ctx-no-ticket" + channel._context_token_at["wx-user"] = time.time() channel._send_text = AsyncMock() channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"}) @@ -1363,355 +1365,3 @@ async def test_poll_loop_logs_exception_and_continues_on_poll_failure(monkeypatc assert call_count == 2 assert any("WeChat poll loop error" in m for m in logged_messages) - - -@pytest.mark.asyncio -async def test_send_text_retries_without_context_token_on_ret_minus_two() -> None: - """If sendmessage returns ret=-2 with a context_token, retry without it.""" - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel._context_tokens["wx-user"] = "expired-token" - - channel._api_post = AsyncMock( - side_effect=[ - {"ret": -2, "errmsg": "unknown error"}, # stale session with token - {"ret": 0}, # retry without token succeeds - ] - ) - - await channel._send_text("wx-user", "hello", "expired-token") - - # Should have called API twice - assert channel._api_post.await_count == 2 - # First call includes context_token - first_body = channel._api_post.await_args_list[0].args[1] - assert first_body["msg"]["context_token"] == "expired-token" - # Second call does NOT include context_token - second_body = channel._api_post.await_args_list[1].args[1] - assert "context_token" not in second_body["msg"] - # Expired token should be cleared from cache - assert "wx-user" not in channel._context_tokens - - -@pytest.mark.asyncio -async def test_send_text_stale_session_retries_without_token_then_rate_limit_backoff(monkeypatch) -> None: - """Stale-session 'unknown error' triggers tokenless retry, then rate-limit backoff.""" - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel._context_tokens["wx-user"] = "bad-token" - - channel._api_post = AsyncMock( - side_effect=[ - {"ret": -2, "errmsg": "unknown error"}, # with token - {"ret": -2, "errmsg": "unknown error"}, # without token - {"ret": -2, "errmsg": "unknown error"}, # rate-limit retry - ] - ) - - # Speed up the 60-second backoff for testing - monkeypatch.setattr(weixin_mod, "RATE_LIMIT_BACKOFF_S", 0) - - with pytest.raises(RuntimeError, match="WeChat send text error"): - await channel._send_text("wx-user", "hello", "bad-token") - - # 3 calls: original + tokenless + rate-limit retry - assert channel._api_post.await_count == 3 - # Token is NOT cleared because tokenless retry also failed - assert channel._context_tokens.get("wx-user") == "bad-token" - - -@pytest.mark.asyncio -async def test_send_text_rate_limit_with_empty_errmsg_waits_and_retries(monkeypatch) -> None: - """Empty errmsg ret=-2 is treated as rate limit: wait then retry once.""" - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - - channel._api_post = AsyncMock( - side_effect=[ - {"ret": -2}, # first attempt — rate limit - {"ret": -2}, # backoff retry — still rate limit - ] - ) - - monkeypatch.setattr(weixin_mod, "RATE_LIMIT_BACKOFF_S", 0) - - with pytest.raises(RuntimeError, match="WeChat send text error"): - await channel._send_text("wx-user", "hello", "") - - assert channel._api_post.await_count == 2 - - -# --------------------------------------------------------------------------- -# Tests for _is_stale_session_ret (hermes-agent#17228 / #18105) -# --------------------------------------------------------------------------- - - -class TestIsApiError: - """Verify the shared ``_is_api_error`` predicate used by _poll_once and send helpers.""" - - def test_success_codes_are_not_error(self): - assert weixin_mod._is_api_error({"ret": 0, "errcode": 0}) is False - assert weixin_mod._is_api_error({"ret": 0}) is False - assert weixin_mod._is_api_error({"errcode": 0}) is False - assert weixin_mod._is_api_error({}) is False - - def test_nonzero_ret_is_error(self): - assert weixin_mod._is_api_error({"ret": -2}) is True - assert weixin_mod._is_api_error({"ret": -14}) is True - - def test_nonzero_errcode_is_error(self): - assert weixin_mod._is_api_error({"errcode": -2}) is True - assert weixin_mod._is_api_error({"ret": 0, "errcode": -2}) is True - - -@pytest.mark.asyncio -async def test_send_text_rate_limit_backoff_succeeds(monkeypatch) -> None: - """If rate-limit retry succeeds after backoff, return normally.""" - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - - channel._api_post = AsyncMock( - side_effect=[ - {"ret": -2}, # first attempt — rate limit - {"ret": 0}, # backoff retry — success - ] - ) - - monkeypatch.setattr(weixin_mod, "RATE_LIMIT_BACKOFF_S", 0) - - await channel._send_text("wx-user", "hello", "") - - assert channel._api_post.await_count == 2 - - -class TestToolHintBuffering: - """Tool hints are buffered inside WeixinChannel to coalesce consecutive - ones and avoid burning the iLink rate-limit quota (~7 msgs / 5 min).""" - - @pytest.mark.asyncio - async def test_single_tool_hint_buffered_until_flush(self): - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel._context_tokens["wx-user"] = "ctx-1" - channel.send_tool_hints = True - channel._send_text = AsyncMock() - - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="read_file(a.py)", - metadata={"_progress": True, "_tool_hint": True}, - )) - - # Buffered — not sent yet - channel._send_text.assert_not_awaited() - - # Non-tool-hint message flushes the buffer first - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="done", - metadata={}, - )) - - # First call is the coalesced hint, second is the trigger message - assert channel._send_text.await_count == 2 - assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)" - assert channel._send_text.await_args_list[1].args[1] == "done" - - @pytest.mark.asyncio - async def test_multiple_tool_hints_coalesced(self): - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel._context_tokens["wx-user"] = "ctx-1" - channel.send_tool_hints = True - channel._send_text = AsyncMock() - - for hint in ["read_file(a.py)", "read_file(b.py)", "exec(cmd)"]: - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content=hint, - metadata={"_progress": True, "_tool_hint": True}, - )) - - channel._send_text.assert_not_awaited() - - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="done", - metadata={}, - )) - - assert channel._send_text.await_count == 2 - assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)\nread_file(b.py)\nexec(cmd)" - assert channel._send_text.await_args_list[1].args[1] == "done" - - @pytest.mark.asyncio - async def test_tool_hints_different_chats_not_coalesced(self): - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel._context_tokens["user-a"] = "ctx-a" - channel._context_tokens["user-b"] = "ctx-b" - channel.send_tool_hints = True - channel._send_text = AsyncMock() - - await channel.send(OutboundMessage( - channel="weixin", - chat_id="user-a", - content="tool-a", - metadata={"_progress": True, "_tool_hint": True}, - )) - await channel.send(OutboundMessage( - channel="weixin", - chat_id="user-b", - content="tool-b", - metadata={"_progress": True, "_tool_hint": True}, - )) - - channel._send_text.assert_not_awaited() - - # Flush chat-a - await channel.send(OutboundMessage( - channel="weixin", - chat_id="user-a", - content="done-a", - metadata={}, - )) - # Flush chat-b - await channel.send(OutboundMessage( - channel="weixin", - chat_id="user-b", - content="done-b", - metadata={}, - )) - - # 2 calls per chat (hint + trigger message) - assert channel._send_text.await_count == 4 - - @pytest.mark.asyncio - async def test_non_tool_hint_flushes_pending_hints(self): - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel._context_tokens["wx-user"] = "ctx-1" - channel.send_tool_hints = True - channel._send_text = AsyncMock() - channel._stop_typing = AsyncMock() - channel._get_typing_ticket = AsyncMock(return_value="") - - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="read_file(a.py)", - metadata={"_progress": True, "_tool_hint": True}, - )) - channel._send_text.assert_not_awaited() - - # Final answer triggers flush before sending itself - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="final answer", - metadata={}, - )) - - assert channel._send_text.await_count == 2 - assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)" - assert channel._send_text.await_args_list[1].args[1] == "final answer" - - @pytest.mark.asyncio - async def test_intermediate_progress_flushes_hints(self): - """Any non-tool-hint message (including thoughts) flushes the buffer - so hints never get stuck when the final answer is streamed or skipped.""" - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel._context_tokens["wx-user"] = "ctx-1" - channel.send_tool_hints = True - channel._send_text = AsyncMock() - channel._stop_typing = AsyncMock() - channel._get_typing_ticket = AsyncMock(return_value="") - - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="read_file(a.py)", - metadata={"_progress": True, "_tool_hint": True}, - )) - - # A thought message flushes the existing buffer before sending itself. - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="Thinking...", - metadata={"_progress": True}, - )) - - # Another tool hint starts a new buffer. - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="exec(cmd)", - metadata={"_progress": True, "_tool_hint": True}, - )) - - # Final answer flushes the remaining buffer before sending itself. - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="done", - metadata={}, - )) - - assert channel._send_text.await_count == 4 - assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)" - assert channel._send_text.await_args_list[1].args[1] == "Thinking..." - assert channel._send_text.await_args_list[2].args[1] == "exec(cmd)" - assert channel._send_text.await_args_list[3].args[1] == "done" - - @pytest.mark.asyncio - async def test_send_tool_hints_disabled_drops(self): - channel, _bus = _make_channel() - channel._client = object() - channel._token = "token" - channel.send_tool_hints = False - channel._send_text = AsyncMock() - - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="read_file(a.py)", - metadata={"_progress": True, "_tool_hint": True}, - )) - - channel._send_text.assert_not_awaited() - - @pytest.mark.asyncio - async def test_stop_clears_pending_tool_hints(self): - channel, _bus = _make_channel() - channel._client = AsyncMock() - channel._token = "token" - channel._context_tokens["wx-user"] = "ctx-1" - channel.send_tool_hints = True - channel._send_text = AsyncMock() - - await channel.send(OutboundMessage( - channel="weixin", - chat_id="wx-user", - content="read_file(a.py)", - metadata={"_progress": True, "_tool_hint": True}, - )) - - await channel.stop() - - assert not channel._pending_tool_hints - channel._send_text.assert_not_awaited()