Merge remote-tracking branch 'origin/main' into codex/coding-tooling-optimization

This commit is contained in:
Xubin Ren 2026-05-21 14:44:56 +08:00
commit 581faa34f7
8 changed files with 836 additions and 84 deletions

View File

@ -587,7 +587,7 @@ class ExecTool(Tool):
# Windows: match drive-root paths like `C:\` as well as `C:\path\to\file`, and UNC paths like `\\server\share`
# NOTE: `*` is required so `C:\` (nothing after the slash) is still extracted.
win_paths = re.findall(
r"(?:[A-Za-z]:[^\s\"'|><;]*|\\\\[^\s\"'|><;]+(?:\\[^\s\"'|><;]+)*)",
r"(?<![A-Za-z])(?:[A-Za-z]:[^\s\"'|><;]*|\\\\[^\s\"'|><;]+(?:\\[^\s\"'|><;]+)*)",
command
)
posix_paths = re.findall(r"(?:^|[\s|>'\"])(/[^\s\"'>;|<]+)", command) # POSIX: /absolute only

View File

@ -79,6 +79,12 @@ BASE_INFO: dict[str, str] = {"channel_version": WEIXIN_CHANNEL_VERSION}
ERRCODE_SESSION_EXPIRED = -14
SESSION_PAUSE_DURATION_S = 60 * 60
# iLink context_token is observed to expire server-side after ~90-160s of
# agent inactivity (openclaw/openclaw#61174). Proactively refresh before
# sending if the cached token is older than this threshold.
CONTEXT_TOKEN_MAX_AGE_S = 60
# Retry constants (matching the reference plugin's monitor.ts)
MAX_CONSECUTIVE_FAILURES = 3
BACKOFF_DELAY_S = 30
@ -159,6 +165,8 @@ class WeixinChannel(BaseChannel):
self._session_pause_until: float = 0.0
self._typing_tasks: dict[str, asyncio.Task] = {}
self._typing_tickets: dict[str, dict[str, Any]] = {}
self._context_token_at: dict[str, float] = {}
self._pending_tool_hints: dict[str, list[str]] = {}
# ------------------------------------------------------------------
# State persistence
@ -486,6 +494,7 @@ class WeixinChannel(BaseChannel):
except Exception:
if not self._running:
break
self.logger.exception("WeChat poll loop error")
consecutive_failures += 1
if consecutive_failures >= MAX_CONSECUTIVE_FAILURES:
consecutive_failures = 0
@ -495,6 +504,7 @@ class WeixinChannel(BaseChannel):
async def stop(self) -> None:
self._running = False
self._pending_tool_hints.clear()
if self._poll_task and not self._poll_task.done():
self._poll_task.cancel()
for chat_id in list(self._typing_tasks):
@ -545,6 +555,7 @@ class WeixinChannel(BaseChannel):
# Check for API-level errors (monitor.ts checks both ret and errcode)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
is_error = (ret is not None and ret != 0) or (errcode is not None and errcode != 0)
if is_error:
@ -575,8 +586,10 @@ class WeixinChannel(BaseChannel):
# Process messages (WeixinMessage[] from types.ts)
msgs: list[dict] = data.get("msgs", []) or []
for msg in msgs:
with suppress(Exception):
try:
await self._process_message(msg)
except Exception:
self.logger.exception("Failed to process WeChat message")
# ------------------------------------------------------------------
# Inbound message processing (matches inbound.ts + process-message.ts)
@ -610,6 +623,7 @@ class WeixinChannel(BaseChannel):
ctx_token = msg.get("context_token", "")
if ctx_token:
self._context_tokens[from_user_id] = ctx_token
self._context_token_at[from_user_id] = time.time()
self._save_state()
# Parse item_list (WeixinMessage.item_list — types.ts:161)
@ -915,6 +929,99 @@ class WeixinChannel(BaseChannel):
}
return ""
async def _refresh_context_token_if_stale(
self, chat_id: str, context_token: str
) -> str:
"""Return a fresh context_token if the cached one is too old.
iLink context_token expires server-side after a short idle period
(empirically ~90s). Proactively refreshing before sending prevents
silent message loss on long agent turns or cron pushes.
"""
if not context_token:
return context_token
now = time.time()
cached_at = self._context_token_at.get(chat_id, 0)
age = now - cached_at
if age < CONTEXT_TOKEN_MAX_AGE_S:
return context_token
self.logger.debug(
"WeChat context_token for {} is {:.0f}s old; refreshing via getconfig",
chat_id,
age,
)
body: dict[str, Any] = {
"ilink_user_id": chat_id,
"context_token": context_token,
"base_info": BASE_INFO,
}
try:
data = await self._api_post("ilink/bot/getconfig", body)
except Exception as e:
self.logger.warning("WeChat getconfig failed for {}: {}", chat_id, e)
return context_token
if data.get("ret", 0) != 0:
self.logger.warning(
"WeChat getconfig returned ret={} for {}: {}",
data.get("ret"),
chat_id,
data.get("errmsg", ""),
)
return context_token
new_token = str(data.get("context_token", "") or "")
if new_token and new_token != context_token:
self.logger.info(
"WeChat context_token refreshed for {} (age {:.0f}s -> fresh)",
chat_id,
age,
)
self._context_tokens[chat_id] = new_token
self._context_token_at[chat_id] = now
self._save_state()
return new_token
return context_token
async def _flush_tool_hints(self, chat_id: str) -> None:
"""Send any buffered tool hints for *chat_id* as a single message.
Tool hints are coalesced to reduce message count and avoid hitting the
WeChat iLink rate limit (~7 msgs / 5 min). Failures are logged but
not raised so that the main message send is never blocked.
"""
hints = self._pending_tool_hints.pop(chat_id, None)
if not hints:
return
self.logger.info(
"Flushing {} buffered tool hint(s) for {}",
len(hints),
chat_id,
)
ctx_token = self._context_tokens.get(chat_id, "")
ctx_token = await self._refresh_context_token_if_stale(chat_id, ctx_token)
if not ctx_token:
self.logger.warning(
"Dropped {} buffered tool hint(s) for {}: no context_token",
len(hints),
chat_id,
)
return
try:
await self._send_text(chat_id, "\n\n".join(hints), ctx_token)
except Exception:
self.logger.exception(
"Failed to flush buffered tool hints for {}", chat_id
)
async def _send_typing(self, user_id: str, typing_ticket: str, status: int) -> None:
"""Best-effort sendtyping wrapper."""
if not typing_ticket:
@ -944,11 +1051,47 @@ class WeixinChannel(BaseChannel):
self._assert_session_active()
is_progress = bool((msg.metadata or {}).get("_progress", False))
# Buffer tool hints to coalesce consecutive ones and avoid burning
# WeChat iLink rate-limit quota (~7 msgs / 5 min).
if is_progress and (msg.metadata or {}).get("_tool_hint"):
if not self.send_tool_hints:
return
self._pending_tool_hints.setdefault(msg.chat_id, []).append(msg.content)
self.logger.debug(
"Buffered tool hint for {} (count={})",
msg.chat_id,
len(self._pending_tool_hints[msg.chat_id]),
)
return
# Reasoning deltas are invisible in WeChat (there is no reasoning
# UI). Skip them entirely — do not send and do not flush buffer.
if is_progress and (msg.metadata or {}).get("_reasoning_delta"):
self.logger.debug(
"Dropped invisible reasoning delta for {}", msg.chat_id
)
return
content = msg.content.strip()
# Empty progress messages (e.g. after_iteration tool_events) must
# NOT act as separators — they have no visible content.
if is_progress and not content and not (msg.media or []):
self.logger.debug(
"Skipped empty progress message for {} (no visible content)",
msg.chat_id,
)
return
# Flush buffered hints before sending any visible message.
await self._flush_tool_hints(msg.chat_id)
if not is_progress:
await self._stop_typing(msg.chat_id, clear_remote=True)
content = msg.content.strip()
ctx_token = self._context_tokens.get(msg.chat_id, "")
ctx_token = await self._refresh_context_token_if_stale(msg.chat_id, ctx_token)
if not ctx_token:
raise RuntimeError(
f"WeChat context_token missing for chat_id={msg.chat_id}, cannot send"
@ -1037,6 +1180,18 @@ class WeixinChannel(BaseChannel):
with suppress(Exception):
await self._send_typing(msg.chat_id, typing_ticket, TYPING_STATUS_CANCEL)
async def send_delta(
self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Weixin iLink does not support native streaming deltas.
We only hook ``_stream_end`` so buffered tool hints are flushed even
when the final answer carries the ``_streamed`` flag and bypasses
:meth:`send`.
"""
if metadata and metadata.get("_stream_end"):
await self._flush_tool_hints(chat_id)
async def _start_typing(self, chat_id: str, context_token: str = "") -> None:
"""Start typing indicator immediately when a message is received."""
if not self._client or not self._token or not chat_id:
@ -1120,10 +1275,11 @@ class WeixinChannel(BaseChannel):
}
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
if errcode and errcode != 0:
if (ret is not None and ret != 0) or (errcode is not None and errcode != 0):
raise RuntimeError(
f"WeChat send text error (code {errcode}): {data.get('errmsg', '')}"
f"WeChat send text error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)
async def _send_media_file(
@ -1270,10 +1426,11 @@ class WeixinChannel(BaseChannel):
}
data = await self._api_post("ilink/bot/sendmessage", body)
ret = data.get("ret", 0)
errcode = data.get("errcode", 0)
if errcode and errcode != 0:
if (ret is not None and ret != 0) or (errcode is not None and errcode != 0):
raise RuntimeError(
f"WeChat send media error (code {errcode}): {data.get('errmsg', '')}"
f"WeChat send media error (ret={ret}, errcode={errcode}): {data.get('errmsg', '')}"
)

View File

@ -74,41 +74,43 @@ _THINKING_STYLE_MAP: dict[str, Any] = {
"enable_thinking": lambda on: {"enable_thinking": on},
"reasoning_split": lambda on: {"reasoning_split": on},
}
_GATEWAY_REASONING_STYLE_MAP: dict[str, Any] = {
"reasoning_effort": lambda effort: {"reasoning": {"effort": effort}},
}
_MODEL_THINKING_STYLES: dict[str, str] = {
**dict.fromkeys(_KIMI_THINKING_MODELS, "thinking_type"),
**dict.fromkeys(_MIMO_THINKING_MODELS, "thinking_type"),
}
def _is_kimi_thinking_model(model_name: str) -> bool:
"""Return True if model_name refers to a Kimi thinking-capable model.
Supports two forms:
- Exact match: e.g. kimi-k2.5 / kimi-k2.6 in _KIMI_THINKING_MODELS
- Slug match: moonshotai/kimi-k2.5 -> the part after the last "/"
is checked against _KIMI_THINKING_MODELS
This covers both the native Moonshot provider (bare slug) and
OpenRouter-style names (``"publisher/slug"``).
"""
name = model_name.lower()
if name in _KIMI_THINKING_MODELS:
return True
if "/" in name and name.rsplit("/", 1)[1] in _KIMI_THINKING_MODELS:
return True
return False
def _model_slug(model_name: str) -> str:
return model_name.lower().rsplit("/", 1)[-1]
def _is_mimo_thinking_model(model_name: str) -> bool:
"""Return True if model_name refers to a MiMo thinking-capable model.
def _model_thinking_style(model_name: str) -> str:
return _MODEL_THINKING_STYLES.get(_model_slug(model_name), "")
Mirrors _is_kimi_thinking_model: gateway providers (e.g. OpenRouter
routing ``xiaomi/mimo-v2.5-pro``) have no ``thinking_style`` on their
spec, so the spec-driven branch in _build_kwargs misses them. The
model-name path catches those cases.
"""
name = model_name.lower()
if name in _MIMO_THINKING_MODELS:
return True
if "/" in name and name.rsplit("/", 1)[1] in _MIMO_THINKING_MODELS:
return True
return False
def _thinking_styles_for(spec: ProviderSpec | None, model_name: str) -> list[str]:
styles: list[str] = []
if spec and spec.thinking_style:
styles.append(spec.thinking_style)
model_style = _model_thinking_style(model_name)
if model_style and model_style not in styles:
styles.append(model_style)
return styles
def _thinking_extra_body(style: str, thinking_enabled: bool) -> dict[str, Any] | None:
builder = _THINKING_STYLE_MAP.get(style)
return builder(thinking_enabled) if builder else None
def _gateway_reasoning_extra_body(style: str, effort: str | None) -> dict[str, Any] | None:
if not effort:
return None
builder = _GATEWAY_REASONING_STYLE_MAP.get(style)
return builder(effort) if builder else None
def _openai_compat_timeout_s() -> float:
@ -581,39 +583,19 @@ class OpenAICompatProvider(LLMProvider):
if wire_effort and semantic_effort != "none":
kwargs["reasoning_effort"] = wire_effort
# Provider-specific thinking parameters.
# Only sent when reasoning_effort is explicitly configured so that
# the provider default is preserved otherwise.
# The mapping is driven by ProviderSpec.thinking_style so that adding
# a new provider never requires touching this function.
if spec and spec.thinking_style and reasoning_effort is not None:
# Only send thinking controls when reasoning_effort is explicit so
# omitting the config preserves each provider's default.
if reasoning_effort is not None:
thinking_enabled = semantic_effort not in ("none", "minimal")
extra = _THINKING_STYLE_MAP.get(spec.thinking_style, lambda _: None)(thinking_enabled)
for thinking_style in _thinking_styles_for(spec, model_name):
extra = _thinking_extra_body(thinking_style, thinking_enabled)
if extra:
kwargs.setdefault("extra_body", {}).update(extra)
gateway_style = getattr(spec, "gateway_reasoning_style", "") if spec else ""
if gateway_style and _model_thinking_style(model_name):
extra = _gateway_reasoning_extra_body(gateway_style, semantic_effort)
if extra:
kwargs.setdefault("extra_body", {}).update(extra)
# Model-level thinking injection for Kimi thinking-capable models.
# Strip any provider prefix (e.g. "moonshotai/") before the set lookup
# so that OpenRouter-style names like "moonshotai/kimi-k2.5" are handled
# identically to bare names like "kimi-k2.5".
if reasoning_effort is not None and _is_kimi_thinking_model(model_name):
thinking_enabled = semantic_effort not in ("none", "minimal")
kwargs.setdefault("extra_body", {}).update(
{"thinking": {"type": "enabled" if thinking_enabled else "disabled"}}
)
# Model-level thinking injection for MiMo thinking-capable models.
# Same shape as Kimi: gateway providers (OpenRouter, etc.) lack the
# xiaomi_mimo spec's thinking_style, so the spec-driven branch above
# misses them — match by model name to catch "xiaomi/mimo-v2.5-pro"
# and friends. (Direct xiaomi_mimo requests are also covered here;
# both branches write the same payload, so the dict update is a
# safe no-op for already-handled cases.)
if reasoning_effort is not None and _is_mimo_thinking_model(model_name):
thinking_enabled = semantic_effort not in ("none", "minimal")
kwargs.setdefault("extra_body", {}).update(
{"thinking": {"type": "enabled" if thinking_enabled else "disabled"}}
)
if tools:
kwargs["tools"] = tools
@ -628,8 +610,7 @@ class OpenAICompatProvider(LLMProvider):
and semantic_effort not in ("none", "minimal")
and (
(spec and spec.thinking_style)
or _is_kimi_thinking_model(model_name)
or _is_mimo_thinking_model(model_name)
or _model_thinking_style(model_name)
)
)
implicit_deepseek_thinking = (

View File

@ -71,6 +71,11 @@ class ProviderSpec:
# "reasoning_split" — {"reasoning_split": true/false} (MiniMax)
thinking_style: str = ""
# Gateway-native reasoning control to pair with model-level thinking styles.
# "reasoning_effort" — {"reasoning": {"effort": <none|minimal|...>}}
# (OpenRouter)
gateway_reasoning_style: str = ""
# When True, treat the "reasoning" response field as formal content
# when "content" is empty. Only set this for providers (e.g. StepFun)
# whose API returns the actual answer in "reasoning" instead of "content".
@ -142,6 +147,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = (
detect_by_base_keyword="openrouter",
default_api_base="https://openrouter.ai/api/v1",
supports_prompt_caching=True,
gateway_reasoning_style="reasoning_effort",
),
# Hugging Face Inference Providers: OpenAI-compatible router for chat models.
ProviderSpec(

View File

@ -1,6 +1,7 @@
import asyncio
import json
import tempfile
import time
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock
@ -374,6 +375,7 @@ async def test_send_uses_typing_start_and_cancel_when_ticket_available() -> None
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-typing"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(
side_effect=[
@ -402,6 +404,7 @@ async def test_send_still_sends_text_when_typing_ticket_missing() -> None:
channel._client = object()
channel._token = "token"
channel._context_tokens["wx-user"] = "ctx-no-ticket"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
channel._api_post = AsyncMock(return_value={"ret": 1, "errmsg": "no config"})
@ -1254,3 +1257,526 @@ async def test_send_text_succeeds_on_zero_errcode() -> None:
await channel._send_text("wx-user", "hello", "ctx-ok")
channel._api_post.assert_awaited_once()
@pytest.mark.asyncio
async def test_send_text_raises_on_nonzero_ret_even_when_errcode_zero() -> None:
"""_send_text must raise when the API returns ret != 0, even if errcode is 0.
The iLink API signals failure through either field. Checking only errcode
caused silent message drops (responses generated but never delivered).
"""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel._api_post = AsyncMock(
return_value={"ret": -100, "errcode": 0, "errmsg": "internal error"}
)
with pytest.raises(RuntimeError, match="WeChat send text error.*ret=-100.*errcode=0"):
await channel._send_text("wx-user", "hello", "ctx-ok")
channel._api_post.assert_awaited_once()
# ---------------------------------------------------------------------------
# Tests for _poll_once not silently dropping messages on processing errors
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_poll_once_logs_exception_on_process_message_failure(monkeypatch) -> None:
"""When _process_message raises, _poll_once must log the error and continue
processing remaining messages instead of silently swallowing the exception."""
channel, _bus = _make_channel()
channel._client = SimpleNamespace(timeout=None)
channel._token = "token"
channel._get_updates_buf = "old-buf"
calls = []
logged_messages: list[str] = []
async def _failing_process(msg: dict) -> None:
calls.append(msg.get("message_id"))
if msg.get("message_id") == "msg-1":
raise RuntimeError("processing failed")
channel._process_message = _failing_process # type: ignore[method-assign]
monkeypatch.setattr(
channel.logger,
"exception",
lambda message, *args, **kwargs: logged_messages.append(str(message)),
)
channel._api_post = AsyncMock( # type: ignore[method-assign]
return_value={
"ret": 0,
"errcode": 0,
"get_updates_buf": "new-buf",
"msgs": [
{"message_id": "msg-1", "message_type": 1},
{"message_id": "msg-2", "message_type": 1},
],
}
)
await channel._poll_once()
# Both messages should have been attempted
assert calls == ["msg-1", "msg-2"]
# Buffer should still advance (already updated before processing)
assert channel._get_updates_buf == "new-buf"
# Error should be logged
assert any("Failed to process WeChat message" in m for m in logged_messages)
@pytest.mark.asyncio
async def test_poll_loop_logs_exception_and_continues_on_poll_failure(monkeypatch) -> None:
"""When _poll_once raises a non-timeout exception, the start() loop must log
the error and continue polling instead of exiting silently."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.config.token = "token" # skip QR login in start()
channel._running = True
call_count = 0
logged_messages: list[str] = []
async def _failing_poll() -> None:
nonlocal call_count
call_count += 1
if call_count == 1:
raise RuntimeError("poll exploded")
channel._running = False # Stop after second call
channel._poll_once = _failing_poll # type: ignore[method-assign]
monkeypatch.setattr(
channel.logger,
"exception",
lambda message, *args, **kwargs: logged_messages.append(str(message)),
)
# Use a tiny retry delay so the test finishes quickly
original_retry = weixin_mod.RETRY_DELAY_S
weixin_mod.RETRY_DELAY_S = 0.01
try:
await channel.start()
finally:
weixin_mod.RETRY_DELAY_S = original_retry
assert call_count == 2
assert any("WeChat poll loop error" in m for m in logged_messages)
# ---------------------------------------------------------------------------
# Tool-hint buffering
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_buffer_single_tool_hint_not_sent_immediately() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Using tool",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
channel._send_text.assert_not_awaited()
assert channel._pending_tool_hints["wx-user"] == ["Using tool"]
@pytest.mark.asyncio
async def test_buffer_multiple_tool_hints_flushed_on_final_answer() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
for hint in ["tool1", "tool2"]:
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": hint,
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._send_text.await_count == 2
channel._send_text.assert_any_await("wx-user", "tool1\n\ntool2", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_thought_progress_flushes_tool_hints() -> None:
"""Thoughts are visible progress messages and must act as separators,
flushing buffered tool hints before they are sent."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
# Buffer a tool hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "search 'foo'",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
# Send a thought — progress but not a tool_hint.
# It must act as a separator and flush the buffered hint.
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Let me think...",
"media": [],
"metadata": {"_progress": True},
},
)()
)
# The buffered hint was flushed before the thought was sent.
channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Let me think...", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
# Final answer arrives with nothing left to flush.
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._send_text.await_count == 3
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
@pytest.mark.asyncio
async def test_reasoning_delta_does_not_flush_tool_hints() -> None:
"""Reasoning deltas are invisible in WeChat and must NOT flush buffered
tool hints otherwise hints separated only by hidden reasoning would
fail to coalesce."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
# Buffer a tool hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "search 'foo'",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
# Send a reasoning delta — invisible in WeChat, must NOT flush
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Thinking step 1...",
"media": [],
"metadata": {"_progress": True, "_reasoning_delta": True},
},
)()
)
# Reasoning is invisible; hint stays buffered, _send_text not called
channel._send_text.assert_not_awaited()
assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"]
# Final answer flushes the buffered hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_empty_progress_message_does_not_flush_tool_hints() -> None:
"""Empty progress messages (e.g. after_iteration tool_events) have no
visible content and must NOT act as separators."""
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
# Buffer a tool hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "search 'foo'",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
# Send an empty progress message (no content, no media)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "",
"media": [],
"metadata": {"_progress": True, "_tool_events": [{"phase": "end"}]},
},
)()
)
# Nothing should have been sent yet
channel._send_text.assert_not_awaited()
assert channel._pending_tool_hints["wx-user"] == ["search 'foo'"]
# Final answer flushes the buffered hint
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
channel._send_text.assert_any_await("wx-user", "search 'foo'", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_buffer_flush_refreshes_context_token() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-old"
channel._context_token_at["wx-user"] = time.time()
channel._refresh_context_token_if_stale = AsyncMock(return_value="ctx-refreshed")
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._refresh_context_token_if_stale.await_count == 2
channel._refresh_context_token_if_stale.assert_any_await("wx-user", "ctx-old")
channel._send_text.assert_any_await("wx-user", "hint", "ctx-refreshed")
@pytest.mark.asyncio
async def test_buffer_flush_failure_does_not_block_final_answer() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock(side_effect=[RuntimeError("boom"), None])
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "Done",
"media": [],
"metadata": {},
},
)()
)
assert channel._send_text.await_count == 2
channel._send_text.assert_any_await("wx-user", "hint", "ctx-1")
channel._send_text.assert_any_await("wx-user", "Done", "ctx-1")
@pytest.mark.asyncio
async def test_buffer_flushed_on_stream_end() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = True
channel._context_tokens["wx-user"] = "ctx-1"
channel._context_token_at["wx-user"] = time.time()
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
await channel.send_delta("wx-user", "", {"_stream_end": True})
channel._send_text.assert_awaited_once_with("wx-user", "hint", "ctx-1")
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_stop_clears_buffer() -> None:
channel, _bus = _make_channel()
channel._pending_tool_hints["wx-user"] = ["hint1", "hint2"]
await channel.stop()
assert "wx-user" not in channel._pending_tool_hints
@pytest.mark.asyncio
async def test_send_tool_hints_false_drops_tool_hints() -> None:
channel, _bus = _make_channel()
channel._client = object()
channel._token = "token"
channel.send_tool_hints = False
channel._send_text = AsyncMock()
await channel.send(
type(
"Msg",
(),
{
"chat_id": "wx-user",
"content": "hint",
"media": [],
"metadata": {"_progress": True, "_tool_hint": True},
},
)()
)
channel._send_text.assert_not_awaited()
assert "wx-user" not in channel._pending_tool_hints

View File

@ -1391,9 +1391,16 @@ def test_kimi_k25_no_extra_body_when_reasoning_effort_none() -> None:
def test_kimi_k25_thinking_enabled_with_openrouter_prefix() -> None:
"""OpenRouter-style model names like moonshotai/kimi-k2.5 must trigger thinking."""
"""OpenRouter-style model names like moonshotai/kimi-k2.5 must trigger thinking.
OR drops upstream-provider `thinking` fields, so the same intent also has
to go through OR's `reasoning.effort` shape (#3851 follow-up).
"""
kw = _build_kwargs_for("openrouter", "moonshotai/kimi-k2.5", reasoning_effort="medium")
assert kw.get("extra_body") == {"thinking": {"type": "enabled"}}
assert kw.get("extra_body") == {
"thinking": {"type": "enabled"},
"reasoning": {"effort": "medium"},
}
def test_kimi_k26_thinking_enabled() -> None:
@ -1403,9 +1410,13 @@ def test_kimi_k26_thinking_enabled() -> None:
def test_kimi_k26_thinking_enabled_with_openrouter_prefix() -> None:
"""OpenRouter-style names like moonshotai/kimi-k2.6 must trigger thinking."""
"""OpenRouter-style names like moonshotai/kimi-k2.6 must trigger thinking
via both upstream `thinking` and OR's `reasoning.effort`."""
kw = _build_kwargs_for("openrouter", "moonshotai/kimi-k2.6", reasoning_effort="medium")
assert kw.get("extra_body") == {"thinking": {"type": "enabled"}}
assert kw.get("extra_body") == {
"thinking": {"type": "enabled"},
"reasoning": {"effort": "medium"},
}
def test_moonshot_kimi_k26_temperature_override() -> None:

View File

@ -32,7 +32,7 @@ def _mimo_spec():
def _openrouter_spec():
"""Return the registered OpenRouter ProviderSpec (no thinking_style)."""
"""Return the registered OpenRouter ProviderSpec."""
specs = {s.name: s for s in PROVIDERS}
return specs["openrouter"]
@ -77,6 +77,13 @@ def test_xiaomi_mimo_uses_thinking_type_style():
assert spec.default_api_base == "https://api.xiaomimimo.com/v1"
def test_openrouter_declares_gateway_reasoning_style():
"""OpenRouter uses its own reasoning.effort field for routed thinking models."""
spec = _openrouter_spec()
assert spec.thinking_style == ""
assert spec.gateway_reasoning_style == "reasoning_effort"
# ---------------------------------------------------------------------------
# _build_kwargs wire-format
# ---------------------------------------------------------------------------
@ -142,9 +149,11 @@ def test_mimo_reasoning_effort_unset_preserves_provider_default():
def test_mimo_via_openrouter_reasoning_effort_none_disables_thinking():
"""OpenRouter routes MiMo as "xiaomi/mimo-v2.5-pro"; the openrouter spec
has no thinking_style, so the disable signal must come from the
model-name path (#3845)."""
"""OpenRouter routes MiMo as "xiaomi/mimo-v2.5-pro" and does NOT forward
extra_body.thinking to upstream, so a disable signal must also reach OR
in its own `reasoning.effort` shape. Verifies both the upstream-MiMo
payload (#3845) and the OR-native payload (#3851 follow-up) are sent.
"""
provider = _openrouter_provider("xiaomi/mimo-v2.5-pro")
kwargs = provider._build_kwargs(
messages=_simple_messages(),
@ -152,11 +161,15 @@ def test_mimo_via_openrouter_reasoning_effort_none_disables_thinking():
temperature=0.7, reasoning_effort="none", tool_choice=None,
)
assert "reasoning_effort" not in kwargs
assert kwargs["extra_body"] == {"thinking": {"type": "disabled"}}
assert kwargs["extra_body"] == {
"thinking": {"type": "disabled"},
"reasoning": {"effort": "none"},
}
def test_mimo_via_openrouter_reasoning_effort_medium_enables_thinking():
"""Same as the direct path: any non-none/minimal effort enables thinking."""
"""Non-none/minimal effort enables thinking and the OR `reasoning.effort`
field mirrors the requested effort level."""
provider = _openrouter_provider("xiaomi/mimo-v2.5-pro")
kwargs = provider._build_kwargs(
messages=_simple_messages(),
@ -164,7 +177,10 @@ def test_mimo_via_openrouter_reasoning_effort_medium_enables_thinking():
temperature=0.7, reasoning_effort="medium", tool_choice=None,
)
assert kwargs.get("reasoning_effort") == "medium"
assert kwargs["extra_body"] == {"thinking": {"type": "enabled"}}
assert kwargs["extra_body"] == {
"thinking": {"type": "enabled"},
"reasoning": {"effort": "medium"},
}
def test_mimo_via_openrouter_bare_slug_also_matches():
@ -176,12 +192,16 @@ def test_mimo_via_openrouter_bare_slug_also_matches():
tools=None, model=None, max_tokens=100,
temperature=0.7, reasoning_effort="none", tool_choice=None,
)
assert kwargs["extra_body"] == {"thinking": {"type": "disabled"}}
assert kwargs["extra_body"] == {
"thinking": {"type": "disabled"},
"reasoning": {"effort": "none"},
}
def test_mimo_flash_via_openrouter_does_not_inject_thinking():
"""mimo-v2-flash has no thinking mode per Xiaomi docs; the allowlist
excludes it, so no thinking field should be injected on the gateway path."""
excludes it, so neither the upstream `thinking` field nor OR's
`reasoning.effort` should be injected on the gateway path."""
provider = _openrouter_provider("xiaomi/mimo-v2-flash")
kwargs = provider._build_kwargs(
messages=_simple_messages(),
@ -200,3 +220,18 @@ def test_non_mimo_model_via_openrouter_unaffected():
temperature=0.7, reasoning_effort="none", tool_choice=None,
)
assert "extra_body" not in kwargs
def test_kimi_via_openrouter_also_injects_reasoning_effort():
"""Kimi has the same gateway problem as MiMo: OR drops the upstream
`thinking` field. The same OR-reasoning injection should fire."""
provider = _openrouter_provider("moonshotai/kimi-k2.5")
kwargs = provider._build_kwargs(
messages=_simple_messages(),
tools=None, model=None, max_tokens=100,
temperature=0.7, reasoning_effort="none", tool_choice=None,
)
assert kwargs["extra_body"] == {
"thinking": {"type": "disabled"},
"reasoning": {"effort": "none"},
}

View File

@ -3,6 +3,8 @@ import subprocess
import sys
from typing import Any
import pytest
from nanobot.agent.tools import (
ArraySchema,
IntegerSchema,
@ -15,6 +17,7 @@ from nanobot.agent.tools import (
from nanobot.agent.tools.base import Tool
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.shell import ExecTool
from nanobot.security.network import configure_ssrf_whitelist
class SampleTool(Tool):
@ -218,6 +221,39 @@ def test_exec_extract_absolute_paths_ignores_relative_posix_segments() -> None:
assert "/bin/python" not in paths
def test_exec_extract_absolute_paths_ignores_urls() -> None:
cmd = 'curl -s -o /dev/null -w "%{http_code}" https://www.google.com'
paths = ExecTool._extract_absolute_paths(cmd)
assert paths == ["/dev/null"]
@pytest.mark.parametrize(
"command",
[
'curl -s -o /dev/null -w "%{http_code}" https://www.google.com',
'wget -q -O - http://example.com 2>&1 | head -c 100',
'python3 -c "import urllib.request; print(urllib.request.urlopen(\'http://example.com\').read()[:100])"',
],
)
def test_exec_guard_allows_public_urls(tmp_path, command: str) -> None:
tool = ExecTool(restrict_to_workspace=True)
error = tool._guard_command(command, str(tmp_path))
assert error is None
def test_exec_guard_allows_whitelisted_internal_urls(tmp_path) -> None:
configure_ssrf_whitelist(["10.10.10.0/24"])
try:
tool = ExecTool(restrict_to_workspace=True)
error = tool._guard_command(
'curl -s -H "Authorization: Bearer ..." http://10.10.10.3:8123/api/',
str(tmp_path),
)
assert error is None
finally:
configure_ssrf_whitelist([])
def test_exec_extract_absolute_paths_captures_posix_absolute_paths() -> None:
cmd = "cat /tmp/data.txt > /tmp/out.txt"
paths = ExecTool._extract_absolute_paths(cmd)