fix(weixin): prevent silent message drops from poll exceptions and expired tokens

- Remove suppress(Exception) from poll loop and message processing; add
  logger.exception so inbound errors are visible.
- Check both ret and errcode on send to avoid silent drops when iLink
  returns ret != 0 with errcode == 0.
- Proactively refresh context_token via getconfig before sending if the
  cached token is older than 60s. This prevents message loss on long
  agent turns and cron pushes without relying on complex retry logic.

Refs: openclaw/openclaw#61174, NousResearch/hermes-agent#21011
This commit is contained in:
chengyongru 2026-05-19 16:00:37 +08:00 committed by Xubin Ren
parent de0a8f5e41
commit e2b51fa5dc
2 changed files with 689 additions and 6 deletions

View File

@ -79,6 +79,12 @@ BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION}
ERRCODE_SESSION_EXPIRED = -14
SESSION_PAUSE_DURATION_S = 60 * 60
# iLink context_token is observed to expire server-side after ~90-160s of
# agent inactivity (openclaw/openclaw#61174). Proactively refresh before
# sending if the cached token is older than this threshold.
CONTEXT_TOKEN_MAX_AGE_S = 60
# Retry constants (matching the reference plugin's monitor.ts)
MAX_CONSECUTIVE_FAILURES = 3
BACKOFF_DELAY_S = 30
@ -159,6 +165,8 @@ class WeixinChannel(BaseChannel):
self._session_pause_until: float = 0.0
self._typing_tasks: dict[str, asyncio.Task] = {}
self._typing_tickets: dict[str, dict[str, Any]] = {}
self._context_token_at: dict[str, float] = {}
self._pending_tool_hints: dict[str, list[str]] = {}
# ------------------------------------------------------------------
# State persistence
@ -486,6 +494,7 @@ class WeixinChannel(BaseChannel):
except Exception:
if not self._running:
break
self.logger.exception("WeChat poll loop error")
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
consecutive_failures = 0
@ -495,6 +504,7 @@ class WeixinChannel(BaseChannel):
async def stop(self) -> None:
self._running = False
self._pending_tool_hints.clear()
if self._poll_task and not self._poll_task.done():
self._poll_task.cancel()
for chat_id in list(self._typing_tasks):
@ -545,6 +555,7 @@ class WeixinChannel(BaseChannel):
# Check for API-level errors (monitor.ts checks both ret and errcode)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0)
if is_error:
@ -575,8 +586,10 @@ class WeixinChannel(BaseChannel):
# Process messages (WeixinMessage[] from types.ts)
msgs: list[dict] = data.get("msgs", []) or []
for msg in msgs:
with suppress(Exception):
try:
await self._process_message(msg)
except Exception:
self.logger.exception("Failed to process WeChat message")
# ------------------------------------------------------------------
# Inbound message processing (matches inbound.ts + process-message.ts)
@ -610,6 +623,7 @@ class WeixinChannel(BaseChannel):
ctx_token = msg.get("context_token", "")
if ctx_token:
self._context_tokens[from_user_id] = ctx_token
self._context_token_at[from_user_id] = time.time()
self._save_state()
# Parse item_list (WeixinMessage.item_list — types.ts:161)
@ -915,6 +929,99 @@ class WeixinChannel(BaseChannel):
}
return ""
async def _refresh_context_token_if_stale(
self, chat_id: str, context_token: str
) -> str:
"""Return a fresh context_token if the cached one is too old.
iLink context_token expires server-side after a short idle period
(empirically ~90s). Proactively refreshing before sending prevents
silent message loss on long agent turns or cron pushes.
"""
if not context_token:
return context_token
now = time.time()
cached_at = self._context_token_at.get(chat_id, 0)
age = now - cached_at
if age < CONTEXT_TOKEN_MAX_AGE_S:
return context_token
self.logger.debug(
"WeChat context_token for {} is {:.0f}s old; refreshing via getconfig",
chat_id,
age,
)
body: dict[str, Any] = {
"ilink_user_id": chat_id,
"context_token": context_token,
"base_info": BASE_INFO,
}
try:
data = await self._api_post("ilink/bot/getconfig", body)
except Exception as e:
self.logger.warning("WeChat getconfig failed for {}: {}", chat_id, e)
return context_token
if data.get("ret", 0) != 0:
self.logger.warning(
"WeChat getconfig returned ret={} for {}: {}",
data.get("ret"),
chat_id,
data.get("errmsg", ""),
)
return context_token
new_token = str(data.get("context_token", "") or "")
if new_token and new_token != context_token:
self.logger.info(
"WeChat context_token refreshed for {} (age {:.0f}s -> fresh)",
chat_id,
age,
)
self._context_tokens[chat_id] = new_token
self._context_token_at[chat_id] = now
self._save_state()
return new_token
return context_token
async def _flush_tool_hints(self, chat_id: str) -> None:
"""Send any buffered tool hints for *chat_id* as a single message.
Tool hints are coalesced to reduce message count and avoid hitting the
WeChat iLink rate limit (~7 msgs / 5 min). Failures are logged but
not raised so that the main message send is never blocked.
"""
hints = self._pending_tool_hints.pop(chat_id, None)
if not hints:
return
self.logger.info(
"Flushing {} buffered tool hint(s) for {}",
len(hints),
chat_id,
)
ctx_token = self._context_tokens.get(chat_id, "")
ctx_token = await self._refresh_context_token_if_stale(chat_id, ctx_token)
if not ctx_token:
self.logger.warning(
"Dropped {} buffered tool hint(s) for {}: no context_token",
len(hints),
chat_id,
)
return
try:
await self._send_text(chat_id, "\n\n".join(hints), ctx_token)
except Exception:
self.logger.exception(
"Failed to flush buffered tool hints for {}", chat_id
)
async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None:
"""Best-effort sendtyping wrapper."""
if not typing_ticket:
@ -944,11 +1051,47 @@ class WeixinChannel(BaseChannel):
self._assert_session_active()
is_progress = bool((msg.metadata or {}).get("_progress", False))
# Buffer tool hints to coalesce consecutive ones and avoid burning
# WeChat iLink rate-limit quota (~7 msgs / 5 min).
if is_progress and (msg.metadata or {}).get("_tool_hint"):
if not self.send_tool_hints:
return
self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content)
self.logger.debug(
"Buffered tool hint for {} (count={})",
msg.chat_id,
len(self._pending_tool_hints[msg.chat_id]),
)
return
# Reasoning deltas are invisible in WeChat (there is no reasoning
# UI). Skip them entirely — do not send and do not flush buffer.
if is_progress and (msg.metadata or {}).get("_reasoning_delta"):
self.logger.debug(
"Dropped invisible reasoning delta for {}", msg.chat_id
)
return
content = msg.content.strip()
# Empty progress messages (e.g. after_iteration tool_events) must
# NOT act as separators — they have no visible content.
if is_progress and not content and not (msg.media or []):
self.logger.debug(
"Skipped empty progress message for {} (no visible content)",
msg.chat_id,
)
return
# Flush buffered hints before sending any visible message.
await self._flush_tool_hints(msg.chat_id)
if not is_progress:
await self._stop_typing(msg.chat_id, clear_remote=True)
content = msg.content.strip()
ctx_token = self._context_tokens.get(msg.chat_id, "")
ctx_token = await self._refresh_context_token_if_stale(msg.chat_id, ctx_token)
if not ctx_token:
raise RuntimeError(
f"WeChat context_token missing for chat_id={msg.chat_id}, cannot send"
@ -1037,6 +1180,18 @@ class WeixinChannel(BaseChannel):
with suppress(Exception):
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL)
async def send_delta(
self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Weixin iLink does not support native streaming deltas.
We only hook ``_stream_end`` so buffered tool hints are flushed even
when the final answer carries the ``_streamed`` flag and bypasses
:meth:`send`.
"""
if metadata and metadata.get("_stream_end"):
await self._flush_tool_hints(chat_id)
async def _start_typing(self, chat_id: str, context_token: str = "") -> None:
"""Start typing indicator immediately when a message is received."""
if not self._client or not self._token or not chat_id:
@ -1120,10 +1275,11 @@ class WeixinChannel(BaseChannel):
}
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
if errcode and errcode != 0:
if (ret is not None and ret != 0) or (errcode is not None and errcode != 0):
raise RuntimeError(
f"WeChat send text error (code {errcode}): {data.get('errmsg', '')}"
f"WeChat send text error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)
async def _send_media_file(
@ -1270,10 +1426,11 @@ class WeixinChannel(BaseChannel):
}
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
if errcode and errcode != 0:
if (ret is not None and ret != 0) or (errcode is not None and errcode != 0):
raise RuntimeError(
f"WeChat send media error (code {errcode}): {data.get('errmsg', '')}"
f"WeChat send media error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)

View File

@ -1,6 +1,7 @@
import asyncio
import json
import tempfile
import time
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock
@ -374,6 +375,7 @@ async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-typing"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(
side_effect=[
@ -402,6 +404,7 @@ async def test_send_still_sends_text_when_typing_ticket_missing() -> None:
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-no-ticket"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"})
@ -1254,3 +1257,526 @@ async def test_send_text_succeeds_on_zero_errcode() -> None:
await channel._send_text("wx-user", "hello", "ctx-ok")
channel._api_post.assert_awaited_once()
@pytest.mark.asyncio
async def test_send_text_raises_on_nonzero_ret_even_when_errcode_zero() -> None:
"""_send_text must raise when the API returns ret != 0, even if errcode is 0.
The iLink API signals failure through either field. Checking only errcode
caused silent message drops (responses generated but never delivered).
"""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._api_post = AsyncMock(
return_value={"ret": -100, "errcode": 0, "errmsg": "internal error"}
)
with pytest.raises(RuntimeError, match="WeChat send text error.*ret=-100.*errcode=0"):
await channel._send_text("wx-user", "hello", "ctx-ok")
channel._api_post.assert_awaited_once()
# ---------------------------------------------------------------------------
# Tests for _poll_once not silently dropping messages on processing errors
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_poll_once_logs_exception_on_process_message_failure(monkeypatch) -> None:
"""When _process_message raises, _poll_once must log the error and continue
processing remaining messages instead of silently swallowing the exception."""
channel, _bus = _make_channel()
channel._client = SimpleNamespace(timeout=None)
channel._token = "token"
channel._get_updates_buf = "old-buf"
calls = []
logged_messages: list[str] = []
async def _failing_process(msg: dict) -> None:
calls.append(msg.get("message_id"))
if msg.get("message_id") == "msg-1":
raise RuntimeError("processing failed")
channel._process_message = _failing_process # type: ignore[method-assign]
monkeypatch.setattr(
channel.logger,
"exception",
lambda message, *args, **kwargs: logged_messages.append(str(message)),
)
channel._api_post = AsyncMock( # type: ignore[method-assign]
return_value={
"ret": 0,
"errcode": 0,
"get_updates_buf": "new-buf",
"msgs": [
{"message_id": "msg-1", "message_type": 1},
{"message_id": "msg-2", "message_type": 1},
],
}
)
await channel._poll_once()
# Both messages should have been attempted
assert calls == ["msg-1", "msg-2"]
# Buffer should still advance (already updated before processing)
assert channel._get_updates_buf == "new-buf"
# Error should be logged
assert any("Failed to process WeChat message" in m for m in logged_messages)
@pytest.mark.asyncio
async def test_poll_loop_logs_exception_and_continues_on_poll_failure(monkeypatch) -> None:
"""When _poll_once raises a non-timeout exception, the start() loop must log
the error and continue polling instead of exiting silently."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.config.token = "token" # skip QR login in start()
channel._running = True
call_count = 0
logged_messages: list[str] = []
async def _failing_poll() -> None:
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("poll exploded")
channel._running = False # Stop after second call
channel._poll_once = _failing_poll # type: ignore[method-assign]
monkeypatch.setattr(
channel.logger,
"exception",
lambda message, *args, **kwargs: logged_messages.append(str(message)),
)
# Use a tiny retry delay so the test finishes quickly
original_retry = weixin_mod.RETRY_DELAY_S
weixin_mod.RETRY_DELAY_S = 0.01
try:
await channel.start()
finally:
weixin_mod.RETRY_DELAY_S = original_retry
assert call_count == 2
assert any("WeChat poll loop error" in m for m in logged_messages)
# ---------------------------------------------------------------------------
# Tool-hint buffering
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_buffer_single_tool_hint_not_sent_immediately() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Using tool",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
channel._send_text.assert_not_awaited()
assert channel._pending_tool_hints["wx-user"] == ["Using tool"]
@pytest.mark.asyncio
async def test_buffer_multiple_tool_hints_flushed_on_final_answer() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
for hint in ["tool1", "tool2"]:
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": hint,
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._send_text.await_count == 2
channel._send_text.assert_any_await("wx-user", "tool1\n\ntool2", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_thought_progress_flushes_tool_hints() -> None:
"""Thoughts are visible progress messages and must act as separators,
flushing buffered tool hints before they are sent."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
# Buffer a tool hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "search 'foo'",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
# Send a thought — progress but not a tool_hint.
# It must act as a separator and flush the buffered hint.
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Let me think...",
"media": [],
"metadata": {"_progress": True},
},
)()
)
# The buffered hint was flushed before the thought was sent.
channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Let me think...", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
# Final answer arrives with nothing left to flush.
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._send_text.await_count == 3
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
@pytest.mark.asyncio
async def test_reasoning_delta_does_not_flush_tool_hints() -> None:
"""Reasoning deltas are invisible in WeChat and must NOT flush buffered
tool hints otherwise hints separated only by hidden reasoning would
fail to coalesce."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
# Buffer a tool hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "search 'foo'",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
# Send a reasoning delta — invisible in WeChat, must NOT flush
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Thinking step 1...",
"media": [],
"metadata": {"_progress": True, "_reasoning_delta": True},
},
)()
)
# Reasoning is invisible; hint stays buffered, _send_text not called
channel._send_text.assert_not_awaited()
assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"]
# Final answer flushes the buffered hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_empty_progress_message_does_not_flush_tool_hints() -> None:
"""Empty progress messages (e.g. after_iteration tool_events) have no
visible content and must NOT act as separators."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
# Buffer a tool hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "search 'foo'",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
# Send an empty progress message (no content, no media)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "",
"media": [],
"metadata": {"_progress": True, "_tool_events": [{"phase": "end"}]},
},
)()
)
# Nothing should have been sent yet
channel._send_text.assert_not_awaited()
assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"]
# Final answer flushes the buffered hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_buffer_flush_refreshes_context_token() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-old"
channel._context_token_at["wx-user"] = time.time()
channel._refresh_context_token_if_stale = AsyncMock(return_value="ctx-refreshed")
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._refresh_context_token_if_stale.await_count == 2
channel._refresh_context_token_if_stale.assert_any_await("wx-user", "ctx-old")
channel._send_text.assert_any_await("wx-user", "hint", "ctx-refreshed")
@pytest.mark.asyncio
async def test_buffer_flush_failure_does_not_block_final_answer() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock(side_effect=[RuntimeError("boom"), None])
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._send_text.await_count == 2
channel._send_text.assert_any_await("wx-user", "hint", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
@pytest.mark.asyncio
async def test_buffer_flushed_on_stream_end() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send_delta("wx-user", "", {"_stream_end": True})
channel._send_text.assert_awaited_once_with("wx-user", "hint", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_stop_clears_buffer() -> None:
channel, _bus = _make_channel()
channel._pending_tool_hints["wx-user"] = ["hint1", "hint2"]
await channel.stop()
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_send_tool_hints_false_drops_tool_hints() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = False
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
channel._send_text.assert_not_awaited()
assert "wx-user" not in channel._pending_tool_hints