fix(weixin): prevent silent message drops from poll exceptions and expired tokens

- Remove suppress(Exception) from poll loop and message processing; add
  logger.exception so inbound errors are visible.
- Check both ret and errcode on send to avoid silent drops when iLink
  returns ret != 0 with errcode == 0.
- Proactively refresh context_token via getconfig before sending if the
  cached token is older than 60s. This prevents message loss on long
  agent turns and cron pushes without relying on complex retry logic.

Refs: openclaw/openclaw#61174, NousResearch/hermes-agent#21011
This commit is contained in:
chengyongru 2026-05-19 14:09:41 +08:00
parent 1672f20d6e
commit e2955a2e12
2 changed files with 92 additions and 507 deletions

View File

@ -11,13 +11,13 @@ from __future__ import annotations
import asyncio
import base64
import copy
import hashlib
import json
import os
import random
import re
import time
import uuid
from collections import OrderedDict
from contextlib import suppress
from pathlib import Path
@ -47,14 +47,13 @@ ITEM_FILE = 4
ITEM_VIDEO = 5
# MessageType (1 = inbound from user, 2 = outbound from bot)
MESSAGE_TYPE_USER = 1
MESSAGE_TYPE_BOT = 2
# MessageState
MESSAGE_STATE_FINISH = 2
WEIXIN_MAX_MESSAGE_LEN = 4000
WEIXIN_CHANNEL_VERSION = "2.1.7"
WEIXIN_CHANNEL_VERSION = "2.1.1"
ILINK_APP_ID = "bot"
@ -80,18 +79,10 @@ BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION}
ERRCODE_SESSION_EXPIRED = -14
SESSION_PAUSE_DURATION_S = 60 * 60
# iLink rate-limit / stale-session errcode
RATE_LIMIT_ERRCODE = -2
# iLink rate-limit backoff (wxclawbot-cli docs: ~7 msgs / 5 min per bot)
RATE_LIMIT_BACKOFF_S = 60
def _is_api_error(data: dict) -> bool:
"""True when iLink response signals failure via ``ret`` or ``errcode``."""
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
return (ret is not None and ret != 0) or (errcode is not None and errcode != 0)
# iLink context_token is observed to expire server-side after ~90-160s of
# agent inactivity (openclaw/openclaw#61174). Proactively refresh before
# sending if the cached token is older than this threshold.
CONTEXT_TOKEN_MAX_AGE_S = 60
# Retry constants (matching the reference plugin's monitor.ts)
@ -174,7 +165,7 @@ class WeixinChannel(BaseChannel):
self._session_pause_until: float = 0.0
self._typing_tasks: dict[str, asyncio.Task] = {}
self._typing_tickets: dict[str, dict[str, Any]] = {}
self._pending_tool_hints: dict[str, list[str]] = {}
self._context_token_at: dict[str, float] = {}
# ------------------------------------------------------------------
# State persistence
@ -511,7 +502,6 @@ class WeixinChannel(BaseChannel):
async def stop(self) -> None:
self._running = False
self._pending_tool_hints.clear()
if self._poll_task and not self._poll_task.done():
self._poll_task.cancel()
for chat_id in list(self._typing_tasks):
@ -542,16 +532,6 @@ class WeixinChannel(BaseChannel):
f"WeChat session paused, {remaining_min} min remaining (errcode {ERRCODE_SESSION_EXPIRED})"
)
def _check_response_error(self, data: dict, operation: str) -> None:
"""Raise if *data* contains an iLink API error."""
if not _is_api_error(data):
return
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
raise RuntimeError(
f"WeChat {operation} error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)
async def _poll_once(self) -> None:
remaining = self._session_pause_remaining_s()
if remaining > 0:
@ -573,7 +553,9 @@ class WeixinChannel(BaseChannel):
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
if _is_api_error(data):
is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0)
if is_error:
if errcode == ERRCODE_SESSION_EXPIRED or ret == ERRCODE_SESSION_EXPIRED:
self._pause_session()
remaining = self._session_pause_remaining_s()
@ -638,6 +620,7 @@ class WeixinChannel(BaseChannel):
ctx_token = msg.get("context_token", "")
if ctx_token:
self._context_tokens[from_user_id] = ctx_token
self._context_token_at[from_user_id] = time.time()
self._save_state()
# Parse item_list (WeixinMessage.item_list — types.ts:161)
@ -943,6 +926,65 @@ class WeixinChannel(BaseChannel):
}
return ""
async def _refresh_context_token_if_stale(
self, chat_id: str, context_token: str
) -> str:
"""Return a fresh context_token if the cached one is too old.
iLink context_token expires server-side after a short idle period
(empirically ~90s). Proactively refreshing before sending prevents
silent message loss on long agent turns or cron pushes.
"""
if not context_token:
return context_token
now = time.time()
cached_at = self._context_token_at.get(chat_id, 0)
age = now - cached_at
if age < CONTEXT_TOKEN_MAX_AGE_S:
return context_token
self.logger.debug(
"WeChat context_token for {} is {:.0f}s old; refreshing via getconfig",
chat_id,
age,
)
body: dict[str, Any] = {
"ilink_user_id": chat_id,
"context_token": context_token,
"base_info": BASE_INFO,
}
try:
data = await self._api_post("ilink/bot/getconfig", body)
except Exception as e:
self.logger.warning("WeChat getconfig failed for {}: {}", chat_id, e)
return context_token
if data.get("ret", 0) != 0:
self.logger.warning(
"WeChat getconfig returned ret={} for {}: {}",
data.get("ret"),
chat_id,
data.get("errmsg", ""),
)
return context_token
new_token = str(data.get("context_token", "") or "")
if new_token and new_token != context_token:
self.logger.info(
"WeChat context_token refreshed for {} (age {:.0f}s -> fresh)",
chat_id,
age,
)
self._context_tokens[chat_id] = new_token
self._context_token_at[chat_id] = now
self._save_state()
return new_token
return context_token
async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None:
"""Best-effort sendtyping wrapper."""
if not typing_ticket:
@ -972,44 +1014,12 @@ class WeixinChannel(BaseChannel):
self._assert_session_active()
is_progress = bool((msg.metadata or {}).get("_progress", False))
# Buffer tool hints to coalesce consecutive ones and avoid burning
# WeChat iLink rate-limit quota (~7 msgs / 5 min).
if is_progress and (msg.metadata or {}).get("_tool_hint"):
if not self.send_tool_hints:
return
self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content)
self.logger.debug(
"Buffered tool hint for {} (count={})",
msg.chat_id,
len(self._pending_tool_hints[msg.chat_id]),
)
return
# Any non-tool-hint message (thought, final answer, /stop response, …)
# flushes the buffer so hints do not get stuck when the final answer
# is suppressed, streamed, or otherwise skips this path.
if not ((msg.metadata or {}).get("_tool_hint") and is_progress):
hints = self._pending_tool_hints.pop(msg.chat_id, None)
if hints:
ctx_token = self._context_tokens.get(msg.chat_id, "")
if ctx_token:
self.logger.info(
"Flushing {} buffered tool hint(s) for {}",
len(hints), msg.chat_id,
)
await self._send_text(msg.chat_id, "\n".join(hints), ctx_token)
else:
self.logger.warning(
"Dropped {} buffered tool hint(s) for {}: no context_token",
len(hints), msg.chat_id,
)
if not is_progress:
await self._stop_typing(msg.chat_id, clear_remote=True)
content = msg.content.strip()
ctx_token = self._context_tokens.get(msg.chat_id, "")
ctx_token = await self._refresh_context_token_if_stale(msg.chat_id, ctx_token)
if not ctx_token:
raise RuntimeError(
f"WeChat context_token missing for chat_id={msg.chat_id}, cannot send"
@ -1098,31 +1108,6 @@ class WeixinChannel(BaseChannel):
with suppress(Exception):
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL)
async def send_delta(
self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Weixin iLink does not support native streaming deltas.
We only hook ``_stream_end`` so buffered tool hints are flushed even
when the final answer carries the ``_streamed`` flag and bypasses
:meth:`send`.
"""
if metadata and metadata.get("_stream_end"):
hints = self._pending_tool_hints.pop(chat_id, None)
if hints:
ctx_token = self._context_tokens.get(chat_id, "")
if ctx_token:
self.logger.info(
"Flushing {} buffered tool hint(s) for {} (stream_end)",
len(hints), chat_id,
)
await self._send_text(chat_id, "\n".join(hints), ctx_token)
else:
self.logger.warning(
"Dropped {} buffered tool hint(s) for {}: no context_token",
len(hints), chat_id,
)
async def _start_typing(self, chat_id: str, context_token: str = "") -> None:
"""Start typing indicator immediately when a message is received."""
if not self._client or not self._token or not chat_id:
@ -1175,67 +1160,6 @@ class WeixinChannel(BaseChannel):
except Exception as e:
self.logger.debug("typing clear failed for {}: {}", chat_id, e)
@staticmethod
def _generate_client_id() -> str:
"""Generate a client_id matching the reference plugin format.
openclaw-weixin uses ``{prefix}:{timestamp}-{8-char hex}``.
"""
return f"nanobot:{int(time.time() * 1000)}-{os.urandom(4).hex()}"
async def _send_message_with_retry(
self,
body: dict[str, Any],
context_token: str,
to_user_id: str,
client_id: str,
operation: str,
) -> None:
"""Post ``ilink/bot/sendmessage`` with stale-session and rate-limit handling."""
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
# Stale session (errmsg == "unknown error") — retry once without token.
if (
(ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE)
and (errmsg or "").strip().lower() == "unknown error"
and context_token
):
self.logger.warning(
"WeChat {} stale-session signal for {} (client_id={}); "
"retrying without context_token",
operation, to_user_id, client_id,
)
body_no_ctx = copy.deepcopy(body)
body_no_ctx["msg"].pop("context_token", None)
data = await self._api_post("ilink/bot/sendmessage", body_no_ctx)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
if ret == 0 and (errcode == 0 or errcode is None):
self.logger.warning(
"WeChat {} succeeded WITHOUT context_token for {}; "
"clearing expired token from cache",
operation, to_user_id,
)
self._context_tokens.pop(to_user_id, None)
self._save_state()
return
# Rate limit (-2) — wait and retry once.
if ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE:
self.logger.warning(
"WeChat {} rate limited for {} (client_id={}); "
"waiting {}s before retry",
operation, to_user_id, client_id, RATE_LIMIT_BACKOFF_S,
)
await asyncio.sleep(RATE_LIMIT_BACKOFF_S)
data = await self._api_post("ilink/bot/sendmessage", body)
self._check_response_error(data, operation)
async def _send_text(
self,
to_user_id: str,
@ -1243,7 +1167,7 @@ class WeixinChannel(BaseChannel):
context_token: str,
) -> None:
"""Send a text message matching the exact protocol from send.ts."""
client_id = self._generate_client_id()
client_id = f"nanobot-{uuid.uuid4().hex[:12]}"
item_list: list[dict] = []
if text:
@ -1266,8 +1190,13 @@ class WeixinChannel(BaseChannel):
"base_info": BASE_INFO,
}
await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send text")
self.logger.debug("WeChat text sent to {} (client_id={})", to_user_id, client_id)
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
if (ret is not None and ret != 0) or (errcode is not None and errcode != 0):
raise RuntimeError(
f"WeChat send text error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)
async def _send_media_file(
self,
@ -1393,7 +1322,7 @@ class WeixinChannel(BaseChannel):
media_item["len"] = str(raw_size)
# Send each media item as its own message (matching reference plugin)
client_id = self._generate_client_id()
client_id = f"nanobot-{uuid.uuid4().hex[:12]}"
item_list: list[dict] = [{"type": item_type, item_key: media_item}]
weixin_msg: dict[str, Any] = {
@ -1412,7 +1341,13 @@ class WeixinChannel(BaseChannel):
"base_info": BASE_INFO,
}
await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send media")
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
if (ret is not None and ret != 0) or (errcode is not None and errcode != 0):
raise RuntimeError(
f"WeChat send media error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)
# ---------------------------------------------------------------------------

View File

@ -1,6 +1,7 @@
import asyncio
import json
import tempfile
import time
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock
@ -9,7 +10,6 @@ import httpx
import pytest
import nanobot.channels.weixin as weixin_mod
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.weixin import (
ITEM_IMAGE,
@ -49,11 +49,11 @@ def test_make_headers_includes_route_tag_when_configured() -> None:
assert headers["Authorization"] == "Bearer token"
assert headers["SKRouteTag"] == "123"
assert headers["iLink-App-Id"] == "bot"
assert headers["iLink-App-ClientVersion"] == str((2 << 16) | (1 << 8) | 7)
assert headers["iLink-App-ClientVersion"] == str((2 << 16) | (1 << 8) | 1)
def test_channel_version_matches_reference_plugin_version() -> None:
assert WEIXIN_CHANNEL_VERSION == "2.1.7"
assert WEIXIN_CHANNEL_VERSION == "2.1.1"
def test_save_and_load_state_persists_context_tokens(tmp_path) -> None:
@ -375,6 +375,7 @@ async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-typing"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(
side_effect=[
@ -403,6 +404,7 @@ async def test_send_still_sends_text_when_typing_ticket_missing() -> None:
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-no-ticket"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"})
@ -1363,355 +1365,3 @@ async def test_poll_loop_logs_exception_and_continues_on_poll_failure(monkeypatc
assert call_count == 2
assert any("WeChat poll loop error" in m for m in logged_messages)
@pytest.mark.asyncio
async def test_send_text_retries_without_context_token_on_ret_minus_two() -> None:
"""If sendmessage returns ret=-2 with a context_token, retry without it."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "expired-token"
channel._api_post = AsyncMock(
side_effect=[
{"ret": -2, "errmsg": "unknown error"}, # stale session with token
{"ret": 0}, # retry without token succeeds
]
)
await channel._send_text("wx-user", "hello", "expired-token")
# Should have called API twice
assert channel._api_post.await_count == 2
# First call includes context_token
first_body = channel._api_post.await_args_list[0].args[1]
assert first_body["msg"]["context_token"] == "expired-token"
# Second call does NOT include context_token
second_body = channel._api_post.await_args_list[1].args[1]
assert "context_token" not in second_body["msg"]
# Expired token should be cleared from cache
assert "wx-user" not in channel._context_tokens
@pytest.mark.asyncio
async def test_send_text_stale_session_retries_without_token_then_rate_limit_backoff(monkeypatch) -> None:
"""Stale-session 'unknown error' triggers tokenless retry, then rate-limit backoff."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "bad-token"
channel._api_post = AsyncMock(
side_effect=[
{"ret": -2, "errmsg": "unknown error"}, # with token
{"ret": -2, "errmsg": "unknown error"}, # without token
{"ret": -2, "errmsg": "unknown error"}, # rate-limit retry
]
)
# Speed up the 60-second backoff for testing
monkeypatch.setattr(weixin_mod, "RATE_LIMIT_BACKOFF_S", 0)
with pytest.raises(RuntimeError, match="WeChat send text error"):
await channel._send_text("wx-user", "hello", "bad-token")
# 3 calls: original + tokenless + rate-limit retry
assert channel._api_post.await_count == 3
# Token is NOT cleared because tokenless retry also failed
assert channel._context_tokens.get("wx-user") == "bad-token"
@pytest.mark.asyncio
async def test_send_text_rate_limit_with_empty_errmsg_waits_and_retries(monkeypatch) -> None:
"""Empty errmsg ret=-2 is treated as rate limit: wait then retry once."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._api_post = AsyncMock(
side_effect=[
{"ret": -2}, # first attempt — rate limit
{"ret": -2}, # backoff retry — still rate limit
]
)
monkeypatch.setattr(weixin_mod, "RATE_LIMIT_BACKOFF_S", 0)
with pytest.raises(RuntimeError, match="WeChat send text error"):
await channel._send_text("wx-user", "hello", "")
assert channel._api_post.await_count == 2
# ---------------------------------------------------------------------------
# Tests for _is_stale_session_ret (hermes-agent#17228 / #18105)
# ---------------------------------------------------------------------------
class TestIsApiError:
"""Verify the shared ``_is_api_error`` predicate used by _poll_once and send helpers."""
def test_success_codes_are_not_error(self):
assert weixin_mod._is_api_error({"ret": 0, "errcode": 0}) is False
assert weixin_mod._is_api_error({"ret": 0}) is False
assert weixin_mod._is_api_error({"errcode": 0}) is False
assert weixin_mod._is_api_error({}) is False
def test_nonzero_ret_is_error(self):
assert weixin_mod._is_api_error({"ret": -2}) is True
assert weixin_mod._is_api_error({"ret": -14}) is True
def test_nonzero_errcode_is_error(self):
assert weixin_mod._is_api_error({"errcode": -2}) is True
assert weixin_mod._is_api_error({"ret": 0, "errcode": -2}) is True
@pytest.mark.asyncio
async def test_send_text_rate_limit_backoff_succeeds(monkeypatch) -> None:
"""If rate-limit retry succeeds after backoff, return normally."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._api_post = AsyncMock(
side_effect=[
{"ret": -2}, # first attempt — rate limit
{"ret": 0}, # backoff retry — success
]
)
monkeypatch.setattr(weixin_mod, "RATE_LIMIT_BACKOFF_S", 0)
await channel._send_text("wx-user", "hello", "")
assert channel._api_post.await_count == 2
class TestToolHintBuffering:
"""Tool hints are buffered inside WeixinChannel to coalesce consecutive
ones and avoid burning the iLink rate-limit quota (~7 msgs / 5 min)."""
@pytest.mark.asyncio
async def test_single_tool_hint_buffered_until_flush(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
# Buffered — not sent yet
channel._send_text.assert_not_awaited()
# Non-tool-hint message flushes the buffer first
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="done",
metadata={},
))
# First call is the coalesced hint, second is the trigger message
assert channel._send_text.await_count == 2
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)"
assert channel._send_text.await_args_list[1].args[1] == "done"
@pytest.mark.asyncio
async def test_multiple_tool_hints_coalesced(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
for hint in ["read_file(a.py)", "read_file(b.py)", "exec(cmd)"]:
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content=hint,
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="done",
metadata={},
))
assert channel._send_text.await_count == 2
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)\nread_file(b.py)\nexec(cmd)"
assert channel._send_text.await_args_list[1].args[1] == "done"
@pytest.mark.asyncio
async def test_tool_hints_different_chats_not_coalesced(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["user-a"] = "ctx-a"
channel._context_tokens["user-b"] = "ctx-b"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-a",
content="tool-a",
metadata={"_progress": True, "_tool_hint": True},
))
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-b",
content="tool-b",
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
# Flush chat-a
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-a",
content="done-a",
metadata={},
))
# Flush chat-b
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-b",
content="done-b",
metadata={},
))
# 2 calls per chat (hint + trigger message)
assert channel._send_text.await_count == 4
@pytest.mark.asyncio
async def test_non_tool_hint_flushes_pending_hints(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
channel._stop_typing = AsyncMock()
channel._get_typing_ticket = AsyncMock(return_value="")
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
# Final answer triggers flush before sending itself
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="final answer",
metadata={},
))
assert channel._send_text.await_count == 2
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)"
assert channel._send_text.await_args_list[1].args[1] == "final answer"
@pytest.mark.asyncio
async def test_intermediate_progress_flushes_hints(self):
"""Any non-tool-hint message (including thoughts) flushes the buffer
so hints never get stuck when the final answer is streamed or skipped."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
channel._stop_typing = AsyncMock()
channel._get_typing_ticket = AsyncMock(return_value="")
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
# A thought message flushes the existing buffer before sending itself.
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="Thinking...",
metadata={"_progress": True},
))
# Another tool hint starts a new buffer.
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="exec(cmd)",
metadata={"_progress": True, "_tool_hint": True},
))
# Final answer flushes the remaining buffer before sending itself.
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="done",
metadata={},
))
assert channel._send_text.await_count == 4
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)"
assert channel._send_text.await_args_list[1].args[1] == "Thinking..."
assert channel._send_text.await_args_list[2].args[1] == "exec(cmd)"
assert channel._send_text.await_args_list[3].args[1] == "done"
@pytest.mark.asyncio
async def test_send_tool_hints_disabled_drops(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = False
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
@pytest.mark.asyncio
async def test_stop_clears_pending_tool_hints(self):
channel, _bus = _make_channel()
channel._client = AsyncMock()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
await channel.stop()
assert not channel._pending_tool_hints
channel._send_text.assert_not_awaited()