mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-19 16:12:30 +00:00
Replace the asyncio.Semaphore queueing approach with a simple count check in SpawnTool.execute(). When the concurrency limit is reached, the tool returns an error string so the agent can perceive the reason and adjust its behavior instead of silently queueing. - Remove max_concurrent_subagents parameter threading through AgentLoop, commands.py, and nanobot.py - SubagentManager reads the limit directly from AgentDefaults - SpawnTool checks get_running_count() before calling spawn() - Simplify tests to verify rejection behavior
131 lines
4.2 KiB
Python
131 lines
4.2 KiB
Python
"""High-level programmatic interface to nanobot."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
from nanobot.agent.hook import AgentHook, SDKCaptureHook
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
|
|
@dataclass(slots=True)
|
|
class RunResult:
|
|
"""Result of a single agent run."""
|
|
|
|
content: str
|
|
tools_used: list[str]
|
|
messages: list[dict[str, Any]]
|
|
|
|
|
|
class Nanobot:
|
|
"""Programmatic facade for running the nanobot agent.
|
|
|
|
Usage::
|
|
|
|
bot = Nanobot.from_config()
|
|
result = await bot.run("Summarize this repo", hooks=[MyHook()])
|
|
print(result.content)
|
|
"""
|
|
|
|
def __init__(self, loop: AgentLoop) -> None:
|
|
self._loop = loop
|
|
|
|
@classmethod
|
|
def from_config(
|
|
cls,
|
|
config_path: str | Path | None = None,
|
|
*,
|
|
workspace: str | Path | None = None,
|
|
) -> Nanobot:
|
|
"""Create a Nanobot instance from a config file.
|
|
|
|
Args:
|
|
config_path: Path to ``config.json``. Defaults to
|
|
``~/.nanobot/config.json``.
|
|
workspace: Override the workspace directory from config.
|
|
"""
|
|
from nanobot.config.loader import load_config, resolve_config_env_vars
|
|
from nanobot.config.schema import Config
|
|
|
|
resolved: Path | None = None
|
|
if config_path is not None:
|
|
resolved = Path(config_path).expanduser().resolve()
|
|
if not resolved.exists():
|
|
raise FileNotFoundError(f"Config not found: {resolved}")
|
|
|
|
config: Config = resolve_config_env_vars(load_config(resolved))
|
|
if workspace is not None:
|
|
config.agents.defaults.workspace = str(
|
|
Path(workspace).expanduser().resolve()
|
|
)
|
|
|
|
provider = _make_provider(config)
|
|
bus = MessageBus()
|
|
defaults = config.agents.defaults
|
|
|
|
loop = AgentLoop(
|
|
bus=bus,
|
|
provider=provider,
|
|
workspace=config.workspace_path,
|
|
model=defaults.model,
|
|
max_iterations=defaults.max_tool_iterations,
|
|
context_window_tokens=defaults.context_window_tokens,
|
|
context_block_limit=defaults.context_block_limit,
|
|
max_tool_result_chars=defaults.max_tool_result_chars,
|
|
provider_retry_mode=defaults.provider_retry_mode,
|
|
web_config=config.tools.web,
|
|
exec_config=config.tools.exec,
|
|
restrict_to_workspace=config.tools.restrict_to_workspace,
|
|
mcp_servers=config.tools.mcp_servers,
|
|
timezone=defaults.timezone,
|
|
unified_session=defaults.unified_session,
|
|
disabled_skills=defaults.disabled_skills,
|
|
session_ttl_minutes=defaults.session_ttl_minutes,
|
|
consolidation_ratio=defaults.consolidation_ratio,
|
|
tools_config=config.tools,
|
|
)
|
|
return cls(loop)
|
|
|
|
async def run(
|
|
self,
|
|
message: str,
|
|
*,
|
|
session_key: str = "sdk:default",
|
|
hooks: list[AgentHook] | None = None,
|
|
) -> RunResult:
|
|
"""Run the agent once and return the result.
|
|
|
|
Args:
|
|
message: The user message to process.
|
|
session_key: Session identifier for conversation isolation.
|
|
Different keys get independent history.
|
|
hooks: Optional lifecycle hooks for this run.
|
|
"""
|
|
capture = SDKCaptureHook()
|
|
prev = self._loop._extra_hooks
|
|
base_hooks = list(hooks) if hooks is not None else list(prev or [])
|
|
self._loop._extra_hooks = [capture, *base_hooks]
|
|
try:
|
|
response = await self._loop.process_direct(
|
|
message, session_key=session_key,
|
|
)
|
|
finally:
|
|
self._loop._extra_hooks = prev
|
|
|
|
content = (response.content if response else None) or ""
|
|
return RunResult(
|
|
content=content,
|
|
tools_used=capture.tools_used,
|
|
messages=capture.messages,
|
|
)
|
|
|
|
|
|
def _make_provider(config: Any) -> Any:
|
|
"""Create the LLM provider from config (extracted from CLI)."""
|
|
from nanobot.providers.factory import make_provider
|
|
|
|
return make_provider(config)
|