From 7e91aecd7dfc932557548940b1a147e7897a4c4e Mon Sep 17 00:00:00 2001 From: bahtya Date: Sun, 12 Apr 2026 02:48:22 +0800 Subject: [PATCH 01/19] fix(telegram): narrow exception catch in _send_text to prevent retry amplification MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Previously _send_text() caught all exceptions (except Exception) when sending HTML-formatted messages, falling back to plain text even for network errors like TimedOut and NetworkError. This caused connection demand to double during pool exhaustion scenarios (3 retries × 2 fallback attempts = 6 calls per message instead of 3). Now only catches BadRequest (HTML parse errors), letting network errors propagate immediately to the retry layer where they belong. Fixes: HKUDS/nanobot#3050 --- nanobot/channels/telegram.py | 5 +- tests/channels/test_telegram_channel.py | 156 ++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 1 deletion(-) diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index 2dde232b1..d2572fac3 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -520,7 +520,10 @@ class TelegramChannel(BaseChannel): reply_parameters=reply_params, **(thread_kwargs or {}), ) - except Exception as e: + except BadRequest as e: + # Only fall back to plain text on actual HTML parse/format errors. + # Network errors (TimedOut, NetworkError) should propagate immediately + # to avoid doubling connection demand during pool exhaustion. logger.warning("HTML parse failed, falling back to plain text: {}", e) try: await self._call_with_retry( diff --git a/tests/channels/test_telegram_channel.py b/tests/channels/test_telegram_channel.py index 5a1964127..7dfb094f9 100644 --- a/tests/channels/test_telegram_channel.py +++ b/tests/channels/test_telegram_channel.py @@ -1159,3 +1159,159 @@ async def test_on_message_location_with_text() -> None: assert len(handled) == 1 assert "meet me here" in handled[0]["content"] assert "[location: 51.5074, -0.1278]" in handled[0]["content"] + + +# --------------------------------------------------------------------------- +# Tests for retry amplification fix (issue #3050) +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_send_text_does_not_fallback_on_network_timeout() -> None: + """TimedOut should propagate immediately, NOT trigger plain-text fallback. + + Before the fix, _send_text caught ALL exceptions (including TimedOut) + and retried as plain text, doubling connection demand during pool + exhaustion — see issue #3050. + """ + from telegram.error import TimedOut + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + + call_count = 0 + + async def always_timeout(**kwargs): + nonlocal call_count + call_count += 1 + raise TimedOut() + + channel._app.bot.send_message = always_timeout + + import nanobot.channels.telegram as tg_mod + orig_delay = tg_mod._SEND_RETRY_BASE_DELAY + tg_mod._SEND_RETRY_BASE_DELAY = 0.01 + try: + with pytest.raises(TimedOut): + await channel._send_text(123, "hello", None, {}) + finally: + tg_mod._SEND_RETRY_BASE_DELAY = orig_delay + + # With the fix: only _call_with_retry's 3 HTML attempts (no plain fallback). + # Before the fix: 3 HTML + 3 plain = 6 attempts. + assert call_count == 3, ( + f"Expected 3 calls (HTML retries only), got {call_count} " + "(plain-text fallback should not trigger on TimedOut)" + ) + + +@pytest.mark.asyncio +async def test_send_text_does_not_fallback_on_network_error() -> None: + """NetworkError should propagate immediately, NOT trigger plain-text fallback.""" + from telegram.error import NetworkError + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + + call_count = 0 + + async def always_network_error(**kwargs): + nonlocal call_count + call_count += 1 + raise NetworkError("Connection reset") + + channel._app.bot.send_message = always_network_error + + import nanobot.channels.telegram as tg_mod + orig_delay = tg_mod._SEND_RETRY_BASE_DELAY + tg_mod._SEND_RETRY_BASE_DELAY = 0.01 + try: + with pytest.raises(NetworkError): + await channel._send_text(123, "hello", None, {}) + finally: + tg_mod._SEND_RETRY_BASE_DELAY = orig_delay + + # _call_with_retry does NOT retry NetworkError (only TimedOut/RetryAfter), + # so it raises after 1 attempt. The fix prevents plain-text fallback. + # Before the fix: 1 HTML + 1 plain = 2. After the fix: 1 HTML only. + assert call_count == 1, ( + f"Expected 1 call (HTML only, no plain fallback), got {call_count}" + ) + + +@pytest.mark.asyncio +async def test_send_text_falls_back_on_bad_request() -> None: + """BadRequest (HTML parse error) should still trigger plain-text fallback.""" + from telegram.error import BadRequest + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + + original_send = channel._app.bot.send_message + html_call_count = 0 + + async def html_fails(**kwargs): + nonlocal html_call_count + if kwargs.get("parse_mode") == "HTML": + html_call_count += 1 + raise BadRequest("Can't parse entities") + return await original_send(**kwargs) + + channel._app.bot.send_message = html_fails + + import nanobot.channels.telegram as tg_mod + orig_delay = tg_mod._SEND_RETRY_BASE_DELAY + tg_mod._SEND_RETRY_BASE_DELAY = 0.01 + try: + await channel._send_text(123, "hello **world**", None, {}) + finally: + tg_mod._SEND_RETRY_BASE_DELAY = orig_delay + + # HTML attempt failed with BadRequest → fallback to plain text succeeds. + assert html_call_count == 1, f"Expected 1 HTML attempt, got {html_call_count}" + assert len(channel._app.bot.sent_messages) == 1 + # Plain text send should NOT have parse_mode + assert channel._app.bot.sent_messages[0].get("parse_mode") is None + + +@pytest.mark.asyncio +async def test_send_text_bad_request_plain_fallback_exhausted() -> None: + """When both HTML and plain-text fallback fail with BadRequest, the error propagates.""" + from telegram.error import BadRequest + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + + call_count = 0 + + async def always_bad_request(**kwargs): + nonlocal call_count + call_count += 1 + raise BadRequest("Bad request") + + channel._app.bot.send_message = always_bad_request + + import nanobot.channels.telegram as tg_mod + orig_delay = tg_mod._SEND_RETRY_BASE_DELAY + tg_mod._SEND_RETRY_BASE_DELAY = 0.01 + try: + with pytest.raises(BadRequest): + await channel._send_text(123, "hello", None, {}) + finally: + tg_mod._SEND_RETRY_BASE_DELAY = orig_delay + + # _call_with_retry does NOT retry BadRequest (only TimedOut/RetryAfter), + # so HTML fails after 1 attempt → fallback to plain also fails after 1 attempt. + # Before the fix: 2 total. After the fix: still 2 (BadRequest SHOULD fallback). + assert call_count == 2, f"Expected 2 calls (1 HTML + 1 plain), got {call_count}" From fa9852494416c851c53067363ddcfaac83a9e6a8 Mon Sep 17 00:00:00 2001 From: bahtya Date: Sun, 12 Apr 2026 03:28:38 +0800 Subject: [PATCH 02/19] fix(channels): prevent retry amplification and silent message loss across channels MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Audited all channel implementations for overly broad exception handling that causes retry amplification or silent message loss during network errors. This is the same class of bug as #3050 (Telegram _send_text). Fixes by channel: Telegram (send_delta): - _stream_end path used except Exception for HTML edit fallback - Network errors (TimedOut, NetworkError) triggered redundant plain text edit, doubling connection demand during pool exhaustion - Changed to except BadRequest, matching the _send_text fix Discord: - send() caught all exceptions without re-raising - ChannelManager._send_with_retry() saw successful return, never retried - Messages silently dropped on any send failure - Added raise after error logging DingTalk: - _send_batch_message() returned False on all exceptions including network errors — no retry, fallback text sent unnecessarily - _read_media_bytes() and _upload_media() swallowed transport errors, causing _send_media_ref() to cascade through doomed fallback attempts - Added except httpx.TransportError handlers that re-raise immediately WeChat: - Media send failure triggered text fallback even for network errors - During network issues: 3×(media + text) = 6 API calls per message - Added specific catches: TimeoutException/TransportError re-raise, 5xx HTTPStatusError re-raises, 4xx falls back to text QQ: - _send_media() returned False on all exceptions - Network errors triggered fallback text instead of retry - Added except (aiohttp.ClientError, OSError) that re-raises Tests: 331 passed (283 existing + 48 new across 5 channel test files) Fixes: #3054 Related: #3050, #3053 --- nanobot/channels/dingtalk.py | 9 + nanobot/channels/discord.py | 1 + nanobot/channels/qq.py | 5 + nanobot/channels/telegram.py | 5 +- nanobot/channels/weixin.py | 36 ++++ tests/channels/test_dingtalk_channel.py | 155 +++++++++++++++++ tests/channels/test_discord_channel.py | 97 +++++++++++ tests/channels/test_qq_channel.py | 221 ++++++++++++++++++++++++ tests/channels/test_telegram_channel.py | 78 +++++++++ tests/channels/test_weixin_channel.py | 182 +++++++++++++++++++ 10 files changed, 788 insertions(+), 1 deletion(-) diff --git a/nanobot/channels/dingtalk.py b/nanobot/channels/dingtalk.py index 39b5818bd..a863ba0df 100644 --- a/nanobot/channels/dingtalk.py +++ b/nanobot/channels/dingtalk.py @@ -337,6 +337,9 @@ class DingTalkChannel(BaseChannel): content_type = (resp.headers.get("content-type") or "").split(";")[0].strip() filename = self._guess_filename(media_ref, self._guess_upload_type(media_ref)) return resp.content, filename, content_type or None + except httpx.TransportError as e: + logger.error("DingTalk media download network error ref={} err={}", media_ref, e) + raise except Exception as e: logger.error("DingTalk media download error ref={} err={}", media_ref, e) return None, None, None @@ -388,6 +391,9 @@ class DingTalkChannel(BaseChannel): logger.error("DingTalk media upload missing media_id body={}", text[:500]) return None return str(media_id) + except httpx.TransportError as e: + logger.error("DingTalk media upload network error type={} err={}", media_type, e) + raise except Exception as e: logger.error("DingTalk media upload error type={} err={}", media_type, e) return None @@ -437,6 +443,9 @@ class DingTalkChannel(BaseChannel): return False logger.debug("DingTalk message sent to {} with msgKey={}", chat_id, msg_key) return True + except httpx.TransportError as e: + logger.error("DingTalk network error sending message msgKey={} err={}", msg_key, e) + raise except Exception as e: logger.error("Error sending DingTalk message msgKey={} err={}", msg_key, e) return False diff --git a/nanobot/channels/discord.py b/nanobot/channels/discord.py index 6e8c673a3..336b6148d 100644 --- a/nanobot/channels/discord.py +++ b/nanobot/channels/discord.py @@ -366,6 +366,7 @@ class DiscordChannel(BaseChannel): await client.send_outbound(msg) except Exception as e: logger.error("Error sending Discord message: {}", e) + raise finally: if not is_progress: await self._stop_typing(msg.chat_id) diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 484eed6e2..96d9d5ecd 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -362,7 +362,12 @@ class QQChannel(BaseChannel): logger.info("QQ media sent: {}", filename) return True + except (aiohttp.ClientError, OSError) as e: + # Network / transport errors — propagate for retry by caller + logger.warning("QQ send media network error filename={} err={}", filename, e) + raise except Exception as e: + # API-level or other non-network errors — return False so send() can fallback logger.error("QQ send media failed filename={} err={}", filename, e) return False diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index d2572fac3..f63704aa7 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -570,7 +570,10 @@ class TelegramChannel(BaseChannel): chat_id=int_chat_id, message_id=buf.message_id, text=html, parse_mode="HTML", ) - except Exception as e: + except BadRequest as e: + # Only fall back to plain text on actual HTML parse/format errors. + # Network errors (TimedOut, NetworkError) should propagate immediately + # to avoid doubling connection demand during pool exhaustion. if self._is_not_modified_error(e): logger.debug("Final stream edit already applied for {}", chat_id) self._stream_bufs.pop(chat_id, None) diff --git a/nanobot/channels/weixin.py b/nanobot/channels/weixin.py index 3f87e2203..fbe84bcf8 100644 --- a/nanobot/channels/weixin.py +++ b/nanobot/channels/weixin.py @@ -985,7 +985,43 @@ class WeixinChannel(BaseChannel): for media_path in (msg.media or []): try: await self._send_media_file(msg.chat_id, media_path, ctx_token) + except (httpx.TimeoutException, httpx.TransportError) as net_err: + # Network/transport errors: do NOT fall back to text — + # the text send would also likely fail, and the outer + # except will re-raise so ChannelManager retries properly. + logger.error( + "Network error sending WeChat media {}: {}", + media_path, + net_err, + ) + raise + except httpx.HTTPStatusError as http_err: + status_code = ( + http_err.response.status_code + if http_err.response is not None + else 0 + ) + if status_code >= 500: + # Server-side / retryable HTTP error — same as network. + logger.error( + "Server error ({} {}) sending WeChat media {}: {}", + status_code, + http_err.response.reason_phrase + if http_err.response is not None + else "", + media_path, + http_err, + ) + raise + # 4xx client errors are NOT retryable — fall back to text. + filename = Path(media_path).name + logger.error("Failed to send WeChat media {}: {}", media_path, http_err) + await self._send_text( + msg.chat_id, f"[Failed to send: {filename}]", ctx_token, + ) except Exception as e: + # Non-network errors (format, file-not-found, etc.): + # notify the user via text fallback. filename = Path(media_path).name logger.error("Failed to send WeChat media {}: {}", media_path, e) # Notify user about failure via text diff --git a/tests/channels/test_dingtalk_channel.py b/tests/channels/test_dingtalk_channel.py index f743c4e62..86de99bb5 100644 --- a/tests/channels/test_dingtalk_channel.py +++ b/tests/channels/test_dingtalk_channel.py @@ -2,7 +2,9 @@ import asyncio import zipfile from io import BytesIO from types import SimpleNamespace +from unittest.mock import AsyncMock +import httpx import pytest # Check optional dingtalk dependencies before running tests @@ -52,6 +54,21 @@ class _FakeHttp: return self._next_response() +class _NetworkErrorHttp: + """HTTP client stub that raises httpx.TransportError on every request.""" + + def __init__(self) -> None: + self.calls: list[dict] = [] + + async def post(self, url: str, json=None, headers=None, **kwargs): + self.calls.append({"method": "POST", "url": url, "json": json, "headers": headers}) + raise httpx.ConnectError("Connection refused") + + async def get(self, url: str, **kwargs): + self.calls.append({"method": "GET", "url": url}) + raise httpx.ConnectError("Connection refused") + + @pytest.mark.asyncio async def test_group_message_keeps_sender_id_and_routes_chat_id() -> None: config = DingTalkConfig(client_id="app", client_secret="secret", allow_from=["user1"]) @@ -298,3 +315,141 @@ async def test_send_media_ref_zips_html_before_upload(tmp_path, monkeypatch) -> archive = zipfile.ZipFile(BytesIO(captured["data"])) assert archive.namelist() == ["report.html"] + + +# ── Exception handling tests ────────────────────────────────────────── + + +@pytest.mark.asyncio +async def test_send_batch_message_propagates_transport_error() -> None: + """Network/transport errors must re-raise so callers can retry.""" + config = DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]) + channel = DingTalkChannel(config, MessageBus()) + channel._http = _NetworkErrorHttp() + + with pytest.raises(httpx.ConnectError, match="Connection refused"): + await channel._send_batch_message( + "token", + "user123", + "sampleMarkdown", + {"text": "hello", "title": "Nanobot Reply"}, + ) + + # The POST was attempted exactly once + assert len(channel._http.calls) == 1 + assert channel._http.calls[0]["method"] == "POST" + + +@pytest.mark.asyncio +async def test_send_batch_message_returns_false_on_api_error() -> None: + """DingTalk API-level errors (non-200 status, errcode != 0) should return False.""" + config = DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]) + channel = DingTalkChannel(config, MessageBus()) + + # Non-200 status code → API error → return False + channel._http = _FakeHttp(responses=[_FakeResponse(400, {"errcode": 400})]) + result = await channel._send_batch_message( + "token", "user123", "sampleMarkdown", {"text": "hello"} + ) + assert result is False + + # 200 with non-zero errcode → API error → return False + channel._http = _FakeHttp(responses=[_FakeResponse(200, {"errcode": 100})]) + result = await channel._send_batch_message( + "token", "user123", "sampleMarkdown", {"text": "hello"} + ) + assert result is False + + # 200 with errcode=0 → success → return True + channel._http = _FakeHttp(responses=[_FakeResponse(200, {"errcode": 0})]) + result = await channel._send_batch_message( + "token", "user123", "sampleMarkdown", {"text": "hello"} + ) + assert result is True + + +@pytest.mark.asyncio +async def test_send_media_ref_short_circuits_on_transport_error() -> None: + """When the first send fails with a transport error, _send_media_ref must + re-raise immediately instead of trying download+upload+fallback.""" + config = DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]) + channel = DingTalkChannel(config, MessageBus()) + channel._http = _NetworkErrorHttp() + + # An image URL triggers the sampleImageMsg path first + with pytest.raises(httpx.ConnectError, match="Connection refused"): + await channel._send_media_ref("token", "user123", "https://example.com/photo.jpg") + + # Only one POST should have been attempted — no download/upload/fallback + assert len(channel._http.calls) == 1 + assert channel._http.calls[0]["method"] == "POST" + + +@pytest.mark.asyncio +async def test_send_media_ref_short_circuits_on_download_transport_error() -> None: + """When the image URL send returns an API error (False) but the download + for the fallback hits a transport error, it must re-raise rather than + silently returning False.""" + config = DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]) + channel = DingTalkChannel(config, MessageBus()) + + # First POST (sampleImageMsg) returns API error → False, then GET (download) raises transport error + class _MixedHttp: + def __init__(self) -> None: + self.calls: list[dict] = [] + + async def post(self, url, json=None, headers=None, **kwargs): + self.calls.append({"method": "POST", "url": url}) + # API-level failure: 200 with errcode != 0 + return _FakeResponse(200, {"errcode": 100}) + + async def get(self, url, **kwargs): + self.calls.append({"method": "GET", "url": url}) + raise httpx.ConnectError("Connection refused") + + channel._http = _MixedHttp() + + with pytest.raises(httpx.ConnectError, match="Connection refused"): + await channel._send_media_ref("token", "user123", "https://example.com/photo.jpg") + + # Should have attempted POST (image URL) and GET (download), but NOT upload + assert len(channel._http.calls) == 2 + assert channel._http.calls[0]["method"] == "POST" + assert channel._http.calls[1]["method"] == "GET" + + +@pytest.mark.asyncio +async def test_send_media_ref_short_circuits_on_upload_transport_error() -> None: + """When download succeeds but upload hits a transport error, must re-raise.""" + config = DingTalkConfig(client_id="app", client_secret="secret", allow_from=["*"]) + channel = DingTalkChannel(config, MessageBus()) + + image_bytes = b"\xff\xd8\xff\xe0" + b"\x00" * 100 # minimal JPEG-ish data + + class _UploadFailsHttp: + def __init__(self) -> None: + self.calls: list[dict] = [] + + async def post(self, url, json=None, headers=None, files=None, **kwargs): + self.calls.append({"method": "POST", "url": url}) + # If it's the upload endpoint, raise transport error + if "media/upload" in url: + raise httpx.ConnectError("Connection refused") + # Otherwise (sampleImageMsg), return API error to trigger fallback + return _FakeResponse(200, {"errcode": 100}) + + async def get(self, url, **kwargs): + self.calls.append({"method": "GET", "url": url}) + resp = _FakeResponse(200) + resp.content = image_bytes + resp.headers = {"content-type": "image/jpeg"} + return resp + + channel._http = _UploadFailsHttp() + + with pytest.raises(httpx.ConnectError, match="Connection refused"): + await channel._send_media_ref("token", "user123", "https://example.com/photo.jpg") + + # POST (image URL), GET (download), POST (upload) attempted — no further sends + methods = [c["method"] for c in channel._http.calls] + assert methods == ["POST", "GET", "POST"] diff --git a/tests/channels/test_discord_channel.py b/tests/channels/test_discord_channel.py index 3a31a5912..7a39bff2b 100644 --- a/tests/channels/test_discord_channel.py +++ b/tests/channels/test_discord_channel.py @@ -867,3 +867,100 @@ async def test_start_no_proxy_auth_when_only_password(monkeypatch) -> None: assert channel.is_running is False assert _FakeDiscordClient.instances[0].proxy == "http://127.0.0.1:7890" assert _FakeDiscordClient.instances[0].proxy_auth is None + + +# --------------------------------------------------------------------------- +# Tests for the send() exception propagation fix +# --------------------------------------------------------------------------- + +@pytest.mark.asyncio +async def test_send_re_raises_network_error() -> None: + """Network errors during send must propagate so ChannelManager can retry.""" + channel = DiscordChannel(DiscordConfig(enabled=True, allow_from=["*"]), MessageBus()) + client = _FakeDiscordClient(channel, intents=None) + channel._client = client + channel._running = True + + async def _failing_send_outbound(msg: OutboundMessage) -> None: + raise ConnectionError("network unreachable") + + client.send_outbound = _failing_send_outbound # type: ignore[method-assign] + + with pytest.raises(ConnectionError, match="network unreachable"): + await channel.send(OutboundMessage(channel="discord", chat_id="123", content="hello")) + + +@pytest.mark.asyncio +async def test_send_re_raises_generic_exception() -> None: + """Any exception from send_outbound must propagate, not be swallowed.""" + channel = DiscordChannel(DiscordConfig(enabled=True, allow_from=["*"]), MessageBus()) + client = _FakeDiscordClient(channel, intents=None) + channel._client = client + channel._running = True + + async def _failing_send_outbound(msg: OutboundMessage) -> None: + raise RuntimeError("discord API failure") + + client.send_outbound = _failing_send_outbound # type: ignore[method-assign] + + with pytest.raises(RuntimeError, match="discord API failure"): + await channel.send(OutboundMessage(channel="discord", chat_id="123", content="hello")) + + +@pytest.mark.asyncio +async def test_send_still_stops_typing_on_error() -> None: + """Typing cleanup must still run in the finally block even when send raises.""" + channel = DiscordChannel(DiscordConfig(enabled=True, allow_from=["*"]), MessageBus()) + client = _FakeDiscordClient(channel, intents=None) + channel._client = client + channel._running = True + + # Start a typing task so we can verify it gets cleaned up + start = asyncio.Event() + release = asyncio.Event() + + async def slow_typing() -> None: + start.set() + await release.wait() + + typing_channel = _FakeChannel(channel_id=123) + typing_channel.typing_enter_hook = slow_typing + await channel._start_typing(typing_channel) + await asyncio.wait_for(start.wait(), timeout=1.0) + + async def _failing_send_outbound(msg: OutboundMessage) -> None: + raise ConnectionError("timeout") + + client.send_outbound = _failing_send_outbound # type: ignore[method-assign] + + with pytest.raises(ConnectionError, match="timeout"): + await channel.send(OutboundMessage(channel="discord", chat_id="123", content="hello")) + + release.set() + await asyncio.sleep(0) + + # Typing should have been cleaned up by the finally block + assert channel._typing_tasks == {} + + +@pytest.mark.asyncio +async def test_send_succeeds_normally() -> None: + """Successful sends should work without raising.""" + channel = DiscordChannel(DiscordConfig(enabled=True, allow_from=["*"]), MessageBus()) + client = _FakeDiscordClient(channel, intents=None) + channel._client = client + channel._running = True + + sent_messages: list[OutboundMessage] = [] + + async def _capture_send_outbound(msg: OutboundMessage) -> None: + sent_messages.append(msg) + + client.send_outbound = _capture_send_outbound # type: ignore[method-assign] + + msg = OutboundMessage(channel="discord", chat_id="123", content="hello world") + await channel.send(msg) + + assert len(sent_messages) == 1 + assert sent_messages[0].content == "hello world" + assert sent_messages[0].chat_id == "123" diff --git a/tests/channels/test_qq_channel.py b/tests/channels/test_qq_channel.py index 729442a13..417648adf 100644 --- a/tests/channels/test_qq_channel.py +++ b/tests/channels/test_qq_channel.py @@ -1,6 +1,7 @@ import tempfile from pathlib import Path from types import SimpleNamespace +from unittest.mock import AsyncMock, patch import pytest @@ -14,6 +15,8 @@ except ImportError: if not QQ_AVAILABLE: pytest.skip("QQ dependencies not installed (qq-botpy)", allow_module_level=True) +import aiohttp + from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.qq import QQChannel, QQConfig @@ -170,3 +173,221 @@ async def test_read_media_bytes_missing_file() -> None: data, filename = await channel._read_media_bytes("/nonexistent/path/image.png") assert data is None assert filename is None + + +# ------------------------------------------------------- +# Tests for _send_media exception handling +# ------------------------------------------------------- + +def _make_channel_with_local_file(suffix: str = ".png", content: bytes = b"\x89PNG\r\n"): + """Create a QQChannel with a fake client and a temp file for media.""" + channel = QQChannel( + QQConfig(app_id="app", secret="secret", allow_from=["*"]), + MessageBus(), + ) + channel._client = _FakeClient() + channel._chat_type_cache["user1"] = "c2c" + + tmp = tempfile.NamedTemporaryFile(suffix=suffix, delete=False) + tmp.write(content) + tmp.close() + return channel, tmp.name + + +@pytest.mark.asyncio +async def test_send_media_network_error_propagates() -> None: + """aiohttp.ClientError (network/transport) should re-raise, not return False.""" + channel, tmp_path = _make_channel_with_local_file() + + # Make the base64 upload raise a network error + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=aiohttp.ServerDisconnectedError("connection lost"), + ) + + with pytest.raises(aiohttp.ServerDisconnectedError): + await channel._send_media( + chat_id="user1", + media_ref=tmp_path, + msg_id="msg1", + is_group=False, + ) + + +@pytest.mark.asyncio +async def test_send_media_client_connector_error_propagates() -> None: + """aiohttp.ClientConnectorError (DNS/connection refused) should re-raise.""" + channel, tmp_path = _make_channel_with_local_file() + + from aiohttp.client_reqrep import ConnectionKey + conn_key = ConnectionKey("api.qq.com", 443, True, None, None, None, None) + connector_error = aiohttp.ClientConnectorError( + connection_key=conn_key, + os_error=OSError("Connection refused"), + ) + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=connector_error, + ) + + with pytest.raises(aiohttp.ClientConnectorError): + await channel._send_media( + chat_id="user1", + media_ref=tmp_path, + msg_id="msg1", + is_group=False, + ) + + +@pytest.mark.asyncio +async def test_send_media_oserror_propagates() -> None: + """OSError (low-level I/O) should re-raise for retry.""" + channel, tmp_path = _make_channel_with_local_file() + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=OSError("Network is unreachable"), + ) + + with pytest.raises(OSError): + await channel._send_media( + chat_id="user1", + media_ref=tmp_path, + msg_id="msg1", + is_group=False, + ) + + +@pytest.mark.asyncio +async def test_send_media_api_error_returns_false() -> None: + """API-level errors (botpy RuntimeError subclasses) should return False, not raise.""" + channel, tmp_path = _make_channel_with_local_file() + + # Simulate a botpy API error (e.g. ServerError is a RuntimeError subclass) + from botpy.errors import ServerError + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=ServerError("internal server error"), + ) + + result = await channel._send_media( + chat_id="user1", + media_ref=tmp_path, + msg_id="msg1", + is_group=False, + ) + assert result is False + + +@pytest.mark.asyncio +async def test_send_media_generic_runtime_error_returns_false() -> None: + """Generic RuntimeError (not network) should return False.""" + channel, tmp_path = _make_channel_with_local_file() + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=RuntimeError("some API error"), + ) + + result = await channel._send_media( + chat_id="user1", + media_ref=tmp_path, + msg_id="msg1", + is_group=False, + ) + assert result is False + + +@pytest.mark.asyncio +async def test_send_media_value_error_returns_false() -> None: + """ValueError (bad API response data) should return False.""" + channel, tmp_path = _make_channel_with_local_file() + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=ValueError("bad response data"), + ) + + result = await channel._send_media( + chat_id="user1", + media_ref=tmp_path, + msg_id="msg1", + is_group=False, + ) + assert result is False + + +@pytest.mark.asyncio +async def test_send_media_timeout_error_propagates() -> None: + """asyncio.TimeoutError inherits from Exception but not ClientError/OSError. + However, aiohttp.ServerTimeoutError IS a ClientError subclass, so that propagates. + For a plain TimeoutError (which is also OSError in Python 3.11+), it should propagate.""" + channel, tmp_path = _make_channel_with_local_file() + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=aiohttp.ServerTimeoutError("request timed out"), + ) + + with pytest.raises(aiohttp.ServerTimeoutError): + await channel._send_media( + chat_id="user1", + media_ref=tmp_path, + msg_id="msg1", + is_group=False, + ) + + +@pytest.mark.asyncio +async def test_send_fallback_text_on_api_error() -> None: + """When _send_media returns False (API error), send() should emit fallback text.""" + channel, tmp_path = _make_channel_with_local_file() + + from botpy.errors import ServerError + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=ServerError("internal server error"), + ) + + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user1", + content="", + media=[tmp_path], + metadata={"message_id": "msg1"}, + ) + ) + + # Should have sent a fallback text message + assert len(channel._client.api.c2c_calls) == 1 + fallback_content = channel._client.api.c2c_calls[0]["content"] + assert "Attachment send failed" in fallback_content + + +@pytest.mark.asyncio +async def test_send_propagates_network_error_no_fallback() -> None: + """When _send_media raises a network error, send() should NOT silently fallback.""" + channel, tmp_path = _make_channel_with_local_file() + + channel._client.api._http = SimpleNamespace() + channel._client.api._http.request = AsyncMock( + side_effect=aiohttp.ServerDisconnectedError("connection lost"), + ) + + with pytest.raises(aiohttp.ServerDisconnectedError): + await channel.send( + OutboundMessage( + channel="qq", + chat_id="user1", + content="hello", + media=[tmp_path], + metadata={"message_id": "msg1"}, + ) + ) + + # No fallback text should have been sent + assert len(channel._client.api.c2c_calls) == 0 diff --git a/tests/channels/test_telegram_channel.py b/tests/channels/test_telegram_channel.py index 7dfb094f9..8d9431ba6 100644 --- a/tests/channels/test_telegram_channel.py +++ b/tests/channels/test_telegram_channel.py @@ -387,6 +387,84 @@ async def test_send_delta_stream_end_treats_not_modified_as_success() -> None: assert "123" not in channel._stream_bufs +@pytest.mark.asyncio +async def test_send_delta_stream_end_does_not_fallback_on_network_timeout() -> None: + """TimedOut during HTML edit should propagate, never fall back to plain text.""" + from telegram.error import TimedOut + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + # _call_with_retry retries TimedOut up to 3 times, so the mock will be called + # multiple times – but all calls must be with parse_mode="HTML" (no plain fallback). + channel._app.bot.edit_message_text = AsyncMock(side_effect=TimedOut("network timeout")) + channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0) + + with pytest.raises(TimedOut, match="network timeout"): + await channel.send_delta("123", "", {"_stream_end": True}) + + # Every call to edit_message_text must have used parse_mode="HTML" — + # no plain-text fallback call should have been made. + for call in channel._app.bot.edit_message_text.call_args_list: + assert call.kwargs.get("parse_mode") == "HTML" + # Buffer should still be present (not cleaned up on error) + assert "123" in channel._stream_bufs + + +@pytest.mark.asyncio +async def test_send_delta_stream_end_does_not_fallback_on_network_error() -> None: + """NetworkError during HTML edit should propagate, never fall back to plain text.""" + from telegram.error import NetworkError + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + channel._app.bot.edit_message_text = AsyncMock(side_effect=NetworkError("connection reset")) + channel._stream_bufs["123"] = _StreamBuf(text="hello", message_id=7, last_edit=0.0) + + with pytest.raises(NetworkError, match="connection reset"): + await channel.send_delta("123", "", {"_stream_end": True}) + + # Every call to edit_message_text must have used parse_mode="HTML" — + # no plain-text fallback call should have been made. + for call in channel._app.bot.edit_message_text.call_args_list: + assert call.kwargs.get("parse_mode") == "HTML" + # Buffer should still be present (not cleaned up on error) + assert "123" in channel._stream_bufs + + +@pytest.mark.asyncio +async def test_send_delta_stream_end_falls_back_on_bad_request() -> None: + """BadRequest (HTML parse error) should still trigger plain-text fallback.""" + from telegram.error import BadRequest + + channel = TelegramChannel( + TelegramConfig(enabled=True, token="123:abc", allow_from=["*"]), + MessageBus(), + ) + channel._app = _FakeApp(lambda: None) + + # First call (HTML) raises BadRequest, second call (plain) succeeds + channel._app.bot.edit_message_text = AsyncMock( + side_effect=[BadRequest("Can't parse entities"), None] + ) + channel._stream_bufs["123"] = _StreamBuf(text="hello ", message_id=7, last_edit=0.0) + + await channel.send_delta("123", "", {"_stream_end": True}) + + # edit_message_text should have been called twice: once for HTML, once for plain fallback + assert channel._app.bot.edit_message_text.call_count == 2 + # Second call should not use parse_mode="HTML" + second_call_kwargs = channel._app.bot.edit_message_text.call_args_list[1].kwargs + assert "parse_mode" not in second_call_kwargs or second_call_kwargs.get("parse_mode") is None + # Buffer should be cleaned up on success + assert "123" not in channel._stream_bufs + + @pytest.mark.asyncio async def test_send_delta_stream_end_splits_oversized_reply() -> None: """Final streamed reply exceeding Telegram limit is split into chunks.""" diff --git a/tests/channels/test_weixin_channel.py b/tests/channels/test_weixin_channel.py index 3a847411b..2b455fca6 100644 --- a/tests/channels/test_weixin_channel.py +++ b/tests/channels/test_weixin_channel.py @@ -1003,3 +1003,185 @@ async def test_download_media_item_non_image_requires_aes_key_even_with_full_url assert saved_path is None channel._client.get.assert_not_awaited() + + +# --------------------------------------------------------------------------- +# Tests for media-send error classification (network vs non-network errors) +# --------------------------------------------------------------------------- + + +def _make_outbound_msg(chat_id: str = "wx-user", content: str = "", media: list | None = None): + """Build a minimal OutboundMessage-like object for send() tests.""" + from nanobot.bus.events import OutboundMessage + + return OutboundMessage( + channel="weixin", + chat_id=chat_id, + content=content, + media=media or [], + metadata={}, + ) + + +@pytest.mark.asyncio +async def test_send_media_timeout_error_propagates_without_text_fallback() -> None: + """httpx.TimeoutException during media send must re-raise immediately, + NOT fall back to _send_text (which would also fail during network issues).""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel._send_media_file = AsyncMock(side_effect=httpx.TimeoutException("timed out")) + channel._send_text = AsyncMock() + + msg = _make_outbound_msg(chat_id="wx-user", media=["/tmp/photo.jpg"]) + + with pytest.raises(httpx.TimeoutException, match="timed out"): + await channel.send(msg) + + # _send_text must NOT have been called as a fallback + channel._send_text.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_send_media_transport_error_propagates_without_text_fallback() -> None: + """httpx.TransportError during media send must re-raise immediately.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel._send_media_file = AsyncMock( + side_effect=httpx.TransportError("connection reset") + ) + channel._send_text = AsyncMock() + + msg = _make_outbound_msg(chat_id="wx-user", media=["/tmp/photo.jpg"]) + + with pytest.raises(httpx.TransportError, match="connection reset"): + await channel.send(msg) + + channel._send_text.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_send_media_5xx_http_status_error_propagates_without_text_fallback() -> None: + """httpx.HTTPStatusError with a 5xx status must re-raise immediately.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + + fake_response = httpx.Response( + status_code=503, + request=httpx.Request("POST", "https://example.test/upload"), + ) + channel._send_media_file = AsyncMock( + side_effect=httpx.HTTPStatusError( + "Service Unavailable", request=fake_response.request, response=fake_response + ) + ) + channel._send_text = AsyncMock() + + msg = _make_outbound_msg(chat_id="wx-user", media=["/tmp/photo.jpg"]) + + with pytest.raises(httpx.HTTPStatusError, match="Service Unavailable"): + await channel.send(msg) + + channel._send_text.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_send_media_4xx_http_status_error_falls_back_to_text() -> None: + """httpx.HTTPStatusError with a 4xx status should fall back to text, not re-raise.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + + fake_response = httpx.Response( + status_code=400, + request=httpx.Request("POST", "https://example.test/upload"), + ) + channel._send_media_file = AsyncMock( + side_effect=httpx.HTTPStatusError( + "Bad Request", request=fake_response.request, response=fake_response + ) + ) + channel._send_text = AsyncMock() + + msg = _make_outbound_msg(chat_id="wx-user", media=["/tmp/photo.jpg"]) + + # Should NOT raise — 4xx is a client error, non-retryable + await channel.send(msg) + + # _send_text should have been called with the fallback message + channel._send_text.assert_awaited_once_with( + "wx-user", "[Failed to send: photo.jpg]", "ctx-1" + ) + + +@pytest.mark.asyncio +async def test_send_media_file_not_found_falls_back_to_text() -> None: + """FileNotFoundError (a non-network error) should fall back to text.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel._send_media_file = AsyncMock( + side_effect=FileNotFoundError("Media file not found: /tmp/missing.jpg") + ) + channel._send_text = AsyncMock() + + msg = _make_outbound_msg(chat_id="wx-user", media=["/tmp/missing.jpg"]) + + # Should NOT raise + await channel.send(msg) + + channel._send_text.assert_awaited_once_with( + "wx-user", "[Failed to send: missing.jpg]", "ctx-1" + ) + + +@pytest.mark.asyncio +async def test_send_media_value_error_falls_back_to_text() -> None: + """ValueError (e.g. unsupported format) should fall back to text.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel._send_media_file = AsyncMock( + side_effect=ValueError("Unsupported media format") + ) + channel._send_text = AsyncMock() + + msg = _make_outbound_msg(chat_id="wx-user", media=["/tmp/file.xyz"]) + + # Should NOT raise + await channel.send(msg) + + channel._send_text.assert_awaited_once_with( + "wx-user", "[Failed to send: file.xyz]", "ctx-1" + ) + + +@pytest.mark.asyncio +async def test_send_media_network_error_does_not_double_api_calls() -> None: + """During network issues, media send should make exactly 1 API call attempt, + not 2 (media + text fallback). Verify total call count.""" + channel, _bus = _make_channel() + channel._client = object() + channel._token = "token" + channel._context_tokens["wx-user"] = "ctx-1" + channel._send_media_file = AsyncMock( + side_effect=httpx.ConnectError("connection refused") + ) + channel._send_text = AsyncMock() + + msg = _make_outbound_msg(chat_id="wx-user", content="hello", media=["/tmp/img.png"]) + + with pytest.raises(httpx.ConnectError): + await channel.send(msg) + + # _send_media_file called once, _send_text never called + channel._send_media_file.assert_awaited_once() + channel._send_text.assert_not_awaited() From f879d81b28cea4bc7ff07ffc2db2362080138c71 Mon Sep 17 00:00:00 2001 From: bahtya Date: Sun, 12 Apr 2026 09:24:06 +0800 Subject: [PATCH 03/19] fix(channels/qq): propagate network errors in send() instead of swallowing The catch-all except Exception in QQ send() was swallowing aiohttp.ClientError and OSError that _send_media correctly re-raises. Add explicit catch for network errors before the generic handler. --- nanobot/channels/qq.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/nanobot/channels/qq.py b/nanobot/channels/qq.py index 96d9d5ecd..f109f6da6 100644 --- a/nanobot/channels/qq.py +++ b/nanobot/channels/qq.py @@ -280,6 +280,9 @@ class QQChannel(BaseChannel): msg_id=msg_id, content=msg.content.strip(), ) + except (aiohttp.ClientError, OSError): + # Network / transport errors — propagate so ChannelManager can retry + raise except Exception: logger.exception("Error sending QQ message to chat_id={}", msg.chat_id) From c68b3edb9d085e2c3f3b0018a41e3d61dc07cf26 Mon Sep 17 00:00:00 2001 From: haosenwang1018 Date: Sun, 12 Apr 2026 20:06:47 +0000 Subject: [PATCH 04/19] fix(provider): clarify local 502 recovery hints --- nanobot/providers/openai_compat_provider.py | 13 +++++++++++-- tests/providers/test_custom_provider.py | 14 ++++++++++++++ 2 files changed, 25 insertions(+), 2 deletions(-) diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index 101ee6c33..83fbd7fb3 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -798,8 +798,7 @@ class OpenAICompatProvider(LLMProvider): "error_should_retry": should_retry, } - @staticmethod - def _handle_error(e: Exception) -> LLMResponse: + def _handle_error(self, e: Exception) -> LLMResponse: body = ( getattr(e, "doc", None) or getattr(e, "body", None) @@ -807,6 +806,16 @@ class OpenAICompatProvider(LLMProvider): ) body_text = body if isinstance(body, str) else str(body) if body is not None else "" msg = f"Error: {body_text.strip()[:500]}" if body_text.strip() else f"Error calling LLM: {e}" + + spec = self._spec + text = f"{body_text} {e}".lower() + if spec and spec.is_local and ("502" in text or "connection" in text or "refused" in text): + msg += ( + "\nHint: this is a local model endpoint. Check that the local server is reachable at " + f"{self.api_base or spec.default_api_base}, and if you are using a proxy/tunnel, make sure it " + "can reach your local Ollama/vLLM service instead of routing localhost through the remote host." + ) + response = getattr(e, "response", None) retry_after = LLMProvider._extract_retry_after_from_headers(getattr(response, "headers", None)) if retry_after is None: diff --git a/tests/providers/test_custom_provider.py b/tests/providers/test_custom_provider.py index d2a9f4247..33c5d027a 100644 --- a/tests/providers/test_custom_provider.py +++ b/tests/providers/test_custom_provider.py @@ -4,6 +4,7 @@ from types import SimpleNamespace from unittest.mock import patch from nanobot.providers.openai_compat_provider import OpenAICompatProvider +from nanobot.providers.registry import find_by_name def test_custom_provider_parse_handles_empty_choices() -> None: @@ -53,3 +54,16 @@ def test_custom_provider_parse_chunks_accepts_plain_text_chunks() -> None: assert result.finish_reason == "stop" assert result.content == "hello world" + + +def test_local_provider_502_error_includes_reachability_hint() -> None: + spec = find_by_name("ollama") + with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): + provider = OpenAICompatProvider(api_base="http://localhost:11434/v1", spec=spec) + + result = provider._handle_error(Exception("Error code: 502")) + + assert result.finish_reason == "error" + assert "local model endpoint" in result.content + assert "http://localhost:11434/v1" in result.content + assert "proxy/tunnel" in result.content From 3573109408f2d9bb9cdda60938b12def6a53f0fb Mon Sep 17 00:00:00 2001 From: haosenwang1018 Date: Sun, 12 Apr 2026 20:53:18 +0000 Subject: [PATCH 05/19] fix(provider): preserve static error helper compatibility --- nanobot/providers/openai_compat_provider.py | 15 ++++++++++----- tests/providers/test_custom_provider.py | 6 +++++- 2 files changed, 15 insertions(+), 6 deletions(-) diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index 83fbd7fb3..4dea2d5fc 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -798,7 +798,13 @@ class OpenAICompatProvider(LLMProvider): "error_should_retry": should_retry, } - def _handle_error(self, e: Exception) -> LLMResponse: + @staticmethod + def _handle_error( + e: Exception, + *, + spec: ProviderSpec | None = None, + api_base: str | None = None, + ) -> LLMResponse: body = ( getattr(e, "doc", None) or getattr(e, "body", None) @@ -807,12 +813,11 @@ class OpenAICompatProvider(LLMProvider): body_text = body if isinstance(body, str) else str(body) if body is not None else "" msg = f"Error: {body_text.strip()[:500]}" if body_text.strip() else f"Error calling LLM: {e}" - spec = self._spec text = f"{body_text} {e}".lower() if spec and spec.is_local and ("502" in text or "connection" in text or "refused" in text): msg += ( "\nHint: this is a local model endpoint. Check that the local server is reachable at " - f"{self.api_base or spec.default_api_base}, and if you are using a proxy/tunnel, make sure it " + f"{api_base or spec.default_api_base}, and if you are using a proxy/tunnel, make sure it " "can reach your local Ollama/vLLM service instead of routing localhost through the remote host." ) @@ -859,7 +864,7 @@ class OpenAICompatProvider(LLMProvider): ) return self._parse(await self._client.chat.completions.create(**kwargs)) except Exception as e: - return self._handle_error(e) + return self._handle_error(e, spec=self._spec, api_base=self.api_base) async def chat_stream( self, @@ -942,7 +947,7 @@ class OpenAICompatProvider(LLMProvider): error_kind="timeout", ) except Exception as e: - return self._handle_error(e) + return self._handle_error(e, spec=self._spec, api_base=self.api_base) def get_default_model(self) -> str: return self.default_model diff --git a/tests/providers/test_custom_provider.py b/tests/providers/test_custom_provider.py index 33c5d027a..85314dc79 100644 --- a/tests/providers/test_custom_provider.py +++ b/tests/providers/test_custom_provider.py @@ -61,7 +61,11 @@ def test_local_provider_502_error_includes_reachability_hint() -> None: with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): provider = OpenAICompatProvider(api_base="http://localhost:11434/v1", spec=spec) - result = provider._handle_error(Exception("Error code: 502")) + result = provider._handle_error( + Exception("Error code: 502"), + spec=spec, + api_base="http://localhost:11434/v1", + ) assert result.finish_reason == "error" assert "local model endpoint" in result.content From 92ef594b6a196a8084187d163058b82250695e76 Mon Sep 17 00:00:00 2001 From: haosenwang1018 Date: Mon, 13 Apr 2026 01:07:08 +0000 Subject: [PATCH 06/19] fix(mcp): hint on stdio protocol pollution --- nanobot/agent/tools/mcp.py | 18 +++++++++++++++++- tests/tools/test_mcp_tool.py | 27 +++++++++++++++++++++++++++ 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/nanobot/agent/tools/mcp.py b/nanobot/agent/tools/mcp.py index 1b5a71322..2aea19279 100644 --- a/nanobot/agent/tools/mcp.py +++ b/nanobot/agent/tools/mcp.py @@ -454,7 +454,23 @@ async def connect_mcp_servers( return name, server_stack except Exception as e: - logger.error("MCP server '{}': failed to connect: {}", name, e) + hint = "" + text = str(e).lower() + if any( + marker in text + for marker in ( + "parse error", + "invalid json", + "unexpected token", + "jsonrpc", + "content-length", + ) + ): + hint = ( + " Hint: this looks like stdio protocol pollution. Make sure the MCP server writes " + "only JSON-RPC to stdout and sends logs/debug output to stderr instead." + ) + logger.error("MCP server '{}': failed to connect: {}{}", name, e, hint) try: await server_stack.aclose() except Exception: diff --git a/tests/tools/test_mcp_tool.py b/tests/tools/test_mcp_tool.py index da90c4d0d..a133f53db 100644 --- a/tests/tools/test_mcp_tool.py +++ b/tests/tools/test_mcp_tool.py @@ -356,6 +356,33 @@ async def test_connect_mcp_servers_enabled_tools_warns_on_unknown_entries( assert "Available wrapped names: mcp_test_demo" in warnings[-1] +@pytest.mark.asyncio +async def test_connect_mcp_servers_logs_stdio_pollution_hint( + monkeypatch: pytest.MonkeyPatch, +) -> None: + messages: list[str] = [] + + def _error(message: str, *args: object) -> None: + messages.append(message.format(*args)) + + @asynccontextmanager + async def _broken_stdio_client(_params: object): + raise RuntimeError("Parse error: Unexpected token 'INFO' before JSON-RPC headers") + yield # pragma: no cover + + monkeypatch.setattr(sys.modules["mcp.client.stdio"], "stdio_client", _broken_stdio_client) + monkeypatch.setattr("nanobot.agent.tools.mcp.logger.error", _error) + + registry = ToolRegistry() + stacks = await connect_mcp_servers({"gh": MCPServerConfig(command="github-mcp")}, registry) + + assert stacks == {} + assert messages + assert "stdio protocol pollution" in messages[-1] + assert "stdout" in messages[-1] + assert "stderr" in messages[-1] + + @pytest.mark.asyncio async def test_connect_mcp_servers_one_failure_does_not_block_others( monkeypatch: pytest.MonkeyPatch, From 830644c35292402befa22a4fe01cfb2845c36805 Mon Sep 17 00:00:00 2001 From: ramonpaolo Date: Sun, 12 Apr 2026 20:56:36 -0300 Subject: [PATCH 07/19] fix: add guard for non-dict tool call parameters - Add type validation in registry.prepare_call() to catch list/other invalid params - Add logger.warning() in provider layer when non-dict args detected - Works for OpenAI-compatible and Anthropic providers - Registry returns clear error hint for model to self-correct --- nanobot/agent/tools/registry.py | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/nanobot/agent/tools/registry.py b/nanobot/agent/tools/registry.py index 99d3ec63a..137038c0c 100644 --- a/nanobot/agent/tools/registry.py +++ b/nanobot/agent/tools/registry.py @@ -68,6 +68,13 @@ class ToolRegistry: params: dict[str, Any], ) -> tuple[Tool | None, dict[str, Any], str | None]: """Resolve, cast, and validate one tool call.""" + # Guard against invalid parameter types (e.g., list instead of dict) + if not isinstance(params, dict) and name in ('write_file', 'read_file'): + return None, params, ( + f"Error: Tool '{name}' parameters must be a JSON object, got {type(params).__name__}. " + "Use named parameters: tool_name(param1=\"value1\", param2=\"value2\")" + ) + tool = self._tools.get(name) if not tool: return None, params, ( From 49355b2bd6025a44f2e8328c8956ac47be7c0e8b Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Mon, 13 Apr 2026 01:53:18 +0000 Subject: [PATCH 08/19] test(tools): lock non-object parameter validation Add focused registry coverage so the new read_file/read_write parameter guard stays actionable without changing generic validation behavior for other tools. Made-with: Cursor --- tests/tools/test_tool_registry.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/tests/tools/test_tool_registry.py b/tests/tools/test_tool_registry.py index 5b259119e..f9e8ce5e1 100644 --- a/tests/tools/test_tool_registry.py +++ b/tests/tools/test_tool_registry.py @@ -47,3 +47,27 @@ def test_get_definitions_orders_builtins_then_mcp_tools() -> None: "mcp_fs_list", "mcp_git_status", ] + + +def test_prepare_call_read_file_rejects_non_object_params_with_actionable_hint() -> None: + registry = ToolRegistry() + registry.register(_FakeTool("read_file")) + + tool, params, error = registry.prepare_call("read_file", ["foo.txt"]) + + assert tool is None + assert params == ["foo.txt"] + assert error is not None + assert "must be a JSON object" in error + assert "Use named parameters" in error + + +def test_prepare_call_other_tools_keep_generic_object_validation() -> None: + registry = ToolRegistry() + registry.register(_FakeTool("grep")) + + tool, params, error = registry.prepare_call("grep", ["TODO"]) + + assert tool is not None + assert params == ["TODO"] + assert error == "Error: Invalid parameters for tool 'grep': parameters must be an object, got list" From ea94a9c088bb12275fa67e8567ec145ab7231454 Mon Sep 17 00:00:00 2001 From: nikube Date: Sun, 12 Apr 2026 20:57:11 +0000 Subject: [PATCH 09/19] fix(agent): persist user message before running turn loop The existing runtime_checkpoint mechanism preserves the in-flight assistant/tool state if the process dies mid-turn, but the triggering user message is only written to session history at the end of the turn via _save_turn(). If the worker is killed (OOM, SIGKILL, a self- triggered systemctl restart, container eviction, etc.) before the turn completes, the user's message is silently lost: on restart, the session log only shows the interrupted assistant turn without any record of what the user asked. Any recovery tooling built on top of session logs cannot reply because it has no prompt to reply to. This patch appends the incoming user message to the session and flushes it to disk immediately after the session is loaded and before the agent loop runs, then adjusts the _save_turn skip offset so the final persistence step does not duplicate it. Limited to textual content (isinstance(msg.content, str)); list-shaped content (media blocks) still flows through _save_turn's sanitization at end of turn, preserving existing behavior for those cases. --- nanobot/agent/loop.py | 21 ++++++++++++++++++++- 1 file changed, 20 insertions(+), 1 deletion(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 5631e12a0..8bc65b7d3 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -693,6 +693,23 @@ class AgentLoop: ) ) + # Persist the triggering user message immediately, before running the + # agent loop. If the process is killed mid-turn (OOM, SIGKILL, self- + # restart, etc.), the existing runtime_checkpoint preserves the + # in-flight assistant/tool state but NOT the user message itself, so + # the user's prompt is silently lost on recovery. Saving it up front + # makes recovery possible from the session log alone. + user_persisted_early = False + if isinstance(msg.content, str) and msg.content.strip(): + from datetime import datetime as _dt + session.messages.append({ + "role": "user", + "content": msg.content, + "timestamp": _dt.now().isoformat(), + }) + self.sessions.save(session) + user_persisted_early = True + final_content, _, all_msgs, stop_reason, had_injections = await self._run_agent_loop( initial_messages, on_progress=on_progress or _bus_progress, @@ -708,7 +725,9 @@ class AgentLoop: if final_content is None or not final_content.strip(): final_content = EMPTY_FINAL_RESPONSE_MESSAGE - self._save_turn(session, all_msgs, 1 + len(history)) + # Skip the already-persisted user message when saving the turn + save_skip = 1 + len(history) + (1 if user_persisted_early else 0) + self._save_turn(session, all_msgs, save_skip) self._clear_runtime_checkpoint(session) self.sessions.save(session) self._schedule_background(self.consolidator.maybe_consolidate_by_tokens(session)) From b964a894d216c8c716647491fe0e63328bc93a70 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Mon, 13 Apr 2026 02:09:40 +0000 Subject: [PATCH 10/19] test(agent): cover early user-message persistence Use session.add_message for the pre-turn user-message flush and add focused regression tests for crash-time persistence and duplicate-free successful saves. Made-with: Cursor --- nanobot/agent/loop.py | 7 +--- tests/agent/test_loop_save_turn.py | 62 ++++++++++++++++++++++++++++++ 2 files changed, 63 insertions(+), 6 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 8bc65b7d3..96b5b30c6 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -701,12 +701,7 @@ class AgentLoop: # makes recovery possible from the session log alone. user_persisted_early = False if isinstance(msg.content, str) and msg.content.strip(): - from datetime import datetime as _dt - session.messages.append({ - "role": "user", - "content": msg.content, - "timestamp": _dt.now().isoformat(), - }) + session.add_message("user", msg.content) self.sessions.save(session) user_persisted_early = True diff --git a/tests/agent/test_loop_save_turn.py b/tests/agent/test_loop_save_turn.py index 8a0b54b86..c499282ab 100644 --- a/tests/agent/test_loop_save_turn.py +++ b/tests/agent/test_loop_save_turn.py @@ -1,5 +1,12 @@ +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + from nanobot.agent.context import ContextBuilder from nanobot.agent.loop import AgentLoop +from nanobot.bus.events import InboundMessage +from nanobot.bus.queue import MessageBus from nanobot.session.manager import Session @@ -11,6 +18,12 @@ def _mk_loop() -> AgentLoop: return loop +def _make_full_loop(tmp_path: Path) -> AgentLoop: + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + return AgentLoop(bus=MessageBus(), provider=provider, workspace=tmp_path, model="test-model") + + def test_save_turn_skips_multimodal_user_when_only_runtime_context() -> None: loop = _mk_loop() session = Session(key="test:runtime-only") @@ -200,3 +213,52 @@ def test_restore_runtime_checkpoint_dedupes_overlapping_tail() -> None: assert session.messages[0]["role"] == "assistant" assert session.messages[1]["tool_call_id"] == "call_done" assert session.messages[2]["tool_call_id"] == "call_pending" + + +@pytest.mark.asyncio +async def test_process_message_persists_user_message_before_turn_completes(tmp_path: Path) -> None: + loop = _make_full_loop(tmp_path) + loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign] + loop._run_agent_loop = AsyncMock(side_effect=RuntimeError("boom")) # type: ignore[method-assign] + + msg = InboundMessage(channel="feishu", sender_id="u1", chat_id="c1", content="persist me") + with pytest.raises(RuntimeError, match="boom"): + await loop._process_message(msg) + + loop.sessions.invalidate("feishu:c1") + persisted = loop.sessions.get_or_create("feishu:c1") + assert [m["role"] for m in persisted.messages] == ["user"] + assert persisted.messages[0]["content"] == "persist me" + assert persisted.updated_at >= persisted.created_at + + +@pytest.mark.asyncio +async def test_process_message_does_not_duplicate_early_persisted_user_message(tmp_path: Path) -> None: + loop = _make_full_loop(tmp_path) + loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign] + loop._run_agent_loop = AsyncMock(return_value=( + "done", + None, + [ + {"role": "system", "content": "system"}, + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "done"}, + ], + "stop", + False, + )) # type: ignore[method-assign] + + result = await loop._process_message( + InboundMessage(channel="feishu", sender_id="u1", chat_id="c2", content="hello") + ) + + assert result is not None + assert result.content == "done" + session = loop.sessions.get_or_create("feishu:c2") + assert [ + {k: v for k, v in m.items() if k in {"role", "content"}} + for m in session.messages + ] == [ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "done"}, + ] From 6484c7c47a74b157432b8e1e3b866fe3ad4711d7 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Mon, 13 Apr 2026 02:21:39 +0000 Subject: [PATCH 11/19] fix(agent): close interrupted early-persisted user turns Track text-only user messages that were flushed before the turn loop completes, then materialize an interrupted assistant placeholder on the next request so session history stays legal and later turns do not skip their own assistant reply. Made-with: Cursor --- nanobot/agent/loop.py | 34 ++++++++++++++++++++++ tests/agent/test_loop_save_turn.py | 46 ++++++++++++++++++++++++++++++ 2 files changed, 80 insertions(+) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 96b5b30c6..0031c90c5 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -129,6 +129,7 @@ class AgentLoop: """ _RUNTIME_CHECKPOINT_KEY = "runtime_checkpoint" + _PENDING_USER_TURN_KEY = "pending_user_turn" def __init__( self, @@ -618,6 +619,8 @@ class AgentLoop: session = self.sessions.get_or_create(key) if self._restore_runtime_checkpoint(session): self.sessions.save(session) + if self._restore_pending_user_turn(session): + self.sessions.save(session) session, pending = self.auto_compact.prepare_session(session, key) @@ -653,6 +656,8 @@ class AgentLoop: session = self.sessions.get_or_create(key) if self._restore_runtime_checkpoint(session): self.sessions.save(session) + if self._restore_pending_user_turn(session): + self.sessions.save(session) session, pending = self.auto_compact.prepare_session(session, key) @@ -702,6 +707,7 @@ class AgentLoop: user_persisted_early = False if isinstance(msg.content, str) and msg.content.strip(): session.add_message("user", msg.content) + self._mark_pending_user_turn(session) self.sessions.save(session) user_persisted_early = True @@ -723,6 +729,7 @@ class AgentLoop: # Skip the already-persisted user message when saving the turn save_skip = 1 + len(history) + (1 if user_persisted_early else 0) self._save_turn(session, all_msgs, save_skip) + self._clear_pending_user_turn(session) self._clear_runtime_checkpoint(session) self.sessions.save(session) self._schedule_background(self.consolidator.maybe_consolidate_by_tokens(session)) @@ -840,6 +847,12 @@ class AgentLoop: session.metadata[self._RUNTIME_CHECKPOINT_KEY] = payload self.sessions.save(session) + def _mark_pending_user_turn(self, session: Session) -> None: + session.metadata[self._PENDING_USER_TURN_KEY] = True + + def _clear_pending_user_turn(self, session: Session) -> None: + session.metadata.pop(self._PENDING_USER_TURN_KEY, None) + def _clear_runtime_checkpoint(self, session: Session) -> None: if self._RUNTIME_CHECKPOINT_KEY in session.metadata: session.metadata.pop(self._RUNTIME_CHECKPOINT_KEY, None) @@ -906,9 +919,30 @@ class AgentLoop: break session.messages.extend(restored_messages[overlap:]) + self._clear_pending_user_turn(session) self._clear_runtime_checkpoint(session) return True + def _restore_pending_user_turn(self, session: Session) -> bool: + """Close a turn that only persisted the user message before crashing.""" + from datetime import datetime + + if not session.metadata.get(self._PENDING_USER_TURN_KEY): + return False + + if session.messages and session.messages[-1].get("role") == "user": + session.messages.append( + { + "role": "assistant", + "content": "Error: Task interrupted before a response was generated.", + "timestamp": datetime.now().isoformat(), + } + ) + session.updated_at = datetime.now() + + self._clear_pending_user_turn(session) + return True + async def process_direct( self, content: str, diff --git a/tests/agent/test_loop_save_turn.py b/tests/agent/test_loop_save_turn.py index c499282ab..c965ccd8c 100644 --- a/tests/agent/test_loop_save_turn.py +++ b/tests/agent/test_loop_save_turn.py @@ -229,6 +229,7 @@ async def test_process_message_persists_user_message_before_turn_completes(tmp_p persisted = loop.sessions.get_or_create("feishu:c1") assert [m["role"] for m in persisted.messages] == ["user"] assert persisted.messages[0]["content"] == "persist me" + assert persisted.metadata.get(AgentLoop._PENDING_USER_TURN_KEY) is True assert persisted.updated_at >= persisted.created_at @@ -262,3 +263,48 @@ async def test_process_message_does_not_duplicate_early_persisted_user_message(t {"role": "user", "content": "hello"}, {"role": "assistant", "content": "done"}, ] + assert AgentLoop._PENDING_USER_TURN_KEY not in session.metadata + + +@pytest.mark.asyncio +async def test_next_turn_after_crash_closes_pending_user_turn_before_new_input(tmp_path: Path) -> None: + loop = _make_full_loop(tmp_path) + loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign] + loop.provider.chat_with_retry = AsyncMock(return_value=MagicMock()) # unused because _run_agent_loop is stubbed + + session = loop.sessions.get_or_create("feishu:c3") + session.add_message("user", "old question") + session.metadata[AgentLoop._PENDING_USER_TURN_KEY] = True + loop.sessions.save(session) + + loop._run_agent_loop = AsyncMock(return_value=( + "new answer", + None, + [ + {"role": "system", "content": "system"}, + {"role": "user", "content": "old question"}, + {"role": "assistant", "content": "Error: Task interrupted before a response was generated."}, + {"role": "user", "content": "new question"}, + {"role": "assistant", "content": "new answer"}, + ], + "stop", + False, + )) # type: ignore[method-assign] + + result = await loop._process_message( + InboundMessage(channel="feishu", sender_id="u1", chat_id="c3", content="new question") + ) + + assert result is not None + assert result.content == "new answer" + session = loop.sessions.get_or_create("feishu:c3") + assert [ + {k: v for k, v in m.items() if k in {"role", "content"}} + for m in session.messages + ] == [ + {"role": "user", "content": "old question"}, + {"role": "assistant", "content": "Error: Task interrupted before a response was generated."}, + {"role": "user", "content": "new question"}, + {"role": "assistant", "content": "new answer"}, + ] + assert AgentLoop._PENDING_USER_TURN_KEY not in session.metadata From becaff3e9d9710fe5a4d8d23d4cb8c64d46ef431 Mon Sep 17 00:00:00 2001 From: chengyongru Date: Mon, 13 Apr 2026 11:27:16 +0800 Subject: [PATCH 12/19] fix(agent): skip auto-compact for sessions with active agent tasks Prevent proactive compaction from archiving sessions that have an in-flight agent task, avoiding mid-turn context truncation when a task runs longer than the idle TTL. --- nanobot/agent/autocompact.py | 17 ++++-- nanobot/agent/loop.py | 5 +- tests/agent/test_auto_compact.py | 100 ++++++++++++++++++++++++++++++- 3 files changed, 115 insertions(+), 7 deletions(-) diff --git a/nanobot/agent/autocompact.py b/nanobot/agent/autocompact.py index 47c7b5a36..ce70337cd 100644 --- a/nanobot/agent/autocompact.py +++ b/nanobot/agent/autocompact.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Collection from datetime import datetime from typing import TYPE_CHECKING, Any, Callable, Coroutine @@ -23,12 +24,13 @@ class AutoCompact: self._archiving: set[str] = set() self._summaries: dict[str, tuple[str, datetime]] = {} - def _is_expired(self, ts: datetime | str | None) -> bool: + def _is_expired(self, ts: datetime | str | None, + now: datetime | None = None) -> bool: if self._ttl <= 0 or not ts: return False if isinstance(ts, str): ts = datetime.fromisoformat(ts) - return (datetime.now() - ts).total_seconds() >= self._ttl * 60 + return ((now or datetime.now()) - ts).total_seconds() >= self._ttl * 60 @staticmethod def _format_summary(text: str, last_active: datetime) -> str: @@ -56,10 +58,17 @@ class AutoCompact: cut = len(tail) - len(kept) return tail[:cut], kept - def check_expired(self, schedule_background: Callable[[Coroutine], None]) -> None: + def check_expired(self, schedule_background: Callable[[Coroutine], None], + active_session_keys: Collection[str] = ()) -> None: + """Schedule archival for idle sessions, skipping those with in-flight agent tasks.""" + now = datetime.now() for info in self.sessions.list_sessions(): key = info.get("key", "") - if key and key not in self._archiving and self._is_expired(info.get("updated_at")): + if not key or key in self._archiving: + continue + if key in active_session_keys: + continue + if self._is_expired(info.get("updated_at"), now): self._archiving.add(key) logger.debug("Auto-compact: scheduling archival for {} (idle > {} min)", key, self._ttl) schedule_background(self._archive(key)) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 0031c90c5..39e1ce23a 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -434,7 +434,10 @@ class AgentLoop: try: msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: - self.auto_compact.check_expired(self._schedule_background) + self.auto_compact.check_expired( + self._schedule_background, + active_session_keys=self._pending_queues.keys(), + ) continue except asyncio.CancelledError: # Preserve real task cancellation so shutdown can complete cleanly. diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py index b3462820b..1f6886ed0 100644 --- a/tests/agent/test_auto_compact.py +++ b/tests/agent/test_auto_compact.py @@ -560,9 +560,12 @@ class TestProactiveAutoCompact: """Test proactive auto-new on idle ticks (TimeoutError path in run loop).""" @staticmethod - async def _run_check_expired(loop): + async def _run_check_expired(loop, active_session_keys=()): """Helper: run check_expired via callback and wait for background tasks.""" - loop.auto_compact.check_expired(loop._schedule_background) + loop.auto_compact.check_expired( + loop._schedule_background, + active_session_keys=active_session_keys, + ) await asyncio.sleep(0.1) @pytest.mark.asyncio @@ -701,6 +704,99 @@ class TestProactiveAutoCompact: assert not archive_called await loop.close_mcp() + @pytest.mark.asyncio + async def test_skip_expired_session_with_active_agent_task(self, tmp_path): + """Expired session with an active agent task should NOT be archived.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + _add_turns(session, 6, prefix="old") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archive_count = 0 + + async def _fake_archive(messages): + nonlocal archive_count + archive_count += 1 + return "Summary." + + loop.consolidator.archive = _fake_archive + + # Simulate an active agent task for this session + await self._run_check_expired(loop, active_session_keys={"cli:test"}) + assert archive_count == 0 + + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 12 # All messages preserved + + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_archive_after_active_task_completes(self, tmp_path): + """Session should be archived on next tick after active task completes.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + _add_turns(session, 6, prefix="old") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archive_count = 0 + + async def _fake_archive(messages): + nonlocal archive_count + archive_count += 1 + return "Summary." + + loop.consolidator.archive = _fake_archive + + # First tick: active task, skip + await self._run_check_expired(loop, active_session_keys={"cli:test"}) + assert archive_count == 0 + + # Second tick: task completed, should archive + await self._run_check_expired(loop) + assert archive_count == 1 + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_partial_active_set_only_archives_inactive_expired(self, tmp_path): + """With multiple sessions, only the expired+inactive one should be archived.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + # Session A: expired, no active task -> should be archived + s1 = loop.sessions.get_or_create("cli:expired_idle") + _add_turns(s1, 6, prefix="old_a") + s1.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(s1) + # Session B: expired, has active task -> should be skipped + s2 = loop.sessions.get_or_create("cli:expired_active") + _add_turns(s2, 6, prefix="old_b") + s2.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(s2) + # Session C: recent, no active task -> should be skipped + s3 = loop.sessions.get_or_create("cli:recent") + s3.add_message("user", "recent") + loop.sessions.save(s3) + + archive_count = 0 + + async def _fake_archive(messages): + nonlocal archive_count + archive_count += 1 + return "Summary." + + loop.consolidator.archive = _fake_archive + + await self._run_check_expired(loop, active_session_keys={"cli:expired_active"}) + + assert archive_count == 1 + s1_after = loop.sessions.get_or_create("cli:expired_idle") + assert len(s1_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES + s2_after = loop.sessions.get_or_create("cli:expired_active") + assert len(s2_after.messages) == 12 # Preserved + s3_after = loop.sessions.get_or_create("cli:recent") + assert len(s3_after.messages) == 1 # Preserved + await loop.close_mcp() + @pytest.mark.asyncio async def test_no_reschedule_after_successful_archive(self, tmp_path): """Already-archived session should NOT be re-scheduled on subsequent ticks.""" From ac714803f67171ad5787142b575d90b8b28bfbf0 Mon Sep 17 00:00:00 2001 From: chengyongru Date: Mon, 13 Apr 2026 11:30:54 +0800 Subject: [PATCH 13/19] fix(provider): recover trailing assistant message as user to prevent empty request When a subagent result is injected with current_role="assistant", _enforce_role_alternation drops the trailing assistant message, leaving only the system prompt. Providers like Zhipu/GLM reject such requests with error 1214 ("messages parameter invalid"). Now the last popped assistant message is recovered as a user message when no user/tool messages remain. --- nanobot/providers/base.py | 16 +++++++- .../test_enforce_role_alternation.py | 41 +++++++++++++++++++ 2 files changed, 56 insertions(+), 1 deletion(-) diff --git a/nanobot/providers/base.py b/nanobot/providers/base.py index 8ce2b9a7a..759d880a8 100644 --- a/nanobot/providers/base.py +++ b/nanobot/providers/base.py @@ -392,8 +392,22 @@ class LLMProvider(ABC): else: merged.append(dict(msg)) + last_popped = None while merged and merged[-1].get("role") == "assistant": - merged.pop() + last_popped = merged.pop() + + # If removing trailing assistant messages left only system messages, + # the request would be invalid for most providers (e.g. Zhipu/GLM + # error 1214). Recover by converting the last popped assistant + # message to a user message so the LLM can still see the content. + if ( + merged + and last_popped is not None + and not any(m.get("role") in ("user", "tool") for m in merged) + ): + recovered = dict(last_popped) + recovered["role"] = "user" + merged.append(recovered) return merged diff --git a/tests/providers/test_enforce_role_alternation.py b/tests/providers/test_enforce_role_alternation.py index aef57f474..333c5d04e 100644 --- a/tests/providers/test_enforce_role_alternation.py +++ b/tests/providers/test_enforce_role_alternation.py @@ -131,6 +131,47 @@ class TestEnforceRoleAlternation: assert msgs[0] == original_first assert len(msgs) == 2 + def test_trailing_assistant_recovered_as_user_when_only_system_remains(self): + """Subagent result injected as assistant message must not be silently dropped. + + When build_messages(current_role="assistant") produces [system, assistant], + _enforce_role_alternation would drop the assistant, leaving only [system]. + Most providers (e.g. Zhipu/GLM error 1214) reject such requests. + The trailing assistant should be recovered as a user message instead. + """ + msgs = [ + {"role": "system", "content": "You are helpful."}, + {"role": "assistant", "content": "Subagent completed successfully."}, + ] + result = LLMProvider._enforce_role_alternation(msgs) + assert len(result) == 2 + assert result[0]["role"] == "system" + assert result[1]["role"] == "user" + assert "Subagent completed successfully." in result[1]["content"] + + def test_trailing_assistant_not_recovered_when_user_message_present(self): + """Recovery should NOT happen when a user message already exists.""" + msgs = [ + {"role": "system", "content": "You are helpful."}, + {"role": "user", "content": "Hi"}, + {"role": "assistant", "content": "Hello!"}, + ] + result = LLMProvider._enforce_role_alternation(msgs) + assert len(result) == 2 + assert result[-1]["role"] == "user" + + def test_trailing_assistant_recovered_with_tool_result_preceding(self): + """When only [system, tool, assistant] remains, recovery is not needed + because tool messages are valid non-system content.""" + msgs = [ + {"role": "system", "content": "You are helpful."}, + {"role": "tool", "content": "result", "tool_call_id": "1"}, + {"role": "assistant", "content": "Done."}, + ] + result = LLMProvider._enforce_role_alternation(msgs) + assert len(result) == 2 + assert result[-1]["role"] == "tool" + def test_only_assistant_messages(self): msgs = [ {"role": "assistant", "content": "A"}, From 85c7996766d19f1a709325cb601dc3a41d4b2a87 Mon Sep 17 00:00:00 2001 From: haosenwang1018 Date: Mon, 13 Apr 2026 04:12:52 +0000 Subject: [PATCH 14/19] docs(api): clarify cross-channel message delivery --- README.md | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/README.md b/README.md index b376d0991..ebe88938d 100644 --- a/README.md +++ b/README.md @@ -1858,6 +1858,19 @@ By default, the API binds to `127.0.0.1:8900`. You can change this in `config.js - Single-message input: each request must contain exactly one `user` message - Fixed model: omit `model`, or pass the same model shown by `/v1/models` - No streaming: `stream=true` is not supported +- API requests run in the synthetic `api` channel, so the `message` tool does **not** automatically deliver to Telegram/Discord/etc. To proactively send to another chat, call `message` with an explicit `channel` and `chat_id` for an enabled channel. + +Example tool call for cross-channel delivery from an API session: + +```json +{ + "content": "Build finished successfully.", + "channel": "telegram", + "chat_id": "123456789" +} +``` + +If `channel` points to a channel that is not enabled in your config, nanobot will queue the outbound event but no platform delivery will occur. ### Endpoints From d33bf22e91bbc703be0075570261ea229804a7a9 Mon Sep 17 00:00:00 2001 From: haosenwang1018 Date: Mon, 13 Apr 2026 04:27:00 +0000 Subject: [PATCH 15/19] docs(provider): clarify responses api routing --- README.md | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/README.md b/README.md index ebe88938d..f593e26ec 100644 --- a/README.md +++ b/README.md @@ -1053,6 +1053,30 @@ Connects directly to any OpenAI-compatible endpoint — LM Studio, llama.cpp, To ``` > For local servers that don't require a key, set `apiKey` to any non-empty string (e.g. `"no-key"`). +> +> `custom` is the right choice for providers that expose an OpenAI-compatible **chat completions** API. It does **not** force third-party endpoints onto the OpenAI/Azure **Responses API**. +> +> If your proxy or gateway is specifically Responses-API-compatible, use the `azure_openai` provider shape instead and point `apiBase` at that endpoint: +> +> ```json +> { +> "providers": { +> "azure_openai": { +> "apiKey": "your-api-key", +> "apiBase": "https://api.your-provider.com", +> "defaultModel": "your-model-name" +> } +> }, +> "agents": { +> "defaults": { +> "provider": "azure_openai", +> "model": "your-model-name" +> } +> } +> } +> ``` +> +> In short: **chat-completions-compatible endpoint → `custom`**; **Responses-compatible endpoint → `azure_openai`**. From 3c06db7e4e7338c4944ef96f37dd35c9fe0349d6 Mon Sep 17 00:00:00 2001 From: chengyongru Date: Mon, 13 Apr 2026 16:03:15 +0800 Subject: [PATCH 16/19] fix(log): remove noisy no-op logs from auto-compact Remove two debug log lines that fire on every idle channel check: - "scheduling archival" (logged before knowing if there's work) - "skipping, no un-consolidated messages" (the common no-op path) The meaningful "archived" info log (only on real work) is preserved. --- nanobot/agent/autocompact.py | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/nanobot/agent/autocompact.py b/nanobot/agent/autocompact.py index ce70337cd..eabd86155 100644 --- a/nanobot/agent/autocompact.py +++ b/nanobot/agent/autocompact.py @@ -70,7 +70,6 @@ class AutoCompact: continue if self._is_expired(info.get("updated_at"), now): self._archiving.add(key) - logger.debug("Auto-compact: scheduling archival for {} (idle > {} min)", key, self._ttl) schedule_background(self._archive(key)) async def _archive(self, key: str) -> None: @@ -79,7 +78,6 @@ class AutoCompact: session = self.sessions.get_or_create(key) archive_msgs, kept_msgs = self._split_unconsolidated(session) if not archive_msgs and not kept_msgs: - logger.debug("Auto-compact: skipping {}, no un-consolidated messages", key) session.updated_at = datetime.now() self.sessions.save(session) return @@ -95,13 +93,14 @@ class AutoCompact: session.last_consolidated = 0 session.updated_at = datetime.now() self.sessions.save(session) - logger.info( - "Auto-compact: archived {} (archived={}, kept={}, summary={})", - key, - len(archive_msgs), - len(kept_msgs), - bool(summary), - ) + if archive_msgs: + logger.info( + "Auto-compact: archived {} (archived={}, kept={}, summary={})", + key, + len(archive_msgs), + len(kept_msgs), + bool(summary), + ) except Exception: logger.exception("Auto-compact: failed for {}", key) finally: From d849a3fa060825ff02b73f74dc14d3ab97735b33 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Mon, 13 Apr 2026 23:33:25 +0800 Subject: [PATCH 17/19] fix(agent): drain injection queue on error/edge-case exit paths When the agent runner exits due to LLM error, tool error, empty response, or max_iterations, it breaks out of the iteration loop without draining the pending injection queue. This causes leftover messages to be re-published as independent inbound messages, resulting in duplicate or confusing replies to the user. Extract the injection drain logic into a `_try_drain_injections` helper and call it before each break in the error/edge-case paths. If injections are found, continue the loop instead of breaking. For max_iterations (where the loop is exhausted), drain injections to prevent re-publish without continuing. --- nanobot/agent/runner.py | 107 ++++++++++++----- tests/agent/test_runner.py | 233 +++++++++++++++++++++++++++++++++++++ 2 files changed, 314 insertions(+), 26 deletions(-) diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index e92d864f2..5cb7b4f0e 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -134,6 +134,36 @@ class AgentRunner: continue messages.append(injection) + async def _try_drain_injections( + self, + spec: AgentRunSpec, + messages: list[dict[str, Any]], + assistant_message: dict[str, Any] | None, + injection_cycles: int, + *, + phase: str = "after error", + ) -> tuple[bool, int]: + """Drain pending injections. Returns (should_continue, updated_cycles). + + If injections are found and we haven't exceeded _MAX_INJECTION_CYCLES, + append them to *messages* and return (True, cycles+1) so the caller + continues the iteration loop. Otherwise return (False, cycles). + """ + if injection_cycles >= _MAX_INJECTION_CYCLES: + return False, injection_cycles + injections = await self._drain_injections(spec) + if not injections: + return False, injection_cycles + injection_cycles += 1 + if assistant_message is not None: + messages.append(assistant_message) + self._append_injected_messages(messages, injections) + logger.info( + "Injected {} follow-up message(s) {} ({}/{})", + len(injections), phase, injection_cycles, _MAX_INJECTION_CYCLES, + ) + return True, injection_cycles + async def _drain_injections(self, spec: AgentRunSpec) -> list[dict[str, Any]]: """Drain pending user messages via the injection callback. @@ -287,6 +317,13 @@ class AgentRunner: context.error = error context.stop_reason = stop_reason await hook.after_iteration(context) + should_continue, injection_cycles = await self._try_drain_injections( + spec, messages, None, injection_cycles, + phase="after tool error", + ) + if should_continue: + had_injections = True + continue break await self._emit_checkpoint( spec, @@ -379,36 +416,31 @@ class AgentRunner: # Check for mid-turn injections BEFORE signaling stream end. # If injections are found we keep the stream alive (resuming=True) # so streaming channels don't prematurely finalize the card. - _injected_after_final = False - if injection_cycles < _MAX_INJECTION_CYCLES: - injections = await self._drain_injections(spec) - if injections: - had_injections = True - injection_cycles += 1 - _injected_after_final = True - if assistant_message is not None: - messages.append(assistant_message) - await self._emit_checkpoint( - spec, - { - "phase": "final_response", - "iteration": iteration, - "model": spec.model, - "assistant_message": assistant_message, - "completed_tool_results": [], - "pending_tool_calls": [], - }, - ) - self._append_injected_messages(messages, injections) - logger.info( - "Injected {} follow-up message(s) after final response ({}/{})", - len(injections), injection_cycles, _MAX_INJECTION_CYCLES, + should_continue, injection_cycles = await self._try_drain_injections( + spec, messages, assistant_message, injection_cycles, + phase="after final response", + ) + if should_continue: + had_injections = True + # Emit checkpoint for the assistant message that was appended + # by _try_drain_injections, then keep the stream alive. + if assistant_message is not None: + await self._emit_checkpoint( + spec, + { + "phase": "final_response", + "iteration": iteration, + "model": spec.model, + "assistant_message": assistant_message, + "completed_tool_results": [], + "pending_tool_calls": [], + }, ) if hook.wants_streaming(): - await hook.on_stream_end(context, resuming=_injected_after_final) + await hook.on_stream_end(context, resuming=should_continue) - if _injected_after_final: + if should_continue: await hook.after_iteration(context) continue @@ -421,6 +453,13 @@ class AgentRunner: context.error = error context.stop_reason = stop_reason await hook.after_iteration(context) + should_continue, injection_cycles = await self._try_drain_injections( + spec, messages, None, injection_cycles, + phase="after LLM error", + ) + if should_continue: + had_injections = True + continue break if is_blank_text(clean): final_content = EMPTY_FINAL_RESPONSE_MESSAGE @@ -431,6 +470,13 @@ class AgentRunner: context.error = error context.stop_reason = stop_reason await hook.after_iteration(context) + should_continue, injection_cycles = await self._try_drain_injections( + spec, messages, None, injection_cycles, + phase="after empty response", + ) + if should_continue: + had_injections = True + continue break messages.append(assistant_message or build_assistant_message( @@ -467,6 +513,15 @@ class AgentRunner: max_iterations=spec.max_iterations, ) self._append_final_message(messages, final_content) + # Drain any remaining injections so they are appended to the + # conversation history instead of being re-published as + # independent inbound messages by _dispatch's finally block. + # We ignore should_continue here because the for-loop has already + # exhausted all iterations. + _, injection_cycles = await self._try_drain_injections( + spec, messages, None, injection_cycles, + phase="after max_iterations", + ) return AgentRunResult( final_content=final_content, diff --git a/tests/agent/test_runner.py b/tests/agent/test_runner.py index a62457aa8..4a943165c 100644 --- a/tests/agent/test_runner.py +++ b/tests/agent/test_runner.py @@ -2410,3 +2410,236 @@ async def test_dispatch_republishes_leftover_queue_messages(tmp_path): contents = [m.content for m in msgs] assert "leftover-1" in contents assert "leftover-2" in contents + + +@pytest.mark.asyncio +async def test_drain_injections_on_fatal_tool_error(): + """Pending injections should be drained even when a fatal tool error occurs.""" + from nanobot.agent.runner import AgentRunSpec, AgentRunner + from nanobot.bus.events import InboundMessage + + provider = MagicMock() + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return LLMResponse( + content="", + tool_calls=[ToolCallRequest(id="c1", name="exec", arguments={"cmd": "bad"})], + usage={}, + ) + # Second call: respond normally to the injected follow-up + return LLMResponse(content="reply to follow-up", tool_calls=[], usage={}) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock(side_effect=RuntimeError("tool exploded")) + + injection_queue = asyncio.Queue() + + async def inject_cb(): + items = [] + while not injection_queue.empty(): + items.append(await injection_queue.get()) + return items + + await injection_queue.put( + InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after error") + ) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "hello"}], + tools=tools, + model="test-model", + max_iterations=5, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + fail_on_tool_error=True, + injection_callback=inject_cb, + )) + + assert result.had_injections is True + assert result.final_content == "reply to follow-up" + # The injection should be in the messages history + injected = [ + m for m in result.messages + if m.get("role") == "user" and m.get("content") == "follow-up after error" + ] + assert len(injected) == 1 + + +@pytest.mark.asyncio +async def test_drain_injections_on_llm_error(): + """Pending injections should be drained when the LLM returns an error finish_reason.""" + from nanobot.agent.runner import AgentRunSpec, AgentRunner + from nanobot.bus.events import InboundMessage + + provider = MagicMock() + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return LLMResponse( + content=None, + tool_calls=[], + finish_reason="error", + usage={}, + ) + # Second call: respond normally to the injected follow-up + return LLMResponse(content="recovered answer", tool_calls=[], usage={}) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + + injection_queue = asyncio.Queue() + + async def inject_cb(): + items = [] + while not injection_queue.empty(): + items.append(await injection_queue.get()) + return items + + await injection_queue.put( + InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after LLM error") + ) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "previous response"}, + {"role": "user", "content": "trigger error"}, + ], + tools=tools, + model="test-model", + max_iterations=5, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + injection_callback=inject_cb, + )) + + assert result.had_injections is True + assert result.final_content == "recovered answer" + injected = [ + m for m in result.messages + if m.get("role") == "user" and "follow-up after LLM error" in str(m.get("content", "")) + ] + assert len(injected) == 1 + + +@pytest.mark.asyncio +async def test_drain_injections_on_empty_final_response(): + """Pending injections should be drained when the runner exits due to empty response.""" + from nanobot.agent.runner import AgentRunSpec, AgentRunner, _MAX_EMPTY_RETRIES + from nanobot.bus.events import InboundMessage + + provider = MagicMock() + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + if call_count["n"] <= _MAX_EMPTY_RETRIES + 1: + return LLMResponse(content="", tool_calls=[], usage={}) + # After retries exhausted + injection drain, respond normally + return LLMResponse(content="answer after empty", tool_calls=[], usage={}) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + + injection_queue = asyncio.Queue() + + async def inject_cb(): + items = [] + while not injection_queue.empty(): + items.append(await injection_queue.get()) + return items + + await injection_queue.put( + InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after empty") + ) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "previous response"}, + {"role": "user", "content": "trigger empty"}, + ], + tools=tools, + model="test-model", + max_iterations=10, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + injection_callback=inject_cb, + )) + + assert result.had_injections is True + assert result.final_content == "answer after empty" + injected = [ + m for m in result.messages + if m.get("role") == "user" and "follow-up after empty" in str(m.get("content", "")) + ] + assert len(injected) == 1 + + +@pytest.mark.asyncio +async def test_drain_injections_on_max_iterations(): + """Pending injections should be drained when the runner hits max_iterations. + + Unlike other error paths, max_iterations cannot continue the loop, so + injections are appended to messages but not processed by the LLM. + The key point is they are consumed from the queue to prevent re-publish. + """ + from nanobot.agent.runner import AgentRunSpec, AgentRunner + from nanobot.bus.events import InboundMessage + + provider = MagicMock() + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + return LLMResponse( + content="", + tool_calls=[ToolCallRequest(id=f"c{call_count['n']}", name="read_file", arguments={"path": "x"})], + usage={}, + ) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock(return_value="file content") + + injection_queue = asyncio.Queue() + + async def inject_cb(): + items = [] + while not injection_queue.empty(): + items.append(await injection_queue.get()) + return items + + await injection_queue.put( + InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after max iters") + ) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "hello"}], + tools=tools, + model="test-model", + max_iterations=2, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + injection_callback=inject_cb, + )) + + assert result.stop_reason == "max_iterations" + # The injection was consumed from the queue (preventing re-publish) + assert injection_queue.empty() + # The injection message is appended to conversation history + injected = [ + m for m in result.messages + if m.get("role") == "user" and m.get("content") == "follow-up after max iters" + ] + assert len(injected) == 1 From a1e1eed2f13c19e9dcec2fa912103dc967b8daec Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Mon, 13 Apr 2026 23:51:23 +0800 Subject: [PATCH 18/19] refactor(runner): consolidate all injection drain paths and deduplicate tests - Migrate "after tools" inline drain to use _try_drain_injections, completing the refactoring (all 6 drain sites now use the helper). - Move checkpoint emission into _try_drain_injections via optional iteration parameter, eliminating the leaky split between helper and caller for the final-response path. - Extract _make_injection_callback() test helper to replace 7 identical inject_cb function bodies. - Add test_injection_cycle_cap_on_error_path to verify the cycle cap is enforced on error exit paths. --- nanobot/agent/runner.py | 49 ++++++++--------- tests/agent/test_runner.py | 109 +++++++++++++++++++++++-------------- 2 files changed, 90 insertions(+), 68 deletions(-) diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index 5cb7b4f0e..20226aed6 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -142,12 +142,14 @@ class AgentRunner: injection_cycles: int, *, phase: str = "after error", + iteration: int | None = None, ) -> tuple[bool, int]: """Drain pending injections. Returns (should_continue, updated_cycles). If injections are found and we haven't exceeded _MAX_INJECTION_CYCLES, - append them to *messages* and return (True, cycles+1) so the caller - continues the iteration loop. Otherwise return (False, cycles). + append them to *messages* (and emit a checkpoint if *assistant_message* + and *iteration* are both provided) and return (True, cycles+1) so the + caller continues the iteration loop. Otherwise return (False, cycles). """ if injection_cycles >= _MAX_INJECTION_CYCLES: return False, injection_cycles @@ -157,6 +159,18 @@ class AgentRunner: injection_cycles += 1 if assistant_message is not None: messages.append(assistant_message) + if iteration is not None: + await self._emit_checkpoint( + spec, + { + "phase": "final_response", + "iteration": iteration, + "model": spec.model, + "assistant_message": assistant_message, + "completed_tool_results": [], + "pending_tool_calls": [], + }, + ) self._append_injected_messages(messages, injections) logger.info( "Injected {} follow-up message(s) {} ({}/{})", @@ -339,16 +353,12 @@ class AgentRunner: empty_content_retries = 0 length_recovery_count = 0 # Checkpoint 1: drain injections after tools, before next LLM call - if injection_cycles < _MAX_INJECTION_CYCLES: - injections = await self._drain_injections(spec) - if injections: - had_injections = True - injection_cycles += 1 - self._append_injected_messages(messages, injections) - logger.info( - "Injected {} follow-up message(s) after tool execution ({}/{})", - len(injections), injection_cycles, _MAX_INJECTION_CYCLES, - ) + _drained, injection_cycles = await self._try_drain_injections( + spec, messages, None, injection_cycles, + phase="after tool execution", + ) + if _drained: + had_injections = True await hook.after_iteration(context) continue @@ -419,23 +429,10 @@ class AgentRunner: should_continue, injection_cycles = await self._try_drain_injections( spec, messages, assistant_message, injection_cycles, phase="after final response", + iteration=iteration, ) if should_continue: had_injections = True - # Emit checkpoint for the assistant message that was appended - # by _try_drain_injections, then keep the stream alive. - if assistant_message is not None: - await self._emit_checkpoint( - spec, - { - "phase": "final_response", - "iteration": iteration, - "model": spec.model, - "assistant_message": assistant_message, - "completed_tool_results": [], - "pending_tool_calls": [], - }, - ) if hook.wants_streaming(): await hook.on_stream_end(context, resuming=should_continue) diff --git a/tests/agent/test_runner.py b/tests/agent/test_runner.py index 4a943165c..53cd07e88 100644 --- a/tests/agent/test_runner.py +++ b/tests/agent/test_runner.py @@ -18,6 +18,16 @@ from nanobot.providers.base import LLMResponse, ToolCallRequest _MAX_TOOL_RESULT_CHARS = AgentDefaults().max_tool_result_chars +def _make_injection_callback(queue: asyncio.Queue): + """Return an async callback that drains *queue* into a list of dicts.""" + async def inject_cb(): + items = [] + while not queue.empty(): + items.append(await queue.get()) + return items + return inject_cb + + def _make_loop(tmp_path): from nanobot.agent.loop import AgentLoop from nanobot.bus.queue import MessageBus @@ -1888,12 +1898,7 @@ async def test_checkpoint1_injects_after_tool_execution(): tools.execute = AsyncMock(return_value="file content") injection_queue = asyncio.Queue() - - async def inject_cb(): - items = [] - while not injection_queue.empty(): - items.append(await injection_queue.get()) - return items + inject_cb = _make_injection_callback(injection_queue) # Put a follow-up message in the queue before the run starts await injection_queue.put( @@ -1951,12 +1956,7 @@ async def test_checkpoint2_injects_after_final_response_with_resuming_stream(): tools.get_definitions.return_value = [] injection_queue = asyncio.Queue() - - async def inject_cb(): - items = [] - while not injection_queue.empty(): - items.append(await injection_queue.get()) - return items + inject_cb = _make_injection_callback(injection_queue) # Inject a follow-up that arrives during the first response await injection_queue.put( @@ -2005,12 +2005,7 @@ async def test_checkpoint2_preserves_final_response_in_history_before_followup() tools.get_definitions.return_value = [] injection_queue = asyncio.Queue() - - async def inject_cb(): - items = [] - while not injection_queue.empty(): - items.append(await injection_queue.get()) - return items + inject_cb = _make_injection_callback(injection_queue) await injection_queue.put( InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up question") @@ -2438,12 +2433,7 @@ async def test_drain_injections_on_fatal_tool_error(): tools.execute = AsyncMock(side_effect=RuntimeError("tool exploded")) injection_queue = asyncio.Queue() - - async def inject_cb(): - items = [] - while not injection_queue.empty(): - items.append(await injection_queue.get()) - return items + inject_cb = _make_injection_callback(injection_queue) await injection_queue.put( InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after error") @@ -2496,12 +2486,7 @@ async def test_drain_injections_on_llm_error(): tools.get_definitions.return_value = [] injection_queue = asyncio.Queue() - - async def inject_cb(): - items = [] - while not injection_queue.empty(): - items.append(await injection_queue.get()) - return items + inject_cb = _make_injection_callback(injection_queue) await injection_queue.put( InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after LLM error") @@ -2551,12 +2536,7 @@ async def test_drain_injections_on_empty_final_response(): tools.get_definitions.return_value = [] injection_queue = asyncio.Queue() - - async def inject_cb(): - items = [] - while not injection_queue.empty(): - items.append(await injection_queue.get()) - return items + inject_cb = _make_injection_callback(injection_queue) await injection_queue.put( InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after empty") @@ -2613,12 +2593,7 @@ async def test_drain_injections_on_max_iterations(): tools.execute = AsyncMock(return_value="file content") injection_queue = asyncio.Queue() - - async def inject_cb(): - items = [] - while not injection_queue.empty(): - items.append(await injection_queue.get()) - return items + inject_cb = _make_injection_callback(injection_queue) await injection_queue.put( InboundMessage(channel="cli", sender_id="u", chat_id="c", content="follow-up after max iters") @@ -2643,3 +2618,53 @@ async def test_drain_injections_on_max_iterations(): if m.get("role") == "user" and m.get("content") == "follow-up after max iters" ] assert len(injected) == 1 + + +@pytest.mark.asyncio +async def test_injection_cycle_cap_on_error_path(): + """Injection cycles should be capped even when every iteration hits an LLM error.""" + from nanobot.agent.runner import AgentRunSpec, AgentRunner, _MAX_INJECTION_CYCLES + from nanobot.bus.events import InboundMessage + + provider = MagicMock() + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + return LLMResponse( + content=None, + tool_calls=[], + finish_reason="error", + usage={}, + ) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + + drain_count = {"n": 0} + + async def inject_cb(): + drain_count["n"] += 1 + if drain_count["n"] <= _MAX_INJECTION_CYCLES: + return [InboundMessage(channel="cli", sender_id="u", chat_id="c", content=f"msg-{drain_count['n']}")] + return [] + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[ + {"role": "user", "content": "hello"}, + {"role": "assistant", "content": "previous"}, + {"role": "user", "content": "trigger error"}, + ], + tools=tools, + model="test-model", + max_iterations=20, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + injection_callback=inject_cb, + )) + + assert result.had_injections is True + # Should cap: _MAX_INJECTION_CYCLES drained rounds + 1 final round that breaks + assert call_count["n"] == _MAX_INJECTION_CYCLES + 1 + assert drain_count["n"] == _MAX_INJECTION_CYCLES From a38bc637bdaaf1ce3e3090ba2d32afdbf79029f5 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Mon, 13 Apr 2026 16:28:35 +0000 Subject: [PATCH 19/19] fix(runner): preserve injection flag after max-iteration drain Keep late follow-up injections observable when they are drained during max-iteration shutdown so loop-level response suppression still makes the right decision. Made-with: Cursor --- nanobot/agent/runner.py | 4 ++- tests/agent/test_runner.py | 64 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index 20226aed6..592af9de2 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -515,10 +515,12 @@ class AgentRunner: # independent inbound messages by _dispatch's finally block. # We ignore should_continue here because the for-loop has already # exhausted all iterations. - _, injection_cycles = await self._try_drain_injections( + drained_after_max_iterations, injection_cycles = await self._try_drain_injections( spec, messages, None, injection_cycles, phase="after max_iterations", ) + if drained_after_max_iterations: + had_injections = True return AgentRunResult( final_content=final_content, diff --git a/tests/agent/test_runner.py b/tests/agent/test_runner.py index 53cd07e88..74025d779 100644 --- a/tests/agent/test_runner.py +++ b/tests/agent/test_runner.py @@ -2610,6 +2610,7 @@ async def test_drain_injections_on_max_iterations(): )) assert result.stop_reason == "max_iterations" + assert result.had_injections is True # The injection was consumed from the queue (preventing re-publish) assert injection_queue.empty() # The injection message is appended to conversation history @@ -2620,6 +2621,69 @@ async def test_drain_injections_on_max_iterations(): assert len(injected) == 1 +@pytest.mark.asyncio +async def test_drain_injections_set_flag_when_followup_arrives_after_last_iteration(): + """Late follow-ups drained in max_iterations should still flip had_injections.""" + from nanobot.agent.hook import AgentHook + from nanobot.agent.runner import AgentRunSpec, AgentRunner + from nanobot.bus.events import InboundMessage + + provider = MagicMock() + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + return LLMResponse( + content="", + tool_calls=[ToolCallRequest(id=f"c{call_count['n']}", name="read_file", arguments={"path": "x"})], + usage={}, + ) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock(return_value="file content") + + injection_queue = asyncio.Queue() + inject_cb = _make_injection_callback(injection_queue) + + class InjectOnLastAfterIterationHook(AgentHook): + def __init__(self) -> None: + self.after_iteration_calls = 0 + + async def after_iteration(self, context) -> None: + self.after_iteration_calls += 1 + if self.after_iteration_calls == 2: + await injection_queue.put( + InboundMessage( + channel="cli", + sender_id="u", + chat_id="c", + content="late follow-up after max iters", + ) + ) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "hello"}], + tools=tools, + model="test-model", + max_iterations=2, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + injection_callback=inject_cb, + hook=InjectOnLastAfterIterationHook(), + )) + + assert result.stop_reason == "max_iterations" + assert result.had_injections is True + assert injection_queue.empty() + injected = [ + m for m in result.messages + if m.get("role") == "user" and m.get("content") == "late follow-up after max iters" + ] + assert len(injected) == 1 + + @pytest.mark.asyncio async def test_injection_cycle_cap_on_error_path(): """Injection cycles should be capped even when every iteration hits an LLM error."""