diff --git a/docs/configuration.md b/docs/configuration.md index 5cfdcda4d..0e4ab2bca 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1268,7 +1268,7 @@ Inline fallback object: Use inline objects only when a fallback is not worth naming as a reusable preset. `fallbackModels` belongs under `agents.defaults`, not inside individual `modelPresets` entries. -Failover only runs when the primary provider returns a retryable model/provider error before any answer text has been streamed. Typical fallback cases include timeouts, connection errors, 5xx server errors, 429 rate limits, overloads, and quota/balance exhaustion. It does not run for malformed requests, authentication/permission errors, content filtering/refusals, or context-length/message-format errors. +Failover normally runs when the primary provider returns a retryable model/provider error before any answer text has been streamed. Stream-stall timeouts are the recovery exception: if the provider already emitted partial answer text and then stalls, nanobot closes the current stream segment and retries/fails over in a new segment. Typical fallback cases include timeouts, connection errors, 5xx server errors, 429 rate limits, overloads, and quota/balance exhaustion. It does not run for malformed requests, authentication/permission errors, content filtering/refusals, or context-length/message-format errors. If fallback candidates use smaller `contextWindowTokens` values, nanobot builds context using the smallest window in the active chain so every candidate can receive the same prompt. @@ -1727,6 +1727,7 @@ For API keys, tokens, and other secrets, see [Environment Variables for Secrets] | `tools.exec.sandbox` | `""` | Sandbox backend for shell commands. Set to `"bwrap"` to wrap exec calls in a [bubblewrap](https://github.com/containers/bubblewrap) sandbox — the process can only see the workspace (read-write) and media directory (read-only); config files and API keys are hidden. Automatically enables `restrictToWorkspace` for file tools. **Linux only** — requires `bwrap` installed (`apt install bubblewrap`; pre-installed in the Docker image). Not available on macOS or Windows (bwrap depends on Linux kernel namespaces). | | `tools.exec.enable` | `true` | When `false`, the shell `exec` tool is not registered at all. Use this to completely disable shell command execution. | | `tools.exec.timeout` | `60` | Default hard timeout in seconds for shell commands. Config values may exceed the per-call tool cap; set `0` to disable the hard timeout for trusted long-running commands. | +| `tools.exec.pathPrepend` | `""` | Extra directories to prepend to `PATH` when running shell commands. Use this when configured tools should win executable lookup precedence, such as a Python virtual environment's `bin` or `Scripts` directory. | | `tools.exec.pathAppend` | `""` | Extra directories to append to `PATH` when running shell commands (e.g. `/usr/sbin` for `ufw`). | | `tools.ssrfWhitelist` | `[]` | CIDR ranges exempted from the shared SSRF guard used by web fetches and HTTP/SSE MCP connections. Prefer exact host CIDRs such as `192.168.1.50/32`; broad ranges increase SSRF exposure. | | `channels.*.allowFrom` | omitted | Access control per channel. Omit to use pairing-only mode; set `["*"]` to allow everyone; or list specific user IDs. See [Pairing](#pairing) for details. | diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index d89f0c927..a81b973e9 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -70,6 +70,8 @@ class ContextBuilder: session_summary: str | None = None, workspace: Path | None = None, include_memory_recent_history: bool = True, + session_key: str | None = None, + unified_session: bool = False, ) -> str: """Build the system prompt from identity, bootstrap files, memory, and skills.""" root = workspace or self.workspace @@ -96,7 +98,11 @@ class ContextBuilder: parts.append(render_template("agent/skills_section.md", skills_summary=skills_summary)) if include_memory_recent_history: - entries = self.memory.read_unprocessed_history(since_cursor=self.memory.get_last_dream_cursor()) + entries = self.memory.read_recent_history_for_prompt( + since_cursor=self.memory.get_last_dream_cursor(), + session_key=session_key, + unified_session=unified_session, + ) if entries: capped = entries[-self._MAX_RECENT_HISTORY:] history_text = "\n".join( @@ -196,6 +202,8 @@ class ContextBuilder: inbound_message: Any | None = None, skip_runtime_lines: bool = False, include_memory_recent_history: bool = True, + session_key: str | None = None, + unified_session: bool = False, ) -> list[dict[str, Any]]: """Build the complete message list for an LLM call.""" root = workspace or self.workspace @@ -232,6 +240,8 @@ class ContextBuilder: session_summary=session_summary, workspace=root, include_memory_recent_history=include_memory_recent_history, + session_key=session_key, + unified_session=unified_session, ), }, *history, diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index b1bde811c..3431237fa 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -9,6 +9,7 @@ import time from contextlib import AsyncExitStack, nullcontext, suppress from dataclasses import dataclass, field from enum import Enum, auto +from functools import partial from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable @@ -314,6 +315,7 @@ class AgentLoop: get_tool_definitions=self.tools.get_definitions, max_completion_tokens=provider.generation.max_tokens, consolidation_ratio=consolidation_ratio, + unified_session=unified_session, ) self.auto_compact = AutoCompact( sessions=self.sessions, @@ -610,6 +612,8 @@ class AgentLoop: runtime_state=self, inbound_message=msg, include_memory_recent_history=include_memory_recent_history, + session_key=session.key, + unified_session=self._unified_session, ) async def _dispatch_command_inline( @@ -1150,6 +1154,8 @@ class AgentLoop: runtime_state=self, inbound_message=msg, skip_runtime_lines=is_subagent, + session_key=key, + unified_session=self._unified_session, ) t_wall = time.time() final_content, _, all_msgs, stop_reason, _ = await self._run_agent_loop( @@ -1163,7 +1169,9 @@ class AgentLoop: latency_ms = max(0, int((wall_done - t_wall) * 1000)) self._save_turn(session, all_msgs, 1 + len(history), turn_latency_ms=latency_ms) self._runtime_events().record_turn_latency(key, latency_ms) - session.enforce_file_cap(on_archive=self.context.memory.raw_archive) + session.enforce_file_cap( + on_archive=partial(self.context.memory.raw_archive, session_key=key) + ) self._clear_runtime_checkpoint(session) self.sessions.save(session) self._schedule_background( @@ -1487,7 +1495,9 @@ class AgentLoop: ctx.turn_latency_ms, ) if not ctx.ephemeral: - ctx.session.enforce_file_cap(on_archive=self.context.memory.raw_archive) + ctx.session.enforce_file_cap( + on_archive=partial(self.context.memory.raw_archive, session_key=ctx.session_key) + ) self._schedule_background( self.consolidator.maybe_consolidate_by_tokens( ctx.session, diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 5aedb511a..9ba60bb31 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -41,6 +41,8 @@ class MemoryStore: """Pure file I/O for memory files: MEMORY.md, history.jsonl, SOUL.md, USER.md.""" _DEFAULT_MAX_HISTORY = 1000 + _INTERNAL_HISTORY_SESSION_PREFIXES = ("cron:", "dream:") + _INTERNAL_HISTORY_SESSION_KEYS = {"heartbeat"} _LEGACY_ENTRY_START_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2}[^\]]*)\]\s*") _LEGACY_TIMESTAMP_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2})\]\s*") _LEGACY_RAW_MESSAGE_RE = re.compile( @@ -232,7 +234,13 @@ class MemoryStore: # -- history.jsonl — append-only, JSONL format --------------------------- - def append_history(self, entry: str, *, max_chars: int | None = None) -> int: + def append_history( + self, + entry: str, + *, + max_chars: int | None = None, + session_key: str | None = None, + ) -> int: """Append *entry* to history.jsonl and return its auto-incrementing cursor. Entries are passed through `strip_think` to drop template-level leaks @@ -272,6 +280,8 @@ class MemoryStore: cursor, ) record = {"cursor": cursor, "timestamp": ts, "content": content} + if session_key: + record["session_key"] = session_key with open(self.history_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") self._cursor_file.write_text(str(cursor), encoding="utf-8") @@ -322,6 +332,36 @@ class MemoryStore: """Return history entries with a valid cursor > *since_cursor*.""" return [e for e, c in self._iter_valid_entries() if c > since_cursor] + @classmethod + def _is_internal_history_session(cls, session_key: str | None) -> bool: + if not session_key: + return False + return ( + session_key in cls._INTERNAL_HISTORY_SESSION_KEYS + or session_key.startswith(cls._INTERNAL_HISTORY_SESSION_PREFIXES) + ) + + def read_recent_history_for_prompt( + self, + since_cursor: int, + *, + session_key: str | None, + unified_session: bool = False, + ) -> list[dict[str, Any]]: + """Return unprocessed history entries safe to inject into a turn prompt.""" + entries = self.read_unprocessed_history(since_cursor=since_cursor) + if session_key is None: + return entries + if not unified_session: + return [e for e in entries if e.get("session_key") == session_key] + + return [ + entry + for entry in entries + if (entry_session := entry.get("session_key")) == session_key + or not self._is_internal_history_session(entry_session) + ] + def compact_history(self) -> None: """Drop oldest entries if the file exceeds *max_history_entries*.""" if self.max_history_entries <= 0: @@ -489,13 +529,20 @@ class MemoryStore: ) return "\n".join(lines) - def raw_archive(self, messages: list[dict], *, max_chars: int | None = None) -> None: + def raw_archive( + self, + messages: list[dict], + *, + max_chars: int | None = None, + session_key: str | None = None, + ) -> None: """Fallback: dump raw messages to history.jsonl without LLM summarization.""" limit = max_chars if max_chars is not None else _RAW_ARCHIVE_MAX_CHARS formatted = truncate_text(self._format_messages(messages), limit) self.append_history( f"[RAW] {len(messages)} messages\n" - f"{formatted}" + f"{formatted}", + session_key=session_key, ) logger.warning( "Memory consolidation degraded: raw-archived {} messages", len(messages) @@ -570,6 +617,7 @@ class Consolidator: get_tool_definitions: Callable[[], list[dict[str, Any]]], max_completion_tokens: int = 4096, consolidation_ratio: float = 0.5, + unified_session: bool = False, ): self.store = store self.provider = provider @@ -578,6 +626,7 @@ class Consolidator: self.context_window_tokens = context_window_tokens self.max_completion_tokens = max_completion_tokens self.consolidation_ratio = consolidation_ratio + self.unified_session = unified_session self._build_messages = build_messages self._get_tool_definitions = get_tool_definitions self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = ( @@ -685,7 +734,7 @@ class Consolidator: len(chunk), replay_max_messages, ) - summary = await self.archive(chunk) + summary = await self.archive(chunk, session_key=session.key) session.last_consolidated = end_idx self.sessions.save(session) return summary @@ -716,6 +765,8 @@ class Consolidator: sender_id=None, session_summary=summary, session_metadata=session.metadata, + session_key=session.key, + unified_session=self.unified_session, ) return estimate_prompt_tokens_chain( self.provider, @@ -743,7 +794,12 @@ class Consolidator: except Exception: return truncate_text(text, budget * 4) - async def archive(self, messages: list[dict]) -> str | None: + async def archive( + self, + messages: list[dict], + *, + session_key: str | None = None, + ) -> str | None: """Summarize messages via LLM and append to history.jsonl. Returns the summary text on success, None if nothing to archive. @@ -771,11 +827,15 @@ class Consolidator: if response.finish_reason == "error": raise RuntimeError(f"LLM returned error: {response.content}") summary = response.content or "[no summary]" - self.store.append_history(summary, max_chars=_ARCHIVE_SUMMARY_MAX_CHARS) + self.store.append_history( + summary, + max_chars=_ARCHIVE_SUMMARY_MAX_CHARS, + session_key=session_key, + ) return summary except Exception: logger.warning("Consolidation LLM call failed, raw-dumping to history") - self.store.raw_archive(messages) + self.store.raw_archive(messages, session_key=session_key) return None async def maybe_consolidate_by_tokens( @@ -858,7 +918,7 @@ class Consolidator: source, len(chunk), ) - summary = await self.archive(chunk) + summary = await self.archive(chunk, session_key=session.key) # Advance the cursor either way: on success the chunk was # summarized; on failure archive() already raw-archived it as # a breadcrumb. Re-archiving the same chunk on the next call @@ -930,7 +990,7 @@ class Consolidator: last_active = session.updated_at summary: str | None = "" if archive_msgs: - summary = await self.archive(archive_msgs) + summary = await self.archive(archive_msgs, session_key=session_key) if summary and summary != "(nothing)": session.metadata["_last_summary"] = { diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index 5c9ff6e2d..53f6554ab 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -754,11 +754,15 @@ class AgentRunner: context.streamed_reasoning = True await hook.emit_reasoning(delta) + async def _stream_recover() -> None: + await hook.on_stream_end(context, resuming=True) + coro = self.provider.chat_stream_with_retry( **kwargs, on_content_delta=_stream, on_thinking_delta=_thinking, on_tool_call_delta=_tool_call_delta if live_file_edits is not None else None, + on_stream_recover=_stream_recover, ) elif wants_progress_streaming: stream_buf = "" diff --git a/nanobot/agent/tools/shell.py b/nanobot/agent/tools/shell.py index 0ecfadc00..b4960e8e0 100644 --- a/nanobot/agent/tools/shell.py +++ b/nanobot/agent/tools/shell.py @@ -55,6 +55,7 @@ class ExecToolConfig(Base): """Shell exec tool configuration.""" enable: bool = True timeout: int = Field(default=60, ge=0) # Hard timeout (s); 0 = no limit. Not capped by the per-call max. + path_prepend: str = "" path_append: str = "" sandbox: str = "" allowed_env_keys: list[str] = Field(default_factory=list) @@ -150,6 +151,7 @@ class ExecTool(Tool): restrict_to_workspace=ctx.config.restrict_to_workspace, webui_allow_local_service_access=ctx.config.webui_allow_local_service_access, sandbox=cfg.sandbox, + path_prepend=cfg.path_prepend, path_append=cfg.path_append, allowed_env_keys=cfg.allowed_env_keys, allow_patterns=cfg.allow_patterns, @@ -166,6 +168,7 @@ class ExecTool(Tool): webui_allow_local_service_access: bool = True, allow_local_preview_access: bool | None = None, sandbox: str = "", + path_prepend: str = "", path_append: str = "", allowed_env_keys: list[str] | None = None, session_manager: Any | None = None, @@ -197,6 +200,7 @@ class ExecTool(Tool): if allow_local_preview_access is not None: webui_allow_local_service_access = allow_local_preview_access self.webui_allow_local_service_access = webui_allow_local_service_access + self.path_prepend = path_prepend self.path_append = path_append self.allowed_env_keys = allowed_env_keys or [] self._session_manager = session_manager or DEFAULT_EXEC_SESSION_MANAGER @@ -411,12 +415,11 @@ class ExecTool(Tool): effective_timeout = self._resolve_timeout(timeout) env = self._build_env() - if self.path_append: + if self.path_prepend or self.path_append: if _IS_WINDOWS: - env["PATH"] = env.get("PATH", "") + os.pathsep + self.path_append + env["PATH"] = self._compose_path(env.get("PATH", "")) else: - env["NANOBOT_PATH_APPEND"] = self.path_append - command = f'export PATH="$PATH{os.pathsep}$NANOBOT_PATH_APPEND"; {command}' + command = self._wrap_path_export(command, env) shell_program, shell_error = self._resolve_shell(shell) if shell_error: @@ -431,6 +434,28 @@ class ExecTool(Tool): login=True if login is None else login, ) + def _compose_path(self, current_path: str) -> str: + parts = [] + if self.path_prepend: + parts.append(self.path_prepend) + if current_path: + parts.append(current_path) + if self.path_append: + parts.append(self.path_append) + return os.pathsep.join(parts) + + def _wrap_path_export(self, command: str, env: dict[str, str]) -> str: + segments = [] + if self.path_prepend: + env["NANOBOT_PATH_PREPEND"] = self.path_prepend + segments.append("$NANOBOT_PATH_PREPEND") + segments.append("$PATH") + if self.path_append: + env["NANOBOT_PATH_APPEND"] = self.path_append + segments.append("$NANOBOT_PATH_APPEND") + path_expr = os.pathsep.join(segments) + return f'export PATH="{path_expr}"; {command}' + @staticmethod async def _spawn( command: str, cwd: str, env: dict[str, str], diff --git a/nanobot/channels/feishu.py b/nanobot/channels/feishu.py index 060ba2bb5..381554347 100644 --- a/nanobot/channels/feishu.py +++ b/nanobot/channels/feishu.py @@ -1,5 +1,7 @@ """Feishu/Lark channel implementation using lark-oapi SDK with WebSocket long connection.""" +from __future__ import annotations + import asyncio import importlib.util import json @@ -11,10 +13,8 @@ import uuid from collections import OrderedDict from contextlib import suppress from dataclasses import dataclass -from typing import Any, Literal +from typing import TYPE_CHECKING, Any, Literal -from lark_oapi.api.im.v1.model import MentionEvent, P2ImMessageReceiveV1 -from lark_oapi.core.const import FEISHU_DOMAIN, LARK_DOMAIN from pydantic import Field from nanobot.bus.events import OutboundMessage @@ -25,8 +25,42 @@ from nanobot.config.schema import Base from nanobot.utils.helpers import safe_filename from nanobot.utils.logging_bridge import redirect_lib_logging +if TYPE_CHECKING: + from lark_oapi.api.im.v1.model import MentionEvent, P2ImMessageReceiveV1 + FEISHU_AVAILABLE = importlib.util.find_spec("lark_oapi") is not None + +def _load_lark_runtime() -> tuple[Any, str, str]: + """Import the heavy Feishu SDK lazily. + + lark_oapi imports a large generated API surface at module import time, so + keep it out of channel discovery and constructor paths. + """ + import sys + + ws_client_already_imported = "lark_oapi.ws.client" in sys.modules + import lark_oapi as lark + import lark_oapi.ws.client as lark_ws_client + from lark_oapi.core.const import FEISHU_DOMAIN, LARK_DOMAIN + + if ( + not ws_client_already_imported + and threading.current_thread() is not threading.main_thread() + ): + import_loop = getattr(lark_ws_client, "loop", None) + if ( + import_loop is not None + and not import_loop.is_running() + and not import_loop.is_closed() + ): + import_loop.close() + lark_ws_client.loop = None + with suppress(Exception): + asyncio.set_event_loop(None) + + return lark, FEISHU_DOMAIN, LARK_DOMAIN + # Message type display mapping MSG_TYPE_MAP = { "image": "[image]", @@ -297,13 +331,11 @@ class FeishuChannel(BaseChannel): return FeishuConfig().model_dump(by_alias=True) def __init__(self, config: Any, bus: MessageBus): - import lark_oapi as lark - if isinstance(config, dict): config = FeishuConfig.model_validate(config) super().__init__(config, bus) self.config: FeishuConfig = config - self._client: lark.Client = None + self._client: Any = None self._ws_client: Any = None self._ws_thread: threading.Thread | None = None self._processed_message_ids: OrderedDict[str, None] = OrderedDict() # Ordered dedup cache @@ -329,7 +361,7 @@ class FeishuChannel(BaseChannel): self.logger.error("app_id and app_secret not configured") return - import lark_oapi as lark + lark, feishu_domain, lark_domain = await asyncio.to_thread(_load_lark_runtime) redirect_lib_logging("Lark") @@ -337,7 +369,7 @@ class FeishuChannel(BaseChannel): self._loop = asyncio.get_running_loop() # Create Lark client for sending messages - domain = LARK_DOMAIN if self.config.domain == "lark" else FEISHU_DOMAIN + domain = lark_domain if self.config.domain == "lark" else feishu_domain self._client = ( lark.Client.builder() .app_id(self.config.app_id) @@ -397,6 +429,7 @@ class FeishuChannel(BaseChannel): import lark_oapi.ws.client as _lark_ws_client + previous_loop = getattr(_lark_ws_client, "loop", None) ws_loop = asyncio.new_event_loop() asyncio.set_event_loop(ws_loop) # Patch the module-level loop used by lark's ws Client.start() @@ -410,6 +443,10 @@ class FeishuChannel(BaseChannel): if self._running: time.sleep(5) finally: + if getattr(_lark_ws_client, "loop", None) is ws_loop: + _lark_ws_client.loop = previous_loop + with suppress(Exception): + asyncio.set_event_loop(None) ws_loop.close() self._ws_thread = threading.Thread(target=run_ws, daemon=True) diff --git a/nanobot/command/builtin.py b/nanobot/command/builtin.py index 10eb995cf..6280e2dfe 100644 --- a/nanobot/command/builtin.py +++ b/nanobot/command/builtin.py @@ -212,7 +212,7 @@ async def cmd_new(ctx: CommandContext) -> OutboundMessage: loop.sessions.save(session) loop.sessions.invalidate(session.key) if snapshot: - loop._schedule_background(loop.consolidator.archive(snapshot)) + loop._schedule_background(loop.consolidator.archive(snapshot, session_key=ctx.key)) return OutboundMessage( channel=ctx.msg.channel, chat_id=ctx.msg.chat_id, content="New session started.", diff --git a/nanobot/config/loader.py b/nanobot/config/loader.py index 545cd0bdc..0fd1aa4c5 100644 --- a/nanobot/config/loader.py +++ b/nanobot/config/loader.py @@ -7,7 +7,6 @@ from pathlib import Path from typing import Any import pydantic -from loguru import logger from pydantic import BaseModel from nanobot.config.schema import Config, _resolve_tool_config_refs @@ -55,8 +54,7 @@ def load_config(config_path: Path | None = None) -> Config: data = _migrate_config(data) config = Config.model_validate(data) except (json.JSONDecodeError, ValueError, pydantic.ValidationError) as e: - logger.warning("Failed to load config from {}: {}", path, e) - logger.warning("Using default configuration.") + raise ValueError(f"Failed to load config from {path}: {e}") from e _apply_ssrf_whitelist(config) return config diff --git a/nanobot/providers/base.py b/nanobot/providers/base.py index 4a692b424..802ac314a 100644 --- a/nanobot/providers/base.py +++ b/nanobot/providers/base.py @@ -631,6 +631,7 @@ class LLMProvider(ABC): on_content_delta: Callable[[str], Awaitable[None]] | None = None, on_thinking_delta: Callable[[str], Awaitable[None]] | None = None, on_tool_call_delta: Callable[[dict[str, Any]], Awaitable[None]] | None = None, + on_stream_recover: Callable[[], Awaitable[None]] | None = None, retry_mode: str = "standard", on_retry_wait: Callable[[str], Awaitable[None]] | None = None, ) -> LLMResponse: @@ -651,6 +652,12 @@ class LLMProvider(ABC): if on_content_delta: await on_content_delta(text) + async def _recover_stream() -> None: + nonlocal has_streamed_content + if on_stream_recover: + await on_stream_recover() + has_streamed_content = False + kw: dict[str, Any] = dict( messages=messages, tools=tools, model=model, max_tokens=max_tokens, temperature=temperature, @@ -659,6 +666,8 @@ class LLMProvider(ABC): on_thinking_delta=on_thinking_delta, on_tool_call_delta=on_tool_call_delta, ) + if on_stream_recover and getattr(self, "supports_stream_recover_callback", False): + kw["on_stream_recover"] = _recover_stream return await self._run_with_retry( self._safe_chat_stream, kw, @@ -666,6 +675,7 @@ class LLMProvider(ABC): retry_mode=retry_mode, on_retry_wait=on_retry_wait, should_retry_guard=lambda: not has_streamed_content, + on_stream_recover=_recover_stream if on_stream_recover else None, ) async def chat_with_retry( @@ -813,6 +823,7 @@ class LLMProvider(ABC): retry_mode: str, on_retry_wait: Callable[[str], Awaitable[None]] | None, should_retry_guard: Callable[[], bool] | None = None, + on_stream_recover: Callable[[], Awaitable[None]] | None = None, ) -> LLMResponse: attempt = 0 delays = list(self._CHAT_RETRY_DELAYS) @@ -827,10 +838,29 @@ class LLMProvider(ABC): return response last_response = response if should_retry_guard is not None and not should_retry_guard(): - logger.warning( - "LLM stream failed after content was emitted; skipping retry" - ) - return response + is_timeout = (response.error_kind or "").lower() == "timeout" + if is_timeout: + if on_stream_recover: + logger.warning( + "LLM stream stalled after content was emitted; " + "starting a new stream segment and retrying" + ) + await on_stream_recover() + else: + logger.warning( + "LLM stream stalled after content was emitted; " + "suppressing delta callbacks and retrying" + ) + kw.setdefault("on_content_delta", None) + kw["on_content_delta"] = None + kw["on_thinking_delta"] = None + kw["on_tool_call_delta"] = None + should_retry_guard = None + else: + logger.warning( + "LLM stream failed after content was emitted; skipping retry" + ) + return response error_key = ((response.content or "").strip().lower() or None) if error_key and error_key == last_error_key: identical_error_count += 1 diff --git a/nanobot/providers/fallback_provider.py b/nanobot/providers/fallback_provider.py index c082c2361..b0c01afae 100644 --- a/nanobot/providers/fallback_provider.py +++ b/nanobot/providers/fallback_provider.py @@ -58,19 +58,24 @@ _FALLBACK_ERROR_TOKENS = ( class FallbackProvider(LLMProvider): """Wrap a primary provider and transparently failover to fallback models. - When the primary model returns an error and no content has been streamed yet, - the wrapper tries each fallback model in order. Each fallback model may - reside on a different provider — a factory callable creates the underlying - provider on-the-fly. + When the primary model returns a fallbackable error before content has been + streamed, the wrapper tries each fallback model in order. Streamed timeout + errors are the recovery exception: the caller may close the current stream + segment, then the wrapper continues failover with later deltas in a new + segment. Each fallback model may reside on a different provider — a factory + callable creates the underlying provider on-the-fly. Key design: - Failover is request-scoped (the wrapper itself is stateless between turns). - - Skipped when content was already streamed to avoid duplicate output. + - Skipped when content was already streamed to avoid duplicate output, + except timeout recovery can resume in a new stream segment. - Recursive failover is prevented by the factory returning plain providers. - Primary provider is circuit-broken after repeated failures to avoid wasting requests on a known-bad endpoint. """ + supports_stream_recover_callback = True + def __init__( self, primary: LLMProvider, @@ -116,6 +121,7 @@ class FallbackProvider(LLMProvider): ) async def chat_stream(self, **kwargs: Any) -> LLMResponse: + on_stream_recover = kwargs.pop("on_stream_recover", None) if not self._has_fallbacks: return await self._primary.chat_stream(**kwargs) @@ -130,7 +136,10 @@ class FallbackProvider(LLMProvider): kwargs["on_content_delta"] = _tracking_delta return await self._try_with_fallback( - lambda p, kw: p.chat_stream(**kw), kwargs, has_streamed=has_streamed + lambda p, kw: p.chat_stream(**kw), + kwargs, + has_streamed=has_streamed, + on_stream_recover=on_stream_recover, ) async def _try_with_fallback( @@ -138,6 +147,7 @@ class FallbackProvider(LLMProvider): call: Callable[[LLMProvider, dict[str, Any]], Awaitable[LLMResponse]], kwargs: dict[str, Any], has_streamed: list[bool] | None, + on_stream_recover: Callable[[], Awaitable[None]] | None = None, ) -> LLMResponse: primary_model = kwargs.get("model") or self._primary.get_default_model() @@ -149,10 +159,23 @@ class FallbackProvider(LLMProvider): return response if has_streamed is not None and has_streamed[0]: - logger.warning( - "Primary model error but content already streamed; skipping failover" - ) - return response + is_timeout = (response.error_kind or "").lower() == "timeout" + if is_timeout: + logger.warning( + "Primary model '{}' stream stalled after content was emitted; " + "attempting failover anyway", + primary_model, + ) + has_streamed[0] = False + if on_stream_recover: + await on_stream_recover() + else: + kwargs["on_content_delta"] = None + else: + logger.warning( + "Primary model error but content already streamed; skipping failover" + ) + return response if not self._should_fallback(response): logger.warning( @@ -177,7 +200,20 @@ class FallbackProvider(LLMProvider): for idx, fallback in enumerate(self._fallback_presets): fallback_model = fallback.model if has_streamed is not None and has_streamed[0]: - break + is_timeout = ( + last_response is not None + and (last_response.error_kind or "").lower() == "timeout" + ) + if is_timeout and on_stream_recover: + logger.warning( + "Fallback model '{}' stream stalled after content was emitted; " + "starting a new stream segment and trying next fallback", + self._fallback_presets[idx - 1].model if idx > 0 else primary_model, + ) + has_streamed[0] = False + await on_stream_recover() + else: + break if idx == 0 and primary_skipped: logger.info( "Primary model '{}' circuit open, trying fallback '{}'", diff --git a/nanobot/webui/settings_api.py b/nanobot/webui/settings_api.py index cbd5e4e13..6dcf89f7c 100644 --- a/nanobot/webui/settings_api.py +++ b/nanobot/webui/settings_api.py @@ -15,6 +15,7 @@ from zoneinfo import ZoneInfo import httpx +from nanobot import __version__ from nanobot.audio.transcription import resolve_transcription_config from nanobot.audio.transcription_registry import ( resolve_transcription_provider, @@ -37,6 +38,13 @@ from nanobot.webui.workspaces import ( QueryParams = dict[str, list[str]] RuntimeSurface = Literal["browser", "native"] + +def _version_payload() -> dict[str, Any]: + """Return version info for the settings payload.""" + return { + "current": __version__, + } + _RUNTIME_CAPABILITIES = { "can_restart_engine": False, "can_pick_folder": False, @@ -801,9 +809,11 @@ def settings_payload( "mcp_server_count": len(config.tools.mcp_servers), "exec_enabled": exec_config.enable, "exec_sandbox": exec_config.sandbox or None, + "exec_path_prepend_set": bool(exec_config.path_prepend), "exec_path_append_set": bool(exec_config.path_append), }, "requires_restart": requires_restart, + "version": _version_payload(), } return decorate_settings_payload( payload, diff --git a/nanobot/webui/settings_routes.py b/nanobot/webui/settings_routes.py index b8dbb4b73..017652331 100644 --- a/nanobot/webui/settings_routes.py +++ b/nanobot/webui/settings_routes.py @@ -36,6 +36,7 @@ from nanobot.webui.settings_api import ( update_transcription_settings, update_web_search_settings, ) +from nanobot.webui.version_check import check_for_update QueryParams = dict[str, list[str]] @@ -117,6 +118,8 @@ class WebUISettingsRouter: return await self._handle_settings_cli_apps_action(request, "test") if path == "/api/settings/mcp-presets": return await self._handle_settings_mcp_presets(request) + if path == "/api/settings/version-check": + return await self._handle_settings_version_check(request) mcp_action = _MCP_PRESET_ACTIONS_BY_PATH.get(path) if mcp_action is not None: return await self._handle_settings_mcp_presets(request, mcp_action) @@ -347,3 +350,15 @@ class WebUISettingsRouter: if action is None: return self._json_response(payload) return self._json_response(self._with_restart_state(payload, section="runtime")) + + async def _handle_settings_version_check(self, request: WsRequest) -> Response: + if not self._authorized(request): + return self._unauthorized() + try: + update_info = await asyncio.to_thread(check_for_update) + except Exception: + self.logger.exception("version check failed") + return self._error_response(500, "version check failed") + return self._json_response({ + "updateAvailable": update_info, + }) diff --git a/nanobot/webui/version_check.py b/nanobot/webui/version_check.py new file mode 100644 index 000000000..6db45c630 --- /dev/null +++ b/nanobot/webui/version_check.py @@ -0,0 +1,51 @@ +"""On-demand version checker for nanobot-ai releases. + +Checks PyPI for newer versions when explicitly requested (no background polling). +""" + +from __future__ import annotations + +import logging +import time +from typing import Any + +import httpx + +from nanobot import __version__ + +logger = logging.getLogger(__name__) + +_PYPI_URL = "https://pypi.org/pypi/nanobot-ai/json" +_CACHE_TTL_S = 300 # 5 minutes cache to avoid hammering PyPI + +_cache: tuple[float, str | None] = (0.0, None) + + +def check_for_update() -> dict[str, Any] | None: + """Check PyPI for a newer version. Returns update info dict or None if up-to-date. + + Uses a short cache to avoid repeated requests within the TTL window. + This is a blocking call — invoke from a thread or background task. + """ + global _cache + now = time.monotonic() + cached_at, cached_val = _cache + if now - cached_at < _CACHE_TTL_S and cached_val is not None: + latest = cached_val + else: + try: + resp = httpx.get(_PYPI_URL, timeout=5.0, follow_redirects=True) + resp.raise_for_status() + latest = resp.json().get("info", {}).get("version") + except Exception: + logger.debug("PyPI version check failed", exc_info=True) + return None + _cache = (now, latest) + + if not latest or latest == __version__: + return None + return { + "currentVersion": __version__, + "latestVersion": latest, + "pypiUrl": "https://pypi.org/project/nanobot-ai/", + } diff --git a/tests/agent/test_consolidate_offset.py b/tests/agent/test_consolidate_offset.py index c4b0e9ea8..74e796144 100644 --- a/tests/agent/test_consolidate_offset.py +++ b/tests/agent/test_consolidate_offset.py @@ -519,8 +519,9 @@ class TestNewCommandArchival: call_count = 0 - async def _failing_summarize(_messages) -> bool: + async def _failing_summarize(_messages, *, session_key=None) -> bool: nonlocal call_count + assert session_key == "cli:test" call_count += 1 return False @@ -551,10 +552,12 @@ class TestNewCommandArchival: loop.sessions.save(session) archived_count = -1 + archived_session_key = None - async def _fake_summarize(messages) -> bool: - nonlocal archived_count + async def _fake_summarize(messages, *, session_key=None) -> bool: + nonlocal archived_count, archived_session_key archived_count = len(messages) + archived_session_key = session_key return True loop.consolidator.archive = _fake_summarize # type: ignore[method-assign] @@ -567,6 +570,7 @@ class TestNewCommandArchival: await loop.close_mcp() assert archived_count == 3 + assert archived_session_key == "cli:test" @pytest.mark.asyncio async def test_new_clears_session_and_responds(self, tmp_path: Path) -> None: @@ -579,7 +583,8 @@ class TestNewCommandArchival: session.add_message("assistant", f"resp{i}") loop.sessions.save(session) - async def _ok_summarize(_messages) -> bool: + async def _ok_summarize(_messages, *, session_key=None) -> bool: + assert session_key == "cli:test" return True loop.consolidator.archive = _ok_summarize # type: ignore[method-assign] @@ -606,7 +611,8 @@ class TestNewCommandArchival: archived = asyncio.Event() release_archive = asyncio.Event() - async def _slow_summarize(_messages) -> bool: + async def _slow_summarize(_messages, *, session_key=None) -> bool: + assert session_key == "cli:test" await release_archive.wait() archived.set() return True diff --git a/tests/agent/test_consolidator.py b/tests/agent/test_consolidator.py index 028bcbedc..61ad0109b 100644 --- a/tests/agent/test_consolidator.py +++ b/tests/agent/test_consolidator.py @@ -63,6 +63,23 @@ class TestConsolidatorSummarize: entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 + async def test_summarize_appends_session_key_to_history( + self, + consolidator, + mock_provider, + store, + ): + mock_provider.chat_with_retry.return_value = MagicMock( + content="User fixed a bug in the auth module.", + finish_reason="stop", + ) + messages = [{"role": "user", "content": "fix the auth bug"}] + + await consolidator.archive(messages, session_key="telegram:chat-1") + + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "telegram:chat-1" + async def test_summarize_raw_dumps_on_llm_failure(self, consolidator, mock_provider, store): """On LLM failure, raw-dump messages to HISTORY.md.""" mock_provider.chat_with_retry.side_effect = Exception("API error") @@ -73,6 +90,20 @@ class TestConsolidatorSummarize: assert len(entries) == 1 assert "[RAW]" in entries[0]["content"] + async def test_raw_dump_fallback_appends_session_key( + self, + consolidator, + mock_provider, + store, + ): + mock_provider.chat_with_retry.side_effect = Exception("API error") + messages = [{"role": "user", "content": "hello"}] + + await consolidator.archive(messages, session_key="slack:chat-2") + + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "slack:chat-2" + async def test_summarize_skips_empty_messages(self, consolidator): result = await consolidator.archive([]) assert result is None @@ -370,6 +401,27 @@ class TestCompactIdleSession: assert meta["text"] == "Summary of old conversation." assert "last_active" in meta + @pytest.mark.asyncio + async def test_idle_compact_writes_session_key_to_history( + self, + real_consolidator, + mock_provider, + store, + ): + mock_provider.chat_with_retry.return_value = MagicMock( + content="Summary of old conversation.", finish_reason="stop" + ) + session = real_consolidator.sessions.get_or_create("cli:test") + for i in range(10): + session.add_message("user", f"user msg {i}") + session.add_message("assistant", f"assistant msg {i}") + real_consolidator.sessions.save(session) + + await real_consolidator.compact_idle_session("cli:test", max_suffix=4) + + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "cli:test" + @pytest.mark.asyncio async def test_empty_session_refreshes_timestamp(self, real_consolidator): """Empty session with old updated_at → refreshed after call, returns ''.""" @@ -640,6 +692,12 @@ class TestRawArchiveTruncation: assert len(entries) == 1 assert "hello" in entries[0]["content"] + def test_raw_archive_preserves_session_key(self, store): + messages = [{"role": "user", "content": "hello"}] + store.raw_archive(messages, session_key="websocket:chat-1") + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "websocket:chat-1" + def test_raw_archive_custom_max_chars(self, store): """max_chars parameter should override default limit.""" messages = [{"role": "user", "content": "a" * 200}] diff --git a/tests/agent/test_context_prompt_cache.py b/tests/agent/test_context_prompt_cache.py index bbafd4890..ac3a83bf4 100644 --- a/tests/agent/test_context_prompt_cache.py +++ b/tests/agent/test_context_prompt_cache.py @@ -2,11 +2,11 @@ from __future__ import annotations +import datetime as datetime_module import re from datetime import datetime as real_datetime from importlib.resources import files as pkg_files from pathlib import Path -import datetime as datetime_module from nanobot.agent.context import ContextBuilder @@ -156,6 +156,58 @@ def test_unprocessed_history_injected_into_system_prompt(tmp_path) -> None: assert re.search(r"\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}\]", prompt) +def test_recent_history_injection_is_session_scoped(tmp_path) -> None: + workspace = _make_workspace(tmp_path) + builder = ContextBuilder(workspace) + + builder.memory.append_history("legacy entry without session") + builder.memory.append_history("telegram history", session_key="telegram:chat-1") + builder.memory.append_history("slack history", session_key="slack:chat-2") + + prompt = builder.build_system_prompt(session_key="telegram:chat-1") + + assert "# Recent History" in prompt + assert "telegram history" in prompt + assert "slack history" not in prompt + assert "legacy entry without session" not in prompt + + +def test_recent_history_injection_unified_excludes_cron_internals(tmp_path) -> None: + workspace = _make_workspace(tmp_path) + builder = ContextBuilder(workspace) + + builder.memory.append_history("unified user history", session_key="unified:default") + builder.memory.append_history("channel user history", session_key="telegram:chat-1") + builder.memory.append_history("cron internal history", session_key="cron:job-1") + + prompt = builder.build_system_prompt( + session_key="unified:default", + unified_session=True, + ) + + assert "unified user history" in prompt + assert "channel user history" in prompt + assert "cron internal history" not in prompt + + +def test_cron_recent_history_can_see_own_history_and_unified_context(tmp_path) -> None: + workspace = _make_workspace(tmp_path) + builder = ContextBuilder(workspace) + + builder.memory.append_history("unified user history", session_key="unified:default") + builder.memory.append_history("own cron history", session_key="cron:job-1") + builder.memory.append_history("other cron history", session_key="cron:job-2") + + prompt = builder.build_system_prompt( + session_key="cron:job-1", + unified_session=True, + ) + + assert "unified user history" in prompt + assert "own cron history" in prompt + assert "other cron history" not in prompt + + def test_recent_history_capped_at_max(tmp_path) -> None: """Only the most recent _MAX_RECENT_HISTORY entries are injected.""" workspace = _make_workspace(tmp_path) @@ -201,7 +253,7 @@ def test_partial_dream_processing_shows_only_remainder(tmp_path) -> None: workspace = _make_workspace(tmp_path) builder = ContextBuilder(workspace) - c1 = builder.memory.append_history("old conversation about Python") + builder.memory.append_history("old conversation about Python") c2 = builder.memory.append_history("old conversation about Rust") builder.memory.append_history("recent question about Docker") builder.memory.append_history("recent question about K8s") diff --git a/tests/agent/test_loop_consolidation_tokens.py b/tests/agent/test_loop_consolidation_tokens.py index 3228bd6dd..3c1f6fcbb 100644 --- a/tests/agent/test_loop_consolidation_tokens.py +++ b/tests/agent/test_loop_consolidation_tokens.py @@ -219,8 +219,11 @@ async def test_preflight_consolidation_before_llm_call(tmp_path, monkeypatch) -> loop = _make_loop(tmp_path, estimated_tokens=0, context_window_tokens=200) - async def track_consolidate(messages): + archived_session_keys: list[str | None] = [] + + async def track_consolidate(messages, *, session_key=None): order.append("consolidate") + archived_session_keys.append(session_key) return True loop.consolidator.archive = track_consolidate # type: ignore[method-assign] @@ -251,3 +254,4 @@ async def test_preflight_consolidation_before_llm_call(tmp_path, monkeypatch) -> assert "consolidate" in order assert "llm" in order assert order.index("consolidate") < order.index("llm") + assert archived_session_keys == ["cli:test"] diff --git a/tests/agent/test_loop_progress.py b/tests/agent/test_loop_progress.py index bbac2e6af..19473cc7f 100644 --- a/tests/agent/test_loop_progress.py +++ b/tests/agent/test_loop_progress.py @@ -492,6 +492,61 @@ class TestToolEventProgress: assert turn_end_msgs[0].content == "" provider.chat_with_retry.assert_not_awaited() + @pytest.mark.asyncio + async def test_stream_timeout_recovery_continues_in_new_segment( + self, + tmp_path: Path, + ) -> None: + """Recovered streaming output should use a new stream segment.""" + bus = MessageBus() + provider = MagicMock() + provider.supports_progress_deltas = True + provider.get_default_model.return_value = "openai-codex/gpt-5.5" + + async def chat_stream_with_retry(*, on_content_delta, on_stream_recover, **kwargs): + await on_content_delta("partial") + await on_stream_recover() + await on_content_delta("full retry response") + return LLMResponse(content="full retry response", tool_calls=[]) + + provider.chat_stream_with_retry = chat_stream_with_retry + provider.chat_with_retry = AsyncMock() + loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path, model="openai-codex/gpt-5.5") + _attach_webui_runtime_events(loop, bus) + loop.tools.get_definitions = MagicMock(return_value=[]) + loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign] + + await loop._dispatch(InboundMessage( + channel="websocket", + sender_id="u1", + chat_id="chat1", + content="say hello", + metadata={"_wants_stream": True}, + )) + + outbound = [] + while bus.outbound_size > 0: + outbound.append(await bus.consume_outbound()) + + deltas = [m for m in outbound if m.metadata.get("_stream_delta")] + stream_end = [m for m in outbound if m.metadata.get("_stream_end")] + final = [ + m for m in outbound + if not m.metadata.get("_stream_delta") + and not m.metadata.get("_stream_end") + and not m.metadata.get("_turn_end") + and not m.metadata.get("_goal_status") + ] + + assert [m.content for m in deltas] == ["partial", "full retry response"] + assert [m.metadata.get("_resuming") for m in stream_end] == [True, False] + assert deltas[0].metadata.get("_stream_id") == stream_end[0].metadata.get("_stream_id") + assert deltas[1].metadata.get("_stream_id") == stream_end[1].metadata.get("_stream_id") + assert deltas[0].metadata.get("_stream_id") != deltas[1].metadata.get("_stream_id") + assert final[-1].content == "full retry response" + assert final[-1].metadata.get("_streamed") is True + provider.chat_with_retry.assert_not_awaited() + @pytest.mark.asyncio async def test_streamed_progress_is_not_repeated_before_tool_execution( self, diff --git a/tests/agent/test_memory_store.py b/tests/agent/test_memory_store.py index fda60b7c5..a9b5d1003 100644 --- a/tests/agent/test_memory_store.py +++ b/tests/agent/test_memory_store.py @@ -58,6 +58,12 @@ class TestHistoryWithCursor: data = json.loads(content) assert data["cursor"] == 1 + def test_append_history_includes_session_key_when_provided(self, store): + store.append_history("event 1", session_key="telegram:chat-1") + content = store.read_file(store.history_file) + data = json.loads(content) + assert data["session_key"] == "telegram:chat-1" + def test_cursor_persists_across_appends(self, store): store.append_history("event 1") store.append_history("event 2") @@ -106,6 +112,54 @@ class TestHistoryWithCursor: entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 2 + def test_prompt_history_filters_to_current_session(self, store): + store.append_history("legacy entry without session") + store.append_history("telegram entry", session_key="telegram:chat-1") + store.append_history("slack entry", session_key="slack:chat-2") + + entries = store.read_recent_history_for_prompt( + since_cursor=0, + session_key="telegram:chat-1", + ) + + assert [e["content"] for e in entries] == ["telegram entry"] + assert [e["content"] for e in store.read_unprocessed_history(0)] == [ + "legacy entry without session", + "telegram entry", + "slack entry", + ] + + def test_unified_prompt_history_excludes_internal_cron_sessions(self, store): + store.append_history("legacy entry without session") + store.append_history("unified entry", session_key="unified:default") + store.append_history("telegram entry", session_key="telegram:chat-1") + store.append_history("cron internal entry", session_key="cron:job-1") + + entries = store.read_recent_history_for_prompt( + since_cursor=0, + session_key="unified:default", + unified_session=True, + ) + + assert [e["content"] for e in entries] == [ + "legacy entry without session", + "unified entry", + "telegram entry", + ] + + def test_unified_cron_prompt_history_includes_own_cron_entry(self, store): + store.append_history("unified entry", session_key="unified:default") + store.append_history("other cron entry", session_key="cron:job-2") + store.append_history("own cron entry", session_key="cron:job-1") + + entries = store.read_recent_history_for_prompt( + since_cursor=0, + session_key="cron:job-1", + unified_session=True, + ) + + assert [e["content"] for e in entries] == ["unified entry", "own cron entry"] + def test_read_unprocessed_skips_entries_without_cursor(self, store): """Regression: entries missing the cursor key should be silently skipped.""" store.history_file.write_text( diff --git a/tests/agent/test_runner_fallback.py b/tests/agent/test_runner_fallback.py index a7a6f7c30..d7e536c0c 100644 --- a/tests/agent/test_runner_fallback.py +++ b/tests/agent/test_runner_fallback.py @@ -287,7 +287,7 @@ class TestFallbackOnPrimaryError: class TestNoFallbackWhenContentStreamed: @pytest.mark.asyncio - async def test(self) -> None: + async def test_non_timeout_error_skips_failover(self) -> None: primary = _FakeProvider("primary", _error_response()) factory = MagicMock() fb = FallbackProvider( @@ -303,12 +303,46 @@ class TestNoFallbackWhenContentStreamed: messages=[{"role": "user", "content": "hi"}], on_content_delta=_delta, ) - # Primary returns error but content was "streamed" (FakeProvider calls delta) - # so failover should be skipped assert result.finish_reason == "error" factory.assert_not_called() +class TestFallbackOnStreamStalledAfterContent: + @pytest.mark.asyncio + async def test_timeout_with_streamed_content_falls_back(self) -> None: + primary = _FakeProvider( + "primary", + _make_response("stream stalled", finish_reason="error", error_kind="timeout"), + ) + fallback = _FakeProvider("fallback", _make_response("fallback ok")) + factory = MagicMock(return_value=fallback) + fb = FallbackProvider( + primary=primary, + fallback_presets=[_fallback("fallback-a")], + provider_factory=factory, + ) + + streamed: list[str] = [] + recoveries: list[str] = [] + + async def _delta(text: str) -> None: + streamed.append(text) + + async def _recover() -> None: + recoveries.append("recover") + + result = await fb.chat_stream( + messages=[{"role": "user", "content": "hi"}], + on_content_delta=_delta, + on_stream_recover=_recover, + ) + assert result.finish_reason == "stop" + assert result.content == "fallback ok" + factory.assert_called_once_with(_fallback("fallback-a")) + assert streamed == ["stream stalled", "fallback ok"] + assert recoveries == ["recover"] + + class TestFailoverOnTransientError: @pytest.mark.asyncio async def test_rate_limit(self) -> None: diff --git a/tests/channels/test_feishu_lazy_import.py b/tests/channels/test_feishu_lazy_import.py new file mode 100644 index 000000000..d43c39ebb --- /dev/null +++ b/tests/channels/test_feishu_lazy_import.py @@ -0,0 +1,46 @@ +import subprocess +import sys + + +def _run_import_probe(source: str) -> str: + proc = subprocess.run( + [sys.executable, "-c", source], + check=True, + capture_output=True, + text=True, + ) + return proc.stdout.strip() + + +def test_feishu_module_import_does_not_import_lark_oapi(): + out = _run_import_probe( + "import sys; import nanobot.channels.feishu; print('lark_oapi' in sys.modules)" + ) + + assert out == "False" + + +def test_feishu_channel_constructor_does_not_import_lark_oapi(): + out = _run_import_probe( + "import sys; " + "from nanobot.bus.queue import MessageBus; " + "from nanobot.channels.feishu import FeishuChannel; " + "FeishuChannel({'enabled': True}, MessageBus()); " + "print('lark_oapi' in sys.modules)" + ) + + assert out == "False" + + +def test_lark_runtime_thread_import_clears_sdk_import_loop(): + out = _run_import_probe( + "import asyncio\n" + "from nanobot.channels.feishu import _load_lark_runtime\n" + "async def main():\n" + " await asyncio.to_thread(_load_lark_runtime)\n" + " import lark_oapi.ws.client as ws\n" + " print(getattr(ws, 'loop', 'sentinel') is None)\n" + "asyncio.run(main())" + ) + + assert out == "True" diff --git a/tests/config/test_config_load_errors.py b/tests/config/test_config_load_errors.py new file mode 100644 index 000000000..1f52f578e --- /dev/null +++ b/tests/config/test_config_load_errors.py @@ -0,0 +1,30 @@ +import json + +import pytest + +from nanobot.config.loader import load_config + + +def test_load_config_missing_file_uses_defaults(tmp_path) -> None: + config = load_config(tmp_path / "missing.json") + + assert config.agents.defaults.model + + +def test_load_config_invalid_json_fails_fast(tmp_path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text("{broken json", encoding="utf-8") + + with pytest.raises(ValueError, match="Failed to load config"): + load_config(config_path) + + +def test_load_config_invalid_schema_fails_fast(tmp_path) -> None: + config_path = tmp_path / "config.json" + config_path.write_text( + json.dumps({"tools": {"exec": {"timeout": -1}}}), + encoding="utf-8", + ) + + with pytest.raises(ValueError, match="Failed to load config"): + load_config(config_path) diff --git a/tests/providers/test_provider_retry.py b/tests/providers/test_provider_retry.py index 6fc2137df..9483fee9b 100644 --- a/tests/providers/test_provider_retry.py +++ b/tests/providers/test_provider_retry.py @@ -163,6 +163,85 @@ async def test_chat_stream_with_retry_does_not_retry_after_emitting_content(monk assert delays == [] +@pytest.mark.asyncio +async def test_chat_stream_with_retry_retries_timeout_after_emitting_content(monkeypatch) -> None: + first = LLMResponse( + content="Error calling LLM: stream stalled for more than 30 seconds", + finish_reason="error", + error_kind="timeout", + ) + first._test_stream_delta = "partial" # type: ignore[attr-defined] + provider = ScriptedProvider([ + first, + LLMResponse(content="full retry response"), + ]) + deltas: list[str] = [] + delays: list[int] = [] + + async def _fake_sleep(delay: int) -> None: + delays.append(delay) + + async def _on_delta(delta: str) -> None: + deltas.append(delta) + + monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep) + + response = await provider.chat_stream_with_retry( + messages=[{"role": "user", "content": "hello"}], + on_content_delta=_on_delta, + ) + + assert response.content == "full retry response" + assert response.finish_reason == "stop" + assert provider.calls == 2 + assert deltas == ["partial"] + assert delays == [1] + assert provider.last_kwargs.get("on_content_delta") is None + + +@pytest.mark.asyncio +async def test_chat_stream_with_retry_retries_timeout_in_new_stream_segment( + monkeypatch, +) -> None: + first = LLMResponse( + content="Error calling LLM: stream stalled for more than 30 seconds", + finish_reason="error", + error_kind="timeout", + ) + first._test_stream_delta = "partial" # type: ignore[attr-defined] + second = LLMResponse(content="full retry response") + second._test_stream_delta = "full retry response" # type: ignore[attr-defined] + provider = ScriptedProvider([first, second]) + deltas: list[str] = [] + recoveries: list[str] = [] + delays: list[int] = [] + + async def _fake_sleep(delay: int) -> None: + delays.append(delay) + + async def _on_delta(delta: str) -> None: + deltas.append(delta) + + async def _on_stream_recover() -> None: + recoveries.append("recover") + + monkeypatch.setattr("nanobot.providers.base.asyncio.sleep", _fake_sleep) + + response = await provider.chat_stream_with_retry( + messages=[{"role": "user", "content": "hello"}], + on_content_delta=_on_delta, + on_stream_recover=_on_stream_recover, + ) + + assert response.content == "full retry response" + assert response.finish_reason == "stop" + assert provider.calls == 2 + assert deltas == ["partial", "full retry response"] + assert recoveries == ["recover"] + assert delays == [1] + assert provider.last_kwargs.get("on_content_delta") is not None + + @pytest.mark.asyncio async def test_chat_with_retry_uses_provider_generation_defaults() -> None: """When callers omit generation params, provider.generation defaults are used.""" diff --git a/tests/tools/test_exec_env.py b/tests/tools/test_exec_env.py index b9567f29d..1d749a078 100644 --- a/tests/tools/test_exec_env.py +++ b/tests/tools/test_exec_env.py @@ -45,6 +45,28 @@ async def test_exec_path_append_preserves_system_path(): assert "Exit code: 0" in result +@_UNIX_ONLY +@pytest.mark.asyncio +async def test_exec_path_prepend_takes_lookup_precedence(tmp_path): + """pathPrepend should win over pathAppend for executable lookup.""" + preferred = tmp_path / "preferred" + fallback = tmp_path / "fallback" + preferred.mkdir() + fallback.mkdir() + preferred_tool = preferred / "pathprobe" + fallback_tool = fallback / "pathprobe" + preferred_tool.write_text("#!/bin/sh\necho preferred\n", encoding="utf-8") + fallback_tool.write_text("#!/bin/sh\necho fallback\n", encoding="utf-8") + preferred_tool.chmod(0o755) + fallback_tool.chmod(0o755) + + tool = ExecTool(path_prepend=str(preferred), path_append=str(fallback)) + result = await tool.execute(command="pathprobe") + + assert "preferred" in result + assert "fallback" not in result + + @_UNIX_ONLY @pytest.mark.asyncio async def test_exec_allowed_env_keys_passthrough(monkeypatch): diff --git a/tests/tools/test_exec_platform.py b/tests/tools/test_exec_platform.py index e09838492..a72b06e36 100644 --- a/tests/tools/test_exec_platform.py +++ b/tests/tools/test_exec_platform.py @@ -202,6 +202,65 @@ class TestPathAppendPlatform: assert captured_env["NANOBOT_PATH_APPEND"] == "/opt/bin; echo INJECTED" assert "INJECTED" not in captured_cmd + @pytest.mark.asyncio + async def test_unix_path_prepend_uses_env_var_in_fixed_export(self): + """On Unix, path_prepend must not be interpolated into shell source.""" + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (b"ok", b"") + mock_proc.returncode = 0 + + captured_cmd = None + captured_env = {} + + async def capture_spawn(cmd, cwd, env, shell_program=None, login=True, *, stdin=None): + nonlocal captured_cmd + captured_cmd = cmd + captured_env.update(env) + return mock_proc + + with ( + patch("nanobot.agent.tools.shell._IS_WINDOWS", False), + patch("nanobot.agent.tools.shell.os.pathsep", ":"), + patch.object(ExecTool, "_spawn", side_effect=capture_spawn), + patch.object(ExecTool, "_guard_command", return_value=None), + ): + tool = ExecTool(path_prepend="/venv/bin; echo INJECTED") + await tool.execute(command="python --version") + + assert captured_cmd == 'export PATH="$NANOBOT_PATH_PREPEND:$PATH"; python --version' + assert captured_env["NANOBOT_PATH_PREPEND"] == "/venv/bin; echo INJECTED" + assert "INJECTED" not in captured_cmd + + @pytest.mark.asyncio + async def test_unix_path_prepend_and_append_order(self): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (b"ok", b"") + mock_proc.returncode = 0 + + captured_cmd = None + captured_env = {} + + async def capture_spawn(cmd, cwd, env, shell_program=None, login=True, *, stdin=None): + nonlocal captured_cmd + captured_cmd = cmd + captured_env.update(env) + return mock_proc + + with ( + patch("nanobot.agent.tools.shell._IS_WINDOWS", False), + patch("nanobot.agent.tools.shell.os.pathsep", ":"), + patch.object(ExecTool, "_spawn", side_effect=capture_spawn), + patch.object(ExecTool, "_guard_command", return_value=None), + ): + tool = ExecTool(path_prepend="/venv/bin", path_append="/usr/sbin") + await tool.execute(command="python --version") + + assert captured_cmd == ( + 'export PATH="$NANOBOT_PATH_PREPEND:$PATH:$NANOBOT_PATH_APPEND"; python --version' + ) + assert captured_env["NANOBOT_PATH_PREPEND"] == "/venv/bin" + assert captured_env["NANOBOT_PATH_APPEND"] == "/usr/sbin" + @pytest.mark.asyncio async def test_windows_modifies_env(self): """On Windows, path_append is appended to PATH in the env dict.""" @@ -226,6 +285,32 @@ class TestPathAppendPlatform: assert captured_env["PATH"].endswith(r";C:\tools\bin") + @pytest.mark.asyncio + async def test_windows_path_prepend_and_append_order(self): + mock_proc = AsyncMock() + mock_proc.communicate.return_value = (b"ok", b"") + mock_proc.returncode = 0 + + captured_env = {} + + async def capture_spawn(cmd, cwd, env, shell_program=None, login=True, *, stdin=None): + captured_env.update(env) + return mock_proc + + with ( + patch("nanobot.agent.tools.shell._IS_WINDOWS", True), + patch("nanobot.agent.tools.shell.os.pathsep", ";"), + patch.object(ExecTool, "_build_env", return_value={"PATH": r"C:\Windows\System32"}), + patch.object(ExecTool, "_spawn", side_effect=capture_spawn), + patch.object(ExecTool, "_guard_command", return_value=None), + ): + tool = ExecTool(path_prepend=r"C:\venv\Scripts", path_append=r"C:\tools\bin") + await tool.execute(command="python --version") + + assert captured_env["PATH"] == ( + r"C:\venv\Scripts;C:\Windows\System32;C:\tools\bin" + ) + # --------------------------------------------------------------------------- # sandbox diff --git a/tests/tools/test_tool_loader.py b/tests/tools/test_tool_loader.py index 4d6f128f1..7c6cd8727 100644 --- a/tests/tools/test_tool_loader.py +++ b/tests/tools/test_tool_loader.py @@ -244,6 +244,7 @@ def test_exec_tool_create(): mock_config.exec.enable = True mock_config.exec.timeout = 120 mock_config.exec.sandbox = "" + mock_config.exec.path_prepend = "/venv/bin" mock_config.exec.path_append = "" mock_config.exec.allowed_env_keys = [] mock_config.exec.allow_patterns = [] @@ -252,6 +253,7 @@ def test_exec_tool_create(): ctx = ToolContext(config=mock_config, workspace="/tmp") tool = ExecTool.create(ctx) assert isinstance(tool, ExecTool) + assert tool.path_prepend == "/venv/bin" def test_web_tools_config_cls(): @@ -360,7 +362,7 @@ def test_config_round_trip(): config_dict = { "tools": { "web": {"enable": True, "search": {"provider": "brave", "api_key": "test"}}, - "exec": {"enable": False, "timeout": 120}, + "exec": {"enable": False, "timeout": 120, "pathPrepend": "/venv/bin"}, "my": {"allowSet": True}, "imageGeneration": {"enabled": True, "provider": "openrouter"}, } @@ -370,8 +372,10 @@ def test_config_round_trip(): assert dumped["tools"]["my"]["allowSet"] is True assert dumped["tools"]["imageGeneration"]["enabled"] is True + assert dumped["tools"]["exec"]["pathPrepend"] == "/venv/bin" assert config.tools.exec.enable is False assert config.tools.exec.timeout == 120 + assert config.tools.exec.path_prepend == "/venv/bin" assert config.tools.web.search.provider == "brave" @@ -382,6 +386,7 @@ def test_config_defaults(): config = Config.model_validate({}) assert config.tools.exec.enable is True assert config.tools.exec.timeout == 60 + assert config.tools.exec.path_prepend == "" assert config.tools.web.enable is True assert config.tools.web.search.provider == "duckduckgo" assert config.tools.my.enable is True @@ -403,6 +408,7 @@ def test_loader_registers_same_tools_as_old_hardcoded(): mock_config.exec.enable = True mock_config.exec.timeout = 60 mock_config.exec.sandbox = "" + mock_config.exec.path_prepend = "" mock_config.exec.path_append = "" mock_config.exec.allowed_env_keys = [] mock_config.exec.allow_patterns = [] diff --git a/tests/webui/test_settings_api.py b/tests/webui/test_settings_api.py index 76518c576..8c3c5889f 100644 --- a/tests/webui/test_settings_api.py +++ b/tests/webui/test_settings_api.py @@ -244,6 +244,24 @@ def test_settings_payload_includes_network_safety_fields( assert payload["advanced"]["ssrf_whitelist_count"] == 1 +def test_settings_payload_includes_exec_path_flags( + tmp_path, + monkeypatch: pytest.MonkeyPatch, +) -> None: + config_path = tmp_path / "config.json" + config = Config() + config.tools.exec.path_prepend = "/venv/bin" + config.tools.exec.path_append = "/usr/sbin" + save_config(config, config_path) + monkeypatch.setattr("nanobot.config.loader._current_config_path", config_path) + monkeypatch.setattr("nanobot.webui.workspaces.get_webui_dir", lambda: tmp_path / "webui") + + payload = settings_payload() + + assert payload["advanced"]["exec_path_prepend_set"] is True + assert payload["advanced"]["exec_path_append_set"] is True + + def test_settings_payload_includes_effective_transcription_config( tmp_path, monkeypatch: pytest.MonkeyPatch, diff --git a/webui/src/components/settings/SettingsView.tsx b/webui/src/components/settings/SettingsView.tsx index 0a6ebcf5a..b1ea148d5 100644 --- a/webui/src/components/settings/SettingsView.tsx +++ b/webui/src/components/settings/SettingsView.tsx @@ -10,6 +10,7 @@ import { } from "react"; import { Activity, + ArrowUpCircle, Bot, Brain, Check, @@ -22,6 +23,7 @@ import { Database, Eye, EyeOff, + ExternalLink, Gem, Globe2, Grid3X3, @@ -75,6 +77,7 @@ import { import { Input } from "@/components/ui/input"; import { Textarea } from "@/components/ui/textarea"; import { + checkVersion, createModelConfiguration, fetchSettings, fetchSettingsUsage, @@ -1852,6 +1855,104 @@ function OverviewSettings({ /> + +
+ {tx("settings.sections.about", "About")} + + + +
+ + ); +} + +function VersionCheckRow({ currentVersion }: { currentVersion?: string }) { + const { t } = useTranslation(); + const tx = (key: string, fallback: string) => t(key, { defaultValue: fallback }); + const { token } = useClient(); + const [checking, setChecking] = useState(false); + const [result, setResult] = useState< + | { type: "up-to-date" } + | { type: "update"; latestVersion: string; pypiUrl?: string } + | { type: "error"; message: string } + | null + >(null); + + const handleCheck = async () => { + setChecking(true); + setResult(null); + try { + const res = await checkVersion(token); + if (res.updateAvailable) { + setResult({ + type: "update", + latestVersion: res.updateAvailable.latestVersion, + pypiUrl: res.updateAvailable.pypiUrl, + }); + } else { + setResult({ type: "up-to-date" }); + } + } catch (err) { + setResult({ type: "error", message: (err as Error).message }); + } finally { + setChecking(false); + } + }; + + return ( +
+
+
+ {tx("settings.about.version", "Version")} +
+
+ {currentVersion ? `v${currentVersion}` : "nanobot"} +
+
+
+ + {result?.type === "up-to-date" ? ( + + + {tx("settings.about.upToDate", "You're up to date")} + + ) : null} + {result?.type === "update" ? ( + + + {tx("settings.about.updateAvailable", "Update available")}{result.latestVersion && ` v${result.latestVersion}`} + {result.pypiUrl ? ( + + PyPI + + + ) : null} + + ) : null} + {result?.type === "error" ? ( + {result.message} + ) : null} +
); } diff --git a/webui/src/lib/api.ts b/webui/src/lib/api.ts index 63a74e06e..9b8aa2551 100644 --- a/webui/src/lib/api.ts +++ b/webui/src/lib/api.ts @@ -244,6 +244,26 @@ export async function fetchSettingsUsage( ); } +export interface VersionCheckResult { + updateAvailable: { + currentVersion: string; + latestVersion: string; + pypiUrl?: string; + } | null; +} + +export async function checkVersion( + token: string, + base: string = "", +): Promise { + return request( + `${base}/api/settings/version-check`, + token, + undefined, + 10_000, + ); +} + export async function fetchWorkspaces( token: string, base: string = "", diff --git a/webui/src/lib/types.ts b/webui/src/lib/types.ts index ae21b98b3..3365b83f4 100644 --- a/webui/src/lib/types.ts +++ b/webui/src/lib/types.ts @@ -480,10 +480,14 @@ export interface SettingsPayload { mcp_server_count: number; exec_enabled: boolean; exec_sandbox?: string | null; + exec_path_prepend_set: boolean; exec_path_append_set: boolean; }; requires_restart: boolean; restart_required_sections?: Array<"runtime" | "browser" | "image">; + version?: { + current: string; + }; } export interface AppPackageRef { diff --git a/webui/src/tests/app-layout.test.tsx b/webui/src/tests/app-layout.test.tsx index 845efa8ab..3fa3e8124 100644 --- a/webui/src/tests/app-layout.test.tsx +++ b/webui/src/tests/app-layout.test.tsx @@ -125,6 +125,7 @@ function baseSettingsPayload() { mcp_server_count: 0, exec_enabled: true, exec_sandbox: null, + exec_path_prepend_set: false, exec_path_append_set: false, }, requires_restart: false, @@ -1023,6 +1024,7 @@ describe("App layout", () => { mcp_server_count: 0, exec_enabled: true, exec_sandbox: null, + exec_path_prepend_set: false, exec_path_append_set: false, }, requires_restart: false, @@ -1349,6 +1351,7 @@ describe("App layout", () => { mcp_server_count: 0, exec_enabled: true, exec_sandbox: null, + exec_path_prepend_set: false, exec_path_append_set: false, }, requires_restart: false, diff --git a/webui/src/tests/settings-view.test.tsx b/webui/src/tests/settings-view.test.tsx index 4987fb96c..15d0dbc54 100644 --- a/webui/src/tests/settings-view.test.tsx +++ b/webui/src/tests/settings-view.test.tsx @@ -93,6 +93,7 @@ function settingsPayload(): SettingsPayload { mcp_server_count: 0, exec_enabled: true, exec_sandbox: null, + exec_path_prepend_set: false, exec_path_append_set: false, }, requires_restart: false, diff --git a/webui/src/tests/thread-shell.test.tsx b/webui/src/tests/thread-shell.test.tsx index 5d026e767..700818f6e 100644 --- a/webui/src/tests/thread-shell.test.tsx +++ b/webui/src/tests/thread-shell.test.tsx @@ -212,6 +212,7 @@ function modelSettings(model: string, provider: string): SettingsPayload { mcp_server_count: 0, exec_enabled: true, exec_sandbox: null, + exec_path_prepend_set: false, exec_path_append_set: false, }, requires_restart: false,