diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index 19ee935c4..edf45f24a 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -39,6 +39,7 @@ class ContextBuilder: skill_names: list[str] | None = None, channel: str | None = None, session_summary: str | None = None, + session_key: str | None = None, ) -> str: """Build the system prompt from identity, bootstrap files, memory, and skills.""" parts = [self._get_identity(channel=channel)] @@ -73,8 +74,29 @@ class ContextBuilder: if session_summary: parts.append(f"[Archived Context Summary]\n\n{session_summary}") + # Inject P2P collaboration hint for task-scoped sessions + if session_key and session_key.startswith("task:"): + parts.append(self._p2p_collaboration_hint()) + return "\n\n---\n\n".join(parts) + @staticmethod + def _p2p_collaboration_hint() -> str: + return ( + "# Multi-Agent Collaboration\n\n" + "You are part of a decentralized agent network. You can:\n" + "- Use `broadcast_task` to announce subtasks and collect BIDs\n" + "- Use `dispatch_task` to assign tasks to specific agents\n" + "- Use `poll_task_result` to check task status\n" + "- Use `report_user` to deliver final results to the user\n" + "- Use `finalize_task` to terminate tasks\n\n" + "Rules:\n" + "- Never block waiting for results. Dispatch and continue.\n" + "- If a task times out, decide whether to retry, failover, or report partial.\n" + "- Respect the user's INTERRUPT messages — they have highest priority.\n" + "- You are currently in a task-scoped session; focus on the delegated task." + ) + def _get_identity(self, channel: str | None = None) -> str: """Get the core identity section.""" workspace_path = str(self.workspace.expanduser().resolve()) @@ -154,6 +176,7 @@ class ContextBuilder: sender_id: str | None = None, session_summary: str | None = None, session_metadata: Mapping[str, Any] | None = None, + session_key: str | None = None, ) -> list[dict[str, Any]]: """Build the complete message list for an LLM call.""" extra = goal_state_runtime_lines(session_metadata) @@ -175,7 +198,7 @@ class ContextBuilder: else: merged = user_content + [{"type": "text", "text": runtime_ctx}] messages = [ - {"role": "system", "content": self.build_system_prompt(skill_names, channel=channel, session_summary=session_summary)}, + {"role": "system", "content": self.build_system_prompt(skill_names, channel=channel, session_summary=session_summary, session_key=session_key)}, *history, ] if messages[-1].get("role") == current_role: diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index bc807092e..b306bbd5f 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -24,6 +24,14 @@ from nanobot.agent.runner import _MAX_INJECTIONS_PER_TURN, AgentRunner, AgentRun from nanobot.agent.subagent import SubagentManager from nanobot.agent.tools.file_state import FileStateStore, bind_file_states, reset_file_states from nanobot.agent.tools.message import MessageTool +from nanobot.agent.tools.p2p import ( + BroadcastTaskTool, + CheckAggregationTool, + DispatchTaskTool, + FinalizeTaskTool, + PollTaskResultTool, + ReportUserTool, +) from nanobot.agent.tools.registry import ToolRegistry from nanobot.agent.tools.self import MyTool from nanobot.bus.events import InboundMessage, OutboundMessage @@ -185,6 +193,7 @@ class AgentLoop: model_preset: str | None = None, preset_snapshot_loader: preset_helpers.PresetSnapshotLoader | None = None, runtime_model_publisher: Callable[[str, str | None], None] | None = None, + p2p_shell: Any | None = None, ): from nanobot.config.schema import ToolsConfig @@ -192,6 +201,7 @@ class AgentLoop: defaults = AgentDefaults() self.bus = bus self.channels_config = channels_config + self.p2p_shell = p2p_shell self.provider = provider self._provider_snapshot_loader = provider_snapshot_loader self._preset_snapshot_loader = preset_snapshot_loader @@ -463,6 +473,22 @@ class AgentLoop: ) registered.append("my") + # Register P2P tools if enabled + if self.p2p_shell: + self.tools.register(DispatchTaskTool(shell=self.p2p_shell)) + self.tools.register(PollTaskResultTool(shell=self.p2p_shell)) + self.tools.register(BroadcastTaskTool(shell=self.p2p_shell)) + self.tools.register(CheckAggregationTool(shell=self.p2p_shell)) + self.tools.register( + ReportUserTool( + send_callback=self.bus.publish_outbound, + default_channel=getattr(self.channels_config, "default_channel", ""), + default_chat_id=getattr(self.channels_config, "default_chat_id", ""), + ) + ) + self.tools.register(FinalizeTaskTool(shell=self.p2p_shell, session_manager=self.sessions)) + registered.append("p2p") + logger.info("Registered {} tools: {}", len(registered), registered) async def _connect_mcp(self) -> None: diff --git a/nanobot/agent/tools/p2p.py b/nanobot/agent/tools/p2p.py new file mode 100644 index 000000000..dd85b0a95 --- /dev/null +++ b/nanobot/agent/tools/p2p.py @@ -0,0 +1,328 @@ +"""P2P tools for inter-agent task dispatch and coordination.""" + +from __future__ import annotations + +from typing import Any, Awaitable, Callable + +from nanobot.agent.tools.base import Tool +from nanobot.bus.events import OutboundMessage + + +class DispatchTaskTool(Tool): + """Asynchronously dispatch a task to another agent. Non-blocking.""" + + def __init__(self, shell: "P2PShell"): + self._shell = shell + + @property + def name(self) -> str: + return "dispatch_task" + + @property + def description(self) -> str: + return ( + "Dispatch a task to a specific target agent. Returns immediately with a receipt. " + "The target agent will process the task independently. Use poll_task_result later to check completion. " + "Do NOT block waiting for results." + ) + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "to": {"type": "string", "description": "Target agent ID"}, + "task_description": {"type": "string", "description": "Clear description of the task"}, + "parent_task_id": {"type": "string", "description": "Parent task ID for ancestry tracking"}, + "deadline_seconds": {"type": "integer", "default": 300, "description": "Task deadline in seconds"}, + "allow_redelegation": {"type": "boolean", "default": True, "description": "Whether the target may re-delegate"}, + }, + "required": ["to", "task_description"], + } + + async def execute( + self, + to: str, + task_description: str, + parent_task_id: str | None = None, + deadline_seconds: int = 300, + allow_redelegation: bool = True, + **kwargs: Any, + ) -> str: + result = self._shell.dispatch( + to=to, + parent_task_id=parent_task_id, + description=task_description, + deadline_seconds=deadline_seconds, + allow_redelegation=allow_redelegation, + ) + if result.get("status") == "rejected": + return f"Error: dispatch rejected — {result.get('reason', 'unknown')}" + if result.get("status") == "circuit_open": + failover = result.get("failover_to") + return f"Error: circuit open for {to}. Failover candidate: {failover or 'none'}" + return ( + f"Dispatched to {to}. Task ID: {result.get('task_id')}. " + f"Depth: {result.get('depth', 0)}." + ) + + +class PollTaskResultTool(Tool): + """Poll the status of a previously dispatched task.""" + + def __init__(self, shell: "P2PShell"): + self._shell = shell + + @property + def name(self) -> str: + return "poll_task_result" + + @property + def description(self) -> str: + return ( + "Check the current status of a task you previously dispatched. " + "Returns completed, pending, timeout, failed, or not_found. " + "Call this proactively — do not wait for automatic notifications." + ) + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "task_id": {"type": "string", "description": "Task ID returned by dispatch_task"}, + }, + "required": ["task_id"], + } + + async def execute(self, task_id: str, **kwargs: Any) -> str: + result = self._shell.poll(task_id) + status = result.get("status") + if status == "not_found": + return f"Task {task_id} not found." + if status == "pending": + return f"Task {task_id} is pending (elapsed {result.get('elapsed', '?')}s)." + if status == "timeout": + return f"Task {task_id} timed out after {result.get('elapsed', '?')}s." + if status in ("completed", "failed", "aborted"): + from_agent = result.get("from", "unknown") + content = result.get("result", "") + preview = content[:500] + "..." if len(content) > 500 else content + return f"Task {task_id} is {status} (from {from_agent}).\n\n{preview}" + return f"Task {task_id} status: {status}" + + +class BroadcastTaskTool(Tool): + """Broadcast subtasks to discover capable agents.""" + + def __init__(self, shell: "P2PShell"): + self._shell = shell + + @property + def name(self) -> str: + return "broadcast_task" + + @property + def description(self) -> str: + return ( + "Announce subtasks to the agent network to collect BIDs. " + "Returns immediately. Use check_aggregation later to see which agents responded. " + "Each subtask should include a capability hint for matching." + ) + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "task_id": {"type": "string", "description": "Your task identifier"}, + "subtasks": { + "type": "array", + "items": { + "type": "object", + "properties": { + "subtask_id": {"type": "string"}, + "description": {"type": "string"}, + "capability": {"type": "string", "description": "Required capability, e.g. 'web_search'"}, + "budget_seconds": {"type": "integer", "default": 300}, + }, + "required": ["subtask_id", "description", "capability"], + }, + }, + "aggregation_timeout": {"type": "integer", "default": 30, "description": "Seconds to wait for BIDs"}, + }, + "required": ["task_id", "subtasks"], + } + + async def execute( + self, + task_id: str, + subtasks: list[dict[str, Any]], + aggregation_timeout: int = 30, + **kwargs: Any, + ) -> str: + result = self._shell.broadcast(task_id, subtasks, aggregation_timeout) + invited = result.get("invited", 0) + return f"Broadcast opened for {task_id}. Invited {invited} agent(s). Use check_aggregation to collect BIDs." + + +class CheckAggregationTool(Tool): + """Check the status of a broadcast aggregation window.""" + + def __init__(self, shell: "P2PShell"): + self._shell = shell + + @property + def name(self) -> str: + return "check_aggregation" + + @property + def description(self) -> str: + return ( + "Check whether a previously broadcast task has collected enough BIDs or timed out. " + "Returns the list of responding agents and their bids, or a pending status with counts." + ) + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "task_id": {"type": "string", "description": "Task ID used in broadcast_task"}, + }, + "required": ["task_id"], + } + + async def execute(self, task_id: str, **kwargs: Any) -> str: + result = self._shell.check_aggregation(task_id) + status = result.get("status") + if status == "no_window": + return f"No broadcast window found for {task_id}." + if status == "pending": + received = result.get("received", 0) + expected = result.get("expected", "?") + remaining = result.get("seconds_remaining", 0) + return ( + f"Aggregation pending for {task_id}: " + f"{received}/{expected} received, {remaining}s remaining." + ) + if status == "closed": + entries = result.get("entries", []) + lines = [f"Aggregation closed for {task_id} ({result.get('reason', '')}):", ""] + for e in entries: + agent = e.get("from", "unknown") + sub = e.get("subtask_id", "") + lines.append(f"- {agent} bid for {sub}") + return "\n".join(lines) + return f"Unknown aggregation status for {task_id}: {status}" + + +class ReportUserTool(Tool): + """Deliver a final answer to the user.""" + + def __init__( + self, + send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None, + default_channel: str = "", + default_chat_id: str = "", + ): + self._send_callback = send_callback + self._default_channel = default_channel + self._default_chat_id = default_chat_id + + @property + def name(self) -> str: + return "report_user" + + @property + def description(self) -> str: + return ( + "Report the final answer to the user. Use this when you have gathered enough results. " + "Status 'partial' means some subtasks are incomplete — list them in pending_items." + ) + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "final_answer": {"type": "string", "description": "Complete answer for the user"}, + "status": {"type": "string", "enum": ["success", "partial", "failed"]}, + "pending_items": { + "type": "array", + "items": {"type": "string"}, + "description": "Incomplete items when status is partial", + }, + "task_summary": {"type": "string", "description": "Optional brief summary"}, + }, + "required": ["final_answer", "status"], + } + + async def execute( + self, + final_answer: str, + status: str, + pending_items: list[str] | None = None, + task_summary: str = "", + **kwargs: Any, + ) -> str: + if not self._send_callback: + return "Error: report_user not configured (no send callback)" + + parts = [final_answer] + if pending_items: + parts.append(f"\n\nPending items:\n" + "\n".join(f"- {i}" for i in pending_items)) + if task_summary: + parts.append(f"\n\nSummary: {task_summary}") + + content = "\n".join(parts) + msg = OutboundMessage( + channel=self._default_channel, + chat_id=self._default_chat_id, + content=content, + ) + await self._send_callback(msg) + return f"Reported to user (status={status})." + + +class FinalizeTaskTool(Tool): + """Force-finalize a task and close its sessions.""" + + def __init__(self, shell: "P2PShell", session_manager: "SessionManager | None" = None): + self._shell = shell + self._session_manager = session_manager + + @property + def name(self) -> str: + return "finalize_task" + + @property + def description(self) -> str: + return ( + "Terminate a task and all its subtasks. Use when the user says 'stop', " + "or when a task is fundamentally blocked. outcome can be completed, failed, or aborted." + ) + + @property + def parameters(self) -> dict[str, Any]: + return { + "type": "object", + "properties": { + "task_id": {"type": "string"}, + "outcome": {"type": "string", "enum": ["completed", "failed", "aborted"]}, + "reason": {"type": "string", "description": "Why the task was finalized"}, + }, + "required": ["task_id", "outcome"], + } + + async def execute( + self, + task_id: str, + outcome: str, + reason: str = "", + **kwargs: Any, + ) -> str: + self._shell.finalize(task_id, outcome, reason) + if self._session_manager: + self._session_manager.finalize_task_session(task_id) + return f"Task {task_id} finalized with outcome={outcome}." diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index cc14f52c1..4376e2c53 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -75,6 +75,7 @@ class SafeFileHistory(FileHistory): from nanobot.cli.stream import StreamRenderer, ThinkingSpinner from nanobot.config.paths import get_workspace_path, is_default_workspace from nanobot.config.schema import Config +from nanobot.p2p.shell import P2PShell from nanobot.utils.helpers import sync_workspace_templates from nanobot.utils.restart import ( consume_restart_notice_from_env, @@ -92,6 +93,17 @@ app = typer.Typer( console = Console() EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"} + +def _resolve_p2p(config: Config) -> P2PShell | None: + """Resolve P2P config and create the stateless P2P shell.""" + mb_cfg = config.mailbox + if not mb_cfg.enabled: + return None + return P2PShell( + agent_id=mb_cfg.agent_id, + mailboxes_root=mb_cfg.mailboxes_root, + ) + # --------------------------------------------------------------------------- # CLI input: prompt_toolkit for editing, paste, history, and display # --------------------------------------------------------------------------- @@ -581,10 +593,13 @@ def serve( sync_workspace_templates(runtime_config.workspace_path) bus = MessageBus() session_manager = SessionManager(runtime_config.workspace_path) + p2p_shell = _resolve_p2p(runtime_config) + try: agent_loop = AgentLoop.from_config( runtime_config, bus, session_manager=session_manager, + p2p_shell=p2p_shell, image_generation_provider_configs={ "openrouter": runtime_config.providers.openrouter, "aihubmix": runtime_config.providers.aihubmix, @@ -690,6 +705,8 @@ def _run_gateway( cron_store_path = config.workspace_path / "cron" / "jobs.json" cron = CronService(cron_store_path) + p2p_shell = _resolve_p2p(config) + # Create agent with cron service agent = AgentLoop.from_config( config, bus, @@ -709,6 +726,7 @@ def _run_gateway( preset, ), provider_signature=provider_snapshot.signature, + p2p_shell=p2p_shell, ) from nanobot.agent.loop import UNIFIED_SESSION_KEY @@ -921,6 +939,8 @@ def _run_gateway( interval_s=hb_cfg.interval_s, enabled=hb_cfg.enabled, timezone=config.agents.defaults.timezone, + p2p_shell=p2p_shell, + bus=bus, ) if channels.enabled_channels: @@ -1083,6 +1103,8 @@ def agent( cron_store_path = config.workspace_path / "cron" / "jobs.json" cron = CronService(cron_store_path) + p2p_shell = _resolve_p2p(config) + if logs: logger.enable("nanobot") else: @@ -1092,6 +1114,7 @@ def agent( agent_loop = AgentLoop.from_config( config, bus, cron_service=cron, + p2p_shell=p2p_shell, ) except ValueError as exc: console.print(f"[red]Error: {exc}[/red]") @@ -1289,6 +1312,7 @@ def agent( console.print("\nGoodbye!") break finally: + pass agent_loop.stop() outbound_task.cancel() await asyncio.gather(bus_task, outbound_task, return_exceptions=True) diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 96f9014a9..7a21eb279 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -282,6 +282,19 @@ class ToolsConfig(Base): ssrf_whitelist: list[str] = Field(default_factory=list) # CIDR ranges to exempt from SSRF blocking (e.g. ["100.64.0.0/10"] for Tailscale) +class P2PConfig(Base): + """P2P collaboration network configuration.""" + + enabled: bool = False + agent_id: str = "" + description: str = "" + capabilities: list[str] = Field(default_factory=list) + allow_from: list[str] = Field(default_factory=lambda: ["*"]) + max_concurrent_tasks: int = 3 + poll_interval: float = 5.0 + mailboxes_root: str = "~/.nanobot/mailboxes" + + class Config(BaseSettings): """Root configuration for nanobot.""" @@ -295,6 +308,7 @@ class Config(BaseSettings): default_factory=dict, validation_alias=AliasChoices("modelPresets", "model_presets"), ) + mailbox: P2PConfig = Field(default_factory=P2PConfig) @model_validator(mode="after") def _validate_model_preset(self) -> "Config": diff --git a/nanobot/heartbeat/service.py b/nanobot/heartbeat/service.py index b41ee7a1e..d27ce990f 100644 --- a/nanobot/heartbeat/service.py +++ b/nanobot/heartbeat/service.py @@ -60,6 +60,8 @@ class HeartbeatService: interval_s: int = 30 * 60, enabled: bool = True, timezone: str | None = None, + p2p_shell: Any | None = None, + bus: Any | None = None, ): self.workspace = workspace self.provider = provider @@ -69,8 +71,11 @@ class HeartbeatService: self.interval_s = interval_s self.enabled = enabled self.timezone = timezone + self.p2p_shell = p2p_shell + self.bus = bus self._running = False self._task: asyncio.Task | None = None + self._last_inbox_scan: float = 0.0 @property def heartbeat_file(self) -> Path: @@ -185,6 +190,32 @@ class HeartbeatService: """Execute a single heartbeat tick.""" from nanobot.utils.evaluator import evaluate_response + # --- P2P inbox scan --- + if self.p2p_shell and self.bus: + try: + new_msgs = self.p2p_shell.scan_new_inbox(since=self._last_inbox_scan) + if new_msgs: + self._last_inbox_scan = time.time() + from nanobot.bus.events import InboundMessage + for msg in new_msgs: + await self.bus.publish_inbound( + InboundMessage( + channel="p2p", + sender_id=msg.get("from", "unknown"), + chat_id=msg.get("task_id", ""), + content=msg.get("payload", {}).get("description", ""), + metadata={"p2p_msg": msg}, + ) + ) + logger.info( + "Heartbeat: injected P2P task {} from {}", + msg.get("task_id", ""), + msg.get("from", "unknown"), + ) + except Exception: + logger.exception("Heartbeat P2P scan failed") + + # --- Legacy heartbeat file check --- content = self._read_heartbeat_file() if not content: logger.debug("Heartbeat: HEARTBEAT.md missing or empty") diff --git a/nanobot/p2p/__init__.py b/nanobot/p2p/__init__.py new file mode 100644 index 000000000..92c80047a --- /dev/null +++ b/nanobot/p2p/__init__.py @@ -0,0 +1,5 @@ +"""P2P inter-agent coordination layer.""" + +from nanobot.p2p.shell import P2PShell + +__all__ = ["P2PShell"] diff --git a/nanobot/p2p/shell.py b/nanobot/p2p/shell.py new file mode 100644 index 000000000..73644fe70 --- /dev/null +++ b/nanobot/p2p/shell.py @@ -0,0 +1,426 @@ +"""P2P shell: filesystem-backed inter-agent coordination. + +All state is stored in the mailbox filesystem; this class is stateless. +Restarting the gateway restores all task state by scanning files. +""" + +from __future__ import annotations + +import json +import os +import time +from pathlib import Path +from typing import Any, Literal + +from loguru import logger + + +class P2PShell: + """Stateless P2P coordination shell backed by the mailbox filesystem.""" + + def __init__(self, agent_id: str, mailboxes_root: str): + self.agent_id = agent_id + self.root = Path(mailboxes_root).expanduser() + self.inbox = self.root / agent_id / "inbox" + self.processed = self.root / agent_id / "processed" + self.links_dir = self.root / "_links" + self.windows_dir = self.root / "_windows" + + for d in (self.inbox, self.processed, self.links_dir, self.windows_dir): + d.mkdir(parents=True, exist_ok=True) + + # ------------------------------------------------------------------ + # Discovery + # ------------------------------------------------------------------ + + def discover(self, capability: str, top_k: int = 3) -> list[dict[str, Any]]: + """Read _registry.json and return candidates matching capability.""" + registry = self._load_json(self.root / "_registry.json", default={}) + candidates: list[dict[str, Any]] = [] + for aid, info in registry.items(): + if aid == self.agent_id: + continue + caps = info.get("capabilities", []) + if capability.lower() in " ".join(caps).lower(): + candidates.append({"agent_id": aid, **info}) + # Sort: idle first, then by current task load + candidates.sort(key=lambda x: (x.get("status") != "idle", x.get("current_tasks", 0))) + return candidates[:top_k] + + def heartbeat(self, description: str, capabilities: list[str]) -> None: + """Write self state into the shared _registry.json.""" + registry = self._load_json(self.root / "_registry.json", default={}) + registry[self.agent_id] = { + "description": description, + "capabilities": capabilities, + "status": "idle", + "last_heartbeat": int(time.time()), + "endpoint": "", + } + self._atomic_write(self.root / "_registry.json", registry) + + # ------------------------------------------------------------------ + # Task dispatch + # ------------------------------------------------------------------ + + def dispatch( + self, + to: str, + parent_task_id: str | None, + description: str, + deadline_seconds: int = 300, + allow_redelegation: bool = True, + ) -> dict[str, Any]: + """Write a task into the target agent's inbox and return a receipt.""" + task_id = ( + f"{parent_task_id}.{int(time.time())}" + if parent_task_id + else f"root_{int(time.time())}" + ) + + depth = self._get_depth(parent_task_id) if parent_task_id else 0 + if depth >= 3: + return {"status": "rejected", "reason": "max_depth_exceeded"} + + if parent_task_id and self._is_ancestor(to, parent_task_id): + return {"status": "rejected", "reason": "ancestry_loop"} + + if not self._circuit_allow(to): + failover = self._find_failover(to) + return {"status": "circuit_open", "failover_to": failover} + + target_inbox = self.root / to / "inbox" + target_inbox.mkdir(parents=True, exist_ok=True) + if list(target_inbox.glob(f"task_{task_id}_from_{self.agent_id}_*.json")): + return {"status": "dispatched", "task_id": task_id, "note": "cached"} + + ancestry = ( + (self._get_ancestry(parent_task_id) + [self.agent_id]) + if parent_task_id + else [self.agent_id] + ) + + msg: dict[str, Any] = { + "version": "p2p/v1", + "type": "task_dispatch", + "from": self.agent_id, + "to": to, + "task_id": task_id, + "ancestry": ancestry, + "depth": depth + 1, + "payload": { + "description": description, + "allow_redelegation": allow_redelegation, + }, + "deadline": int(time.time()) + deadline_seconds, + "timestamp": int(time.time()), + } + + path = target_inbox / f"task_{task_id}_from_{self.agent_id}_{os.urandom(4).hex()}.json" + self._atomic_write(path, msg) + logger.info("P2P dispatch: {} -> {} (task_id={})", self.agent_id, to, task_id) + return {"status": "dispatched", "task_id": task_id, "depth": depth + 1} + + def poll(self, task_id: str) -> dict[str, Any]: + """Scan inbox/processed and return task status.""" + # Check processed results first + results = list(self.processed.glob(f"result_{task_id}_from_*.json")) + if results: + data = self._load_json(results[0]) + payload = data.get("payload", {}) + return { + "status": payload.get("outcome", "completed"), + "result": payload.get("content", ""), + "from": data["from"], + } + + # Check inbox for results (not yet moved to processed) + inbox_results = list(self.inbox.glob(f"result_{task_id}_from_*.json")) + if inbox_results: + data = self._load_json(inbox_results[0]) + payload = data.get("payload", {}) + return { + "status": payload.get("outcome", "completed"), + "result": payload.get("content", ""), + "from": data["from"], + } + + # Check inbox for pending task dispatches + pending = list(self.inbox.glob(f"task_{task_id}_from_*.json")) + if pending: + data = self._load_json(pending[0]) + deadline = data.get("deadline", 0) + elapsed = int(time.time() - data["timestamp"]) + if time.time() > deadline: + return {"status": "timeout", "elapsed": elapsed} + return {"status": "pending", "elapsed": elapsed} + + return {"status": "not_found"} + + # ------------------------------------------------------------------ + # Aggregation (broadcast + check) + # ------------------------------------------------------------------ + + def broadcast( + self, + task_id: str, + subtasks: list[dict[str, Any]], + aggregation_timeout: int = 30, + ) -> dict[str, Any]: + """Write bid requests to candidate agents and create a window descriptor.""" + targets: list[tuple[str, str]] = [] # (subtask_id, agent_id) + for sub in subtasks: + caps = sub.get("capability", "") + found = self.discover(caps, top_k=3) + targets.extend([(sub["subtask_id"], a["agent_id"]) for a in found]) + + for subtask_id, target in targets: + msg: dict[str, Any] = { + "version": "p2p/v1", + "type": "bid_request", + "from": self.agent_id, + "to": target, + "task_id": task_id, + "subtask_id": subtask_id, + "payload": sub, + "deadline": int(time.time()) + aggregation_timeout, + "timestamp": int(time.time()), + } + target_inbox = self.root / target / "inbox" + target_inbox.mkdir(parents=True, exist_ok=True) + path = target_inbox / f"bid_{task_id}_{subtask_id}_from_{self.agent_id}.json" + self._atomic_write(path, msg) + + window: dict[str, Any] = { + "task_id": task_id, + "mode": "bid", + "expected": len(targets), + "deadline": int(time.time()) + aggregation_timeout, + "created_at": int(time.time()), + } + self._atomic_write(self.windows_dir / f"{task_id}.json", window) + logger.info( + "P2P broadcast: {} invited {} agents for task_id={}", + self.agent_id, + len(targets), + task_id, + ) + return {"status": "bidding_opened", "task_id": task_id, "invited": len(targets)} + + def check_aggregation(self, task_id: str) -> dict[str, Any]: + """Lazily check aggregation status by scanning files.""" + window_path = self.windows_dir / f"{task_id}.json" + if not window_path.exists(): + return {"status": "no_window"} + + window = self._load_json(window_path) + mode = window.get("mode", "bid") + deadline = window.get("deadline", 0) + + pattern = f"{mode}_{task_id}_*_from_*.json" + entries: list[dict[str, Any]] = [] + for f in self.inbox.glob(pattern): + data = self._load_json(f) + entries.append( + { + "from": data.get("from", ""), + "subtask_id": data.get("subtask_id", ""), + "payload": data.get("payload", {}), + } + ) + + is_timeout = time.time() > deadline + is_full = window.get("expected") and len(entries) >= window["expected"] + + if is_timeout or is_full: + self._atomic_write( + self.processed / f"window_{task_id}.json", + {**window, "closed_at": int(time.time()), "received": len(entries)}, + ) + window_path.unlink(missing_ok=True) + return { + "status": "closed", + "mode": mode, + "entries": entries, + "reason": "timeout" if is_timeout else "full", + } + + return { + "status": "pending", + "received": len(entries), + "expected": window.get("expected"), + "seconds_remaining": max(0, deadline - int(time.time())), + } + + # ------------------------------------------------------------------ + # Result reporting + # ------------------------------------------------------------------ + + def report_result( + self, + to: str, + task_id: str, + outcome: Literal["completed", "failed", "aborted"], + content: str, + callback: dict[str, Any] | None = None, + ) -> None: + """Worker calls this to write a result into the manager's inbox.""" + msg: dict[str, Any] = { + "version": "p2p/v1", + "type": "result", + "from": self.agent_id, + "to": to, + "task_id": task_id, + "payload": {"outcome": outcome, "content": content}, + "timestamp": int(time.time()), + } + if callback: + msg["callback"] = callback + target_inbox = self.root / to / "inbox" + target_inbox.mkdir(parents=True, exist_ok=True) + path = target_inbox / f"result_{task_id}_from_{self.agent_id}_{os.urandom(4).hex()}.json" + self._atomic_write(path, msg) + logger.info("P2P result: {} -> {} (task_id={}, outcome={})", self.agent_id, to, task_id, outcome) + + # ------------------------------------------------------------------ + # Finalization + # ------------------------------------------------------------------ + + def finalize(self, task_id: str, outcome: str, reason: str = "") -> None: + """Move all task files from inbox to processed and mark outcome.""" + for src in list(self.inbox.glob(f"*{task_id}*")): + data = self._load_json(src) + data.setdefault("payload", {}) + data["payload"]["outcome"] = outcome + data["payload"]["reason"] = reason + dst = self.processed / src.name + self._atomic_write(dst, data) + src.unlink(missing_ok=True) + logger.info("P2P finalize: task_id={} outcome={}", task_id, outcome) + + # ------------------------------------------------------------------ + # Circuit breaker + # ------------------------------------------------------------------ + + def _circuit_allow(self, to: str) -> bool: + link = self._load_json( + self.links_dir / f"{to}.json", + default={"failures": 0, "last_failure": 0, "open": False}, + ) + if not link.get("open"): + return True + backoff = 300 * (2 ** max(0, link.get("failures", 0) - 3)) + if time.time() - link.get("last_failure", 0) > backoff: + link["open"] = False + self._atomic_write(self.links_dir / f"{to}.json", link) + return True + return False + + def record_failure(self, to: str) -> None: + link = self._load_json( + self.links_dir / f"{to}.json", + default={"failures": 0, "last_failure": 0, "open": False}, + ) + link["failures"] = link.get("failures", 0) + 1 + link["last_failure"] = int(time.time()) + if link["failures"] >= 3: + link["open"] = True + self._atomic_write(self.links_dir / f"{to}.json", link) + + def record_success(self, to: str) -> None: + link = self._load_json( + self.links_dir / f"{to}.json", + default={"failures": 0, "last_failure": 0, "open": False}, + ) + link["failures"] = 0 + link["open"] = False + self._atomic_write(self.links_dir / f"{to}.json", link) + + # ------------------------------------------------------------------ + # Inbox scanning (for HeartbeatService) + # ------------------------------------------------------------------ + + def scan_inbox(self) -> list[dict[str, Any]]: + """Return all task_dispatch messages currently in inbox.""" + messages: list[dict[str, Any]] = [] + for f in sorted(self.inbox.glob("task_*_from_*.json"), key=lambda p: p.stat().st_mtime): + data = self._load_json(f) + # Skip expired tasks + if time.time() > data.get("deadline", 0): + continue + data["_filename"] = f.name + messages.append(data) + return messages + + def scan_new_inbox(self, since: float | None = None) -> list[dict[str, Any]]: + """Return inbox messages newer than the given timestamp.""" + messages: list[dict[str, Any]] = [] + for f in self.inbox.glob("task_*_from_*.json"): + mtime = f.stat().st_mtime + if since is not None and mtime <= since: + continue + data = self._load_json(f) + if time.time() > data.get("deadline", 0): + continue + data["_filename"] = f.name + data["_mtime"] = mtime + messages.append(data) + return sorted(messages, key=lambda x: x.get("_mtime", 0)) + + def mark_processed(self, filename: str) -> None: + """Move a single inbox file to processed.""" + src = self.inbox / filename + if not src.exists(): + return + dst = self.processed / filename + try: + import shutil + shutil.move(str(src), str(dst)) + except Exception: + logger.warning("Failed to mark processed: {}", filename) + + # ------------------------------------------------------------------ + # Helpers + # ------------------------------------------------------------------ + + def _load_json(self, path: Path, default: Any | None = None) -> Any: + if not path.exists(): + return default if default is not None else {} + with open(path, "r", encoding="utf-8") as f: + return json.load(f) + + def _atomic_write(self, path: Path, data: dict[str, Any]) -> None: + tmp = path.with_suffix(".tmp") + with open(tmp, "w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + tmp.rename(path) + + def _get_depth(self, task_id: str) -> int: + return task_id.count(".") + + def _is_ancestor(self, agent_id: str, parent_task_id: str) -> bool: + for f in list(self.processed.glob(f"*{parent_task_id}*")) + list( + self.inbox.glob(f"*{parent_task_id}*") + ): + data = self._load_json(f) + if agent_id in data.get("ancestry", []): + return True + return False + + def _get_ancestry(self, task_id: str) -> list[str]: + for f in list(self.processed.glob(f"*{task_id}*")) + list( + self.inbox.glob(f"*{task_id}*") + ): + data = self._load_json(f) + return data.get("ancestry", []) + return [] + + def _find_failover(self, to: str) -> str | None: + registry = self._load_json(self.root / "_registry.json", default={}) + target_caps = registry.get(to, {}).get("capabilities", []) + for aid, info in registry.items(): + if aid == to: + continue + if any(c in info.get("capabilities", []) for c in target_caps): + return aid + return None diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 269301104..177b1215b 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -8,7 +8,7 @@ from contextlib import suppress from dataclasses import dataclass, field from datetime import datetime from pathlib import Path -from typing import Any +from typing import Any, Literal from loguru import logger @@ -581,6 +581,36 @@ class SessionManager: return self._session_payload(repaired) return None + def get_or_create_task_session( + self, + base_key: str, + task_id: str, + role: Literal["manager", "worker"] = "worker", + ) -> Session: + """Get or create an isolated session for a specific task. + + Key format: task:{base_key}:{task_id}:{role} + Example: task:slack:C123:root_qml:manager + """ + task_key = f"task:{base_key}:{task_id}:{role}" + return self.get_or_create(task_key) + + def list_task_sessions(self, base_key: str) -> list[Session]: + """List all task-scoped sessions for a given base key.""" + prefix = f"task:{base_key}:" + return [ + session for key, session in self._cache.items() + if key.startswith(prefix) + ] + + def finalize_task_session(self, task_id: str) -> None: + """Mark a task session as finalized (read-only) by setting metadata.""" + prefix = f"task:" + for key, session in list(self._cache.items()): + if f":{task_id}:" in key and key.startswith(prefix): + session.metadata["finalized"] = True + self.save(session) + def list_sessions(self) -> list[dict[str, Any]]: """ List all sessions. diff --git a/nanobot/skills/create-instance/SKILL.md b/nanobot/skills/create-instance/SKILL.md index a88c324bb..1f25a71ba 100644 --- a/nanobot/skills/create-instance/SKILL.md +++ b/nanobot/skills/create-instance/SKILL.md @@ -1,6 +1,6 @@ --- name: create-instance -description: "Create a new nanobot instance with separate config and workspace. Use when the user wants to set up a new bot, create a new instance for a different channel, persona, or purpose. Triggers on: create instance, new bot, set up bot, add bot, create telegram/discord/feishu/slack/wechat/wecom/dingtalk/qq/email/matrix/msteams/whatsapp bot, multi-instance setup." +description: "Create a new nanobot instance with separate config and workspace. Use when the user wants to set up a new bot, create a new instance for a different channel, persona, or purpose. Triggers on: create instance, new bot, set up bot, add bot, create telegram/discord/feishu/slack/wechat/wecom/dingtalk/qq/email/matrix/msteams/whatsapp bot, multi-instance setup, inter-agent communication." --- # Create Instance diff --git a/nanobot/skills/create-instance/references/channels.md b/nanobot/skills/create-instance/references/channels.md index 0bcf6ef50..d1e7427c9 100644 --- a/nanobot/skills/create-instance/references/channels.md +++ b/nanobot/skills/create-instance/references/channels.md @@ -192,3 +192,4 @@ Built-in WebSocket channel for programmatic access. - `port` — Listen port (default: 8765) - `allow_from` — Allowed origins (default: `["*"]`) - `streaming` — Enable streaming (default: true) + diff --git a/nanobot/skills/create-instance/scripts/create_instance.py b/nanobot/skills/create-instance/scripts/create_instance.py index d40d04bf2..905daad66 100644 --- a/nanobot/skills/create-instance/scripts/create_instance.py +++ b/nanobot/skills/create-instance/scripts/create_instance.py @@ -68,6 +68,7 @@ def _patch_config( channel: str, workspace: Path, model: str | None, + name: str | None = None, inherit_config_path: Path | None = None, ) -> dict: """Patch the generated config: enable channel, set workspace, optionally set model.""" @@ -227,6 +228,7 @@ def main() -> None: channel=args.channel, workspace=workspace, model=args.model, + name=name, inherit_config_path=inherit_path, )