feat: add Python SDK facade and per-session isolation

This commit is contained in:
Xubin Ren 2026-03-30 18:46:11 +00:00 committed by Xubin Ren
parent 842b8b255d
commit 7fad14802e
7 changed files with 515 additions and 23 deletions

View File

@ -115,6 +115,7 @@
- [Configuration](#-configuration)
- [Multiple Instances](#-multiple-instances)
- [CLI Reference](#-cli-reference)
- [Python SDK](#-python-sdk)
- [OpenAI-Compatible API](#-openai-compatible-api)
- [Docker](#-docker)
- [Linux Service](#-linux-service)
@ -1571,6 +1572,40 @@ The agent can also manage this file itself — ask it to "add a periodic task" a
</details>
## 🐍 Python SDK
Use nanobot as a library — no CLI, no gateway, just Python:
```python
from nanobot import Nanobot
bot = Nanobot.from_config()
result = await bot.run("Summarize the README")
print(result.content)
```
Each call carries a `session_key` for conversation isolation — different keys get independent history:
```python
await bot.run("hi", session_key="user-alice")
await bot.run("hi", session_key="task-42")
```
Add lifecycle hooks to observe or customize the agent:
```python
from nanobot.agent import AgentHook, AgentHookContext
class AuditHook(AgentHook):
async def before_execute_tools(self, ctx: AgentHookContext) -> None:
for tc in ctx.tool_calls:
print(f"[tool] {tc.name}")
result = await bot.run("Hello", hooks=[AuditHook()])
```
See [docs/PYTHON_SDK.md](docs/PYTHON_SDK.md) for the full SDK reference.
## 🔌 OpenAI-Compatible API
nanobot can expose a minimal OpenAI-compatible endpoint for local integrations:
@ -1580,11 +1615,11 @@ pip install "nanobot-ai[api]"
nanobot serve
```
By default, the API binds to `127.0.0.1:8900`.
By default, the API binds to `127.0.0.1:8900`. You can change this in `config.json`.
### Behavior
- Fixed session: all requests share the same nanobot session (`api:default`)
- Session isolation: pass `"session_id"` in the request body to isolate conversations; omit for a shared default session (`api:default`)
- Single-message input: each request must contain exactly one `user` message
- Fixed model: omit `model`, or pass the same model shown by `/v1/models`
- No streaming: `stream=true` is not supported
@ -1601,12 +1636,8 @@ By default, the API binds to `127.0.0.1:8900`.
curl http://127.0.0.1:8900/v1/chat/completions \
-H "Content-Type: application/json" \
-d '{
"messages": [
{
"role": "user",
"content": "hi"
}
]
"messages": [{"role": "user", "content": "hi"}],
"session_id": "my-session"
}'
```
@ -1618,9 +1649,8 @@ import requests
resp = requests.post(
"http://127.0.0.1:8900/v1/chat/completions",
json={
"messages": [
{"role": "user", "content": "hi"}
]
"messages": [{"role": "user", "content": "hi"}],
"session_id": "my-session", # optional: isolate conversation
},
timeout=120,
)
@ -1641,6 +1671,7 @@ client = OpenAI(
resp = client.chat.completions.create(
model="MiniMax-M2.7",
messages=[{"role": "user", "content": "hi"}],
extra_body={"session_id": "my-session"}, # optional: isolate conversation
)
print(resp.choices[0].message.content)
```

View File

@ -1,5 +1,6 @@
#!/bin/bash
# Count core agent lines (excluding channels/, cli/, api/, providers/ adapters)
# Count core agent lines (excluding channels/, cli/, api/, providers/ adapters,
# and the high-level Python SDK facade)
cd "$(dirname "$0")" || exit 1
echo "nanobot core agent line count"
@ -15,7 +16,7 @@ root=$(cat nanobot/__init__.py nanobot/__main__.py | wc -l)
printf " %-16s %5s lines\n" "(root)" "$root"
echo ""
total=$(find nanobot -name "*.py" ! -path "*/channels/*" ! -path "*/cli/*" ! -path "*/api/*" ! -path "*/command/*" ! -path "*/providers/*" ! -path "*/skills/*" | xargs cat | wc -l)
total=$(find nanobot -name "*.py" ! -path "*/channels/*" ! -path "*/cli/*" ! -path "*/api/*" ! -path "*/command/*" ! -path "*/providers/*" ! -path "*/skills/*" ! -path "nanobot/nanobot.py" | xargs cat | wc -l)
echo " Core total: $total lines"
echo ""
echo " (excludes: channels/, cli/, api/, command/, providers/, skills/)"
echo " (excludes: channels/, cli/, api/, command/, providers/, skills/, nanobot.py)"

136
docs/PYTHON_SDK.md Normal file
View File

@ -0,0 +1,136 @@
# Python SDK
Use nanobot programmatically — load config, run the agent, get results.
## Quick Start
```python
import asyncio
from nanobot import Nanobot
async def main():
bot = Nanobot.from_config()
result = await bot.run("What time is it in Tokyo?")
print(result.content)
asyncio.run(main())
```
## API
### `Nanobot.from_config(config_path?, *, workspace?)`
Create a `Nanobot` from a config file.
| Param | Type | Default | Description |
|-------|------|---------|-------------|
| `config_path` | `str \| Path \| None` | `None` | Path to `config.json`. Defaults to `~/.nanobot/config.json`. |
| `workspace` | `str \| Path \| None` | `None` | Override workspace directory from config. |
Raises `FileNotFoundError` if an explicit path doesn't exist.
### `await bot.run(message, *, session_key?, hooks?)`
Run the agent once. Returns a `RunResult`.
| Param | Type | Default | Description |
|-------|------|---------|-------------|
| `message` | `str` | *(required)* | The user message to process. |
| `session_key` | `str` | `"sdk:default"` | Session identifier for conversation isolation. Different keys get independent history. |
| `hooks` | `list[AgentHook] \| None` | `None` | Lifecycle hooks for this run only. |
```python
# Isolated sessions — each user gets independent conversation history
await bot.run("hi", session_key="user-alice")
await bot.run("hi", session_key="user-bob")
```
### `RunResult`
| Field | Type | Description |
|-------|------|-------------|
| `content` | `str` | The agent's final text response. |
| `tools_used` | `list[str]` | Tool names invoked during the run. |
| `messages` | `list[dict]` | Raw message history (for debugging). |
## Hooks
Hooks let you observe or modify the agent loop without touching internals.
Subclass `AgentHook` and override any method:
| Method | When |
|--------|------|
| `before_iteration(ctx)` | Before each LLM call |
| `on_stream(ctx, delta)` | On each streamed token |
| `on_stream_end(ctx)` | When streaming finishes |
| `before_execute_tools(ctx)` | Before tool execution (inspect `ctx.tool_calls`) |
| `after_iteration(ctx, response)` | After each LLM response |
| `finalize_content(ctx, content)` | Transform final output text |
### Example: Audit Hook
```python
from nanobot.agent import AgentHook, AgentHookContext
class AuditHook(AgentHook):
def __init__(self):
self.calls = []
async def before_execute_tools(self, ctx: AgentHookContext) -> None:
for tc in ctx.tool_calls:
self.calls.append(tc.name)
print(f"[audit] {tc.name}({tc.arguments})")
hook = AuditHook()
result = await bot.run("List files in /tmp", hooks=[hook])
print(f"Tools used: {hook.calls}")
```
### Composing Hooks
Pass multiple hooks — they run in order, errors in one don't block others:
```python
result = await bot.run("hi", hooks=[AuditHook(), MetricsHook()])
```
Under the hood this uses `CompositeHook` for fan-out with error isolation.
### `finalize_content` Pipeline
Unlike the async methods (fan-out), `finalize_content` is a pipeline — each hook's output feeds the next:
```python
class Censor(AgentHook):
def finalize_content(self, ctx, content):
return content.replace("secret", "***") if content else content
```
## Full Example
```python
import asyncio
from nanobot import Nanobot
from nanobot.agent import AgentHook, AgentHookContext
class TimingHook(AgentHook):
async def before_iteration(self, ctx: AgentHookContext) -> None:
import time
ctx.metadata["_t0"] = time.time()
async def after_iteration(self, ctx, response) -> None:
import time
elapsed = time.time() - ctx.metadata.get("_t0", 0)
print(f"[timing] iteration took {elapsed:.2f}s")
async def main():
bot = Nanobot.from_config(workspace="/my/project")
result = await bot.run(
"Explain the main function",
hooks=[TimingHook()],
)
print(result.content)
asyncio.run(main())
```

View File

@ -4,3 +4,7 @@ nanobot - A lightweight AI agent framework
__version__ = "0.1.4.post6"
__logo__ = "🐈"
from nanobot.nanobot import Nanobot, RunResult
__all__ = ["Nanobot", "RunResult"]

View File

@ -91,9 +91,12 @@ async def handle_chat_completions(request: web.Request) -> web.Response:
model_name: str = request.app.get("model_name", "nanobot")
if (requested_model := body.get("model")) and requested_model != model_name:
return _error_json(400, f"Only configured model '{model_name}' is available")
session_lock: asyncio.Lock = request.app["session_lock"]
logger.info("API request session_key={} content={}", API_SESSION_KEY, user_content[:80])
session_key = f"api:{body['session_id']}" if body.get("session_id") else API_SESSION_KEY
session_locks: dict[str, asyncio.Lock] = request.app["session_locks"]
session_lock = session_locks.setdefault(session_key, asyncio.Lock())
logger.info("API request session_key={} content={}", session_key, user_content[:80])
_FALLBACK = "I've completed processing but have no response to give."
@ -103,7 +106,7 @@ async def handle_chat_completions(request: web.Request) -> web.Response:
response = await asyncio.wait_for(
agent_loop.process_direct(
content=user_content,
session_key=API_SESSION_KEY,
session_key=session_key,
channel="api",
chat_id=API_CHAT_ID,
),
@ -114,12 +117,12 @@ async def handle_chat_completions(request: web.Request) -> web.Response:
if not response_text or not response_text.strip():
logger.warning(
"Empty response for session {}, retrying",
API_SESSION_KEY,
session_key,
)
retry_response = await asyncio.wait_for(
agent_loop.process_direct(
content=user_content,
session_key=API_SESSION_KEY,
session_key=session_key,
channel="api",
chat_id=API_CHAT_ID,
),
@ -129,17 +132,17 @@ async def handle_chat_completions(request: web.Request) -> web.Response:
if not response_text or not response_text.strip():
logger.warning(
"Empty response after retry for session {}, using fallback",
API_SESSION_KEY,
session_key,
)
response_text = _FALLBACK
except asyncio.TimeoutError:
return _error_json(504, f"Request timed out after {timeout_s}s")
except Exception:
logger.exception("Error processing request for session {}", API_SESSION_KEY)
logger.exception("Error processing request for session {}", session_key)
return _error_json(500, "Internal server error", err_type="server_error")
except Exception:
logger.exception("Unexpected API lock error for session {}", API_SESSION_KEY)
logger.exception("Unexpected API lock error for session {}", session_key)
return _error_json(500, "Internal server error", err_type="server_error")
return web.json_response(_chat_completion_response(response_text, model_name))
@ -182,7 +185,7 @@ def create_app(agent_loop, model_name: str = "nanobot", request_timeout: float =
app["agent_loop"] = agent_loop
app["model_name"] = model_name
app["request_timeout"] = request_timeout
app["session_lock"] = asyncio.Lock()
app["session_locks"] = {} # per-user locks, keyed by session_key
app.router.add_post("/v1/chat/completions", handle_chat_completions)
app.router.add_get("/v1/models", handle_models)

170
nanobot/nanobot.py Normal file
View File

@ -0,0 +1,170 @@
"""High-level programmatic interface to nanobot."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from nanobot.agent.hook import AgentHook
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
@dataclass(slots=True)
class RunResult:
"""Result of a single agent run."""
content: str
tools_used: list[str]
messages: list[dict[str, Any]]
class Nanobot:
"""Programmatic facade for running the nanobot agent.
Usage::
bot = Nanobot.from_config()
result = await bot.run("Summarize this repo", hooks=[MyHook()])
print(result.content)
"""
def __init__(self, loop: AgentLoop) -> None:
self._loop = loop
@classmethod
def from_config(
cls,
config_path: str | Path | None = None,
*,
workspace: str | Path | None = None,
) -> Nanobot:
"""Create a Nanobot instance from a config file.
Args:
config_path: Path to ``config.json``. Defaults to
``~/.nanobot/config.json``.
workspace: Override the workspace directory from config.
"""
from nanobot.config.loader import load_config
from nanobot.config.schema import Config
resolved: Path | None = None
if config_path is not None:
resolved = Path(config_path).expanduser().resolve()
if not resolved.exists():
raise FileNotFoundError(f"Config not found: {resolved}")
config: Config = load_config(resolved)
if workspace is not None:
config.agents.defaults.workspace = str(
Path(workspace).expanduser().resolve()
)
provider = _make_provider(config)
bus = MessageBus()
defaults = config.agents.defaults
loop = AgentLoop(
bus=bus,
provider=provider,
workspace=config.workspace_path,
model=defaults.model,
max_iterations=defaults.max_tool_iterations,
context_window_tokens=defaults.context_window_tokens,
web_search_config=config.tools.web.search,
web_proxy=config.tools.web.proxy or None,
exec_config=config.tools.exec,
restrict_to_workspace=config.tools.restrict_to_workspace,
mcp_servers=config.tools.mcp_servers,
timezone=defaults.timezone,
)
return cls(loop)
async def run(
self,
message: str,
*,
session_key: str = "sdk:default",
hooks: list[AgentHook] | None = None,
) -> RunResult:
"""Run the agent once and return the result.
Args:
message: The user message to process.
session_key: Session identifier for conversation isolation.
Different keys get independent history.
hooks: Optional lifecycle hooks for this run.
"""
prev = self._loop._extra_hooks
if hooks is not None:
self._loop._extra_hooks = list(hooks)
try:
response = await self._loop.process_direct(
message, session_key=session_key,
)
finally:
self._loop._extra_hooks = prev
content = (response.content if response else None) or ""
return RunResult(content=content, tools_used=[], messages=[])
def _make_provider(config: Any) -> Any:
"""Create the LLM provider from config (extracted from CLI)."""
from nanobot.providers.base import GenerationSettings
from nanobot.providers.registry import find_by_name
model = config.agents.defaults.model
provider_name = config.get_provider_name(model)
p = config.get_provider(model)
spec = find_by_name(provider_name) if provider_name else None
backend = spec.backend if spec else "openai_compat"
if backend == "azure_openai":
if not p or not p.api_key or not p.api_base:
raise ValueError("Azure OpenAI requires api_key and api_base in config.")
elif backend == "openai_compat" and not model.startswith("bedrock/"):
needs_key = not (p and p.api_key)
exempt = spec and (spec.is_oauth or spec.is_local or spec.is_direct)
if needs_key and not exempt:
raise ValueError(f"No API key configured for provider '{provider_name}'.")
if backend == "openai_codex":
from nanobot.providers.openai_codex_provider import OpenAICodexProvider
provider = OpenAICodexProvider(default_model=model)
elif backend == "azure_openai":
from nanobot.providers.azure_openai_provider import AzureOpenAIProvider
provider = AzureOpenAIProvider(
api_key=p.api_key, api_base=p.api_base, default_model=model
)
elif backend == "anthropic":
from nanobot.providers.anthropic_provider import AnthropicProvider
provider = AnthropicProvider(
api_key=p.api_key if p else None,
api_base=config.get_api_base(model),
default_model=model,
extra_headers=p.extra_headers if p else None,
)
else:
from nanobot.providers.openai_compat_provider import OpenAICompatProvider
provider = OpenAICompatProvider(
api_key=p.api_key if p else None,
api_base=config.get_api_base(model),
default_model=model,
extra_headers=p.extra_headers if p else None,
spec=spec,
)
defaults = config.agents.defaults
provider.generation = GenerationSettings(
temperature=defaults.temperature,
max_tokens=defaults.max_tokens,
reasoning_effort=defaults.reasoning_effort,
)
return provider

View File

@ -0,0 +1,147 @@
"""Tests for the Nanobot programmatic facade."""
from __future__ import annotations
import json
from pathlib import Path
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from nanobot.nanobot import Nanobot, RunResult
def _write_config(tmp_path: Path, overrides: dict | None = None) -> Path:
data = {
"providers": {"openrouter": {"apiKey": "sk-test-key"}},
"agents": {"defaults": {"model": "openai/gpt-4.1"}},
}
if overrides:
data.update(overrides)
config_path = tmp_path / "config.json"
config_path.write_text(json.dumps(data))
return config_path
def test_from_config_missing_file():
with pytest.raises(FileNotFoundError):
Nanobot.from_config("/nonexistent/config.json")
def test_from_config_creates_instance(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
assert bot._loop is not None
assert bot._loop.workspace == tmp_path
def test_from_config_default_path():
from nanobot.config.schema import Config
with patch("nanobot.config.loader.load_config") as mock_load, \
patch("nanobot.nanobot._make_provider") as mock_prov:
mock_load.return_value = Config()
mock_prov.return_value = MagicMock()
mock_prov.return_value.get_default_model.return_value = "test"
mock_prov.return_value.generation.max_tokens = 4096
Nanobot.from_config()
mock_load.assert_called_once_with(None)
@pytest.mark.asyncio
async def test_run_returns_result(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
from nanobot.bus.events import OutboundMessage
mock_response = OutboundMessage(
channel="cli", chat_id="direct", content="Hello back!"
)
bot._loop.process_direct = AsyncMock(return_value=mock_response)
result = await bot.run("hi")
assert isinstance(result, RunResult)
assert result.content == "Hello back!"
bot._loop.process_direct.assert_awaited_once_with("hi", session_key="sdk:default")
@pytest.mark.asyncio
async def test_run_with_hooks(tmp_path):
from nanobot.agent.hook import AgentHook, AgentHookContext
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
class TestHook(AgentHook):
async def before_iteration(self, context: AgentHookContext) -> None:
pass
mock_response = OutboundMessage(
channel="cli", chat_id="direct", content="done"
)
bot._loop.process_direct = AsyncMock(return_value=mock_response)
result = await bot.run("hi", hooks=[TestHook()])
assert result.content == "done"
assert bot._loop._extra_hooks == []
@pytest.mark.asyncio
async def test_run_hooks_restored_on_error(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
from nanobot.agent.hook import AgentHook
bot._loop.process_direct = AsyncMock(side_effect=RuntimeError("boom"))
original_hooks = bot._loop._extra_hooks
with pytest.raises(RuntimeError):
await bot.run("hi", hooks=[AgentHook()])
assert bot._loop._extra_hooks is original_hooks
@pytest.mark.asyncio
async def test_run_none_response(tmp_path):
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
bot._loop.process_direct = AsyncMock(return_value=None)
result = await bot.run("hi")
assert result.content == ""
def test_workspace_override(tmp_path):
config_path = _write_config(tmp_path)
custom_ws = tmp_path / "custom_workspace"
custom_ws.mkdir()
bot = Nanobot.from_config(config_path, workspace=custom_ws)
assert bot._loop.workspace == custom_ws
@pytest.mark.asyncio
async def test_run_custom_session_key(tmp_path):
from nanobot.bus.events import OutboundMessage
config_path = _write_config(tmp_path)
bot = Nanobot.from_config(config_path, workspace=tmp_path)
mock_response = OutboundMessage(
channel="cli", chat_id="direct", content="ok"
)
bot._loop.process_direct = AsyncMock(return_value=mock_response)
await bot.run("hi", session_key="user-alice")
bot._loop.process_direct.assert_awaited_once_with("hi", session_key="user-alice")
def test_import_from_top_level():
from nanobot import Nanobot as N, RunResult as R
assert N is Nanobot
assert R is RunResult