From e2b51fa5dca86db7d0638848f4c72d390842c28a Mon Sep 17 00:00:00 2001 From: chengyongru Date: Tue, 19 May 2026 16:00:37 +0800 Subject: [PATCH 1/4] fix(weixin): prevent silent message drops from poll exceptions and expired tokens - Remove suppress(Exception) from poll loop and message processing; add logger.exception so inbound errors are visible. - Check both ret and errcode on send to avoid silent drops when iLink returns ret != 0 with errcode == 0. - Proactively refresh context_token via getconfig before sending if the cached token is older than 60s. This prevents message loss on long agent turns and cron pushes without relying on complex retry logic. Refs: openclaw/openclaw#61174, NousResearch/hermes-agent#21011 --- nanobot/channels/weixin.py | 169 ++++++++- tests/channels/test_weixin_channel.py | 526 ++++++++++++++++++++++++++ 2 files changed, 689 insertions(+), 6 deletions(-) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 41390f8b3..a75c897f4 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -79,6 +79,12 @@ BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION} ERRCODE_SESSION_EXPIRED = -14 SESSION_PAUSE_DURATION_S = 60 * 60 +# iLink context_token is observed to expire server-side after ~90-160s of +# agent inactivity (openclaw/openclaw#61174). Proactively refresh before +# sending if the cached token is older than this threshold. +CONTEXT_TOKEN_MAX_AGE_S = 60 + + # Retry constants (matching the reference plugin's monitor.ts) MAX_CONSECUTIVE_FAILURES = 3 BACKOFF_DELAY_S = 30 @@ -159,6 +165,8 @@ class WeixinChannel(BaseChannel): self._session_pause_until: float = 0.0 self._typing_tasks: dict[str, asyncio.Task] = {} self._typing_tickets: dict[str, dict[str, Any]] = {} + self._context_token_at: dict[str, float] = {} + self._pending_tool_hints: dict[str, list[str]] = {} # ------------------------------------------------------------------ # State persistence @@ -486,6 +494,7 @@ class WeixinChannel(BaseChannel): except Exception: if not self._running: break + self.logger.exception("WeChat poll loop error") consecutive_failures += 1 if consecutive_failures >= MAX_CONSECUTIVE_FAILURES: consecutive_failures = 0 @@ -495,6 +504,7 @@ class WeixinChannel(BaseChannel): async def stop(self) -> None: self._running = False + self._pending_tool_hints.clear() if self._poll_task and not self._poll_task.done(): self._poll_task.cancel() for chat_id in list(self._typing_tasks): @@ -545,6 +555,7 @@ class WeixinChannel(BaseChannel): # Check for API-level errors (monitor.ts checks both ret and errcode) ret = data.get("ret", 0) errcode = data.get("errcode", 0) + is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0) if is_error: @@ -575,8 +586,10 @@ class WeixinChannel(BaseChannel): # Process messages (WeixinMessage[] from types.ts) msgs: list[dict] = data.get("msgs", []) or [] for msg in msgs: - with suppress(Exception): + try: await self._process_message(msg) + except Exception: + self.logger.exception("Failed to process WeChat message") # ------------------------------------------------------------------ # Inbound message processing (matches inbound.ts + process-message.ts) @@ -610,6 +623,7 @@ class WeixinChannel(BaseChannel): ctx_token = msg.get("context_token", "") if ctx_token: self._context_tokens[from_user_id] = ctx_token + self._context_token_at[from_user_id] = time.time() self._save_state() # Parse item_list (WeixinMessage.item_list — types.ts:161) @@ -915,6 +929,99 @@ class WeixinChannel(BaseChannel): } return "" + async def _refresh_context_token_if_stale( + self, chat_id: str, context_token: str + ) -> str: + """Return a fresh context_token if the cached one is too old. + + iLink context_token expires server-side after a short idle period + (empirically ~90s). Proactively refreshing before sending prevents + silent message loss on long agent turns or cron pushes. + """ + if not context_token: + return context_token + + now = time.time() + cached_at = self._context_token_at.get(chat_id, 0) + age = now - cached_at + + if age < CONTEXT_TOKEN_MAX_AGE_S: + return context_token + + self.logger.debug( + "WeChat context_token for {} is {:.0f}s old; refreshing via getconfig", + chat_id, + age, + ) + + body: dict[str, Any] = { + "ilink_user_id": chat_id, + "context_token": context_token, + "base_info": BASE_INFO, + } + try: + data = await self._api_post("ilink/bot/getconfig", body) + except Exception as e: + self.logger.warning("WeChat getconfig failed for {}: {}", chat_id, e) + return context_token + + if data.get("ret", 0) != 0: + self.logger.warning( + "WeChat getconfig returned ret={} for {}: {}", + data.get("ret"), + chat_id, + data.get("errmsg", ""), + ) + return context_token + + new_token = str(data.get("context_token", "") or "") + if new_token and new_token != context_token: + self.logger.info( + "WeChat context_token refreshed for {} (age {:.0f}s -> fresh)", + chat_id, + age, + ) + self._context_tokens[chat_id] = new_token + self._context_token_at[chat_id] = now + self._save_state() + return new_token + + return context_token + + async def _flush_tool_hints(self, chat_id: str) -> None: + """Send any buffered tool hints for *chat_id* as a single message. + + Tool hints are coalesced to reduce message count and avoid hitting the + WeChat iLink rate limit (~7 msgs / 5 min). Failures are logged but + not raised so that the main message send is never blocked. + """ + hints = self._pending_tool_hints.pop(chat_id, None) + if not hints: + return + + self.logger.info( + "Flushing {} buffered tool hint(s) for {}", + len(hints), + chat_id, + ) + + ctx_token = self._context_tokens.get(chat_id, "") + ctx_token = await self._refresh_context_token_if_stale(chat_id, ctx_token) + if not ctx_token: + self.logger.warning( + "Dropped {} buffered tool hint(s) for {}: no context_token", + len(hints), + chat_id, + ) + return + + try: + await self._send_text(chat_id, "\n\n".join(hints), ctx_token) + except Exception: + self.logger.exception( + "Failed to flush buffered tool hints for {}", chat_id + ) + async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None: """Best-effort sendtyping wrapper.""" if not typing_ticket: @@ -944,11 +1051,47 @@ class WeixinChannel(BaseChannel): self._assert_session_active() is_progress = bool((msg.metadata or {}).get("_progress", False)) + + # Buffer tool hints to coalesce consecutive ones and avoid burning + # WeChat iLink rate-limit quota (~7 msgs / 5 min). + if is_progress and (msg.metadata or {}).get("_tool_hint"): + if not self.send_tool_hints: + return + self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content) + self.logger.debug( + "Buffered tool hint for {} (count={})", + msg.chat_id, + len(self._pending_tool_hints[msg.chat_id]), + ) + return + + # Reasoning deltas are invisible in WeChat (there is no reasoning + # UI). Skip them entirely — do not send and do not flush buffer. + if is_progress and (msg.metadata or {}).get("_reasoning_delta"): + self.logger.debug( + "Dropped invisible reasoning delta for {}", msg.chat_id + ) + return + + content = msg.content.strip() + + # Empty progress messages (e.g. after_iteration tool_events) must + # NOT act as separators — they have no visible content. + if is_progress and not content and not (msg.media or []): + self.logger.debug( + "Skipped empty progress message for {} (no visible content)", + msg.chat_id, + ) + return + + # Flush buffered hints before sending any visible message. + await self._flush_tool_hints(msg.chat_id) + if not is_progress: await self._stop_typing(msg.chat_id, clear_remote=True) - content = msg.content.strip() ctx_token = self._context_tokens.get(msg.chat_id, "") + ctx_token = await self._refresh_context_token_if_stale(msg.chat_id, ctx_token) if not ctx_token: raise RuntimeError( f"WeChat context_token missing for chat_id={msg.chat_id}, cannot send" @@ -1037,6 +1180,18 @@ class WeixinChannel(BaseChannel): with suppress(Exception): await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL) + async def send_delta( + self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None + ) -> None: + """Weixin iLink does not support native streaming deltas. + + We only hook ``_stream_end`` so buffered tool hints are flushed even + when the final answer carries the ``_streamed`` flag and bypasses + :meth:`send`. + """ + if metadata and metadata.get("_stream_end"): + await self._flush_tool_hints(chat_id) + async def _start_typing(self, chat_id: str, context_token: str = "") -> None: """Start typing indicator immediately when a message is received.""" if not self._client or not self._token or not chat_id: @@ -1120,10 +1275,11 @@ class WeixinChannel(BaseChannel): } data = await self._api_post("ilink/bot/sendmessage", body) + ret = data.get("ret", 0) errcode = data.get("errcode", 0) - if errcode and errcode != 0: + if (ret is not None and ret != 0) or (errcode is not None and errcode != 0): raise RuntimeError( - f"WeChat send text error (code {errcode}): {data.get('errmsg', '')}" + f"WeChat send text error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" ) async def _send_media_file( @@ -1270,10 +1426,11 @@ class WeixinChannel(BaseChannel): } data = await self._api_post("ilink/bot/sendmessage", body) + ret = data.get("ret", 0) errcode = data.get("errcode", 0) - if errcode and errcode != 0: + if (ret is not None and ret != 0) or (errcode is not None and errcode != 0): raise RuntimeError( - f"WeChat send media error (code {errcode}): {data.get('errmsg', '')}" + f"WeChat send media error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}" ) diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index a695ba936..3d3606e75 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -1,6 +1,7 @@ import asyncio import json import tempfile +import time from pathlib import Path from types import SimpleNamespace from unittest.mock import AsyncMock @@ -374,6 +375,7 @@ async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None channel._client = object() channel._token = "token" channel._context_tokens["wx-user"] = "ctx-typing" + channel._context_token_at["wx-user"] = time.time() channel._send_text = AsyncMock() channel._api_post = AsyncMock( side_effect=[ @@ -402,6 +404,7 @@ async def test_send_still_sends_text_when_typing_ticket_missing() -> None: channel._client = object() channel._token = "token" channel._context_tokens["wx-user"] = "ctx-no-ticket" + channel._context_token_at["wx-user"] = time.time() channel._send_text = AsyncMock() channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"}) @@ -1254,3 +1257,526 @@ async def test_send_text_succeeds_on_zero_errcode() -> None: await channel._send_text("wx-user", "hello", "ctx-ok") channel._api_post.assert_awaited_once() + + +@pytest.mark.asyncio +async def test_send_text_raises_on_nonzero_ret_even_when_errcode_zero() -> None: + """_send_text must raise when the API returns ret != 0, even if errcode is 0. + + The iLink API signals failure through either field. Checking only errcode + caused silent message drops (responses generated but never delivered). + """ + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._api_post = AsyncMock( + return_value={"ret": -100, "errcode": 0, "errmsg": "internal error"} + ) + + with pytest.raises(RuntimeError, match="WeChat send text error.*ret=-100.*errcode=0"): + await channel._send_text("wx-user", "hello", "ctx-ok") + + channel._api_post.assert_awaited_once() + + +# --------------------------------------------------------------------------- +# Tests for _poll_once not silently dropping messages on processing errors +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_poll_once_logs_exception_on_process_message_failure(monkeypatch) -> None: + """When _process_message raises, _poll_once must log the error and continue + processing remaining messages instead of silently swallowing the exception.""" + channel, _bus = _make_channel() + channel._client = SimpleNamespace(timeout=None) + channel._token = "token" + channel._get_updates_buf = "old-buf" + + calls = [] + logged_messages: list[str] = [] + + async def _failing_process(msg: dict) -> None: + calls.append(msg.get("message_id")) + if msg.get("message_id") == "msg-1": + raise RuntimeError("processing failed") + + channel._process_message = _failing_process # type: ignore[method-assign] + + monkeypatch.setattr( + channel.logger, + "exception", + lambda message, *args, **kwargs: logged_messages.append(str(message)), + ) + + channel._api_post = AsyncMock( # type: ignore[method-assign] + return_value={ + "ret": 0, + "errcode": 0, + "get_updates_buf": "new-buf", + "msgs": [ + {"message_id": "msg-1", "message_type": 1}, + {"message_id": "msg-2", "message_type": 1}, + ], + } + ) + + await channel._poll_once() + + # Both messages should have been attempted + assert calls == ["msg-1", "msg-2"] + # Buffer should still advance (already updated before processing) + assert channel._get_updates_buf == "new-buf" + # Error should be logged + assert any("Failed to process WeChat message" in m for m in logged_messages) + + +@pytest.mark.asyncio +async def test_poll_loop_logs_exception_and_continues_on_poll_failure(monkeypatch) -> None: + """When _poll_once raises a non-timeout exception, the start() loop must log + the error and continue polling instead of exiting silently.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.config.token = "token" # skip QR login in start() + channel._running = True + + call_count = 0 + logged_messages: list[str] = [] + + async def _failing_poll() -> None: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise RuntimeError("poll exploded") + channel._running = False # Stop after second call + + channel._poll_once = _failing_poll # type: ignore[method-assign] + + monkeypatch.setattr( + channel.logger, + "exception", + lambda message, *args, **kwargs: logged_messages.append(str(message)), + ) + + # Use a tiny retry delay so the test finishes quickly + original_retry = weixin_mod.RETRY_DELAY_S + weixin_mod.RETRY_DELAY_S = 0.01 + try: + await channel.start() + finally: + weixin_mod.RETRY_DELAY_S = original_retry + + assert call_count == 2 + assert any("WeChat poll loop error" in m for m in logged_messages) + + +# --------------------------------------------------------------------------- +# Tool-hint buffering +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_buffer_single_tool_hint_not_sent_immediately() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Using tool", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + channel._send_text.assert_not_awaited() + assert channel._pending_tool_hints["wx-user"] == ["Using tool"] + + +@pytest.mark.asyncio +async def test_buffer_multiple_tool_hints_flushed_on_final_answer() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + for hint in ["tool1", "tool2"]: + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": hint, + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._send_text.await_count == 2 + channel._send_text.assert_any_await("wx-user", "tool1\n\ntool2", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_thought_progress_flushes_tool_hints() -> None: + """Thoughts are visible progress messages and must act as separators, + flushing buffered tool hints before they are sent.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + # Buffer a tool hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "search 'foo'", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + # Send a thought — progress but not a tool_hint. + # It must act as a separator and flush the buffered hint. + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Let me think...", + "media": [], + "metadata": {"_progress": True}, + }, + )() + ) + + # The buffered hint was flushed before the thought was sent. + channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Let me think...", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + # Final answer arrives with nothing left to flush. + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._send_text.await_count == 3 + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + + +@pytest.mark.asyncio +async def test_reasoning_delta_does_not_flush_tool_hints() -> None: + """Reasoning deltas are invisible in WeChat and must NOT flush buffered + tool hints — otherwise hints separated only by hidden reasoning would + fail to coalesce.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + # Buffer a tool hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "search 'foo'", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + # Send a reasoning delta — invisible in WeChat, must NOT flush + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Thinking step 1...", + "media": [], + "metadata": {"_progress": True, "_reasoning_delta": True}, + }, + )() + ) + + # Reasoning is invisible; hint stays buffered, _send_text not called + channel._send_text.assert_not_awaited() + assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"] + + # Final answer flushes the buffered hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_empty_progress_message_does_not_flush_tool_hints() -> None: + """Empty progress messages (e.g. after_iteration tool_events) have no + visible content and must NOT act as separators.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + # Buffer a tool hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "search 'foo'", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + # Send an empty progress message (no content, no media) + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "", + "media": [], + "metadata": {"_progress": True, "_tool_events": [{"phase": "end"}]}, + }, + )() + ) + + # Nothing should have been sent yet + channel._send_text.assert_not_awaited() + assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"] + + # Final answer flushes the buffered hint + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_buffer_flush_refreshes_context_token() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-old" + channel._context_token_at["wx-user"] = time.time() + channel._refresh_context_token_if_stale = AsyncMock(return_value="ctx-refreshed") + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._refresh_context_token_if_stale.await_count == 2 + channel._refresh_context_token_if_stale.assert_any_await("wx-user", "ctx-old") + channel._send_text.assert_any_await("wx-user", "hint", "ctx-refreshed") + + +@pytest.mark.asyncio +async def test_buffer_flush_failure_does_not_block_final_answer() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock(side_effect=[RuntimeError("boom"), None]) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "Done", + "media": [], + "metadata": {}, + }, + )() + ) + + assert channel._send_text.await_count == 2 + channel._send_text.assert_any_await("wx-user", "hint", "ctx-1") + channel._send_text.assert_any_await("wx-user", "Done", "ctx-1") + + +@pytest.mark.asyncio +async def test_buffer_flushed_on_stream_end() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = True + channel._context_tokens["wx-user"] = "ctx-1" + channel._context_token_at["wx-user"] = time.time() + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + await channel.send_delta("wx-user", "", {"_stream_end": True}) + + channel._send_text.assert_awaited_once_with("wx-user", "hint", "ctx-1") + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_stop_clears_buffer() -> None: + channel, _bus = _make_channel() + channel._pending_tool_hints["wx-user"] = ["hint1", "hint2"] + await channel.stop() + assert "wx-user" not in channel._pending_tool_hints + + +@pytest.mark.asyncio +async def test_send_tool_hints_false_drops_tool_hints() -> None: + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel.send_tool_hints = False + channel._send_text = AsyncMock() + + await channel.send( + type( + "Msg", + (), + { + "chat_id": "wx-user", + "content": "hint", + "media": [], + "metadata": {"_progress": True, "_tool_hint": True}, + }, + )() + ) + + channel._send_text.assert_not_awaited() + assert "wx-user" not in channel._pending_tool_hints From 0cd2f626c054572f9eb0787daa9c817c470b3249 Mon Sep 17 00:00:00 2001 From: olgagaga Date: Sat, 16 May 2026 12:19:30 -0400 Subject: [PATCH 2/4] fix(providers): inject OpenRouter `reasoning.effort` for thinking models MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Follow-up to #3851: that PR added `extra_body.thinking={type: disabled}` for MiMo via OpenRouter, but OR doesn't forward provider-specific thinking shapes to upstream — it strips unknown extra_body fields and uses its own unified `reasoning` parameter. So MiMo via OR kept thinking despite the injection (reproduced by @ClearPlume on #3851 with identical kwargs but provider switched from openrouter → xiaomi_mimo). For known thinking-capable models (Kimi, MiMo) routed via the openrouter spec, also inject `extra_body.reasoning = {effort: }` in OR's documented enum ("none"|"minimal"|"low"|"medium"|"high"|"xhigh"). OR translates this to the upstream model's native shape. Existing tests updated to expect both fields on the OR path. The direct xiaomi_mimo and moonshot paths are unchanged (the new branch is gated on spec.name == "openrouter"). Flash and non-MiMo models on OR continue to receive no injection. --- nanobot/providers/openai_compat_provider.py | 21 ++++++++++ tests/providers/test_litellm_kwargs.py | 19 +++++++-- tests/providers/test_xiaomi_mimo_thinking.py | 44 ++++++++++++++++---- 3 files changed, 72 insertions(+), 12 deletions(-) diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index 03ab35a0e..222159dda 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -615,6 +615,27 @@ class OpenAICompatProvider(LLMProvider): {"thinking": {"type": "enabled" if thinking_enabled else "disabled"}} ) + # OpenRouter uses its own unified `reasoning` field and does not + # forward provider-specific thinking shapes (the Kimi/MiMo + # extra_body.thinking above) to upstream. Reported as the follow-up + # to #3845/#3851: MiMo via OR kept thinking despite our injection. + # For known thinking-capable models routed via OR, mirror the + # effort signal into reasoning.effort (OR's documented enum: + # "none"|"minimal"|"low"|"medium"|"high"|"xhigh"), which OR + # translates to the upstream model's native shape. + if ( + spec + and spec.name == "openrouter" + and reasoning_effort is not None + and ( + _is_kimi_thinking_model(model_name) + or _is_mimo_thinking_model(model_name) + ) + ): + kwargs.setdefault("extra_body", {}).update( + {"reasoning": {"effort": semantic_effort}} + ) + if tools: kwargs["tools"] = tools kwargs["tool_choice"] = tool_choice or "auto" diff --git a/tests/providers/test_litellm_kwargs.py b/tests/providers/test_litellm_kwargs.py index 5f2ffec59..461913c93 100644 --- a/tests/providers/test_litellm_kwargs.py +++ b/tests/providers/test_litellm_kwargs.py @@ -1391,9 +1391,16 @@ def test_kimi_k25_no_extra_body_when_reasoning_effort_none() -> None: def test_kimi_k25_thinking_enabled_with_openrouter_prefix() -> None: - """OpenRouter-style model names like moonshotai/kimi-k2.5 must trigger thinking.""" + """OpenRouter-style model names like moonshotai/kimi-k2.5 must trigger thinking. + + OR drops upstream-provider `thinking` fields, so the same intent also has + to go through OR's `reasoning.effort` shape (#3851 follow-up). + """ kw = _build_kwargs_for("openrouter", "moonshotai/kimi-k2.5", reasoning_effort="medium") - assert kw.get("extra_body") == {"thinking": {"type": "enabled"}} + assert kw.get("extra_body") == { + "thinking": {"type": "enabled"}, + "reasoning": {"effort": "medium"}, + } def test_kimi_k26_thinking_enabled() -> None: @@ -1403,9 +1410,13 @@ def test_kimi_k26_thinking_enabled() -> None: def test_kimi_k26_thinking_enabled_with_openrouter_prefix() -> None: - """OpenRouter-style names like moonshotai/kimi-k2.6 must trigger thinking.""" + """OpenRouter-style names like moonshotai/kimi-k2.6 must trigger thinking + via both upstream `thinking` and OR's `reasoning.effort`.""" kw = _build_kwargs_for("openrouter", "moonshotai/kimi-k2.6", reasoning_effort="medium") - assert kw.get("extra_body") == {"thinking": {"type": "enabled"}} + assert kw.get("extra_body") == { + "thinking": {"type": "enabled"}, + "reasoning": {"effort": "medium"}, + } def test_moonshot_kimi_k26_temperature_override() -> None: diff --git a/tests/providers/test_xiaomi_mimo_thinking.py b/tests/providers/test_xiaomi_mimo_thinking.py index 68ca6dd80..43dfec537 100644 --- a/tests/providers/test_xiaomi_mimo_thinking.py +++ b/tests/providers/test_xiaomi_mimo_thinking.py @@ -142,9 +142,11 @@ def test_mimo_reasoning_effort_unset_preserves_provider_default(): def test_mimo_via_openrouter_reasoning_effort_none_disables_thinking(): - """OpenRouter routes MiMo as "xiaomi/mimo-v2.5-pro"; the openrouter spec - has no thinking_style, so the disable signal must come from the - model-name path (#3845).""" + """OpenRouter routes MiMo as "xiaomi/mimo-v2.5-pro" and does NOT forward + extra_body.thinking to upstream, so a disable signal must also reach OR + in its own `reasoning.effort` shape. Verifies both the upstream-MiMo + payload (#3845) and the OR-native payload (#3851 follow-up) are sent. + """ provider = _openrouter_provider("xiaomi/mimo-v2.5-pro") kwargs = provider._build_kwargs( messages=_simple_messages(), @@ -152,11 +154,15 @@ def test_mimo_via_openrouter_reasoning_effort_none_disables_thinking(): temperature=0.7, reasoning_effort="none", tool_choice=None, ) assert "reasoning_effort" not in kwargs - assert kwargs["extra_body"] == {"thinking": {"type": "disabled"}} + assert kwargs["extra_body"] == { + "thinking": {"type": "disabled"}, + "reasoning": {"effort": "none"}, + } def test_mimo_via_openrouter_reasoning_effort_medium_enables_thinking(): - """Same as the direct path: any non-none/minimal effort enables thinking.""" + """Non-none/minimal effort enables thinking and the OR `reasoning.effort` + field mirrors the requested effort level.""" provider = _openrouter_provider("xiaomi/mimo-v2.5-pro") kwargs = provider._build_kwargs( messages=_simple_messages(), @@ -164,7 +170,10 @@ def test_mimo_via_openrouter_reasoning_effort_medium_enables_thinking(): temperature=0.7, reasoning_effort="medium", tool_choice=None, ) assert kwargs.get("reasoning_effort") == "medium" - assert kwargs["extra_body"] == {"thinking": {"type": "enabled"}} + assert kwargs["extra_body"] == { + "thinking": {"type": "enabled"}, + "reasoning": {"effort": "medium"}, + } def test_mimo_via_openrouter_bare_slug_also_matches(): @@ -176,12 +185,16 @@ def test_mimo_via_openrouter_bare_slug_also_matches(): tools=None, model=None, max_tokens=100, temperature=0.7, reasoning_effort="none", tool_choice=None, ) - assert kwargs["extra_body"] == {"thinking": {"type": "disabled"}} + assert kwargs["extra_body"] == { + "thinking": {"type": "disabled"}, + "reasoning": {"effort": "none"}, + } def test_mimo_flash_via_openrouter_does_not_inject_thinking(): """mimo-v2-flash has no thinking mode per Xiaomi docs; the allowlist - excludes it, so no thinking field should be injected on the gateway path.""" + excludes it, so neither the upstream `thinking` field nor OR's + `reasoning.effort` should be injected on the gateway path.""" provider = _openrouter_provider("xiaomi/mimo-v2-flash") kwargs = provider._build_kwargs( messages=_simple_messages(), @@ -200,3 +213,18 @@ def test_non_mimo_model_via_openrouter_unaffected(): temperature=0.7, reasoning_effort="none", tool_choice=None, ) assert "extra_body" not in kwargs + + +def test_kimi_via_openrouter_also_injects_reasoning_effort(): + """Kimi has the same gateway problem as MiMo: OR drops the upstream + `thinking` field. The same OR-reasoning injection should fire.""" + provider = _openrouter_provider("moonshotai/kimi-k2.5") + kwargs = provider._build_kwargs( + messages=_simple_messages(), + tools=None, model=None, max_tokens=100, + temperature=0.7, reasoning_effort="none", tool_choice=None, + ) + assert kwargs["extra_body"] == { + "thinking": {"type": "disabled"}, + "reasoning": {"effort": "none"}, + } From 4f895e6307cab731e86f4e67b6a044bc957dbf9e Mon Sep 17 00:00:00 2001 From: Xubin Ren <52506698+Re-bin@users.noreply.github.com> Date: Thu, 21 May 2026 14:34:45 +0800 Subject: [PATCH 3/4] refactor(providers): centralize gateway reasoning control --- nanobot/providers/openai_compat_provider.py | 130 +++++++------------ nanobot/providers/registry.py | 6 + tests/providers/test_xiaomi_mimo_thinking.py | 9 +- 3 files changed, 59 insertions(+), 86 deletions(-) diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index 222159dda..a61439025 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -74,41 +74,43 @@ _THINKING_STYLE_MAP: dict[str, Any] = { "enable_thinking": lambda on: {"enable_thinking": on}, "reasoning_split": lambda on: {"reasoning_split": on}, } +_GATEWAY_REASONING_STYLE_MAP: dict[str, Any] = { + "reasoning_effort": lambda effort: {"reasoning": {"effort": effort}}, +} +_MODEL_THINKING_STYLES: dict[str, str] = { + **dict.fromkeys(_KIMI_THINKING_MODELS, "thinking_type"), + **dict.fromkeys(_MIMO_THINKING_MODELS, "thinking_type"), +} -def _is_kimi_thinking_model(model_name: str) -> bool: - """Return True if model_name refers to a Kimi thinking-capable model. - - Supports two forms: - - Exact match: e.g. kimi-k2.5 / kimi-k2.6 in _KIMI_THINKING_MODELS - - Slug match: moonshotai/kimi-k2.5 -> the part after the last "/" - is checked against _KIMI_THINKING_MODELS - - This covers both the native Moonshot provider (bare slug) and - OpenRouter-style names (``"publisher/slug"``). - """ - name = model_name.lower() - if name in _KIMI_THINKING_MODELS: - return True - if "/" in name and name.rsplit("/", 1)[1] in _KIMI_THINKING_MODELS: - return True - return False +def _model_slug(model_name: str) -> str: + return model_name.lower().rsplit("/", 1)[-1] -def _is_mimo_thinking_model(model_name: str) -> bool: - """Return True if model_name refers to a MiMo thinking-capable model. +def _model_thinking_style(model_name: str) -> str: + return _MODEL_THINKING_STYLES.get(_model_slug(model_name), "") - Mirrors _is_kimi_thinking_model: gateway providers (e.g. OpenRouter - routing ``xiaomi/mimo-v2.5-pro``) have no ``thinking_style`` on their - spec, so the spec-driven branch in _build_kwargs misses them. The - model-name path catches those cases. - """ - name = model_name.lower() - if name in _MIMO_THINKING_MODELS: - return True - if "/" in name and name.rsplit("/", 1)[1] in _MIMO_THINKING_MODELS: - return True - return False + +def _thinking_styles_for(spec: ProviderSpec | None, model_name: str) -> list[str]: + styles: list[str] = [] + if spec and spec.thinking_style: + styles.append(spec.thinking_style) + model_style = _model_thinking_style(model_name) + if model_style and model_style not in styles: + styles.append(model_style) + return styles + + +def _thinking_extra_body(style: str, thinking_enabled: bool) -> dict[str, Any] | None: + builder = _THINKING_STYLE_MAP.get(style) + return builder(thinking_enabled) if builder else None + + +def _gateway_reasoning_extra_body(style: str, effort: str | None) -> dict[str, Any] | None: + if not effort: + return None + builder = _GATEWAY_REASONING_STYLE_MAP.get(style) + return builder(effort) if builder else None def _openai_compat_timeout_s() -> float: @@ -581,60 +583,19 @@ class OpenAICompatProvider(LLMProvider): if wire_effort and semantic_effort != "none": kwargs["reasoning_effort"] = wire_effort - # Provider-specific thinking parameters. - # Only sent when reasoning_effort is explicitly configured so that - # the provider default is preserved otherwise. - # The mapping is driven by ProviderSpec.thinking_style so that adding - # a new provider never requires touching this function. - if spec and spec.thinking_style and reasoning_effort is not None: + # Only send thinking controls when reasoning_effort is explicit so + # omitting the config preserves each provider's default. + if reasoning_effort is not None: thinking_enabled = semantic_effort not in ("none", "minimal") - extra = _THINKING_STYLE_MAP.get(spec.thinking_style, lambda _: None)(thinking_enabled) - if extra: - kwargs.setdefault("extra_body", {}).update(extra) - - # Model-level thinking injection for Kimi thinking-capable models. - # Strip any provider prefix (e.g. "moonshotai/") before the set lookup - # so that OpenRouter-style names like "moonshotai/kimi-k2.5" are handled - # identically to bare names like "kimi-k2.5". - if reasoning_effort is not None and _is_kimi_thinking_model(model_name): - thinking_enabled = semantic_effort not in ("none", "minimal") - kwargs.setdefault("extra_body", {}).update( - {"thinking": {"type": "enabled" if thinking_enabled else "disabled"}} - ) - - # Model-level thinking injection for MiMo thinking-capable models. - # Same shape as Kimi: gateway providers (OpenRouter, etc.) lack the - # xiaomi_mimo spec's thinking_style, so the spec-driven branch above - # misses them — match by model name to catch "xiaomi/mimo-v2.5-pro" - # and friends. (Direct xiaomi_mimo requests are also covered here; - # both branches write the same payload, so the dict update is a - # safe no-op for already-handled cases.) - if reasoning_effort is not None and _is_mimo_thinking_model(model_name): - thinking_enabled = semantic_effort not in ("none", "minimal") - kwargs.setdefault("extra_body", {}).update( - {"thinking": {"type": "enabled" if thinking_enabled else "disabled"}} - ) - - # OpenRouter uses its own unified `reasoning` field and does not - # forward provider-specific thinking shapes (the Kimi/MiMo - # extra_body.thinking above) to upstream. Reported as the follow-up - # to #3845/#3851: MiMo via OR kept thinking despite our injection. - # For known thinking-capable models routed via OR, mirror the - # effort signal into reasoning.effort (OR's documented enum: - # "none"|"minimal"|"low"|"medium"|"high"|"xhigh"), which OR - # translates to the upstream model's native shape. - if ( - spec - and spec.name == "openrouter" - and reasoning_effort is not None - and ( - _is_kimi_thinking_model(model_name) - or _is_mimo_thinking_model(model_name) - ) - ): - kwargs.setdefault("extra_body", {}).update( - {"reasoning": {"effort": semantic_effort}} - ) + for thinking_style in _thinking_styles_for(spec, model_name): + extra = _thinking_extra_body(thinking_style, thinking_enabled) + if extra: + kwargs.setdefault("extra_body", {}).update(extra) + gateway_style = getattr(spec, "gateway_reasoning_style", "") if spec else "" + if gateway_style and _model_thinking_style(model_name): + extra = _gateway_reasoning_extra_body(gateway_style, semantic_effort) + if extra: + kwargs.setdefault("extra_body", {}).update(extra) if tools: kwargs["tools"] = tools @@ -649,8 +610,7 @@ class OpenAICompatProvider(LLMProvider): and semantic_effort not in ("none", "minimal") and ( (spec and spec.thinking_style) - or _is_kimi_thinking_model(model_name) - or _is_mimo_thinking_model(model_name) + or _model_thinking_style(model_name) ) ) implicit_deepseek_thinking = ( diff --git a/nanobot/providers/registry.py b/nanobot/providers/registry.py index 7c8edd271..d942c03bf 100644 --- a/nanobot/providers/registry.py +++ b/nanobot/providers/registry.py @@ -71,6 +71,11 @@ class ProviderSpec: # "reasoning_split" — {"reasoning_split": true/false} (MiniMax) thinking_style: str = "" + # Gateway-native reasoning control to pair with model-level thinking styles. + # "reasoning_effort" — {"reasoning": {"effort": }} + # (OpenRouter) + gateway_reasoning_style: str = "" + # When True, treat the "reasoning" response field as formal content # when "content" is empty. Only set this for providers (e.g. StepFun) # whose API returns the actual answer in "reasoning" instead of "content". @@ -142,6 +147,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = ( detect_by_base_keyword="openrouter", default_api_base="https://openrouter.ai/api/v1", supports_prompt_caching=True, + gateway_reasoning_style="reasoning_effort", ), # Hugging Face Inference Providers: OpenAI-compatible router for chat models. ProviderSpec( diff --git a/tests/providers/test_xiaomi_mimo_thinking.py b/tests/providers/test_xiaomi_mimo_thinking.py index 43dfec537..92161803f 100644 --- a/tests/providers/test_xiaomi_mimo_thinking.py +++ b/tests/providers/test_xiaomi_mimo_thinking.py @@ -32,7 +32,7 @@ def _mimo_spec(): def _openrouter_spec(): - """Return the registered OpenRouter ProviderSpec (no thinking_style).""" + """Return the registered OpenRouter ProviderSpec.""" specs = {s.name: s for s in PROVIDERS} return specs["openrouter"] @@ -77,6 +77,13 @@ def test_xiaomi_mimo_uses_thinking_type_style(): assert spec.default_api_base == "https://api.xiaomimimo.com/v1" +def test_openrouter_declares_gateway_reasoning_style(): + """OpenRouter uses its own reasoning.effort field for routed thinking models.""" + spec = _openrouter_spec() + assert spec.thinking_style == "" + assert spec.gateway_reasoning_style == "reasoning_effort" + + # --------------------------------------------------------------------------- # _build_kwargs wire-format # --------------------------------------------------------------------------- From e645fbcb34ee61884167c6b2b260545d6d3c8aba Mon Sep 17 00:00:00 2001 From: Haisam Abbas Date: Wed, 20 May 2026 17:16:53 +0500 Subject: [PATCH 4/4] fix shell guard url path detection --- nanobot/agent/tools/shell.py | 2 +- tests/tools/test_tool_validation.py | 36 +++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 0252b9746..7e0ef57a8 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -418,7 +418,7 @@ class ExecTool(Tool): # Windows: match drive-root paths like `C:\` as well as `C:\path\to\file`, and UNC paths like `\\server\share` # NOTE: `*` is required so `C:\` (nothing after the slash) is still extracted. win_paths = re.findall( - r"(?:[A-Za-z]:[^\s\"'|><;]*|\\\\[^\s\"'|><;]+(?:\\[^\s\"'|><;]+)*)", + r"(?<;]*|\\\\[^\s\"'|><;]+(?:\\[^\s\"'|><;]+)*)", command ) posix_paths = re.findall(r"(?:^|[\s|>'\"])(/[^\s\"'>;|<]+)", command) # POSIX: /absolute only diff --git a/tests/tools/test_tool_validation.py b/tests/tools/test_tool_validation.py index 42620dcc6..188a8952f 100644 --- a/tests/tools/test_tool_validation.py +++ b/tests/tools/test_tool_validation.py @@ -3,6 +3,8 @@ import subprocess import sys from typing import Any +import pytest + from nanobot.agent.tools import ( ArraySchema, IntegerSchema, @@ -15,6 +17,7 @@ from nanobot.agent.tools import ( from nanobot.agent.tools.base import Tool from nanobot.agent.tools.registry import ToolRegistry from nanobot.agent.tools.shell import ExecTool +from nanobot.security.network import configure_ssrf_whitelist class SampleTool(Tool): @@ -218,6 +221,39 @@ def test_exec_extract_absolute_paths_ignores_relative_posix_segments() -> None: assert "/bin/python" not in paths +def test_exec_extract_absolute_paths_ignores_urls() -> None: + cmd = 'curl -s -o /dev/null -w "%{http_code}" https://www.google.com' + paths = ExecTool._extract_absolute_paths(cmd) + assert paths == ["/dev/null"] + + +@pytest.mark.parametrize( + "command", + [ + 'curl -s -o /dev/null -w "%{http_code}" https://www.google.com', + 'wget -q -O - http://example.com 2>&1 | head -c 100', + 'python3 -c "import urllib.request; print(urllib.request.urlopen(\'http://example.com\').read()[:100])"', + ], +) +def test_exec_guard_allows_public_urls(tmp_path, command: str) -> None: + tool = ExecTool(restrict_to_workspace=True) + error = tool._guard_command(command, str(tmp_path)) + assert error is None + + +def test_exec_guard_allows_whitelisted_internal_urls(tmp_path) -> None: + configure_ssrf_whitelist(["10.10.10.0/24"]) + try: + tool = ExecTool(restrict_to_workspace=True) + error = tool._guard_command( + 'curl -s -H "Authorization: Bearer ..." http://10.10.10.3:8123/api/', + str(tmp_path), + ) + assert error is None + finally: + configure_ssrf_whitelist([]) + + def test_exec_extract_absolute_paths_captures_posix_absolute_paths() -> None: cmd = "cat /tmp/data.txt > /tmp/out.txt" paths = ExecTool._extract_absolute_paths(cmd)