From e79b9f4a831ab265639cfc95dbbbb5a6152d5cfc Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 22 Mar 2026 02:38:34 +0000 Subject: [PATCH 1/9] feat(agent): add streaming groundwork for future TUI Preserve the provider and agent-loop streaming primitives plus the CLI experiment scaffolding so this work can be resumed later without blocking urgent bug fixes on main. Made-with: Cursor --- nanobot/agent/loop.py | 65 +++++++--- nanobot/cli/commands.py | 39 ++++-- nanobot/providers/base.py | 85 ++++++++++++ nanobot/providers/litellm_provider.py | 164 +++++++++++++++--------- tests/test_loop_consolidation_tokens.py | 5 +- 5 files changed, 268 insertions(+), 90 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index b8d1647f0..093f0e204 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -212,8 +212,16 @@ class AgentLoop: self, initial_messages: list[dict], on_progress: Callable[..., Awaitable[None]] | None = None, + on_stream: Callable[[str], Awaitable[None]] | None = None, + on_stream_end: Callable[..., Awaitable[None]] | None = None, ) -> tuple[str | None, list[str], list[dict]]: - """Run the agent iteration loop.""" + """Run the agent iteration loop. + + *on_stream*: called with each content delta during streaming. + *on_stream_end(resuming)*: called when a streaming session finishes. + ``resuming=True`` means tool calls follow (spinner should restart); + ``resuming=False`` means this is the final response. + """ messages = initial_messages iteration = 0 final_content = None @@ -224,11 +232,20 @@ class AgentLoop: tool_defs = self.tools.get_definitions() - response = await self.provider.chat_with_retry( - messages=messages, - tools=tool_defs, - model=self.model, - ) + if on_stream: + response = await self.provider.chat_stream_with_retry( + messages=messages, + tools=tool_defs, + model=self.model, + on_content_delta=on_stream, + ) + else: + response = await self.provider.chat_with_retry( + messages=messages, + tools=tool_defs, + model=self.model, + ) + usage = response.usage or {} self._last_usage = { "prompt_tokens": int(usage.get("prompt_tokens", 0) or 0), @@ -236,10 +253,14 @@ class AgentLoop: } if response.has_tool_calls: + if on_stream and on_stream_end: + await on_stream_end(resuming=True) + if on_progress: - thought = self._strip_think(response.content) - if thought: - await on_progress(thought) + if not on_stream: + thought = self._strip_think(response.content) + if thought: + await on_progress(thought) tool_hint = self._tool_hint(response.tool_calls) tool_hint = self._strip_think(tool_hint) await on_progress(tool_hint, tool_hint=True) @@ -263,9 +284,10 @@ class AgentLoop: messages, tool_call.id, tool_call.name, result ) else: + if on_stream and on_stream_end: + await on_stream_end(resuming=False) + clean = self._strip_think(response.content) - # Don't persist error responses to session history — they can - # poison the context and cause permanent 400 loops (#1303). if response.finish_reason == "error": logger.error("LLM returned error: {}", (clean or "")[:200]) final_content = clean or "Sorry, I encountered an error calling the AI model." @@ -400,6 +422,8 @@ class AgentLoop: msg: InboundMessage, session_key: str | None = None, on_progress: Callable[[str], Awaitable[None]] | None = None, + on_stream: Callable[[str], Awaitable[None]] | None = None, + on_stream_end: Callable[..., Awaitable[None]] | None = None, ) -> OutboundMessage | None: """Process a single inbound message and return the response.""" # System messages: parse origin from chat_id ("channel:chat_id") @@ -412,7 +436,6 @@ class AgentLoop: await self.memory_consolidator.maybe_consolidate_by_tokens(session) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) history = session.get_history(max_messages=0) - # Subagent results should be assistant role, other system messages use user role current_role = "assistant" if msg.sender_id == "subagent" else "user" messages = self.context.build_messages( history=history, @@ -486,7 +509,10 @@ class AgentLoop: )) final_content, _, all_msgs = await self._run_agent_loop( - initial_messages, on_progress=on_progress or _bus_progress, + initial_messages, + on_progress=on_progress or _bus_progress, + on_stream=on_stream, + on_stream_end=on_stream_end, ) if final_content is None: @@ -501,9 +527,13 @@ class AgentLoop: preview = final_content[:120] + "..." if len(final_content) > 120 else final_content logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview) + + meta = dict(msg.metadata or {}) + if on_stream is not None: + meta["_streamed"] = True return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, content=final_content, - metadata=msg.metadata or {}, + metadata=meta, ) @staticmethod @@ -592,8 +622,13 @@ class AgentLoop: channel: str = "cli", chat_id: str = "direct", on_progress: Callable[[str], Awaitable[None]] | None = None, + on_stream: Callable[[str], Awaitable[None]] | None = None, + on_stream_end: Callable[..., Awaitable[None]] | None = None, ) -> OutboundMessage | None: """Process a message directly and return the outbound payload.""" await self._connect_mcp() msg = InboundMessage(channel=channel, sender_id="user", chat_id=chat_id, content=content) - return await self._process_message(msg, session_key=session_key, on_progress=on_progress) + return await self._process_message( + msg, session_key=session_key, on_progress=on_progress, + on_stream=on_stream, on_stream_end=on_stream_end, + ) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index ea06acb86..7639b3de8 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -207,6 +207,10 @@ class _ThinkingSpinner: self._active = False if self._spinner: self._spinner.stop() + # Force-clear the spinner line: Rich Live's transient cleanup + # occasionally loses a race with its own render thread. + console.file.write("\033[2K\r") + console.file.flush() return False @contextmanager @@ -214,6 +218,8 @@ class _ThinkingSpinner: """Temporarily stop spinner while printing progress.""" if self._spinner and self._active: self._spinner.stop() + console.file.write("\033[2K\r") + console.file.flush() try: yield finally: @@ -770,16 +776,25 @@ def agent( async def run_once(): nonlocal _thinking _thinking = _ThinkingSpinner(enabled=not logs) - with _thinking: + + with _thinking or nullcontext(): response = await agent_loop.process_direct( - message, session_id, on_progress=_cli_progress, + message, session_id, + on_progress=_cli_progress, ) - _thinking = None - _print_agent_response( - response.content if response else "", - render_markdown=markdown, - metadata=response.metadata if response else None, - ) + + if _thinking: + _thinking.__exit__(None, None, None) + _thinking = None + + if response and response.content: + _print_agent_response( + response.content, + render_markdown=markdown, + metadata=response.metadata, + ) + else: + console.print() await agent_loop.close_mcp() asyncio.run(run_once()) @@ -820,6 +835,7 @@ def agent( while True: try: msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + if msg.metadata.get("_progress"): is_tool_hint = msg.metadata.get("_tool_hint", False) ch = agent_loop.channels_config @@ -834,6 +850,7 @@ def agent( if msg.content: turn_response.append((msg.content, dict(msg.metadata or {}))) turn_done.set() + elif msg.content: await _print_interactive_response( msg.content, @@ -872,11 +889,7 @@ def agent( content=user_input, )) - nonlocal _thinking - _thinking = _ThinkingSpinner(enabled=not logs) - with _thinking: - await turn_done.wait() - _thinking = None + await turn_done.wait() if turn_response: content, meta = turn_response[0] diff --git a/nanobot/providers/base.py b/nanobot/providers/base.py index 8f9b2ba8c..046458dec 100644 --- a/nanobot/providers/base.py +++ b/nanobot/providers/base.py @@ -3,6 +3,7 @@ import asyncio import json from abc import ABC, abstractmethod +from collections.abc import Awaitable, Callable from dataclasses import dataclass, field from typing import Any @@ -223,6 +224,90 @@ class LLMProvider(ABC): except Exception as exc: return LLMResponse(content=f"Error calling LLM: {exc}", finish_reason="error") + async def chat_stream( + self, + messages: list[dict[str, Any]], + tools: list[dict[str, Any]] | None = None, + model: str | None = None, + max_tokens: int = 4096, + temperature: float = 0.7, + reasoning_effort: str | None = None, + tool_choice: str | dict[str, Any] | None = None, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, + ) -> LLMResponse: + """Stream a chat completion, calling *on_content_delta* for each text chunk. + + Returns the same ``LLMResponse`` as :meth:`chat`. The default + implementation falls back to a non-streaming call and delivers the + full content as a single delta. Providers that support native + streaming should override this method. + """ + response = await self.chat( + messages=messages, tools=tools, model=model, + max_tokens=max_tokens, temperature=temperature, + reasoning_effort=reasoning_effort, tool_choice=tool_choice, + ) + if on_content_delta and response.content: + await on_content_delta(response.content) + return response + + async def _safe_chat_stream(self, **kwargs: Any) -> LLMResponse: + """Call chat_stream() and convert unexpected exceptions to error responses.""" + try: + return await self.chat_stream(**kwargs) + except asyncio.CancelledError: + raise + except Exception as exc: + return LLMResponse(content=f"Error calling LLM: {exc}", finish_reason="error") + + async def chat_stream_with_retry( + self, + messages: list[dict[str, Any]], + tools: list[dict[str, Any]] | None = None, + model: str | None = None, + max_tokens: object = _SENTINEL, + temperature: object = _SENTINEL, + reasoning_effort: object = _SENTINEL, + tool_choice: str | dict[str, Any] | None = None, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, + ) -> LLMResponse: + """Call chat_stream() with retry on transient provider failures.""" + if max_tokens is self._SENTINEL: + max_tokens = self.generation.max_tokens + if temperature is self._SENTINEL: + temperature = self.generation.temperature + if reasoning_effort is self._SENTINEL: + reasoning_effort = self.generation.reasoning_effort + + kw: dict[str, Any] = dict( + messages=messages, tools=tools, model=model, + max_tokens=max_tokens, temperature=temperature, + reasoning_effort=reasoning_effort, tool_choice=tool_choice, + on_content_delta=on_content_delta, + ) + + for attempt, delay in enumerate(self._CHAT_RETRY_DELAYS, start=1): + response = await self._safe_chat_stream(**kw) + + if response.finish_reason != "error": + return response + + if not self._is_transient_error(response.content): + stripped = self._strip_image_content(messages) + if stripped is not None: + logger.warning("Non-transient LLM error with image content, retrying without images") + return await self._safe_chat_stream(**{**kw, "messages": stripped}) + return response + + logger.warning( + "LLM transient error (attempt {}/{}), retrying in {}s: {}", + attempt, len(self._CHAT_RETRY_DELAYS), delay, + (response.content or "")[:120].lower(), + ) + await asyncio.sleep(delay) + + return await self._safe_chat_stream(**kw) + async def chat_with_retry( self, messages: list[dict[str, Any]], diff --git a/nanobot/providers/litellm_provider.py b/nanobot/providers/litellm_provider.py index 20c3d2527..9aa0ba680 100644 --- a/nanobot/providers/litellm_provider.py +++ b/nanobot/providers/litellm_provider.py @@ -4,6 +4,7 @@ import hashlib import os import secrets import string +from collections.abc import Awaitable, Callable from typing import Any import json_repair @@ -223,6 +224,64 @@ class LiteLLMProvider(LLMProvider): clean["tool_call_id"] = map_id(clean["tool_call_id"]) return sanitized + def _build_chat_kwargs( + self, + messages: list[dict[str, Any]], + tools: list[dict[str, Any]] | None, + model: str | None, + max_tokens: int, + temperature: float, + reasoning_effort: str | None, + tool_choice: str | dict[str, Any] | None, + ) -> tuple[dict[str, Any], str]: + """Build the kwargs dict for ``acompletion``. + + Returns ``(kwargs, original_model)`` so callers can reuse the + original model string for downstream logic. + """ + original_model = model or self.default_model + resolved = self._resolve_model(original_model) + extra_msg_keys = self._extra_msg_keys(original_model, resolved) + + if self._supports_cache_control(original_model): + messages, tools = self._apply_cache_control(messages, tools) + + max_tokens = max(1, max_tokens) + + kwargs: dict[str, Any] = { + "model": resolved, + "messages": self._sanitize_messages( + self._sanitize_empty_content(messages), extra_keys=extra_msg_keys, + ), + "max_tokens": max_tokens, + "temperature": temperature, + } + + if self._gateway: + kwargs.update(self._gateway.litellm_kwargs) + + self._apply_model_overrides(resolved, kwargs) + + if self._langsmith_enabled: + kwargs.setdefault("callbacks", []).append("langsmith") + + if self.api_key: + kwargs["api_key"] = self.api_key + if self.api_base: + kwargs["api_base"] = self.api_base + if self.extra_headers: + kwargs["extra_headers"] = self.extra_headers + + if reasoning_effort: + kwargs["reasoning_effort"] = reasoning_effort + kwargs["drop_params"] = True + + if tools: + kwargs["tools"] = tools + kwargs["tool_choice"] = tool_choice or "auto" + + return kwargs, original_model + async def chat( self, messages: list[dict[str, Any]], @@ -233,71 +292,54 @@ class LiteLLMProvider(LLMProvider): reasoning_effort: str | None = None, tool_choice: str | dict[str, Any] | None = None, ) -> LLMResponse: - """ - Send a chat completion request via LiteLLM. - - Args: - messages: List of message dicts with 'role' and 'content'. - tools: Optional list of tool definitions in OpenAI format. - model: Model identifier (e.g., 'anthropic/claude-sonnet-4-5'). - max_tokens: Maximum tokens in response. - temperature: Sampling temperature. - - Returns: - LLMResponse with content and/or tool calls. - """ - original_model = model or self.default_model - model = self._resolve_model(original_model) - extra_msg_keys = self._extra_msg_keys(original_model, model) - - if self._supports_cache_control(original_model): - messages, tools = self._apply_cache_control(messages, tools) - - # Clamp max_tokens to at least 1 — negative or zero values cause - # LiteLLM to reject the request with "max_tokens must be at least 1". - max_tokens = max(1, max_tokens) - - kwargs: dict[str, Any] = { - "model": model, - "messages": self._sanitize_messages(self._sanitize_empty_content(messages), extra_keys=extra_msg_keys), - "max_tokens": max_tokens, - "temperature": temperature, - } - - if self._gateway: - kwargs.update(self._gateway.litellm_kwargs) - - # Apply model-specific overrides (e.g. kimi-k2.5 temperature) - self._apply_model_overrides(model, kwargs) - - if self._langsmith_enabled: - kwargs.setdefault("callbacks", []).append("langsmith") - - # Pass api_key directly — more reliable than env vars alone - if self.api_key: - kwargs["api_key"] = self.api_key - - # Pass api_base for custom endpoints - if self.api_base: - kwargs["api_base"] = self.api_base - - # Pass extra headers (e.g. APP-Code for AiHubMix) - if self.extra_headers: - kwargs["extra_headers"] = self.extra_headers - - if reasoning_effort: - kwargs["reasoning_effort"] = reasoning_effort - kwargs["drop_params"] = True - - if tools: - kwargs["tools"] = tools - kwargs["tool_choice"] = tool_choice or "auto" - + """Send a chat completion request via LiteLLM.""" + kwargs, _ = self._build_chat_kwargs( + messages, tools, model, max_tokens, temperature, + reasoning_effort, tool_choice, + ) try: response = await acompletion(**kwargs) return self._parse_response(response) except Exception as e: - # Return error as content for graceful handling + return LLMResponse( + content=f"Error calling LLM: {str(e)}", + finish_reason="error", + ) + + async def chat_stream( + self, + messages: list[dict[str, Any]], + tools: list[dict[str, Any]] | None = None, + model: str | None = None, + max_tokens: int = 4096, + temperature: float = 0.7, + reasoning_effort: str | None = None, + tool_choice: str | dict[str, Any] | None = None, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, + ) -> LLMResponse: + """Stream a chat completion via LiteLLM, forwarding text deltas.""" + kwargs, _ = self._build_chat_kwargs( + messages, tools, model, max_tokens, temperature, + reasoning_effort, tool_choice, + ) + kwargs["stream"] = True + + try: + stream = await acompletion(**kwargs) + chunks: list[Any] = [] + async for chunk in stream: + chunks.append(chunk) + if on_content_delta: + delta = chunk.choices[0].delta if chunk.choices else None + text = getattr(delta, "content", None) if delta else None + if text: + await on_content_delta(text) + + full_response = litellm.stream_chunk_builder( + chunks, messages=kwargs["messages"], + ) + return self._parse_response(full_response) + except Exception as e: return LLMResponse( content=f"Error calling LLM: {str(e)}", finish_reason="error", diff --git a/tests/test_loop_consolidation_tokens.py b/tests/test_loop_consolidation_tokens.py index b0f3dda53..87d8d29f3 100644 --- a/tests/test_loop_consolidation_tokens.py +++ b/tests/test_loop_consolidation_tokens.py @@ -12,7 +12,9 @@ def _make_loop(tmp_path, *, estimated_tokens: int, context_window_tokens: int) - provider = MagicMock() provider.get_default_model.return_value = "test-model" provider.estimate_prompt_tokens.return_value = (estimated_tokens, "test-counter") - provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + _response = LLMResponse(content="ok", tool_calls=[]) + provider.chat_with_retry = AsyncMock(return_value=_response) + provider.chat_stream_with_retry = AsyncMock(return_value=_response) loop = AgentLoop( bus=MessageBus(), @@ -167,6 +169,7 @@ async def test_preflight_consolidation_before_llm_call(tmp_path, monkeypatch) -> order.append("llm") return LLMResponse(content="ok", tool_calls=[]) loop.provider.chat_with_retry = track_llm + loop.provider.chat_stream_with_retry = track_llm session = loop.sessions.get_or_create("cli:test") session.messages = [ From bd621df57f7b4ab4122d57bf04d797eb1e523690 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 22 Mar 2026 15:34:15 +0000 Subject: [PATCH 2/9] feat: add streaming channel support with automatic fallback Provider layer: add chat_stream / chat_stream_with_retry to all providers (base fallback, litellm, custom, azure, codex). Refactor shared kwargs building in each provider. Channel layer: BaseChannel gains send_delta (no-op) and supports_streaming (checks config + method override). ChannelManager routes _stream_delta / _stream_end to send_delta, skips _streamed final messages. AgentLoop._dispatch builds bus-backed on_stream/on_stream_end callbacks when _wants_stream metadata is set. Non-streaming path unchanged. CLI: clean up spinner ANSI workarounds, simplify commands.py flow. Made-with: Cursor --- nanobot/agent/loop.py | 18 +++- nanobot/channels/base.py | 17 ++- nanobot/channels/manager.py | 7 +- nanobot/cli/commands.py | 39 +++---- nanobot/config/schema.py | 1 + nanobot/providers/azure_openai_provider.py | 96 +++++++++++++++++ nanobot/providers/custom_provider.py | 116 ++++++++++++++++----- nanobot/providers/openai_codex_provider.py | 115 ++++++++++---------- 8 files changed, 300 insertions(+), 109 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 093f0e204..1bbb7cfa7 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -376,7 +376,23 @@ class AgentLoop: """Process a message under the global lock.""" async with self._processing_lock: try: - response = await self._process_message(msg) + on_stream = on_stream_end = None + if msg.metadata.get("_wants_stream"): + async def on_stream(delta: str) -> None: + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content=delta, metadata={"_stream_delta": True}, + )) + + async def on_stream_end(*, resuming: bool = False) -> None: + await self.bus.publish_outbound(OutboundMessage( + channel=msg.channel, chat_id=msg.chat_id, + content="", metadata={"_stream_end": True, "_resuming": resuming}, + )) + + response = await self._process_message( + msg, on_stream=on_stream, on_stream_end=on_stream_end, + ) if response is not None: await self.bus.publish_outbound(response) elif msg.channel == "cli": diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index 81f0751c0..49be3901f 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -76,6 +76,17 @@ class BaseChannel(ABC): """ pass + async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + """Deliver a streaming text chunk. Override in subclass to enable streaming.""" + pass + + @property + def supports_streaming(self) -> bool: + """True when config enables streaming AND this subclass implements send_delta.""" + cfg = self.config + streaming = cfg.get("streaming", False) if isinstance(cfg, dict) else getattr(cfg, "streaming", False) + return bool(streaming) and type(self).send_delta is not BaseChannel.send_delta + def is_allowed(self, sender_id: str) -> bool: """Check if *sender_id* is permitted. Empty list → deny all; ``"*"`` → allow all.""" allow_list = getattr(self.config, "allow_from", []) @@ -116,13 +127,17 @@ class BaseChannel(ABC): ) return + meta = metadata or {} + if self.supports_streaming: + meta = {**meta, "_wants_stream": True} + msg = InboundMessage( channel=self.name, sender_id=str(sender_id), chat_id=str(chat_id), content=content, media=media or [], - metadata=metadata or {}, + metadata=meta, session_key_override=session_key, ) diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index 3820c10df..3a53b6307 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -130,7 +130,12 @@ class ChannelManager: channel = self.channels.get(msg.channel) if channel: try: - await channel.send(msg) + if msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): + await channel.send_delta(msg.chat_id, msg.content, msg.metadata) + elif msg.metadata.get("_streamed"): + pass + else: + await channel.send(msg) except Exception as e: logger.error("Error sending to {}: {}", msg.channel, e) else: diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 7639b3de8..ea06acb86 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -207,10 +207,6 @@ class _ThinkingSpinner: self._active = False if self._spinner: self._spinner.stop() - # Force-clear the spinner line: Rich Live's transient cleanup - # occasionally loses a race with its own render thread. - console.file.write("\033[2K\r") - console.file.flush() return False @contextmanager @@ -218,8 +214,6 @@ class _ThinkingSpinner: """Temporarily stop spinner while printing progress.""" if self._spinner and self._active: self._spinner.stop() - console.file.write("\033[2K\r") - console.file.flush() try: yield finally: @@ -776,25 +770,16 @@ def agent( async def run_once(): nonlocal _thinking _thinking = _ThinkingSpinner(enabled=not logs) - - with _thinking or nullcontext(): + with _thinking: response = await agent_loop.process_direct( - message, session_id, - on_progress=_cli_progress, + message, session_id, on_progress=_cli_progress, ) - - if _thinking: - _thinking.__exit__(None, None, None) - _thinking = None - - if response and response.content: - _print_agent_response( - response.content, - render_markdown=markdown, - metadata=response.metadata, - ) - else: - console.print() + _thinking = None + _print_agent_response( + response.content if response else "", + render_markdown=markdown, + metadata=response.metadata if response else None, + ) await agent_loop.close_mcp() asyncio.run(run_once()) @@ -835,7 +820,6 @@ def agent( while True: try: msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) - if msg.metadata.get("_progress"): is_tool_hint = msg.metadata.get("_tool_hint", False) ch = agent_loop.channels_config @@ -850,7 +834,6 @@ def agent( if msg.content: turn_response.append((msg.content, dict(msg.metadata or {}))) turn_done.set() - elif msg.content: await _print_interactive_response( msg.content, @@ -889,7 +872,11 @@ def agent( content=user_input, )) - await turn_done.wait() + nonlocal _thinking + _thinking = _ThinkingSpinner(enabled=not logs) + with _thinking: + await turn_done.wait() + _thinking = None if turn_response: content, meta = turn_response[0] diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index c88443377..5937b2e35 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -18,6 +18,7 @@ class ChannelsConfig(Base): Built-in and plugin channel configs are stored as extra fields (dicts). Each channel parses its own config in __init__. + Per-channel "streaming": true enables streaming output (requires send_delta impl). """ model_config = ConfigDict(extra="allow") diff --git a/nanobot/providers/azure_openai_provider.py b/nanobot/providers/azure_openai_provider.py index 05fbac4c1..d71dae917 100644 --- a/nanobot/providers/azure_openai_provider.py +++ b/nanobot/providers/azure_openai_provider.py @@ -2,7 +2,9 @@ from __future__ import annotations +import json import uuid +from collections.abc import Awaitable, Callable from typing import Any from urllib.parse import urljoin @@ -208,6 +210,100 @@ class AzureOpenAIProvider(LLMProvider): finish_reason="error", ) + async def chat_stream( + self, + messages: list[dict[str, Any]], + tools: list[dict[str, Any]] | None = None, + model: str | None = None, + max_tokens: int = 4096, + temperature: float = 0.7, + reasoning_effort: str | None = None, + tool_choice: str | dict[str, Any] | None = None, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, + ) -> LLMResponse: + """Stream a chat completion via Azure OpenAI SSE.""" + deployment_name = model or self.default_model + url = self._build_chat_url(deployment_name) + headers = self._build_headers() + payload = self._prepare_request_payload( + deployment_name, messages, tools, max_tokens, temperature, + reasoning_effort, tool_choice=tool_choice, + ) + payload["stream"] = True + + try: + async with httpx.AsyncClient(timeout=60.0, verify=True) as client: + async with client.stream("POST", url, headers=headers, json=payload) as response: + if response.status_code != 200: + text = await response.aread() + return LLMResponse( + content=f"Azure OpenAI API Error {response.status_code}: {text.decode('utf-8', 'ignore')}", + finish_reason="error", + ) + return await self._consume_stream(response, on_content_delta) + except Exception as e: + return LLMResponse(content=f"Error calling Azure OpenAI: {repr(e)}", finish_reason="error") + + async def _consume_stream( + self, + response: httpx.Response, + on_content_delta: Callable[[str], Awaitable[None]] | None, + ) -> LLMResponse: + """Parse Azure OpenAI SSE stream into an LLMResponse.""" + content_parts: list[str] = [] + tool_call_buffers: dict[int, dict[str, str]] = {} + finish_reason = "stop" + + async for line in response.aiter_lines(): + if not line.startswith("data: "): + continue + data = line[6:].strip() + if data == "[DONE]": + break + try: + chunk = json.loads(data) + except Exception: + continue + + choices = chunk.get("choices") or [] + if not choices: + continue + choice = choices[0] + if choice.get("finish_reason"): + finish_reason = choice["finish_reason"] + delta = choice.get("delta") or {} + + text = delta.get("content") + if text: + content_parts.append(text) + if on_content_delta: + await on_content_delta(text) + + for tc in delta.get("tool_calls") or []: + idx = tc.get("index", 0) + buf = tool_call_buffers.setdefault(idx, {"id": "", "name": "", "arguments": ""}) + if tc.get("id"): + buf["id"] = tc["id"] + fn = tc.get("function") or {} + if fn.get("name"): + buf["name"] = fn["name"] + if fn.get("arguments"): + buf["arguments"] += fn["arguments"] + + tool_calls = [ + ToolCallRequest( + id=buf["id"], name=buf["name"], + arguments=json_repair.loads(buf["arguments"]) if buf["arguments"] else {}, + ) + for buf in tool_call_buffers.values() + ] + + return LLMResponse( + content="".join(content_parts) or None, + tool_calls=tool_calls, + finish_reason=finish_reason, + ) + def get_default_model(self) -> str: """Get the default model (also used as default deployment name).""" return self.default_model \ No newline at end of file diff --git a/nanobot/providers/custom_provider.py b/nanobot/providers/custom_provider.py index 3daa0cc77..a47dae7cd 100644 --- a/nanobot/providers/custom_provider.py +++ b/nanobot/providers/custom_provider.py @@ -3,6 +3,7 @@ from __future__ import annotations import uuid +from collections.abc import Awaitable, Callable from typing import Any import json_repair @@ -22,22 +23,20 @@ class CustomProvider(LLMProvider): ): super().__init__(api_key, api_base) self.default_model = default_model - # Keep affinity stable for this provider instance to improve backend cache locality, - # while still letting users attach provider-specific headers for custom gateways. - default_headers = { - "x-session-affinity": uuid.uuid4().hex, - **(extra_headers or {}), - } self._client = AsyncOpenAI( api_key=api_key, base_url=api_base, - default_headers=default_headers, + default_headers={ + "x-session-affinity": uuid.uuid4().hex, + **(extra_headers or {}), + }, ) - async def chat(self, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, - model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, - reasoning_effort: str | None = None, - tool_choice: str | dict[str, Any] | None = None) -> LLMResponse: + def _build_kwargs( + self, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None, + model: str | None, max_tokens: int, temperature: float, + reasoning_effort: str | None, tool_choice: str | dict[str, Any] | None, + ) -> dict[str, Any]: kwargs: dict[str, Any] = { "model": model or self.default_model, "messages": self._sanitize_empty_content(messages), @@ -48,37 +47,106 @@ class CustomProvider(LLMProvider): kwargs["reasoning_effort"] = reasoning_effort if tools: kwargs.update(tools=tools, tool_choice=tool_choice or "auto") + return kwargs + + def _handle_error(self, e: Exception) -> LLMResponse: + body = getattr(e, "doc", None) or getattr(getattr(e, "response", None), "text", None) + msg = f"Error: {body.strip()[:500]}" if body and body.strip() else f"Error: {e}" + return LLMResponse(content=msg, finish_reason="error") + + async def chat(self, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, + model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, + reasoning_effort: str | None = None, + tool_choice: str | dict[str, Any] | None = None) -> LLMResponse: + kwargs = self._build_kwargs(messages, tools, model, max_tokens, temperature, reasoning_effort, tool_choice) try: return self._parse(await self._client.chat.completions.create(**kwargs)) except Exception as e: - # JSONDecodeError.doc / APIError.response.text may carry the raw body - # (e.g. "unsupported model: xxx") which is far more useful than the - # generic "Expecting value …" message. Truncate to avoid huge HTML pages. - body = getattr(e, "doc", None) or getattr(getattr(e, "response", None), "text", None) - if body and body.strip(): - return LLMResponse(content=f"Error: {body.strip()[:500]}", finish_reason="error") - return LLMResponse(content=f"Error: {e}", finish_reason="error") + return self._handle_error(e) + + async def chat_stream( + self, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, + model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, + reasoning_effort: str | None = None, + tool_choice: str | dict[str, Any] | None = None, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, + ) -> LLMResponse: + kwargs = self._build_kwargs(messages, tools, model, max_tokens, temperature, reasoning_effort, tool_choice) + kwargs["stream"] = True + try: + stream = await self._client.chat.completions.create(**kwargs) + chunks: list[Any] = [] + async for chunk in stream: + chunks.append(chunk) + if on_content_delta and chunk.choices: + text = getattr(chunk.choices[0].delta, "content", None) + if text: + await on_content_delta(text) + return self._parse_chunks(chunks) + except Exception as e: + return self._handle_error(e) def _parse(self, response: Any) -> LLMResponse: if not response.choices: return LLMResponse( - content="Error: API returned empty choices. This may indicate a temporary service issue or an invalid model response.", - finish_reason="error" + content="Error: API returned empty choices.", + finish_reason="error", ) choice = response.choices[0] msg = choice.message tool_calls = [ - ToolCallRequest(id=tc.id, name=tc.function.name, - arguments=json_repair.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else tc.function.arguments) + ToolCallRequest( + id=tc.id, name=tc.function.name, + arguments=json_repair.loads(tc.function.arguments) if isinstance(tc.function.arguments, str) else tc.function.arguments, + ) for tc in (msg.tool_calls or []) ] u = response.usage return LLMResponse( - content=msg.content, tool_calls=tool_calls, finish_reason=choice.finish_reason or "stop", + content=msg.content, tool_calls=tool_calls, + finish_reason=choice.finish_reason or "stop", usage={"prompt_tokens": u.prompt_tokens, "completion_tokens": u.completion_tokens, "total_tokens": u.total_tokens} if u else {}, reasoning_content=getattr(msg, "reasoning_content", None) or None, ) + def _parse_chunks(self, chunks: list[Any]) -> LLMResponse: + """Reassemble streamed chunks into a single LLMResponse.""" + content_parts: list[str] = [] + tc_bufs: dict[int, dict[str, str]] = {} + finish_reason = "stop" + usage: dict[str, int] = {} + + for chunk in chunks: + if not chunk.choices: + if hasattr(chunk, "usage") and chunk.usage: + u = chunk.usage + usage = {"prompt_tokens": u.prompt_tokens or 0, "completion_tokens": u.completion_tokens or 0, + "total_tokens": u.total_tokens or 0} + continue + choice = chunk.choices[0] + if choice.finish_reason: + finish_reason = choice.finish_reason + delta = choice.delta + if delta and delta.content: + content_parts.append(delta.content) + for tc in (delta.tool_calls or []) if delta else []: + buf = tc_bufs.setdefault(tc.index, {"id": "", "name": "", "arguments": ""}) + if tc.id: + buf["id"] = tc.id + if tc.function and tc.function.name: + buf["name"] = tc.function.name + if tc.function and tc.function.arguments: + buf["arguments"] += tc.function.arguments + + return LLMResponse( + content="".join(content_parts) or None, + tool_calls=[ + ToolCallRequest(id=b["id"], name=b["name"], arguments=json_repair.loads(b["arguments"]) if b["arguments"] else {}) + for b in tc_bufs.values() + ], + finish_reason=finish_reason, + usage=usage, + ) + def get_default_model(self) -> str: return self.default_model - diff --git a/nanobot/providers/openai_codex_provider.py b/nanobot/providers/openai_codex_provider.py index c8f21553c..1c6bc7075 100644 --- a/nanobot/providers/openai_codex_provider.py +++ b/nanobot/providers/openai_codex_provider.py @@ -5,6 +5,7 @@ from __future__ import annotations import asyncio import hashlib import json +from collections.abc import Awaitable, Callable from typing import Any, AsyncGenerator import httpx @@ -24,16 +25,16 @@ class OpenAICodexProvider(LLMProvider): super().__init__(api_key=None, api_base=None) self.default_model = default_model - async def chat( + async def _call_codex( self, messages: list[dict[str, Any]], - tools: list[dict[str, Any]] | None = None, - model: str | None = None, - max_tokens: int = 4096, - temperature: float = 0.7, - reasoning_effort: str | None = None, - tool_choice: str | dict[str, Any] | None = None, + tools: list[dict[str, Any]] | None, + model: str | None, + reasoning_effort: str | None, + tool_choice: str | dict[str, Any] | None, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, ) -> LLMResponse: + """Shared request logic for both chat() and chat_stream().""" model = model or self.default_model system_prompt, input_items = _convert_messages(messages) @@ -52,33 +53,45 @@ class OpenAICodexProvider(LLMProvider): "tool_choice": tool_choice or "auto", "parallel_tool_calls": True, } - if reasoning_effort: body["reasoning"] = {"effort": reasoning_effort} - if tools: body["tools"] = _convert_tools(tools) - url = DEFAULT_CODEX_URL - try: try: - content, tool_calls, finish_reason = await _request_codex(url, headers, body, verify=True) + content, tool_calls, finish_reason = await _request_codex( + DEFAULT_CODEX_URL, headers, body, verify=True, + on_content_delta=on_content_delta, + ) except Exception as e: if "CERTIFICATE_VERIFY_FAILED" not in str(e): raise - logger.warning("SSL certificate verification failed for Codex API; retrying with verify=False") - content, tool_calls, finish_reason = await _request_codex(url, headers, body, verify=False) - return LLMResponse( - content=content, - tool_calls=tool_calls, - finish_reason=finish_reason, - ) + logger.warning("SSL verification failed for Codex API; retrying with verify=False") + content, tool_calls, finish_reason = await _request_codex( + DEFAULT_CODEX_URL, headers, body, verify=False, + on_content_delta=on_content_delta, + ) + return LLMResponse(content=content, tool_calls=tool_calls, finish_reason=finish_reason) except Exception as e: - return LLMResponse( - content=f"Error calling Codex: {str(e)}", - finish_reason="error", - ) + return LLMResponse(content=f"Error calling Codex: {e}", finish_reason="error") + + async def chat( + self, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, + model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, + reasoning_effort: str | None = None, + tool_choice: str | dict[str, Any] | None = None, + ) -> LLMResponse: + return await self._call_codex(messages, tools, model, reasoning_effort, tool_choice) + + async def chat_stream( + self, messages: list[dict[str, Any]], tools: list[dict[str, Any]] | None = None, + model: str | None = None, max_tokens: int = 4096, temperature: float = 0.7, + reasoning_effort: str | None = None, + tool_choice: str | dict[str, Any] | None = None, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, + ) -> LLMResponse: + return await self._call_codex(messages, tools, model, reasoning_effort, tool_choice, on_content_delta) def get_default_model(self) -> str: return self.default_model @@ -107,13 +120,14 @@ async def _request_codex( headers: dict[str, str], body: dict[str, Any], verify: bool, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, ) -> tuple[str, list[ToolCallRequest], str]: async with httpx.AsyncClient(timeout=60.0, verify=verify) as client: async with client.stream("POST", url, headers=headers, json=body) as response: if response.status_code != 200: text = await response.aread() raise RuntimeError(_friendly_error(response.status_code, text.decode("utf-8", "ignore"))) - return await _consume_sse(response) + return await _consume_sse(response, on_content_delta) def _convert_tools(tools: list[dict[str, Any]]) -> list[dict[str, Any]]: @@ -151,45 +165,28 @@ def _convert_messages(messages: list[dict[str, Any]]) -> tuple[str, list[dict[st continue if role == "assistant": - # Handle text first. if isinstance(content, str) and content: - input_items.append( - { - "type": "message", - "role": "assistant", - "content": [{"type": "output_text", "text": content}], - "status": "completed", - "id": f"msg_{idx}", - } - ) - # Then handle tool calls. + input_items.append({ + "type": "message", "role": "assistant", + "content": [{"type": "output_text", "text": content}], + "status": "completed", "id": f"msg_{idx}", + }) for tool_call in msg.get("tool_calls", []) or []: fn = tool_call.get("function") or {} call_id, item_id = _split_tool_call_id(tool_call.get("id")) - call_id = call_id or f"call_{idx}" - item_id = item_id or f"fc_{idx}" - input_items.append( - { - "type": "function_call", - "id": item_id, - "call_id": call_id, - "name": fn.get("name"), - "arguments": fn.get("arguments") or "{}", - } - ) + input_items.append({ + "type": "function_call", + "id": item_id or f"fc_{idx}", + "call_id": call_id or f"call_{idx}", + "name": fn.get("name"), + "arguments": fn.get("arguments") or "{}", + }) continue if role == "tool": call_id, _ = _split_tool_call_id(msg.get("tool_call_id")) output_text = content if isinstance(content, str) else json.dumps(content, ensure_ascii=False) - input_items.append( - { - "type": "function_call_output", - "call_id": call_id, - "output": output_text, - } - ) - continue + input_items.append({"type": "function_call_output", "call_id": call_id, "output": output_text}) return system_prompt, input_items @@ -247,7 +244,10 @@ async def _iter_sse(response: httpx.Response) -> AsyncGenerator[dict[str, Any], buffer.append(line) -async def _consume_sse(response: httpx.Response) -> tuple[str, list[ToolCallRequest], str]: +async def _consume_sse( + response: httpx.Response, + on_content_delta: Callable[[str], Awaitable[None]] | None = None, +) -> tuple[str, list[ToolCallRequest], str]: content = "" tool_calls: list[ToolCallRequest] = [] tool_call_buffers: dict[str, dict[str, Any]] = {} @@ -267,7 +267,10 @@ async def _consume_sse(response: httpx.Response) -> tuple[str, list[ToolCallRequ "arguments": item.get("arguments") or "", } elif event_type == "response.output_text.delta": - content += event.get("delta") or "" + delta_text = event.get("delta") or "" + content += delta_text + if on_content_delta and delta_text: + await on_content_delta(delta_text) elif event_type == "response.function_call_arguments.delta": call_id = event.get("call_id") if call_id and call_id in tool_call_buffers: From f2e1cb3662d76f3594eeee20c5bd586ece54cbad Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 22 Mar 2026 16:47:57 +0000 Subject: [PATCH 3/9] feat(cli): extract streaming renderer to stream.py with Rich Live Move ThinkingSpinner and StreamRenderer into a dedicated module to keep commands.py focused on orchestration. Uses Rich Live with manual refresh (auto_refresh=False) and ellipsis overflow for stable streaming output. Made-with: Cursor --- nanobot/cli/commands.py | 95 +++++++++++++---------------- nanobot/cli/stream.py | 128 ++++++++++++++++++++++++++++++++++++++++ tests/test_cli_input.py | 26 ++++---- 3 files changed, 184 insertions(+), 65 deletions(-) create mode 100644 nanobot/cli/stream.py diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index ea06acb86..b915ce9b2 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -33,6 +33,7 @@ from rich.table import Table from rich.text import Text from nanobot import __logo__, __version__ +from nanobot.cli.stream import StreamRenderer, ThinkingSpinner from nanobot.config.paths import get_workspace_path from nanobot.config.schema import Config from nanobot.utils.helpers import sync_workspace_templates @@ -188,46 +189,13 @@ async def _print_interactive_response( await run_in_terminal(_write) -class _ThinkingSpinner: - """Spinner wrapper with pause support for clean progress output.""" - - def __init__(self, enabled: bool): - self._spinner = console.status( - "[dim]nanobot is thinking...[/dim]", spinner="dots" - ) if enabled else None - self._active = False - - def __enter__(self): - if self._spinner: - self._spinner.start() - self._active = True - return self - - def __exit__(self, *exc): - self._active = False - if self._spinner: - self._spinner.stop() - return False - - @contextmanager - def pause(self): - """Temporarily stop spinner while printing progress.""" - if self._spinner and self._active: - self._spinner.stop() - try: - yield - finally: - if self._spinner and self._active: - self._spinner.start() - - -def _print_cli_progress_line(text: str, thinking: _ThinkingSpinner | None) -> None: +def _print_cli_progress_line(text: str, thinking: ThinkingSpinner | None) -> None: """Print a CLI progress line, pausing the spinner if needed.""" with thinking.pause() if thinking else nullcontext(): console.print(f" [dim]↳ {text}[/dim]") -async def _print_interactive_progress_line(text: str, thinking: _ThinkingSpinner | None) -> None: +async def _print_interactive_progress_line(text: str, thinking: ThinkingSpinner | None) -> None: """Print an interactive progress line, pausing the spinner if needed.""" with thinking.pause() if thinking else nullcontext(): await _print_interactive_line(text) @@ -755,7 +723,7 @@ def agent( ) # Shared reference for progress callbacks - _thinking: _ThinkingSpinner | None = None + _thinking: ThinkingSpinner | None = None async def _cli_progress(content: str, *, tool_hint: bool = False) -> None: ch = agent_loop.channels_config @@ -768,18 +736,19 @@ def agent( if message: # Single message mode — direct call, no bus needed async def run_once(): - nonlocal _thinking - _thinking = _ThinkingSpinner(enabled=not logs) - with _thinking: - response = await agent_loop.process_direct( - message, session_id, on_progress=_cli_progress, - ) - _thinking = None - _print_agent_response( - response.content if response else "", - render_markdown=markdown, - metadata=response.metadata if response else None, + renderer = StreamRenderer(render_markdown=markdown) + response = await agent_loop.process_direct( + message, session_id, + on_progress=_cli_progress, + on_stream=renderer.on_delta, + on_stream_end=renderer.on_end, ) + if not renderer.streamed: + _print_agent_response( + response.content if response else "", + render_markdown=markdown, + metadata=response.metadata if response else None, + ) await agent_loop.close_mcp() asyncio.run(run_once()) @@ -815,11 +784,27 @@ def agent( turn_done = asyncio.Event() turn_done.set() turn_response: list[tuple[str, dict]] = [] + renderer: StreamRenderer | None = None async def _consume_outbound(): while True: try: msg = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0) + + if msg.metadata.get("_stream_delta"): + if renderer: + await renderer.on_delta(msg.content) + continue + if msg.metadata.get("_stream_end"): + if renderer: + await renderer.on_end( + resuming=msg.metadata.get("_resuming", False), + ) + continue + if msg.metadata.get("_streamed"): + turn_done.set() + continue + if msg.metadata.get("_progress"): is_tool_hint = msg.metadata.get("_tool_hint", False) ch = agent_loop.channels_config @@ -829,8 +814,9 @@ def agent( pass else: await _print_interactive_progress_line(msg.content, _thinking) + continue - elif not turn_done.is_set(): + if not turn_done.is_set(): if msg.content: turn_response.append((msg.content, dict(msg.metadata or {}))) turn_done.set() @@ -864,23 +850,24 @@ def agent( turn_done.clear() turn_response.clear() + renderer = StreamRenderer(render_markdown=markdown) await bus.publish_inbound(InboundMessage( channel=cli_channel, sender_id="user", chat_id=cli_chat_id, content=user_input, + metadata={"_wants_stream": True}, )) - nonlocal _thinking - _thinking = _ThinkingSpinner(enabled=not logs) - with _thinking: - await turn_done.wait() - _thinking = None + await turn_done.wait() if turn_response: content, meta = turn_response[0] - _print_agent_response(content, render_markdown=markdown, metadata=meta) + if content and not meta.get("_streamed"): + _print_agent_response( + content, render_markdown=markdown, metadata=meta, + ) except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") diff --git a/nanobot/cli/stream.py b/nanobot/cli/stream.py new file mode 100644 index 000000000..3ee28fe6e --- /dev/null +++ b/nanobot/cli/stream.py @@ -0,0 +1,128 @@ +"""Streaming renderer for CLI output. + +Uses Rich Live with auto_refresh=False for stable, flicker-free +markdown rendering during streaming. Ellipsis mode handles overflow. +""" + +from __future__ import annotations + +import re +import sys +import time +from typing import Any + +from rich.console import Console +from rich.live import Live +from rich.markdown import Markdown +from rich.text import Text + +from nanobot import __logo__ + + +def _make_console() -> Console: + return Console(file=sys.stdout) + + +class ThinkingSpinner: + """Spinner that shows 'nanobot is thinking...' with pause support.""" + + def __init__(self, console: Console | None = None): + c = console or _make_console() + self._spinner = c.status("[dim]nanobot is thinking...[/dim]", spinner="dots") + self._active = False + + def __enter__(self): + self._spinner.start() + self._active = True + return self + + def __exit__(self, *exc): + self._active = False + self._spinner.stop() + return False + + def pause(self): + """Context manager: temporarily stop spinner for clean output.""" + from contextlib import contextmanager + + @contextmanager + def _ctx(): + if self._spinner and self._active: + self._spinner.stop() + try: + yield + finally: + if self._spinner and self._active: + self._spinner.start() + + return _ctx() + + +class StreamRenderer: + """Rich Live streaming with markdown. auto_refresh=False avoids render races. + + Flow per round: + spinner -> first visible delta -> header + Live renders -> + on_end -> Live stops (content stays on screen) + """ + + def __init__(self, render_markdown: bool = True, show_spinner: bool = True): + self._md = render_markdown + self._show_spinner = show_spinner + self._buf = "" + self._live: Live | None = None + self._t = 0.0 + self.streamed = False + self._spinner: ThinkingSpinner | None = None + self._start_spinner() + + @staticmethod + def _clean(text: str) -> str: + text = re.sub(r"[\s\S]*?", "", text) + text = re.sub(r"[\s\S]*$", "", text) + return text.strip() + + def _render(self): + clean = self._clean(self._buf) + return Markdown(clean) if self._md and clean else Text(clean or "") + + def _start_spinner(self) -> None: + if self._show_spinner: + self._spinner = ThinkingSpinner() + self._spinner.__enter__() + + def _stop_spinner(self) -> None: + if self._spinner: + self._spinner.__exit__(None, None, None) + self._spinner = None + + async def on_delta(self, delta: str) -> None: + self.streamed = True + self._buf += delta + if self._live is None: + if not self._clean(self._buf): + return + self._stop_spinner() + c = _make_console() + c.print() + c.print(f"[cyan]{__logo__} nanobot[/cyan]") + self._live = Live(self._render(), console=c, auto_refresh=False) + self._live.start() + now = time.monotonic() + if "\n" in delta or (now - self._t) > 0.05: + self._live.update(self._render()) + self._live.refresh() + self._t = now + + async def on_end(self, *, resuming: bool = False) -> None: + if self._live: + self._live.update(self._render()) + self._live.refresh() + self._live.stop() + self._live = None + self._stop_spinner() + if resuming: + self._buf = "" + self._start_spinner() + else: + _make_console().print() diff --git a/tests/test_cli_input.py b/tests/test_cli_input.py index 2fc974853..142dc7260 100644 --- a/tests/test_cli_input.py +++ b/tests/test_cli_input.py @@ -5,6 +5,7 @@ import pytest from prompt_toolkit.formatted_text import HTML from nanobot.cli import commands +from nanobot.cli import stream as stream_mod @pytest.fixture @@ -62,12 +63,13 @@ def test_init_prompt_session_creates_session(): def test_thinking_spinner_pause_stops_and_restarts(): """Pause should stop the active spinner and restart it afterward.""" spinner = MagicMock() + mock_console = MagicMock() + mock_console.status.return_value = spinner - with patch.object(commands.console, "status", return_value=spinner): - thinking = commands._ThinkingSpinner(enabled=True) - with thinking: - with thinking.pause(): - pass + thinking = stream_mod.ThinkingSpinner(console=mock_console) + with thinking: + with thinking.pause(): + pass assert spinner.method_calls == [ call.start(), @@ -83,10 +85,11 @@ def test_print_cli_progress_line_pauses_spinner_before_printing(): spinner = MagicMock() spinner.start.side_effect = lambda: order.append("start") spinner.stop.side_effect = lambda: order.append("stop") + mock_console = MagicMock() + mock_console.status.return_value = spinner - with patch.object(commands.console, "status", return_value=spinner), \ - patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")): - thinking = commands._ThinkingSpinner(enabled=True) + with patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")): + thinking = stream_mod.ThinkingSpinner(console=mock_console) with thinking: commands._print_cli_progress_line("tool running", thinking) @@ -100,13 +103,14 @@ async def test_print_interactive_progress_line_pauses_spinner_before_printing(): spinner = MagicMock() spinner.start.side_effect = lambda: order.append("start") spinner.stop.side_effect = lambda: order.append("stop") + mock_console = MagicMock() + mock_console.status.return_value = spinner async def fake_print(_text: str) -> None: order.append("print") - with patch.object(commands.console, "status", return_value=spinner), \ - patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print): - thinking = commands._ThinkingSpinner(enabled=True) + with patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print): + thinking = stream_mod.ThinkingSpinner(console=mock_console) with thinking: await commands._print_interactive_progress_line("tool running", thinking) From 9d5e511a6e69a2735f65a7959350c991f2d5bd4b Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sun, 22 Mar 2026 17:33:09 +0000 Subject: [PATCH 4/9] feat(streaming): centralize think-tag filtering and add Telegram streaming - Add strip_think() to helpers.py as single source of truth - Filter deltas in agent loop before dispatching to consumers - Implement send_delta in TelegramChannel with progressive edit_message_text - Remove duplicate think filtering from CLI stream.py and telegram.py - Remove legacy fake streaming (send_message_draft) from Telegram - Default Telegram streaming to true - Update CHANNEL_PLUGIN_GUIDE.md with streaming documentation Made-with: Cursor --- docs/CHANNEL_PLUGIN_GUIDE.md | 100 +++++++++++++++++++++++++++++++++- nanobot/agent/loop.py | 22 +++++++- nanobot/channels/telegram.py | 103 +++++++++++++++++++++++++---------- nanobot/cli/stream.py | 15 ++--- nanobot/utils/helpers.py | 7 +++ 5 files changed, 204 insertions(+), 43 deletions(-) diff --git a/docs/CHANNEL_PLUGIN_GUIDE.md b/docs/CHANNEL_PLUGIN_GUIDE.md index a23ea07bb..575cad699 100644 --- a/docs/CHANNEL_PLUGIN_GUIDE.md +++ b/docs/CHANNEL_PLUGIN_GUIDE.md @@ -182,12 +182,19 @@ The agent receives the message and processes it. Replies arrive in your `send()` | Method / Property | Description | |-------------------|-------------| -| `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. | +| `_handle_message(sender_id, chat_id, content, media?, metadata?, session_key?)` | **Call this when you receive a message.** Checks `is_allowed()`, then publishes to the bus. Automatically sets `_wants_stream` if `supports_streaming` is true. | | `is_allowed(sender_id)` | Checks against `config["allowFrom"]`; `"*"` allows all, `[]` denies all. | | `default_config()` (classmethod) | Returns default config dict for `nanobot onboard`. Override to declare your fields. | | `transcribe_audio(file_path)` | Transcribes audio via Groq Whisper (if configured). | +| `supports_streaming` (property) | `True` when config has `"streaming": true` **and** subclass overrides `send_delta()`. | | `is_running` | Returns `self._running`. | +### Optional (streaming) + +| Method | Description | +|--------|-------------| +| `async send_delta(chat_id, delta, metadata?)` | Override to receive streaming chunks. See [Streaming Support](#streaming-support) for details. | + ### Message Types ```python @@ -201,6 +208,97 @@ class OutboundMessage: # "message_id" for reply threading ``` +## Streaming Support + +Channels can opt into real-time streaming — the agent sends content token-by-token instead of one final message. This is entirely optional; channels work fine without it. + +### How It Works + +When **both** conditions are met, the agent streams content through your channel: + +1. Config has `"streaming": true` +2. Your subclass overrides `send_delta()` + +If either is missing, the agent falls back to the normal one-shot `send()` path. + +### Implementing `send_delta` + +Override `send_delta` to handle two types of calls: + +```python +async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + meta = metadata or {} + + if meta.get("_stream_end"): + # Streaming finished — do final formatting, cleanup, etc. + return + + # Regular delta — append text, update the message on screen + # delta contains a small chunk of text (a few tokens) +``` + +**Metadata flags:** + +| Flag | Meaning | +|------|---------| +| `_stream_delta: True` | A content chunk (delta contains the new text) | +| `_stream_end: True` | Streaming finished (delta is empty) | +| `_resuming: True` | More streaming rounds coming (e.g. tool call then another response) | + +### Example: Webhook with Streaming + +```python +class WebhookChannel(BaseChannel): + name = "webhook" + display_name = "Webhook" + + def __init__(self, config, bus): + super().__init__(config, bus) + self._buffers: dict[str, str] = {} + + async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + meta = metadata or {} + if meta.get("_stream_end"): + text = self._buffers.pop(chat_id, "") + # Final delivery — format and send the complete message + await self._deliver(chat_id, text, final=True) + return + + self._buffers.setdefault(chat_id, "") + self._buffers[chat_id] += delta + # Incremental update — push partial text to the client + await self._deliver(chat_id, self._buffers[chat_id], final=False) + + async def send(self, msg: OutboundMessage) -> None: + # Non-streaming path — unchanged + await self._deliver(msg.chat_id, msg.content, final=True) +``` + +### Config + +Enable streaming per channel: + +```json +{ + "channels": { + "webhook": { + "enabled": true, + "streaming": true, + "allowFrom": ["*"] + } + } +} +``` + +When `streaming` is `false` (default) or omitted, only `send()` is called — no streaming overhead. + +### BaseChannel Streaming API + +| Method / Property | Description | +|-------------------|-------------| +| `async send_delta(chat_id, delta, metadata?)` | Override to handle streaming chunks. No-op by default. | +| `supports_streaming` (property) | Returns `True` when config has `streaming: true` **and** subclass overrides `send_delta`. | + ## Config Your channel receives config as a plain `dict`. Access fields with `.get()`: diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 1bbb7cfa7..6cf2ec328 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -173,7 +173,8 @@ class AgentLoop: """Remove blocks that some models embed in content.""" if not text: return None - return re.sub(r"[\s\S]*?", "", text).strip() or None + from nanobot.utils.helpers import strip_think + return strip_think(text) or None @staticmethod def _tool_hint(tool_calls: list) -> str: @@ -227,6 +228,21 @@ class AgentLoop: final_content = None tools_used: list[str] = [] + # Wrap on_stream with stateful think-tag filter so downstream + # consumers (CLI, channels) never see blocks. + _raw_stream = on_stream + _stream_buf = "" + + async def _filtered_stream(delta: str) -> None: + nonlocal _stream_buf + from nanobot.utils.helpers import strip_think + prev_clean = strip_think(_stream_buf) + _stream_buf += delta + new_clean = strip_think(_stream_buf) + incremental = new_clean[len(prev_clean):] + if incremental and _raw_stream: + await _raw_stream(incremental) + while iteration < self.max_iterations: iteration += 1 @@ -237,7 +253,7 @@ class AgentLoop: messages=messages, tools=tool_defs, model=self.model, - on_content_delta=on_stream, + on_content_delta=_filtered_stream, ) else: response = await self.provider.chat_with_retry( @@ -255,6 +271,7 @@ class AgentLoop: if response.has_tool_calls: if on_stream and on_stream_end: await on_stream_end(resuming=True) + _stream_buf = "" if on_progress: if not on_stream: @@ -286,6 +303,7 @@ class AgentLoop: else: if on_stream and on_stream_end: await on_stream_end(resuming=False) + _stream_buf = "" clean = self._strip_think(response.content) if response.finish_reason == "error": diff --git a/nanobot/channels/telegram.py b/nanobot/channels/telegram.py index fc2e47da4..850e09c0f 100644 --- a/nanobot/channels/telegram.py +++ b/nanobot/channels/telegram.py @@ -6,6 +6,7 @@ import asyncio import re import time import unicodedata +from dataclasses import dataclass, field from typing import Any, Literal from loguru import logger @@ -156,6 +157,14 @@ _SEND_MAX_RETRIES = 3 _SEND_RETRY_BASE_DELAY = 0.5 # seconds, doubled each retry +@dataclass +class _StreamBuf: + """Per-chat streaming accumulator for progressive message editing.""" + text: str = "" + message_id: int | None = None + last_edit: float = 0.0 + + class TelegramConfig(Base): """Telegram channel configuration.""" @@ -167,6 +176,7 @@ class TelegramConfig(Base): group_policy: Literal["open", "mention"] = "mention" connection_pool_size: int = 32 pool_timeout: float = 5.0 + streaming: bool = True class TelegramChannel(BaseChannel): @@ -193,6 +203,8 @@ class TelegramChannel(BaseChannel): def default_config(cls) -> dict[str, Any]: return TelegramConfig().model_dump(by_alias=True) + _STREAM_EDIT_INTERVAL = 0.6 # min seconds between edit_message_text calls + def __init__(self, config: Any, bus: MessageBus): if isinstance(config, dict): config = TelegramConfig.model_validate(config) @@ -206,6 +218,7 @@ class TelegramChannel(BaseChannel): self._message_threads: dict[tuple[str, int], int] = {} self._bot_user_id: int | None = None self._bot_username: str | None = None + self._stream_bufs: dict[str, _StreamBuf] = {} # chat_id -> streaming state def is_allowed(self, sender_id: str) -> bool: """Preserve Telegram's legacy id|username allowlist matching.""" @@ -416,14 +429,8 @@ class TelegramChannel(BaseChannel): # Send text content if msg.content and msg.content != "[empty message]": - is_progress = msg.metadata.get("_progress", False) - for chunk in split_message(msg.content, TELEGRAM_MAX_MESSAGE_LEN): - # Final response: simulate streaming via draft, then persist. - if not is_progress: - await self._send_with_streaming(chat_id, chunk, reply_params, thread_kwargs) - else: - await self._send_text(chat_id, chunk, reply_params, thread_kwargs) + await self._send_text(chat_id, chunk, reply_params, thread_kwargs) async def _call_with_retry(self, fn, *args, **kwargs): """Call an async Telegram API function with retry on pool/network timeout.""" @@ -469,29 +476,67 @@ class TelegramChannel(BaseChannel): except Exception as e2: logger.error("Error sending Telegram message: {}", e2) - async def _send_with_streaming( - self, - chat_id: int, - text: str, - reply_params=None, - thread_kwargs: dict | None = None, - ) -> None: - """Simulate streaming via send_message_draft, then persist with send_message.""" - draft_id = int(time.time() * 1000) % (2**31) - try: - step = max(len(text) // 8, 40) - for i in range(step, len(text), step): - await self._app.bot.send_message_draft( - chat_id=chat_id, draft_id=draft_id, text=text[:i], + async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None: + """Progressive message editing: send on first delta, edit on subsequent ones.""" + if not self._app: + return + meta = metadata or {} + int_chat_id = int(chat_id) + + if meta.get("_stream_end"): + buf = self._stream_bufs.pop(chat_id, None) + if not buf or not buf.message_id or not buf.text: + return + self._stop_typing(chat_id) + try: + html = _markdown_to_telegram_html(buf.text) + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=int_chat_id, message_id=buf.message_id, + text=html, parse_mode="HTML", ) - await asyncio.sleep(0.04) - await self._app.bot.send_message_draft( - chat_id=chat_id, draft_id=draft_id, text=text, - ) - await asyncio.sleep(0.15) - except Exception: - pass - await self._send_text(chat_id, text, reply_params, thread_kwargs) + except Exception as e: + logger.debug("Final stream edit failed (HTML), trying plain: {}", e) + try: + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=int_chat_id, message_id=buf.message_id, + text=buf.text, + ) + except Exception: + pass + return + + buf = self._stream_bufs.get(chat_id) + if buf is None: + buf = _StreamBuf() + self._stream_bufs[chat_id] = buf + buf.text += delta + + if not buf.text.strip(): + return + + now = time.monotonic() + if buf.message_id is None: + try: + sent = await self._call_with_retry( + self._app.bot.send_message, + chat_id=int_chat_id, text=buf.text, + ) + buf.message_id = sent.message_id + buf.last_edit = now + except Exception as e: + logger.warning("Stream initial send failed: {}", e) + elif (now - buf.last_edit) >= self._STREAM_EDIT_INTERVAL: + try: + await self._call_with_retry( + self._app.bot.edit_message_text, + chat_id=int_chat_id, message_id=buf.message_id, + text=buf.text, + ) + buf.last_edit = now + except Exception: + pass async def _on_start(self, update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: """Handle /start command.""" diff --git a/nanobot/cli/stream.py b/nanobot/cli/stream.py index 3ee28fe6e..161d53082 100644 --- a/nanobot/cli/stream.py +++ b/nanobot/cli/stream.py @@ -6,10 +6,8 @@ markdown rendering during streaming. Ellipsis mode handles overflow. from __future__ import annotations -import re import sys import time -from typing import Any from rich.console import Console from rich.live import Live @@ -61,6 +59,8 @@ class ThinkingSpinner: class StreamRenderer: """Rich Live streaming with markdown. auto_refresh=False avoids render races. + Deltas arrive pre-filtered (no tags) from the agent loop. + Flow per round: spinner -> first visible delta -> header + Live renders -> on_end -> Live stops (content stays on screen) @@ -76,15 +76,8 @@ class StreamRenderer: self._spinner: ThinkingSpinner | None = None self._start_spinner() - @staticmethod - def _clean(text: str) -> str: - text = re.sub(r"[\s\S]*?", "", text) - text = re.sub(r"[\s\S]*$", "", text) - return text.strip() - def _render(self): - clean = self._clean(self._buf) - return Markdown(clean) if self._md and clean else Text(clean or "") + return Markdown(self._buf) if self._md and self._buf else Text(self._buf or "") def _start_spinner(self) -> None: if self._show_spinner: @@ -100,7 +93,7 @@ class StreamRenderer: self.streamed = True self._buf += delta if self._live is None: - if not self._clean(self._buf): + if not self._buf.strip(): return self._stop_spinner() c = _make_console() diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py index f89b95681..f265870dd 100644 --- a/nanobot/utils/helpers.py +++ b/nanobot/utils/helpers.py @@ -11,6 +11,13 @@ from typing import Any import tiktoken +def strip_think(text: str) -> str: + """Remove blocks and any unclosed trailing tag.""" + text = re.sub(r"[\s\S]*?", "", text) + text = re.sub(r"[\s\S]*$", "", text) + return text.strip() + + def detect_image_mime(data: bytes) -> str | None: """Detect image MIME type from magic bytes, ignoring file extension.""" if data[:8] == b"\x89PNG\r\n\x1a\n": From 78783400317281f8e3ee6680de056a6526f2f90e Mon Sep 17 00:00:00 2001 From: Matt von Rohr Date: Mon, 16 Mar 2026 08:13:43 +0100 Subject: [PATCH 5/9] feat(providers): add Mistral AI provider Register Mistral as a first-class provider with LiteLLM routing, MISTRAL_API_KEY env var, and https://api.mistral.ai/v1 default base. Includes schema field, registry entry, and tests. --- nanobot/config/schema.py | 1 + nanobot/providers/registry.py | 17 +++++++++++++++++ tests/test_mistral_provider.py | 22 ++++++++++++++++++++++ 3 files changed, 40 insertions(+) create mode 100644 tests/test_mistral_provider.py diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 5937b2e35..9c841ca9c 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -73,6 +73,7 @@ class ProvidersConfig(Base): gemini: ProviderConfig = Field(default_factory=ProviderConfig) moonshot: ProviderConfig = Field(default_factory=ProviderConfig) minimax: ProviderConfig = Field(default_factory=ProviderConfig) + mistral: ProviderConfig = Field(default_factory=ProviderConfig) aihubmix: ProviderConfig = Field(default_factory=ProviderConfig) # AiHubMix API gateway siliconflow: ProviderConfig = Field(default_factory=ProviderConfig) # SiliconFlow (硅基流动) volcengine: ProviderConfig = Field(default_factory=ProviderConfig) # VolcEngine (火山引擎) diff --git a/nanobot/providers/registry.py b/nanobot/providers/registry.py index 42c1d24df..825653ff0 100644 --- a/nanobot/providers/registry.py +++ b/nanobot/providers/registry.py @@ -399,6 +399,23 @@ PROVIDERS: tuple[ProviderSpec, ...] = ( strip_model_prefix=False, model_overrides=(), ), + # Mistral AI: OpenAI-compatible API at api.mistral.ai/v1. + ProviderSpec( + name="mistral", + keywords=("mistral",), + env_key="MISTRAL_API_KEY", + display_name="Mistral", + litellm_prefix="mistral", # mistral-large-latest → mistral/mistral-large-latest + skip_prefixes=("mistral/",), # avoid double-prefix + env_extras=(), + is_gateway=False, + is_local=False, + detect_by_key_prefix="", + detect_by_base_keyword="", + default_api_base="https://api.mistral.ai/v1", + strip_model_prefix=False, + model_overrides=(), + ), # === Local deployment (matched by config key, NOT by api_base) ========= # vLLM / any OpenAI-compatible local server. # Detected when config key is "vllm" (provider_name="vllm"). diff --git a/tests/test_mistral_provider.py b/tests/test_mistral_provider.py new file mode 100644 index 000000000..401122178 --- /dev/null +++ b/tests/test_mistral_provider.py @@ -0,0 +1,22 @@ +"""Tests for the Mistral provider registration.""" + +from nanobot.config.schema import ProvidersConfig +from nanobot.providers.registry import PROVIDERS + + +def test_mistral_config_field_exists(): + """ProvidersConfig should have a mistral field.""" + config = ProvidersConfig() + assert hasattr(config, "mistral") + + +def test_mistral_provider_in_registry(): + """Mistral should be registered in the provider registry.""" + specs = {s.name: s for s in PROVIDERS} + assert "mistral" in specs + + mistral = specs["mistral"] + assert mistral.env_key == "MISTRAL_API_KEY" + assert mistral.litellm_prefix == "mistral" + assert mistral.default_api_base == "https://api.mistral.ai/v1" + assert "mistral/" in mistral.skip_prefixes From f64ae3b900df63018a385bb0b0f51453f7a555b6 Mon Sep 17 00:00:00 2001 From: Desmond Sow Date: Wed, 18 Mar 2026 15:02:47 +0800 Subject: [PATCH 6/9] feat(provider): add OpenVINO Model Server provider (#2193) add OpenVINO Model Server provider --- README.md | 76 +++++++++++++++++++++++++++++++++++ nanobot/cli/commands.py | 8 ++++ nanobot/config/schema.py | 1 + nanobot/providers/registry.py | 11 +++++ 4 files changed, 96 insertions(+) diff --git a/README.md b/README.md index 64ae157db..52d45046a 100644 --- a/README.md +++ b/README.md @@ -803,6 +803,7 @@ Config file: `~/.nanobot/config.json` | `moonshot` | LLM (Moonshot/Kimi) | [platform.moonshot.cn](https://platform.moonshot.cn) | | `zhipu` | LLM (Zhipu GLM) | [open.bigmodel.cn](https://open.bigmodel.cn) | | `ollama` | LLM (local, Ollama) | — | +| `ovms` | LLM (local, OpenVINO Model Server) | [docs.openvino.ai](https://docs.openvino.ai/2026/model-server/ovms_docs_llm_quickstart.html) | | `vllm` | LLM (local, any OpenAI-compatible server) | — | | `openai_codex` | LLM (Codex, OAuth) | `nanobot provider login openai-codex` | | `github_copilot` | LLM (GitHub Copilot, OAuth) | `nanobot provider login github-copilot` | @@ -938,6 +939,81 @@ ollama run llama3.2 +
+OpenVINO Model Server (local / OpenAI-compatible) + +Run LLMs locally on Intel GPUs using [OpenVINO Model Server](https://docs.openvino.ai/2026/model-server/ovms_docs_llm_quickstart.html). OVMS exposes an OpenAI-compatible API at `/v3`. + +> Requires Docker and an Intel GPU with driver access (`/dev/dri`). + +**1. Pull the model** (example): + +```bash +mkdir -p ov/models && cd ov + +docker run -d \ + --rm \ + --user $(id -u):$(id -g) \ + -v $(pwd)/models:/models \ + openvino/model_server:latest-gpu \ + --pull \ + --model_name openai/gpt-oss-20b \ + --model_repository_path /models \ + --source_model OpenVINO/gpt-oss-20b-int4-ov \ + --task text_generation \ + --tool_parser gptoss \ + --reasoning_parser gptoss \ + --enable_prefix_caching true \ + --target_device GPU +``` + +> This downloads the model weights. Wait for the container to finish before proceeding. + +**2. Start the server** (example): + +```bash +docker run -d \ + --rm \ + --name ovms \ + --user $(id -u):$(id -g) \ + -p 8000:8000 \ + -v $(pwd)/models:/models \ + --device /dev/dri \ + --group-add=$(stat -c "%g" /dev/dri/render* | head -n 1) \ + openvino/model_server:latest-gpu \ + --rest_port 8000 \ + --model_name openai/gpt-oss-20b \ + --model_repository_path /models \ + --source_model OpenVINO/gpt-oss-20b-int4-ov \ + --task text_generation \ + --tool_parser gptoss \ + --reasoning_parser gptoss \ + --enable_prefix_caching true \ + --target_device GPU +``` + +**3. Add to config** (partial — merge into `~/.nanobot/config.json`): + +```json +{ + "providers": { + "ovms": { + "apiBase": "http://localhost:8000/v3" + } + }, + "agents": { + "defaults": { + "provider": "ovms", + "model": "openai/gpt-oss-20b" + } + } +} +``` + +> OVMS is a local server — no API key required. Supports tool calling (`--tool_parser gptoss`), reasoning (`--reasoning_parser gptoss`), and streaming. +> See the [official OVMS docs](https://docs.openvino.ai/2026/model-server/ovms_docs_llm_quickstart.html) for more details. +
+
vLLM (local / OpenAI-compatible) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index b915ce9b2..db348ed90 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -409,6 +409,14 @@ def _make_provider(config: Config): api_base=p.api_base, default_model=model, ) + # OpenVINO Model Server: direct OpenAI-compatible endpoint at /v3 + elif provider_name == "ovms": + from nanobot.providers.custom_provider import CustomProvider + provider = CustomProvider( + api_key=p.api_key if p else "no-key", + api_base=config.get_api_base(model) or "http://localhost:8000/v3", + default_model=model, + ) else: from nanobot.providers.litellm_provider import LiteLLMProvider from nanobot.providers.registry import find_by_name diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 9c841ca9c..58ead15e1 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -70,6 +70,7 @@ class ProvidersConfig(Base): dashscope: ProviderConfig = Field(default_factory=ProviderConfig) vllm: ProviderConfig = Field(default_factory=ProviderConfig) ollama: ProviderConfig = Field(default_factory=ProviderConfig) # Ollama local models + ovms: ProviderConfig = Field(default_factory=ProviderConfig) # OpenVINO Model Server (OVMS) gemini: ProviderConfig = Field(default_factory=ProviderConfig) moonshot: ProviderConfig = Field(default_factory=ProviderConfig) minimax: ProviderConfig = Field(default_factory=ProviderConfig) diff --git a/nanobot/providers/registry.py b/nanobot/providers/registry.py index 825653ff0..9cc430b88 100644 --- a/nanobot/providers/registry.py +++ b/nanobot/providers/registry.py @@ -452,6 +452,17 @@ PROVIDERS: tuple[ProviderSpec, ...] = ( strip_model_prefix=False, model_overrides=(), ), + # === OpenVINO Model Server (direct, local, OpenAI-compatible at /v3) === + ProviderSpec( + name="ovms", + keywords=("openvino", "ovms"), + env_key="", + display_name="OpenVINO Model Server", + litellm_prefix="", + is_direct=True, + is_local=True, + default_api_base="http://localhost:8000/v3", + ), # === Auxiliary (not a primary LLM provider) ============================ # Groq: mainly used for Whisper voice transcription, also usable for LLM. # Needs "groq/" prefix for LiteLLM routing. Placed last — it rarely wins fallback. From a46803cbd7078fa18bd6dbed842045a822352a65 Mon Sep 17 00:00:00 2001 From: chengyongru Date: Wed, 18 Mar 2026 15:38:03 +0800 Subject: [PATCH 7/9] docs(provider): add mistral intro --- README.md | 1 + 1 file changed, 1 insertion(+) diff --git a/README.md b/README.md index 52d45046a..062abbbfc 100644 --- a/README.md +++ b/README.md @@ -803,6 +803,7 @@ Config file: `~/.nanobot/config.json` | `moonshot` | LLM (Moonshot/Kimi) | [platform.moonshot.cn](https://platform.moonshot.cn) | | `zhipu` | LLM (Zhipu GLM) | [open.bigmodel.cn](https://open.bigmodel.cn) | | `ollama` | LLM (local, Ollama) | — | +| `mistral` | LLM | [docs.mistral.ai](https://docs.mistral.ai/) | | `ovms` | LLM (local, OpenVINO Model Server) | [docs.openvino.ai](https://docs.openvino.ai/2026/model-server/ovms_docs_llm_quickstart.html) | | `vllm` | LLM (local, any OpenAI-compatible server) | — | | `openai_codex` | LLM (Codex, OAuth) | `nanobot provider login openai-codex` | From 8f5c2d1a062dc85eb9d5521167df7b642fbb9bc3 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Mon, 23 Mar 2026 03:27:13 +0000 Subject: [PATCH 8/9] fix(cli): stop spinner after non-streaming interactive replies --- nanobot/cli/commands.py | 5 +++++ nanobot/cli/stream.py | 7 +++++++ 2 files changed, 12 insertions(+) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index db348ed90..d0ec145d8 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -752,6 +752,7 @@ def agent( on_stream_end=renderer.on_end, ) if not renderer.streamed: + await renderer.close() _print_agent_response( response.content if response else "", render_markdown=markdown, @@ -873,9 +874,13 @@ def agent( if turn_response: content, meta = turn_response[0] if content and not meta.get("_streamed"): + if renderer: + await renderer.close() _print_agent_response( content, render_markdown=markdown, metadata=meta, ) + elif renderer and not renderer.streamed: + await renderer.close() except KeyboardInterrupt: _restore_terminal() console.print("\nGoodbye!") diff --git a/nanobot/cli/stream.py b/nanobot/cli/stream.py index 161d53082..16586ecd0 100644 --- a/nanobot/cli/stream.py +++ b/nanobot/cli/stream.py @@ -119,3 +119,10 @@ class StreamRenderer: self._start_spinner() else: _make_console().print() + + async def close(self) -> None: + """Stop spinner/live without rendering a final streamed round.""" + if self._live: + self._live.stop() + self._live = None + self._stop_spinner() From aba0b83a77eed0c2ba7536b4c7df35c6a4f8d8d9 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Mon, 23 Mar 2026 03:48:12 +0000 Subject: [PATCH 9/9] fix(memory): reserve completion headroom for consolidation Trigger token consolidation before prompt usage reaches the full context window so response tokens and tokenizer estimation drift still fit safely within the model budget. Made-with: Cursor --- nanobot/agent/loop.py | 1 + nanobot/agent/memory.py | 15 ++++++++++++--- tests/test_loop_consolidation_tokens.py | 3 +++ 3 files changed, 16 insertions(+), 3 deletions(-) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 6cf2ec328..a892d3d7e 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -115,6 +115,7 @@ class AgentLoop: context_window_tokens=context_window_tokens, build_messages=self.context.build_messages, get_tool_definitions=self.tools.get_definitions, + max_completion_tokens=provider.generation.max_tokens, ) self._register_default_tools() diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 5fdfa7a06..aa2de9290 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -224,6 +224,8 @@ class MemoryConsolidator: _MAX_CONSOLIDATION_ROUNDS = 5 + _SAFETY_BUFFER = 1024 # extra headroom for tokenizer estimation drift + def __init__( self, workspace: Path, @@ -233,12 +235,14 @@ class MemoryConsolidator: context_window_tokens: int, build_messages: Callable[..., list[dict[str, Any]]], get_tool_definitions: Callable[[], list[dict[str, Any]]], + max_completion_tokens: int = 4096, ): self.store = MemoryStore(workspace) self.provider = provider self.model = model self.sessions = sessions self.context_window_tokens = context_window_tokens + self.max_completion_tokens = max_completion_tokens self._build_messages = build_messages self._get_tool_definitions = get_tool_definitions self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = weakref.WeakValueDictionary() @@ -300,17 +304,22 @@ class MemoryConsolidator: return True async def maybe_consolidate_by_tokens(self, session: Session) -> None: - """Loop: archive old messages until prompt fits within half the context window.""" + """Loop: archive old messages until prompt fits within safe budget. + + The budget reserves space for completion tokens and a safety buffer + so the LLM request never exceeds the context window. + """ if not session.messages or self.context_window_tokens <= 0: return lock = self.get_lock(session.key) async with lock: - target = self.context_window_tokens // 2 + budget = self.context_window_tokens - self.max_completion_tokens - self._SAFETY_BUFFER + target = budget // 2 estimated, source = self.estimate_session_prompt_tokens(session) if estimated <= 0: return - if estimated < self.context_window_tokens: + if estimated < budget: logger.debug( "Token consolidation idle {}: {}/{} via {}", session.key, diff --git a/tests/test_loop_consolidation_tokens.py b/tests/test_loop_consolidation_tokens.py index 87d8d29f3..2f9c2dea7 100644 --- a/tests/test_loop_consolidation_tokens.py +++ b/tests/test_loop_consolidation_tokens.py @@ -9,8 +9,10 @@ from nanobot.providers.base import LLMResponse def _make_loop(tmp_path, *, estimated_tokens: int, context_window_tokens: int) -> AgentLoop: + from nanobot.providers.base import GenerationSettings provider = MagicMock() provider.get_default_model.return_value = "test-model" + provider.generation = GenerationSettings(max_tokens=0) provider.estimate_prompt_tokens.return_value = (estimated_tokens, "test-counter") _response = LLMResponse(content="ok", tool_calls=[]) provider.chat_with_retry = AsyncMock(return_value=_response) @@ -24,6 +26,7 @@ def _make_loop(tmp_path, *, estimated_tokens: int, context_window_tokens: int) - context_window_tokens=context_window_tokens, ) loop.tools.get_definitions = MagicMock(return_value=[]) + loop.memory_consolidator._SAFETY_BUFFER = 0 return loop