feat(weixin): buffer and coalesce tool hints inside WeixinChannel

WeChat iLink has a strict ~7 msgs / 5 min rate limit. A busy agent turn
can trigger 8+ tool-call hints, each sent as a separate message, quickly
burning the quota and causing silent message drops.

Implement buffering entirely inside WeixinChannel (no global changes):

- Tool hints are appended to a per-chat_id buffer instead of being
  sent immediately.
- A non-tool-hint message arriving for the same chat flushes pending
  hints first (joined with newlines, sent as a single message).
- stop() clears any remaining buffered hints.
- send_tool_hints=False still drops hints as before.

- Add 6 tests covering: single hint, multiple hints coalesced,
  different chats isolated, non-tool-hint flush, disabled dropping,
  and stop clearing buffers.
This commit is contained in:
chengyongru 2026-05-08 11:35:23 +08:00
parent 2e56fb95b6
commit 1672f20d6e
2 changed files with 368 additions and 160 deletions

View File

@ -87,22 +87,11 @@ RATE_LIMIT_ERRCODE = -2
RATE_LIMIT_BACKOFF_S = 60
def _is_stale_session_ret(
ret: int | None,
errcode: int | None,
errmsg: str | None,
) -> bool:
"""True when iLink returns ret=-2 / errcode=-2 with 'unknown error',
which historically correlates with a stale context_token.
Note: per wxclawbot-cli docs, ret=-2 is primarily a *rate limit*
(~7 msgs / 5 min per bot). We only treat the 'unknown error' variant
as stale-session because empty/missing errmsg is far more commonly
the rate-limit signal in practice.
"""
if ret != RATE_LIMIT_ERRCODE and errcode != RATE_LIMIT_ERRCODE:
return False
return (errmsg or "").strip().lower() == "unknown error"
def _is_api_error(data: dict) -> bool:
"""True when iLink response signals failure via ``ret`` or ``errcode``."""
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
return (ret is not None and ret != 0) or (errcode is not None and errcode != 0)
# Retry constants (matching the reference plugin's monitor.ts)
@ -185,6 +174,7 @@ class WeixinChannel(BaseChannel):
self._session_pause_until: float = 0.0
self._typing_tasks: dict[str, asyncio.Task] = {}
self._typing_tickets: dict[str, dict[str, Any]] = {}
self._pending_tool_hints: dict[str, list[str]] = {}
# ------------------------------------------------------------------
# State persistence
@ -521,6 +511,7 @@ class WeixinChannel(BaseChannel):
async def stop(self) -> None:
self._running = False
self._pending_tool_hints.clear()
if self._poll_task and not self._poll_task.done():
self._poll_task.cancel()
for chat_id in list(self._typing_tasks):
@ -551,18 +542,12 @@ class WeixinChannel(BaseChannel):
f"WeChat session paused, {remaining_min} min remaining (errcode {ERRCODE_SESSION_EXPIRED})"
)
def _check_response_error(self, data: dict, operation: str, *, body: dict | None = None) -> None:
"""Check both ``ret`` and ``errcode`` like the reference TS code.
The iLink API may signal failure through either field (or both).
``_poll_once`` already checks both; outbound send helpers must do
the same to avoid silent drops.
"""
def _check_response_error(self, data: dict, operation: str) -> None:
"""Raise if *data* contains an iLink API error."""
if not _is_api_error(data):
return
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0)
if not is_error:
return
raise RuntimeError(
f"WeChat {operation} error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)
@ -587,9 +572,8 @@ class WeixinChannel(BaseChannel):
# Check for API-level errors (monitor.ts checks both ret and errcode)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0)
if is_error:
if _is_api_error(data):
if errcode == ERRCODE_SESSION_EXPIRED or ret == ERRCODE_SESSION_EXPIRED:
self._pause_session()
remaining = self._session_pause_remaining_s()
@ -988,6 +972,39 @@ class WeixinChannel(BaseChannel):
self._assert_session_active()
is_progress = bool((msg.metadata or {}).get("_progress", False))
# Buffer tool hints to coalesce consecutive ones and avoid burning
# WeChat iLink rate-limit quota (~7 msgs / 5 min).
if is_progress and (msg.metadata or {}).get("_tool_hint"):
if not self.send_tool_hints:
return
self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content)
self.logger.debug(
"Buffered tool hint for {} (count={})",
msg.chat_id,
len(self._pending_tool_hints[msg.chat_id]),
)
return
# Any non-tool-hint message (thought, final answer, /stop response, …)
# flushes the buffer so hints do not get stuck when the final answer
# is suppressed, streamed, or otherwise skips this path.
if not ((msg.metadata or {}).get("_tool_hint") and is_progress):
hints = self._pending_tool_hints.pop(msg.chat_id, None)
if hints:
ctx_token = self._context_tokens.get(msg.chat_id, "")
if ctx_token:
self.logger.info(
"Flushing {} buffered tool hint(s) for {}",
len(hints), msg.chat_id,
)
await self._send_text(msg.chat_id, "\n".join(hints), ctx_token)
else:
self.logger.warning(
"Dropped {} buffered tool hint(s) for {}: no context_token",
len(hints), msg.chat_id,
)
if not is_progress:
await self._stop_typing(msg.chat_id, clear_remote=True)
@ -1081,6 +1098,31 @@ class WeixinChannel(BaseChannel):
with suppress(Exception):
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL)
async def send_delta(
self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Weixin iLink does not support native streaming deltas.
We only hook ``_stream_end`` so buffered tool hints are flushed even
when the final answer carries the ``_streamed`` flag and bypasses
:meth:`send`.
"""
if metadata and metadata.get("_stream_end"):
hints = self._pending_tool_hints.pop(chat_id, None)
if hints:
ctx_token = self._context_tokens.get(chat_id, "")
if ctx_token:
self.logger.info(
"Flushing {} buffered tool hint(s) for {} (stream_end)",
len(hints), chat_id,
)
await self._send_text(chat_id, "\n".join(hints), ctx_token)
else:
self.logger.warning(
"Dropped {} buffered tool hint(s) for {}: no context_token",
len(hints), chat_id,
)
async def _start_typing(self, chat_id: str, context_token: str = "") -> None:
"""Start typing indicator immediately when a message is received."""
if not self._client or not self._token or not chat_id:
@ -1141,6 +1183,59 @@ class WeixinChannel(BaseChannel):
"""
return f"nanobot:{int(time.time() * 1000)}-{os.urandom(4).hex()}"
async def _send_message_with_retry(
self,
body: dict[str, Any],
context_token: str,
to_user_id: str,
client_id: str,
operation: str,
) -> None:
"""Post ``ilink/bot/sendmessage`` with stale-session and rate-limit handling."""
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
# Stale session (errmsg == "unknown error") — retry once without token.
if (
(ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE)
and (errmsg or "").strip().lower() == "unknown error"
and context_token
):
self.logger.warning(
"WeChat {} stale-session signal for {} (client_id={}); "
"retrying without context_token",
operation, to_user_id, client_id,
)
body_no_ctx = copy.deepcopy(body)
body_no_ctx["msg"].pop("context_token", None)
data = await self._api_post("ilink/bot/sendmessage", body_no_ctx)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
if ret == 0 and (errcode == 0 or errcode is None):
self.logger.warning(
"WeChat {} succeeded WITHOUT context_token for {}; "
"clearing expired token from cache",
operation, to_user_id,
)
self._context_tokens.pop(to_user_id, None)
self._save_state()
return
# Rate limit (-2) — wait and retry once.
if ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE:
self.logger.warning(
"WeChat {} rate limited for {} (client_id={}); "
"waiting {}s before retry",
operation, to_user_id, client_id, RATE_LIMIT_BACKOFF_S,
)
await asyncio.sleep(RATE_LIMIT_BACKOFF_S)
data = await self._api_post("ilink/bot/sendmessage", body)
self._check_response_error(data, operation)
async def _send_text(
self,
to_user_id: str,
@ -1171,60 +1266,7 @@ class WeixinChannel(BaseChannel):
"base_info": BASE_INFO,
}
async def _do_send(_body: dict[str, Any]) -> dict:
return await self._api_post("ilink/bot/sendmessage", _body)
data = await _do_send(body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
# Stale session (errmsg == "unknown error") — retry once without token.
# This is distinct from the far more common rate-limit signal.
if _is_stale_session_ret(ret, errcode, errmsg) and context_token:
self.logger.warning(
"WeChat send text returned stale-session signal for {} (client_id={}); "
"retrying without context_token",
to_user_id,
client_id,
)
body_no_ctx = copy.deepcopy(body)
body_no_ctx["msg"].pop("context_token", None)
data = await _do_send(body_no_ctx)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
if ret == 0 and (errcode == 0 or errcode is None):
self.logger.warning(
"WeChat send text succeeded WITHOUT context_token for {}; "
"clearing expired token from cache",
to_user_id,
)
self._context_tokens.pop(to_user_id, None)
self._save_state()
self.logger.debug(
"WeChat text sent to {} (client_id={})", to_user_id, client_id
)
return
# Rate limit (-2) — per wxclawbot-cli docs this is ~7 msgs / 5 min.
# Wait 60 s and retry once; do NOT strip context_token (rate limit is
# per-bot, not per-token).
if (ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE):
self.logger.warning(
"WeChat send text rate limited for {} (client_id={}); "
"waiting {}s before retry",
to_user_id,
client_id,
RATE_LIMIT_BACKOFF_S,
)
await asyncio.sleep(RATE_LIMIT_BACKOFF_S)
data = await _do_send(body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
self._check_response_error(data, "send text", body=body)
await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send text")
self.logger.debug("WeChat text sent to {} (client_id={})", to_user_id, client_id)
async def _send_media_file(
@ -1370,54 +1412,7 @@ class WeixinChannel(BaseChannel):
"base_info": BASE_INFO,
}
async def _do_send(_body: dict[str, Any]) -> dict:
return await self._api_post("ilink/bot/sendmessage", _body)
data = await _do_send(body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
# Same stale-session handling as _send_text.
if _is_stale_session_ret(ret, errcode, errmsg) and context_token:
self.logger.warning(
"WeChat send media returned stale-session signal for {} (client_id={}); "
"retrying without context_token",
to_user_id,
client_id,
)
body_no_ctx = copy.deepcopy(body)
body_no_ctx["msg"].pop("context_token", None)
data = await _do_send(body_no_ctx)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
if ret == 0 and (errcode == 0 or errcode is None):
self.logger.warning(
"WeChat send media succeeded WITHOUT context_token for {}; "
"clearing expired token from cache",
to_user_id,
)
self._context_tokens.pop(to_user_id, None)
self._save_state()
return
# Rate limit (-2) — wait and retry once (see _send_text for rationale).
if (ret == RATE_LIMIT_ERRCODE or errcode == RATE_LIMIT_ERRCODE):
self.logger.warning(
"WeChat send media rate limited for {} (client_id={}); "
"waiting {}s before retry",
to_user_id,
client_id,
RATE_LIMIT_BACKOFF_S,
)
await asyncio.sleep(RATE_LIMIT_BACKOFF_S)
data = await _do_send(body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
errmsg = data.get("errmsg", "")
self._check_response_error(data, "send media", body=body)
await self._send_message_with_retry(body, context_token, to_user_id, client_id, "send media")
# ---------------------------------------------------------------------------

View File

@ -9,6 +9,7 @@ import httpx
import pytest
import nanobot.channels.weixin as weixin_mod
from nanobot.bus.events import OutboundMessage
from nanobot.bus.queue import MessageBus
from nanobot.channels.weixin import (
ITEM_IMAGE,
@ -1448,39 +1449,22 @@ async def test_send_text_rate_limit_with_empty_errmsg_waits_and_retries(monkeypa
# ---------------------------------------------------------------------------
class TestIsStaleSessionRet:
"""Verify stale-session detection for iLink ret=-2 / errcode=-2 responses."""
class TestIsApiError:
"""Verify the shared ``_is_api_error`` predicate used by _poll_once and send helpers."""
def test_ret_minus_2_with_empty_errmsg_is_not_stale(self):
# Empty errmsg is the rate-limit signal per wxclawbot-cli docs.
assert weixin_mod._is_stale_session_ret(-2, 0, "") is False
assert weixin_mod._is_stale_session_ret(-2, 0, None) is False
def test_success_codes_are_not_error(self):
assert weixin_mod._is_api_error({"ret": 0, "errcode": 0}) is False
assert weixin_mod._is_api_error({"ret": 0}) is False
assert weixin_mod._is_api_error({"errcode": 0}) is False
assert weixin_mod._is_api_error({}) is False
def test_errcode_minus_2_with_empty_errmsg_is_not_stale(self):
assert weixin_mod._is_stale_session_ret(0, -2, "") is False
assert weixin_mod._is_stale_session_ret(0, -2, None) is False
def test_nonzero_ret_is_error(self):
assert weixin_mod._is_api_error({"ret": -2}) is True
assert weixin_mod._is_api_error({"ret": -14}) is True
def test_ret_minus_2_with_unknown_error_is_stale(self):
assert weixin_mod._is_stale_session_ret(-2, 0, "unknown error") is True
assert weixin_mod._is_stale_session_ret(-2, 0, "UNKNOWN ERROR") is True
def test_errcode_minus_2_with_unknown_error_is_stale(self):
assert weixin_mod._is_stale_session_ret(0, -2, "unknown error") is True
def test_ret_minus_2_with_frequency_limit_is_not_stale(self):
assert weixin_mod._is_stale_session_ret(-2, 0, "frequency limit") is False
assert weixin_mod._is_stale_session_ret(-2, 0, "too frequently") is False
def test_errcode_minus_2_with_frequency_limit_is_not_stale(self):
assert weixin_mod._is_stale_session_ret(0, -2, "freq limit") is False
def test_success_codes_are_not_stale(self):
assert weixin_mod._is_stale_session_ret(0, 0, "") is False
assert weixin_mod._is_stale_session_ret(0, 0, None) is False
def test_other_errors_are_not_stale(self):
assert weixin_mod._is_stale_session_ret(-14, -14, "session timeout") is False
assert weixin_mod._is_stale_session_ret(-100, 0, "internal error") is False
def test_nonzero_errcode_is_error(self):
assert weixin_mod._is_api_error({"errcode": -2}) is True
assert weixin_mod._is_api_error({"ret": 0, "errcode": -2}) is True
@pytest.mark.asyncio
@ -1502,3 +1486,232 @@ async def test_send_text_rate_limit_backoff_succeeds(monkeypatch) -> None:
await channel._send_text("wx-user", "hello", "")
assert channel._api_post.await_count == 2
class TestToolHintBuffering:
"""Tool hints are buffered inside WeixinChannel to coalesce consecutive
ones and avoid burning the iLink rate-limit quota (~7 msgs / 5 min)."""
@pytest.mark.asyncio
async def test_single_tool_hint_buffered_until_flush(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
# Buffered — not sent yet
channel._send_text.assert_not_awaited()
# Non-tool-hint message flushes the buffer first
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="done",
metadata={},
))
# First call is the coalesced hint, second is the trigger message
assert channel._send_text.await_count == 2
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)"
assert channel._send_text.await_args_list[1].args[1] == "done"
@pytest.mark.asyncio
async def test_multiple_tool_hints_coalesced(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
for hint in ["read_file(a.py)", "read_file(b.py)", "exec(cmd)"]:
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content=hint,
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="done",
metadata={},
))
assert channel._send_text.await_count == 2
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)\nread_file(b.py)\nexec(cmd)"
assert channel._send_text.await_args_list[1].args[1] == "done"
@pytest.mark.asyncio
async def test_tool_hints_different_chats_not_coalesced(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["user-a"] = "ctx-a"
channel._context_tokens["user-b"] = "ctx-b"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-a",
content="tool-a",
metadata={"_progress": True, "_tool_hint": True},
))
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-b",
content="tool-b",
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
# Flush chat-a
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-a",
content="done-a",
metadata={},
))
# Flush chat-b
await channel.send(OutboundMessage(
channel="weixin",
chat_id="user-b",
content="done-b",
metadata={},
))
# 2 calls per chat (hint + trigger message)
assert channel._send_text.await_count == 4
@pytest.mark.asyncio
async def test_non_tool_hint_flushes_pending_hints(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
channel._stop_typing = AsyncMock()
channel._get_typing_ticket = AsyncMock(return_value="")
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
# Final answer triggers flush before sending itself
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="final answer",
metadata={},
))
assert channel._send_text.await_count == 2
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)"
assert channel._send_text.await_args_list[1].args[1] == "final answer"
@pytest.mark.asyncio
async def test_intermediate_progress_flushes_hints(self):
"""Any non-tool-hint message (including thoughts) flushes the buffer
so hints never get stuck when the final answer is streamed or skipped."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
channel._stop_typing = AsyncMock()
channel._get_typing_ticket = AsyncMock(return_value="")
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
# A thought message flushes the existing buffer before sending itself.
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="Thinking...",
metadata={"_progress": True},
))
# Another tool hint starts a new buffer.
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="exec(cmd)",
metadata={"_progress": True, "_tool_hint": True},
))
# Final answer flushes the remaining buffer before sending itself.
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="done",
metadata={},
))
assert channel._send_text.await_count == 4
assert channel._send_text.await_args_list[0].args[1] == "read_file(a.py)"
assert channel._send_text.await_args_list[1].args[1] == "Thinking..."
assert channel._send_text.await_args_list[2].args[1] == "exec(cmd)"
assert channel._send_text.await_args_list[3].args[1] == "done"
@pytest.mark.asyncio
async def test_send_tool_hints_disabled_drops(self):
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = False
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
channel._send_text.assert_not_awaited()
@pytest.mark.asyncio
async def test_stop_clears_pending_tool_hints(self):
channel, _bus = _make_channel()
channel._client = AsyncMock()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-1"
channel.send_tool_hints = True
channel._send_text = AsyncMock()
await channel.send(OutboundMessage(
channel="weixin",
chat_id="wx-user",
content="read_file(a.py)",
metadata={"_progress": True, "_tool_hint": True},
))
await channel.stop()
assert not channel._pending_tool_hints
channel._send_text.assert_not_awaited()