feat(p2p): add peer-to-peer task coordination and mailbox system

This commit is contained in:
chengyongru 2026-04-30 15:11:27 +08:00 committed by chengyongru
parent b815aa8c0e
commit 80e103aae3
12 changed files with 913 additions and 3 deletions

View File

@ -39,6 +39,7 @@ class ContextBuilder:
skill_names: list[str] | None = None,
channel: str | None = None,
session_summary: str | None = None,
session_key: str | None = None,
) -> str:
"""Build the system prompt from identity, bootstrap files, memory, and skills."""
parts = [self._get_identity(channel=channel)]
@ -73,8 +74,29 @@ class ContextBuilder:
if session_summary:
parts.append(f"[Archived Context Summary]\n\n{session_summary}")
# Inject P2P collaboration hint for task-scoped sessions
if session_key and session_key.startswith("task:"):
parts.append(self._p2p_collaboration_hint())
return "\n\n---\n\n".join(parts)
@staticmethod
def _p2p_collaboration_hint() -> str:
return (
"# Multi-Agent Collaboration\n\n"
"You are part of a decentralized agent network. You can:\n"
"- Use `broadcast_task` to announce subtasks and collect BIDs\n"
"- Use `dispatch_task` to assign tasks to specific agents\n"
"- Use `poll_task_result` to check task status\n"
"- Use `report_user` to deliver final results to the user\n"
"- Use `finalize_task` to terminate tasks\n\n"
"Rules:\n"
"- Never block waiting for results. Dispatch and continue.\n"
"- If a task times out, decide whether to retry, failover, or report partial.\n"
"- Respect the user's INTERRUPT messages — they have highest priority.\n"
"- You are currently in a task-scoped session; focus on the delegated task."
)
def _get_identity(self, channel: str | None = None) -> str:
"""Get the core identity section."""
workspace_path = str(self.workspace.expanduser().resolve())
@ -154,6 +176,7 @@ class ContextBuilder:
sender_id: str | None = None,
session_summary: str | None = None,
session_metadata: Mapping[str, Any] | None = None,
session_key: str | None = None,
) -> list[dict[str, Any]]:
"""Build the complete message list for an LLM call."""
extra = goal_state_runtime_lines(session_metadata)
@ -175,7 +198,7 @@ class ContextBuilder:
else:
merged = user_content + [{"type": "text", "text": runtime_ctx}]
messages = [
{"role": "system", "content": self.build_system_prompt(skill_names, channel=channel, session_summary=session_summary)},
{"role": "system", "content": self.build_system_prompt(skill_names, channel=channel, session_summary=session_summary, session_key=session_key)},
*history,
]
if messages[-1].get("role") == current_role:

View File

@ -24,6 +24,14 @@ from nanobot.agent.runner import _MAX_INJECTIONS_PER_TURN, AgentRunner, AgentRun
from nanobot.agent.subagent import SubagentManager
from nanobot.agent.tools.file_state import FileStateStore, bind_file_states, reset_file_states
from nanobot.agent.tools.message import MessageTool
from nanobot.agent.tools.p2p import (
BroadcastTaskTool,
CheckAggregationTool,
DispatchTaskTool,
FinalizeTaskTool,
PollTaskResultTool,
ReportUserTool,
)
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.self import MyTool
from nanobot.bus.events import InboundMessage, OutboundMessage
@ -185,6 +193,7 @@ class AgentLoop:
model_preset: str | None = None,
preset_snapshot_loader: preset_helpers.PresetSnapshotLoader | None = None,
runtime_model_publisher: Callable[[str, str | None], None] | None = None,
p2p_shell: Any | None = None,
):
from nanobot.config.schema import ToolsConfig
@ -192,6 +201,7 @@ class AgentLoop:
defaults = AgentDefaults()
self.bus = bus
self.channels_config = channels_config
self.p2p_shell = p2p_shell
self.provider = provider
self._provider_snapshot_loader = provider_snapshot_loader
self._preset_snapshot_loader = preset_snapshot_loader
@ -463,6 +473,22 @@ class AgentLoop:
)
registered.append("my")
# Register P2P tools if enabled
if self.p2p_shell:
self.tools.register(DispatchTaskTool(shell=self.p2p_shell))
self.tools.register(PollTaskResultTool(shell=self.p2p_shell))
self.tools.register(BroadcastTaskTool(shell=self.p2p_shell))
self.tools.register(CheckAggregationTool(shell=self.p2p_shell))
self.tools.register(
ReportUserTool(
send_callback=self.bus.publish_outbound,
default_channel=getattr(self.channels_config, "default_channel", ""),
default_chat_id=getattr(self.channels_config, "default_chat_id", ""),
)
)
self.tools.register(FinalizeTaskTool(shell=self.p2p_shell, session_manager=self.sessions))
registered.append("p2p")
logger.info("Registered {} tools: {}", len(registered), registered)
async def _connect_mcp(self) -> None:

328
nanobot/agent/tools/p2p.py Normal file
View File

@ -0,0 +1,328 @@
"""P2P tools for inter-agent task dispatch and coordination."""
from __future__ import annotations
from typing import Any, Awaitable, Callable
from nanobot.agent.tools.base import Tool
from nanobot.bus.events import OutboundMessage
class DispatchTaskTool(Tool):
"""Asynchronously dispatch a task to another agent. Non-blocking."""
def __init__(self, shell: "P2PShell"):
self._shell = shell
@property
def name(self) -> str:
return "dispatch_task"
@property
def description(self) -> str:
return (
"Dispatch a task to a specific target agent. Returns immediately with a receipt. "
"The target agent will process the task independently. Use poll_task_result later to check completion. "
"Do NOT block waiting for results."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"to": {"type": "string", "description": "Target agent ID"},
"task_description": {"type": "string", "description": "Clear description of the task"},
"parent_task_id": {"type": "string", "description": "Parent task ID for ancestry tracking"},
"deadline_seconds": {"type": "integer", "default": 300, "description": "Task deadline in seconds"},
"allow_redelegation": {"type": "boolean", "default": True, "description": "Whether the target may re-delegate"},
},
"required": ["to", "task_description"],
}
async def execute(
self,
to: str,
task_description: str,
parent_task_id: str | None = None,
deadline_seconds: int = 300,
allow_redelegation: bool = True,
**kwargs: Any,
) -> str:
result = self._shell.dispatch(
to=to,
parent_task_id=parent_task_id,
description=task_description,
deadline_seconds=deadline_seconds,
allow_redelegation=allow_redelegation,
)
if result.get("status") == "rejected":
return f"Error: dispatch rejected — {result.get('reason', 'unknown')}"
if result.get("status") == "circuit_open":
failover = result.get("failover_to")
return f"Error: circuit open for {to}. Failover candidate: {failover or 'none'}"
return (
f"Dispatched to {to}. Task ID: {result.get('task_id')}. "
f"Depth: {result.get('depth', 0)}."
)
class PollTaskResultTool(Tool):
"""Poll the status of a previously dispatched task."""
def __init__(self, shell: "P2PShell"):
self._shell = shell
@property
def name(self) -> str:
return "poll_task_result"
@property
def description(self) -> str:
return (
"Check the current status of a task you previously dispatched. "
"Returns completed, pending, timeout, failed, or not_found. "
"Call this proactively — do not wait for automatic notifications."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task ID returned by dispatch_task"},
},
"required": ["task_id"],
}
async def execute(self, task_id: str, **kwargs: Any) -> str:
result = self._shell.poll(task_id)
status = result.get("status")
if status == "not_found":
return f"Task {task_id} not found."
if status == "pending":
return f"Task {task_id} is pending (elapsed {result.get('elapsed', '?')}s)."
if status == "timeout":
return f"Task {task_id} timed out after {result.get('elapsed', '?')}s."
if status in ("completed", "failed", "aborted"):
from_agent = result.get("from", "unknown")
content = result.get("result", "")
preview = content[:500] + "..." if len(content) > 500 else content
return f"Task {task_id} is {status} (from {from_agent}).\n\n{preview}"
return f"Task {task_id} status: {status}"
class BroadcastTaskTool(Tool):
"""Broadcast subtasks to discover capable agents."""
def __init__(self, shell: "P2PShell"):
self._shell = shell
@property
def name(self) -> str:
return "broadcast_task"
@property
def description(self) -> str:
return (
"Announce subtasks to the agent network to collect BIDs. "
"Returns immediately. Use check_aggregation later to see which agents responded. "
"Each subtask should include a capability hint for matching."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Your task identifier"},
"subtasks": {
"type": "array",
"items": {
"type": "object",
"properties": {
"subtask_id": {"type": "string"},
"description": {"type": "string"},
"capability": {"type": "string", "description": "Required capability, e.g. 'web_search'"},
"budget_seconds": {"type": "integer", "default": 300},
},
"required": ["subtask_id", "description", "capability"],
},
},
"aggregation_timeout": {"type": "integer", "default": 30, "description": "Seconds to wait for BIDs"},
},
"required": ["task_id", "subtasks"],
}
async def execute(
self,
task_id: str,
subtasks: list[dict[str, Any]],
aggregation_timeout: int = 30,
**kwargs: Any,
) -> str:
result = self._shell.broadcast(task_id, subtasks, aggregation_timeout)
invited = result.get("invited", 0)
return f"Broadcast opened for {task_id}. Invited {invited} agent(s). Use check_aggregation to collect BIDs."
class CheckAggregationTool(Tool):
"""Check the status of a broadcast aggregation window."""
def __init__(self, shell: "P2PShell"):
self._shell = shell
@property
def name(self) -> str:
return "check_aggregation"
@property
def description(self) -> str:
return (
"Check whether a previously broadcast task has collected enough BIDs or timed out. "
"Returns the list of responding agents and their bids, or a pending status with counts."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"task_id": {"type": "string", "description": "Task ID used in broadcast_task"},
},
"required": ["task_id"],
}
async def execute(self, task_id: str, **kwargs: Any) -> str:
result = self._shell.check_aggregation(task_id)
status = result.get("status")
if status == "no_window":
return f"No broadcast window found for {task_id}."
if status == "pending":
received = result.get("received", 0)
expected = result.get("expected", "?")
remaining = result.get("seconds_remaining", 0)
return (
f"Aggregation pending for {task_id}: "
f"{received}/{expected} received, {remaining}s remaining."
)
if status == "closed":
entries = result.get("entries", [])
lines = [f"Aggregation closed for {task_id} ({result.get('reason', '')}):", ""]
for e in entries:
agent = e.get("from", "unknown")
sub = e.get("subtask_id", "")
lines.append(f"- {agent} bid for {sub}")
return "\n".join(lines)
return f"Unknown aggregation status for {task_id}: {status}"
class ReportUserTool(Tool):
"""Deliver a final answer to the user."""
def __init__(
self,
send_callback: Callable[[OutboundMessage], Awaitable[None]] | None = None,
default_channel: str = "",
default_chat_id: str = "",
):
self._send_callback = send_callback
self._default_channel = default_channel
self._default_chat_id = default_chat_id
@property
def name(self) -> str:
return "report_user"
@property
def description(self) -> str:
return (
"Report the final answer to the user. Use this when you have gathered enough results. "
"Status 'partial' means some subtasks are incomplete — list them in pending_items."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"final_answer": {"type": "string", "description": "Complete answer for the user"},
"status": {"type": "string", "enum": ["success", "partial", "failed"]},
"pending_items": {
"type": "array",
"items": {"type": "string"},
"description": "Incomplete items when status is partial",
},
"task_summary": {"type": "string", "description": "Optional brief summary"},
},
"required": ["final_answer", "status"],
}
async def execute(
self,
final_answer: str,
status: str,
pending_items: list[str] | None = None,
task_summary: str = "",
**kwargs: Any,
) -> str:
if not self._send_callback:
return "Error: report_user not configured (no send callback)"
parts = [final_answer]
if pending_items:
parts.append(f"\n\nPending items:\n" + "\n".join(f"- {i}" for i in pending_items))
if task_summary:
parts.append(f"\n\nSummary: {task_summary}")
content = "\n".join(parts)
msg = OutboundMessage(
channel=self._default_channel,
chat_id=self._default_chat_id,
content=content,
)
await self._send_callback(msg)
return f"Reported to user (status={status})."
class FinalizeTaskTool(Tool):
"""Force-finalize a task and close its sessions."""
def __init__(self, shell: "P2PShell", session_manager: "SessionManager | None" = None):
self._shell = shell
self._session_manager = session_manager
@property
def name(self) -> str:
return "finalize_task"
@property
def description(self) -> str:
return (
"Terminate a task and all its subtasks. Use when the user says 'stop', "
"or when a task is fundamentally blocked. outcome can be completed, failed, or aborted."
)
@property
def parameters(self) -> dict[str, Any]:
return {
"type": "object",
"properties": {
"task_id": {"type": "string"},
"outcome": {"type": "string", "enum": ["completed", "failed", "aborted"]},
"reason": {"type": "string", "description": "Why the task was finalized"},
},
"required": ["task_id", "outcome"],
}
async def execute(
self,
task_id: str,
outcome: str,
reason: str = "",
**kwargs: Any,
) -> str:
self._shell.finalize(task_id, outcome, reason)
if self._session_manager:
self._session_manager.finalize_task_session(task_id)
return f"Task {task_id} finalized with outcome={outcome}."

View File

@ -75,6 +75,7 @@ class SafeFileHistory(FileHistory):
from nanobot.cli.stream import StreamRenderer, ThinkingSpinner
from nanobot.config.paths import get_workspace_path, is_default_workspace
from nanobot.config.schema import Config
from nanobot.p2p.shell import P2PShell
from nanobot.utils.helpers import sync_workspace_templates
from nanobot.utils.restart import (
consume_restart_notice_from_env,
@ -92,6 +93,17 @@ app = typer.Typer(
console = Console()
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
def _resolve_p2p(config: Config) -> P2PShell | None:
"""Resolve P2P config and create the stateless P2P shell."""
mb_cfg = config.mailbox
if not mb_cfg.enabled:
return None
return P2PShell(
agent_id=mb_cfg.agent_id,
mailboxes_root=mb_cfg.mailboxes_root,
)
# ---------------------------------------------------------------------------
# CLI input: prompt_toolkit for editing, paste, history, and display
# ---------------------------------------------------------------------------
@ -581,10 +593,13 @@ def serve(
sync_workspace_templates(runtime_config.workspace_path)
bus = MessageBus()
session_manager = SessionManager(runtime_config.workspace_path)
p2p_shell = _resolve_p2p(runtime_config)
try:
agent_loop = AgentLoop.from_config(
runtime_config, bus,
session_manager=session_manager,
p2p_shell=p2p_shell,
image_generation_provider_configs={
"openrouter": runtime_config.providers.openrouter,
"aihubmix": runtime_config.providers.aihubmix,
@ -690,6 +705,8 @@ def _run_gateway(
cron_store_path = config.workspace_path / "cron" / "jobs.json"
cron = CronService(cron_store_path)
p2p_shell = _resolve_p2p(config)
# Create agent with cron service
agent = AgentLoop.from_config(
config, bus,
@ -709,6 +726,7 @@ def _run_gateway(
preset,
),
provider_signature=provider_snapshot.signature,
p2p_shell=p2p_shell,
)
from nanobot.agent.loop import UNIFIED_SESSION_KEY
@ -921,6 +939,8 @@ def _run_gateway(
interval_s=hb_cfg.interval_s,
enabled=hb_cfg.enabled,
timezone=config.agents.defaults.timezone,
p2p_shell=p2p_shell,
bus=bus,
)
if channels.enabled_channels:
@ -1083,6 +1103,8 @@ def agent(
cron_store_path = config.workspace_path / "cron" / "jobs.json"
cron = CronService(cron_store_path)
p2p_shell = _resolve_p2p(config)
if logs:
logger.enable("nanobot")
else:
@ -1092,6 +1114,7 @@ def agent(
agent_loop = AgentLoop.from_config(
config, bus,
cron_service=cron,
p2p_shell=p2p_shell,
)
except ValueError as exc:
console.print(f"[red]Error: {exc}[/red]")
@ -1289,6 +1312,7 @@ def agent(
console.print("\nGoodbye!")
break
finally:
pass
agent_loop.stop()
outbound_task.cancel()
await asyncio.gather(bus_task, outbound_task, return_exceptions=True)

View File

@ -282,6 +282,19 @@ class ToolsConfig(Base):
ssrf_whitelist: list[str] = Field(default_factory=list) # CIDR ranges to exempt from SSRF blocking (e.g. ["100.64.0.0/10"] for Tailscale)
class P2PConfig(Base):
"""P2P collaboration network configuration."""
enabled: bool = False
agent_id: str = ""
description: str = ""
capabilities: list[str] = Field(default_factory=list)
allow_from: list[str] = Field(default_factory=lambda: ["*"])
max_concurrent_tasks: int = 3
poll_interval: float = 5.0
mailboxes_root: str = "~/.nanobot/mailboxes"
class Config(BaseSettings):
"""Root configuration for nanobot."""
@ -295,6 +308,7 @@ class Config(BaseSettings):
default_factory=dict,
validation_alias=AliasChoices("modelPresets", "model_presets"),
)
mailbox: P2PConfig = Field(default_factory=P2PConfig)
@model_validator(mode="after")
def _validate_model_preset(self) -> "Config":

View File

@ -60,6 +60,8 @@ class HeartbeatService:
interval_s: int = 30 * 60,
enabled: bool = True,
timezone: str | None = None,
p2p_shell: Any | None = None,
bus: Any | None = None,
):
self.workspace = workspace
self.provider = provider
@ -69,8 +71,11 @@ class HeartbeatService:
self.interval_s = interval_s
self.enabled = enabled
self.timezone = timezone
self.p2p_shell = p2p_shell
self.bus = bus
self._running = False
self._task: asyncio.Task | None = None
self._last_inbox_scan: float = 0.0
@property
def heartbeat_file(self) -> Path:
@ -185,6 +190,32 @@ class HeartbeatService:
"""Execute a single heartbeat tick."""
from nanobot.utils.evaluator import evaluate_response
# --- P2P inbox scan ---
if self.p2p_shell and self.bus:
try:
new_msgs = self.p2p_shell.scan_new_inbox(since=self._last_inbox_scan)
if new_msgs:
self._last_inbox_scan = time.time()
from nanobot.bus.events import InboundMessage
for msg in new_msgs:
await self.bus.publish_inbound(
InboundMessage(
channel="p2p",
sender_id=msg.get("from", "unknown"),
chat_id=msg.get("task_id", ""),
content=msg.get("payload", {}).get("description", ""),
metadata={"p2p_msg": msg},
)
)
logger.info(
"Heartbeat: injected P2P task {} from {}",
msg.get("task_id", ""),
msg.get("from", "unknown"),
)
except Exception:
logger.exception("Heartbeat P2P scan failed")
# --- Legacy heartbeat file check ---
content = self._read_heartbeat_file()
if not content:
logger.debug("Heartbeat: HEARTBEAT.md missing or empty")

5
nanobot/p2p/__init__.py Normal file
View File

@ -0,0 +1,5 @@
"""P2P inter-agent coordination layer."""
from nanobot.p2p.shell import P2PShell
__all__ = ["P2PShell"]

426
nanobot/p2p/shell.py Normal file
View File

@ -0,0 +1,426 @@
"""P2P shell: filesystem-backed inter-agent coordination.
All state is stored in the mailbox filesystem; this class is stateless.
Restarting the gateway restores all task state by scanning files.
"""
from __future__ import annotations
import json
import os
import time
from pathlib import Path
from typing import Any, Literal
from loguru import logger
class P2PShell:
"""Stateless P2P coordination shell backed by the mailbox filesystem."""
def __init__(self, agent_id: str, mailboxes_root: str):
self.agent_id = agent_id
self.root = Path(mailboxes_root).expanduser()
self.inbox = self.root / agent_id / "inbox"
self.processed = self.root / agent_id / "processed"
self.links_dir = self.root / "_links"
self.windows_dir = self.root / "_windows"
for d in (self.inbox, self.processed, self.links_dir, self.windows_dir):
d.mkdir(parents=True, exist_ok=True)
# ------------------------------------------------------------------
# Discovery
# ------------------------------------------------------------------
def discover(self, capability: str, top_k: int = 3) -> list[dict[str, Any]]:
"""Read _registry.json and return candidates matching capability."""
registry = self._load_json(self.root / "_registry.json", default={})
candidates: list[dict[str, Any]] = []
for aid, info in registry.items():
if aid == self.agent_id:
continue
caps = info.get("capabilities", [])
if capability.lower() in " ".join(caps).lower():
candidates.append({"agent_id": aid, **info})
# Sort: idle first, then by current task load
candidates.sort(key=lambda x: (x.get("status") != "idle", x.get("current_tasks", 0)))
return candidates[:top_k]
def heartbeat(self, description: str, capabilities: list[str]) -> None:
"""Write self state into the shared _registry.json."""
registry = self._load_json(self.root / "_registry.json", default={})
registry[self.agent_id] = {
"description": description,
"capabilities": capabilities,
"status": "idle",
"last_heartbeat": int(time.time()),
"endpoint": "",
}
self._atomic_write(self.root / "_registry.json", registry)
# ------------------------------------------------------------------
# Task dispatch
# ------------------------------------------------------------------
def dispatch(
self,
to: str,
parent_task_id: str | None,
description: str,
deadline_seconds: int = 300,
allow_redelegation: bool = True,
) -> dict[str, Any]:
"""Write a task into the target agent's inbox and return a receipt."""
task_id = (
f"{parent_task_id}.{int(time.time())}"
if parent_task_id
else f"root_{int(time.time())}"
)
depth = self._get_depth(parent_task_id) if parent_task_id else 0
if depth >= 3:
return {"status": "rejected", "reason": "max_depth_exceeded"}
if parent_task_id and self._is_ancestor(to, parent_task_id):
return {"status": "rejected", "reason": "ancestry_loop"}
if not self._circuit_allow(to):
failover = self._find_failover(to)
return {"status": "circuit_open", "failover_to": failover}
target_inbox = self.root / to / "inbox"
target_inbox.mkdir(parents=True, exist_ok=True)
if list(target_inbox.glob(f"task_{task_id}_from_{self.agent_id}_*.json")):
return {"status": "dispatched", "task_id": task_id, "note": "cached"}
ancestry = (
(self._get_ancestry(parent_task_id) + [self.agent_id])
if parent_task_id
else [self.agent_id]
)
msg: dict[str, Any] = {
"version": "p2p/v1",
"type": "task_dispatch",
"from": self.agent_id,
"to": to,
"task_id": task_id,
"ancestry": ancestry,
"depth": depth + 1,
"payload": {
"description": description,
"allow_redelegation": allow_redelegation,
},
"deadline": int(time.time()) + deadline_seconds,
"timestamp": int(time.time()),
}
path = target_inbox / f"task_{task_id}_from_{self.agent_id}_{os.urandom(4).hex()}.json"
self._atomic_write(path, msg)
logger.info("P2P dispatch: {} -> {} (task_id={})", self.agent_id, to, task_id)
return {"status": "dispatched", "task_id": task_id, "depth": depth + 1}
def poll(self, task_id: str) -> dict[str, Any]:
"""Scan inbox/processed and return task status."""
# Check processed results first
results = list(self.processed.glob(f"result_{task_id}_from_*.json"))
if results:
data = self._load_json(results[0])
payload = data.get("payload", {})
return {
"status": payload.get("outcome", "completed"),
"result": payload.get("content", ""),
"from": data["from"],
}
# Check inbox for results (not yet moved to processed)
inbox_results = list(self.inbox.glob(f"result_{task_id}_from_*.json"))
if inbox_results:
data = self._load_json(inbox_results[0])
payload = data.get("payload", {})
return {
"status": payload.get("outcome", "completed"),
"result": payload.get("content", ""),
"from": data["from"],
}
# Check inbox for pending task dispatches
pending = list(self.inbox.glob(f"task_{task_id}_from_*.json"))
if pending:
data = self._load_json(pending[0])
deadline = data.get("deadline", 0)
elapsed = int(time.time() - data["timestamp"])
if time.time() > deadline:
return {"status": "timeout", "elapsed": elapsed}
return {"status": "pending", "elapsed": elapsed}
return {"status": "not_found"}
# ------------------------------------------------------------------
# Aggregation (broadcast + check)
# ------------------------------------------------------------------
def broadcast(
self,
task_id: str,
subtasks: list[dict[str, Any]],
aggregation_timeout: int = 30,
) -> dict[str, Any]:
"""Write bid requests to candidate agents and create a window descriptor."""
targets: list[tuple[str, str]] = [] # (subtask_id, agent_id)
for sub in subtasks:
caps = sub.get("capability", "")
found = self.discover(caps, top_k=3)
targets.extend([(sub["subtask_id"], a["agent_id"]) for a in found])
for subtask_id, target in targets:
msg: dict[str, Any] = {
"version": "p2p/v1",
"type": "bid_request",
"from": self.agent_id,
"to": target,
"task_id": task_id,
"subtask_id": subtask_id,
"payload": sub,
"deadline": int(time.time()) + aggregation_timeout,
"timestamp": int(time.time()),
}
target_inbox = self.root / target / "inbox"
target_inbox.mkdir(parents=True, exist_ok=True)
path = target_inbox / f"bid_{task_id}_{subtask_id}_from_{self.agent_id}.json"
self._atomic_write(path, msg)
window: dict[str, Any] = {
"task_id": task_id,
"mode": "bid",
"expected": len(targets),
"deadline": int(time.time()) + aggregation_timeout,
"created_at": int(time.time()),
}
self._atomic_write(self.windows_dir / f"{task_id}.json", window)
logger.info(
"P2P broadcast: {} invited {} agents for task_id={}",
self.agent_id,
len(targets),
task_id,
)
return {"status": "bidding_opened", "task_id": task_id, "invited": len(targets)}
def check_aggregation(self, task_id: str) -> dict[str, Any]:
"""Lazily check aggregation status by scanning files."""
window_path = self.windows_dir / f"{task_id}.json"
if not window_path.exists():
return {"status": "no_window"}
window = self._load_json(window_path)
mode = window.get("mode", "bid")
deadline = window.get("deadline", 0)
pattern = f"{mode}_{task_id}_*_from_*.json"
entries: list[dict[str, Any]] = []
for f in self.inbox.glob(pattern):
data = self._load_json(f)
entries.append(
{
"from": data.get("from", ""),
"subtask_id": data.get("subtask_id", ""),
"payload": data.get("payload", {}),
}
)
is_timeout = time.time() > deadline
is_full = window.get("expected") and len(entries) >= window["expected"]
if is_timeout or is_full:
self._atomic_write(
self.processed / f"window_{task_id}.json",
{**window, "closed_at": int(time.time()), "received": len(entries)},
)
window_path.unlink(missing_ok=True)
return {
"status": "closed",
"mode": mode,
"entries": entries,
"reason": "timeout" if is_timeout else "full",
}
return {
"status": "pending",
"received": len(entries),
"expected": window.get("expected"),
"seconds_remaining": max(0, deadline - int(time.time())),
}
# ------------------------------------------------------------------
# Result reporting
# ------------------------------------------------------------------
def report_result(
self,
to: str,
task_id: str,
outcome: Literal["completed", "failed", "aborted"],
content: str,
callback: dict[str, Any] | None = None,
) -> None:
"""Worker calls this to write a result into the manager's inbox."""
msg: dict[str, Any] = {
"version": "p2p/v1",
"type": "result",
"from": self.agent_id,
"to": to,
"task_id": task_id,
"payload": {"outcome": outcome, "content": content},
"timestamp": int(time.time()),
}
if callback:
msg["callback"] = callback
target_inbox = self.root / to / "inbox"
target_inbox.mkdir(parents=True, exist_ok=True)
path = target_inbox / f"result_{task_id}_from_{self.agent_id}_{os.urandom(4).hex()}.json"
self._atomic_write(path, msg)
logger.info("P2P result: {} -> {} (task_id={}, outcome={})", self.agent_id, to, task_id, outcome)
# ------------------------------------------------------------------
# Finalization
# ------------------------------------------------------------------
def finalize(self, task_id: str, outcome: str, reason: str = "") -> None:
"""Move all task files from inbox to processed and mark outcome."""
for src in list(self.inbox.glob(f"*{task_id}*")):
data = self._load_json(src)
data.setdefault("payload", {})
data["payload"]["outcome"] = outcome
data["payload"]["reason"] = reason
dst = self.processed / src.name
self._atomic_write(dst, data)
src.unlink(missing_ok=True)
logger.info("P2P finalize: task_id={} outcome={}", task_id, outcome)
# ------------------------------------------------------------------
# Circuit breaker
# ------------------------------------------------------------------
def _circuit_allow(self, to: str) -> bool:
link = self._load_json(
self.links_dir / f"{to}.json",
default={"failures": 0, "last_failure": 0, "open": False},
)
if not link.get("open"):
return True
backoff = 300 * (2 ** max(0, link.get("failures", 0) - 3))
if time.time() - link.get("last_failure", 0) > backoff:
link["open"] = False
self._atomic_write(self.links_dir / f"{to}.json", link)
return True
return False
def record_failure(self, to: str) -> None:
link = self._load_json(
self.links_dir / f"{to}.json",
default={"failures": 0, "last_failure": 0, "open": False},
)
link["failures"] = link.get("failures", 0) + 1
link["last_failure"] = int(time.time())
if link["failures"] >= 3:
link["open"] = True
self._atomic_write(self.links_dir / f"{to}.json", link)
def record_success(self, to: str) -> None:
link = self._load_json(
self.links_dir / f"{to}.json",
default={"failures": 0, "last_failure": 0, "open": False},
)
link["failures"] = 0
link["open"] = False
self._atomic_write(self.links_dir / f"{to}.json", link)
# ------------------------------------------------------------------
# Inbox scanning (for HeartbeatService)
# ------------------------------------------------------------------
def scan_inbox(self) -> list[dict[str, Any]]:
"""Return all task_dispatch messages currently in inbox."""
messages: list[dict[str, Any]] = []
for f in sorted(self.inbox.glob("task_*_from_*.json"), key=lambda p: p.stat().st_mtime):
data = self._load_json(f)
# Skip expired tasks
if time.time() > data.get("deadline", 0):
continue
data["_filename"] = f.name
messages.append(data)
return messages
def scan_new_inbox(self, since: float | None = None) -> list[dict[str, Any]]:
"""Return inbox messages newer than the given timestamp."""
messages: list[dict[str, Any]] = []
for f in self.inbox.glob("task_*_from_*.json"):
mtime = f.stat().st_mtime
if since is not None and mtime <= since:
continue
data = self._load_json(f)
if time.time() > data.get("deadline", 0):
continue
data["_filename"] = f.name
data["_mtime"] = mtime
messages.append(data)
return sorted(messages, key=lambda x: x.get("_mtime", 0))
def mark_processed(self, filename: str) -> None:
"""Move a single inbox file to processed."""
src = self.inbox / filename
if not src.exists():
return
dst = self.processed / filename
try:
import shutil
shutil.move(str(src), str(dst))
except Exception:
logger.warning("Failed to mark processed: {}", filename)
# ------------------------------------------------------------------
# Helpers
# ------------------------------------------------------------------
def _load_json(self, path: Path, default: Any | None = None) -> Any:
if not path.exists():
return default if default is not None else {}
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
def _atomic_write(self, path: Path, data: dict[str, Any]) -> None:
tmp = path.with_suffix(".tmp")
with open(tmp, "w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
tmp.rename(path)
def _get_depth(self, task_id: str) -> int:
return task_id.count(".")
def _is_ancestor(self, agent_id: str, parent_task_id: str) -> bool:
for f in list(self.processed.glob(f"*{parent_task_id}*")) + list(
self.inbox.glob(f"*{parent_task_id}*")
):
data = self._load_json(f)
if agent_id in data.get("ancestry", []):
return True
return False
def _get_ancestry(self, task_id: str) -> list[str]:
for f in list(self.processed.glob(f"*{task_id}*")) + list(
self.inbox.glob(f"*{task_id}*")
):
data = self._load_json(f)
return data.get("ancestry", [])
return []
def _find_failover(self, to: str) -> str | None:
registry = self._load_json(self.root / "_registry.json", default={})
target_caps = registry.get(to, {}).get("capabilities", [])
for aid, info in registry.items():
if aid == to:
continue
if any(c in info.get("capabilities", []) for c in target_caps):
return aid
return None

View File

@ -8,7 +8,7 @@ from contextlib import suppress
from dataclasses import dataclass, field
from datetime import datetime
from pathlib import Path
from typing import Any
from typing import Any, Literal
from loguru import logger
@ -581,6 +581,36 @@ class SessionManager:
return self._session_payload(repaired)
return None
def get_or_create_task_session(
self,
base_key: str,
task_id: str,
role: Literal["manager", "worker"] = "worker",
) -> Session:
"""Get or create an isolated session for a specific task.
Key format: task:{base_key}:{task_id}:{role}
Example: task:slack:C123:root_qml:manager
"""
task_key = f"task:{base_key}:{task_id}:{role}"
return self.get_or_create(task_key)
def list_task_sessions(self, base_key: str) -> list[Session]:
"""List all task-scoped sessions for a given base key."""
prefix = f"task:{base_key}:"
return [
session for key, session in self._cache.items()
if key.startswith(prefix)
]
def finalize_task_session(self, task_id: str) -> None:
"""Mark a task session as finalized (read-only) by setting metadata."""
prefix = f"task:"
for key, session in list(self._cache.items()):
if f":{task_id}:" in key and key.startswith(prefix):
session.metadata["finalized"] = True
self.save(session)
def list_sessions(self) -> list[dict[str, Any]]:
"""
List all sessions.

View File

@ -1,6 +1,6 @@
---
name: create-instance
description: "Create a new nanobot instance with separate config and workspace. Use when the user wants to set up a new bot, create a new instance for a different channel, persona, or purpose. Triggers on: create instance, new bot, set up bot, add bot, create telegram/discord/feishu/slack/wechat/wecom/dingtalk/qq/email/matrix/msteams/whatsapp bot, multi-instance setup."
description: "Create a new nanobot instance with separate config and workspace. Use when the user wants to set up a new bot, create a new instance for a different channel, persona, or purpose. Triggers on: create instance, new bot, set up bot, add bot, create telegram/discord/feishu/slack/wechat/wecom/dingtalk/qq/email/matrix/msteams/whatsapp bot, multi-instance setup, inter-agent communication."
---
# Create Instance

View File

@ -192,3 +192,4 @@ Built-in WebSocket channel for programmatic access.
- `port` — Listen port (default: 8765)
- `allow_from` — Allowed origins (default: `["*"]`)
- `streaming` — Enable streaming (default: true)

View File

@ -68,6 +68,7 @@ def _patch_config(
channel: str,
workspace: Path,
model: str | None,
name: str | None = None,
inherit_config_path: Path | None = None,
) -> dict:
"""Patch the generated config: enable channel, set workspace, optionally set model."""
@ -227,6 +228,7 @@ def main() -> None:
channel=args.channel,
workspace=workspace,
model=args.model,
name=name,
inherit_config_path=inherit_path,
)