diff --git a/docs/superpowers/plans/2026-04-26-multi-agent-mailbox.md b/docs/superpowers/plans/2026-04-26-multi-agent-mailbox.md new file mode 100644 index 000000000..5d754f589 --- /dev/null +++ b/docs/superpowers/plans/2026-04-26-multi-agent-mailbox.md @@ -0,0 +1,1198 @@ +# Multi-Agent Mailbox Implementation Plan + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** Implement a file-system-based mailbox channel plugin for inter-agent communication. Zero modifications to existing code. + +**Architecture:** MailboxManager handles low-level file I/O (atomic writes, polling, registry). MailboxChannel is a standard nanobot channel plugin that polls inbox and injects messages into the bus. The LLM uses the existing `MessageTool` with `channel="mailbox"` and `chat_id=""` to send messages to other agents — MailboxChannel.send() handles the rest. + +**Tech Stack:** Python 3.12+, asyncio, pydantic, pytest + +--- + +## File Structure + +| File | Responsibility | +|------|---------------| +| `nanobot/channels/mailbox.py` | MailboxManager + MailboxConfig + MailboxChannel — single file, same as all other channels | +| `tests/channels/test_mailbox.py` | All tests: MailboxManager, MailboxChannel, integration | + +**Existing files modified: None** + +--- + +### Task 1: MailboxManager — File Operations (inside mailbox.py) + +**Files:** +- Create: `nanobot/channels/mailbox.py` +- Create: `tests/channels/test_mailbox.py` + +- [ ] **Step 1: Write failing tests for MailboxManager** + +Create `tests/channels/test_mailbox.py`: + +```python +"""Tests for MailboxManager (file operations) and MailboxChannel.""" + +import asyncio +import json +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nanobot.channels.mailbox import MailboxManager, MailboxChannel, MailboxConfig + + +# --- MailboxManager Tests --- + +@pytest.fixture +def root(tmp_path: Path) -> Path: + mailboxes = tmp_path / "mailboxes" + mailboxes.mkdir() + return mailboxes + + +@pytest.fixture +def mgr(root: Path) -> MailboxManager: + return MailboxManager(root) + + +class TestRegister: + def test_register_creates_agent_entry(self, mgr: MailboxManager, root: Path): + card = {"agent_id": "researcher", "description": "test agent"} + mgr.register("researcher", card) + registry = json.loads((root / "_registry.json").read_text()) + assert "researcher" in registry + assert registry["researcher"]["agent_id"] == "researcher" + + def test_register_creates_directories(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + assert (root / "coder" / "inbox").is_dir() + assert (root / "coder" / "processed").is_dir() + + def test_register_overwrite(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder", "status": "idle"}) + mgr.register("coder", {"agent_id": "coder", "status": "busy"}) + registry = json.loads((root / "_registry.json").read_text()) + assert registry["coder"]["status"] == "busy" + + +class TestHeartbeat: + def test_heartbeat_updates_timestamp(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + before = json.loads((root / "_registry.json").read_text())["coder"]["last_heartbeat"] + time.sleep(0.01) + mgr.heartbeat("coder") + after = json.loads((root / "_registry.json").read_text())["coder"]["last_heartbeat"] + assert after >= before + + +class TestUpdateStatus: + def test_update_status(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder", "status": "idle"}) + mgr.update_status("coder", "busy", current_tasks=["task_1"]) + registry = json.loads((root / "_registry.json").read_text()) + assert registry["coder"]["status"] == "busy" + assert registry["coder"]["current_tasks"] == ["task_1"] + + +class TestSendAndPoll: + def test_send_creates_message_file(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + mgr.register("researcher", {"agent_id": "researcher"}) + msg = {"type": "message", "content": {"parts": [{"type": "text", "text": "hello"}]}} + mgr.send("researcher", "coder", msg) + files = list((root / "coder" / "inbox").glob("*.msg.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["from"] == "researcher" + assert data["to"] == "coder" + + def test_poll_returns_new_messages_sorted(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + mgr.register("a1", {"agent_id": "a1"}) + mgr.register("a2", {"agent_id": "a2"}) + mgr.send("a1", "coder", {"type": "message", "content": {"parts": []}}) + time.sleep(0.01) + mgr.send("a2", "coder", {"type": "task", "content": {"parts": []}}) + messages = mgr.poll("coder") + assert len(messages) == 2 + assert messages[0]["from"] == "a1" + assert messages[1]["from"] == "a2" + + def test_mark_processed_moves_file(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + mgr.register("researcher", {"agent_id": "researcher"}) + mgr.send("researcher", "coder", {"type": "message", "content": {"parts": []}}) + messages = mgr.poll("coder") + mgr.mark_processed("coder", messages[0]["_filename"]) + assert len(list((root / "coder" / "inbox").glob("*.msg.json"))) == 0 + assert len(list((root / "coder" / "processed").glob("*.msg.json"))) == 1 + + +class TestListAndGetAgents: + def test_list_online_agents(self, mgr: MailboxManager, root: Path): + mgr.register("researcher", {"agent_id": "researcher", "status": "idle"}) + mgr.register("coder", {"agent_id": "coder", "status": "busy"}) + agents = mgr.list_online_agents() + ids = {a["agent_id"] for a in agents} + assert ids == {"researcher", "coder"} + + def test_get_agent_not_found(self, mgr: MailboxManager): + assert mgr.get_agent("nonexistent") is None +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd D:\Documents\GitHub\nanobot\.worktrees\n2n && uv run pytest tests/channels/test_mailbox.py -v` +Expected: FAIL with `ModuleNotFoundError` + +- [ ] **Step 3: Implement MailboxManager (first part of mailbox.py)** + +Create `nanobot/channels/mailbox.py` with just the MailboxManager class: + +```python +"""Mailbox channel for inter-agent communication via filesystem.""" + +from __future__ import annotations + +import asyncio +import json +import logging +import os +import time +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from pydantic import Field + +from nanobot.bus.events import InboundMessage, OutboundMessage +from nanobot.channels.base import BaseChannel +from nanobot.config.schema import Base + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# MailboxManager — low-level file operations +# --------------------------------------------------------------------------- + + +class MailboxManager: + """File-system operations for agent mailboxes.""" + + def __init__(self, mailboxes_root: Path) -> None: + self.root = Path(mailboxes_root) + self.root.mkdir(parents=True, exist_ok=True) + + # -- registry -- + + def _registry_path(self) -> Path: + return self.root / "_registry.json" + + def _read_registry(self) -> dict: + path = self._registry_path() + if not path.exists(): + return {} + return json.loads(path.read_text(encoding="utf-8")) + + def _write_registry(self, data: dict) -> None: + path = self._registry_path() + tmp = path.with_suffix(".tmp") + tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + os.replace(tmp, path) + + def _ensure_dirs(self, agent_id: str) -> None: + agent_dir = self.root / agent_id + (agent_dir / "inbox").mkdir(parents=True, exist_ok=True) + (agent_dir / "processed").mkdir(parents=True, exist_ok=True) + + def register(self, agent_id: str, card: dict) -> None: + self._ensure_dirs(agent_id) + registry = self._read_registry() + card["registered_at"] = card.get( + "registered_at", datetime.now(timezone.utc).isoformat() + ) + card["last_heartbeat"] = datetime.now(timezone.utc).isoformat() + registry[agent_id] = card + self._write_registry(registry) + + def heartbeat(self, agent_id: str) -> None: + registry = self._read_registry() + if agent_id in registry: + registry[agent_id]["last_heartbeat"] = datetime.now(timezone.utc).isoformat() + self._write_registry(registry) + + def update_status( + self, agent_id: str, status: str, current_tasks: list[str] | None = None + ) -> None: + registry = self._read_registry() + if agent_id in registry: + registry[agent_id]["status"] = status + if current_tasks is not None: + registry[agent_id]["current_tasks"] = current_tasks + self._write_registry(registry) + + # -- message I/O -- + + def send(self, from_id: str, to_id: str, msg: dict) -> None: + self._ensure_dirs(to_id) + ts = int(time.time() * 1000) + filename = f"{ts}_{from_id}.msg.json" + msg.setdefault("id", f"msg_{ts}_{from_id}") + msg.setdefault("from", from_id) + msg.setdefault("to", to_id) + msg.setdefault("timestamp", datetime.now(timezone.utc).isoformat()) + filepath = self.root / to_id / "inbox" / filename + tmp = filepath.with_suffix(".tmp") + tmp.write_text(json.dumps(msg, ensure_ascii=False), encoding="utf-8") + os.replace(tmp, filepath) + + def poll(self, agent_id: str) -> list[dict]: + inbox = self.root / agent_id / "inbox" + if not inbox.is_dir(): + return [] + messages = [] + for f in sorted(inbox.glob("*.msg.json")): + data = json.loads(f.read_text(encoding="utf-8")) + data["_filename"] = f.name + messages.append(data) + return messages + + def mark_processed(self, agent_id: str, filename: str) -> None: + src = self.root / agent_id / "inbox" / filename + dst = self.root / agent_id / "processed" / filename + if src.exists(): + os.replace(src, dst) + + # -- discovery -- + + def list_online_agents(self) -> list[dict]: + registry = self._read_registry() + return [card for card in registry.values() if card.get("status") != "offline"] + + def get_agent(self, agent_id: str) -> dict | None: + return self._read_registry().get(agent_id) + + +# --------------------------------------------------------------------------- +# MailboxConfig — channel configuration +# --------------------------------------------------------------------------- + + +class MailboxConfig(Base): + """Mailbox channel configuration.""" + + enabled: bool = False + agent_id: str = "" + description: str = "" + capabilities: list[str] = Field(default_factory=list) + allow_from: list[str] = Field(default_factory=lambda: ["*"]) + max_concurrent_tasks: int = 3 + poll_interval: float = 5.0 + mailboxes_root: str = "~/.nanobot/mailboxes" + + +# --------------------------------------------------------------------------- +# MailboxChannel — channel plugin +# --------------------------------------------------------------------------- + + +class MailboxChannel(BaseChannel): + """Channel plugin for inter-agent communication via filesystem mailboxes. + + The LLM sends messages to other agents using the existing MessageTool: + MessageTool(channel='mailbox', chat_id='', content='...') + + This produces an OutboundMessage routed to MailboxChannel.send(), + which writes to the target agent's mailbox. + """ + + name = "mailbox" + display_name = "Mailbox" + + def __init__(self, config: Any, bus: Any) -> None: + if isinstance(config, dict): + config = MailboxConfig.model_validate(config) + super().__init__(config, bus) + self.config: MailboxConfig = config + root = Path(self.config.mailboxes_root).expanduser() + self.manager = MailboxManager(root) + self._running = False + self._poll_task: asyncio.Task | None = None + + async def start(self) -> None: + self.manager.register(self.config.agent_id, self._build_card()) + self._running = True + self._poll_task = asyncio.create_task(self._poll_loop()) + + async def _poll_loop(self) -> None: + while self._running: + await self._poll_once() + self.manager.heartbeat(self.config.agent_id) + await asyncio.sleep(self.config.poll_interval) + + async def _poll_once(self) -> None: + messages = self.manager.poll(self.config.agent_id) + for msg in messages: + filename = msg.pop("_filename", "") + callback = msg.get("callback") + if callback and callback.get("channel") and callback.get("chat_id"): + await self._handle_callback_message(msg, callback) + else: + await self._handle_message( + sender_id=msg["from"], + chat_id=msg["from"], + content=self._extract_text(msg), + metadata=self._build_metadata(msg), + ) + if filename: + self.manager.mark_processed(self.config.agent_id, filename) + + async def _handle_callback_message(self, msg: dict, callback: dict) -> None: + if not self.is_allowed(msg["from"]): + logger.warning("Mailbox: access denied from {}", msg["from"]) + return + inbound = InboundMessage( + channel=callback["channel"], + sender_id=msg["from"], + chat_id=callback["chat_id"], + content=self._extract_text(msg), + metadata=self._build_metadata(msg), + session_key_override=callback.get("session_id"), + ) + await self.bus.publish_inbound(inbound) + + def _extract_text(self, msg: dict) -> str: + parts = msg.get("content", {}).get("parts", []) + texts = [p["text"] for p in parts if p.get("type") == "text" and "text" in p] + return "\n".join(texts) if texts else "" + + def _build_metadata(self, msg: dict) -> dict[str, Any]: + return { + "mailbox_type": msg.get("type", "message"), + "mailbox_task": msg.get("task"), + "mailbox_parts": msg.get("content", {}).get("parts"), + "mailbox_ttl": msg.get("ttl"), + "mailbox_trace": msg.get("trace"), + "reply_to": msg.get("reply_to"), + } + + async def send(self, msg: OutboundMessage) -> None: + target = msg.chat_id + meta = msg.metadata or {} + mailbox_msg: dict[str, Any] = { + "type": meta.get("mailbox_type", "message"), + "from": self.config.agent_id, + "to": target, + "content": {"parts": [{"type": "text", "text": msg.content}]}, + "ttl": meta.get("mailbox_ttl", 3), + "trace": [self.config.agent_id], + } + if meta.get("mailbox_task"): + mailbox_msg["task"] = meta["mailbox_task"] + if meta.get("mailbox_callback"): + mailbox_msg["callback"] = meta["mailbox_callback"] + self.manager.send(self.config.agent_id, target, mailbox_msg) + + async def stop(self) -> None: + if not self._running: + return + self._running = False + if self._poll_task: + self._poll_task.cancel() + try: + await self._poll_task + except asyncio.CancelledError: + pass + self._poll_task = None + self.manager.update_status(self.config.agent_id, "offline") + + def _build_card(self) -> dict: + return { + "agent_id": self.config.agent_id, + "description": self.config.description, + "capabilities": self.config.capabilities, + "status": "idle", + "allow_from": self.config.allow_from, + "max_concurrent_tasks": self.config.max_concurrent_tasks, + "current_tasks": [], + "registered_at": datetime.now(timezone.utc).isoformat(), + "last_heartbeat": datetime.now(timezone.utc).isoformat(), + } + + @classmethod + def default_config(cls) -> dict[str, Any]: + return MailboxConfig().model_dump(by_alias=True) +``` + +- [ ] **Step 4: Run MailboxManager tests** + +Run: `cd D:\Documents\GitHub\nanobot\.worktrees\n2n && uv run pytest tests/channels/test_mailbox.py::TestRegister tests/channels/test_mailbox.py::TestHeartbeat tests/channels/test_mailbox.py::TestUpdateStatus tests/channels/test_mailbox.py::TestSendAndPoll tests/channels/test_mailbox.py::TestListAndGetAgents -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add nanobot/channels/mailbox.py tests/channels/test_mailbox.py +git commit -m "feat: add MailboxManager and MailboxChannel for inter-agent communication" +``` + +--- + +### Task 2: MailboxChannel — Channel Plugin Tests + +**Files:** +- Create: `nanobot/channels/mailbox.py` +- Create: `tests/channels/test_mailbox_channel.py` + +- [ ] **Step 1: Write failing tests for MailboxChannel** + +Create `tests/channels/test_mailbox_channel.py`: + +```python +"""Tests for MailboxChannel.""" + +import asyncio +import json +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nanobot.channels.mailbox import MailboxChannel, MailboxConfig + + +@pytest.fixture +def bus(): + b = MagicMock() + b.publish_inbound = AsyncMock() + return b + + +@pytest.fixture +def tmp_root(tmp_path: Path) -> Path: + root = tmp_path / "mailboxes" + root.mkdir() + return root + + +def _make_channel(bus, tmp_root: Path, **overrides) -> MailboxChannel: + cfg = { + "enabled": True, + "agentId": "coder", + "description": "test coder agent", + "capabilities": ["code_write"], + "allowFrom": ["*"], + "maxConcurrentTasks": 3, + "pollInterval": 0.1, + "mailboxesRoot": str(tmp_root), + } + cfg.update(overrides) + return MailboxChannel(cfg, bus) + + +class TestConfig: + def test_default_config(self): + config = MailboxConfig() + data = config.model_dump(by_alias=True) + assert data["enabled"] is False + assert data["agentId"] == "" + assert data["allowFrom"] == ["*"] + + def test_config_from_dict(self): + cfg = MailboxConfig.model_validate({ + "enabled": True, + "agentId": "researcher", + "allowFrom": ["coder"], + "pollInterval": 10, + }) + assert cfg.agent_id == "researcher" + assert cfg.allow_from == ["coder"] + assert cfg.poll_interval == 10 + + +class TestChannelAttributes: + def test_name(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + assert ch.name == "mailbox" + + def test_display_name(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + assert ch.display_name == "Mailbox" + + +class TestStartAndStop: + def test_start_registers_agent(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.start()) + registry = json.loads((tmp_root / "_registry.json").read_text()) + assert "coder" in registry + assert registry["coder"]["description"] == "test coder agent" + asyncio.get_event_loop().run_until_complete(ch.stop()) + + def test_stop_marks_offline(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.start()) + asyncio.get_event_loop().run_until_complete(ch.stop()) + registry = json.loads((tmp_root / "_registry.json").read_text()) + assert registry["coder"]["status"] == "offline" + + def test_stop_idempotent(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.stop()) + asyncio.get_event_loop().run_until_complete(ch.stop()) + + +class TestPollAndInbound: + def test_poll_delivers_inbound_message(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.start()) + + from nanobot.channels.mailbox import MailboxManager + mgr = MailboxManager(tmp_root) + mgr.register("researcher", {"agent_id": "researcher"}) + mgr.send("researcher", "coder", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "hello from researcher"}]}, + }) + + asyncio.get_event_loop().run_until_complete(ch._poll_once()) + + bus.publish_inbound.assert_called_once() + call_args = bus.publish_inbound.call_args[0][0] + assert call_args.channel == "mailbox" + assert call_args.sender_id == "researcher" + assert "hello from researcher" in call_args.content + + asyncio.get_event_loop().run_until_complete(ch.stop()) + + def test_poll_with_callback_routes_to_original_session(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.start()) + + from nanobot.channels.mailbox import MailboxManager + mgr = MailboxManager(tmp_root) + mgr.register("researcher", {"agent_id": "researcher"}) + mgr.send("researcher", "coder", { + "type": "task_update", + "content": {"parts": [{"type": "text", "text": "task done"}]}, + "callback": { + "session_id": "feishu:user_123", + "channel": "feishu", + "chat_id": "user_123", + }, + }) + + asyncio.get_event_loop().run_until_complete(ch._poll_once()) + + bus.publish_inbound.assert_called_once() + call_args = bus.publish_inbound.call_args[0][0] + assert call_args.channel == "feishu" + assert call_args.session_key_override == "feishu:user_123" + assert call_args.chat_id == "user_123" + + asyncio.get_event_loop().run_until_complete(ch.stop()) + + def test_poll_respects_allow_from(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root, allowFrom=["researcher"]) + asyncio.get_event_loop().run_until_complete(ch.start()) + + from nanobot.channels.mailbox import MailboxManager + mgr = MailboxManager(tmp_root) + mgr.register("stranger", {"agent_id": "stranger"}) + mgr.send("stranger", "coder", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "unauthorized"}]}, + }) + + asyncio.get_event_loop().run_until_complete(ch._poll_once()) + + bus.publish_inbound.assert_not_called() + + asyncio.get_event_loop().run_until_complete(ch.stop()) + + def test_poll_marks_processed(self, bus, tmp_root): + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.start()) + + from nanobot.channels.mailbox import MailboxManager + mgr = MailboxManager(tmp_root) + mgr.register("researcher", {"agent_id": "researcher"}) + mgr.send("researcher", "coder", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "hello"}]}, + }) + + asyncio.get_event_loop().run_until_complete(ch._poll_once()) + + inbox_files = list((tmp_root / "coder" / "inbox").glob("*.msg.json")) + processed_files = list((tmp_root / "coder" / "processed").glob("*.msg.json")) + assert len(inbox_files) == 0 + assert len(processed_files) == 1 + + asyncio.get_event_loop().run_until_complete(ch.stop()) + + +class TestSend: + def test_send_writes_to_target_mailbox(self, bus, tmp_root): + """MailboxChannel.send() is called by ChannelManager when OutboundMessage + has channel='mailbox'. It writes to the target agent's inbox.""" + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.start()) + + from nanobot.channels.mailbox import MailboxManager + mgr = MailboxManager(tmp_root) + mgr.register("researcher", {"agent_id": "researcher"}) + + from nanobot.bus.events import OutboundMessage + msg = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="task completed", + metadata={}, + ) + asyncio.get_event_loop().run_until_complete(ch.send(msg)) + + files = list((tmp_root / "researcher" / "inbox").glob("*.msg.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["from"] == "coder" + assert data["to"] == "researcher" + assert data["ttl"] == 3 + assert "coder" in data["trace"] + + asyncio.get_event_loop().run_until_complete(ch.stop()) + + def test_send_uses_existing_message_tool(self, bus, tmp_root): + """Verify the flow: LLM calls MessageTool(channel='mailbox', chat_id='researcher') + → OutboundMessage → ChannelManager → MailboxChannel.send()""" + ch = _make_channel(bus, tmp_root) + asyncio.get_event_loop().run_until_complete(ch.start()) + + from nanobot.channels.mailbox import MailboxManager + mgr = MailboxManager(tmp_root) + mgr.register("researcher", {"agent_id": "researcher"}) + + # Simulate what MessageTool produces + from nanobot.bus.events import OutboundMessage + outbound = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="please write a sort function", + ) + asyncio.get_event_loop().run_until_complete(ch.send(outbound)) + + files = list((tmp_root / "researcher" / "inbox").glob("*.msg.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert "sort function" in data["content"]["parts"][0]["text"] + + asyncio.get_event_loop().run_until_complete(ch.stop()) +``` + +- [ ] **Step 2: Run tests to verify they fail** + +Run: `cd D:\Documents\GitHub\nanobot\.worktrees\n2n && uv run pytest tests/channels/test_mailbox_channel.py -v` +Expected: FAIL with `ModuleNotFoundError: No module named 'nanobot.channels.mailbox'` + +- [ ] **Step 3: Implement MailboxConfig and MailboxChannel** + +Create `nanobot/channels/mailbox.py`: + +```python +"""Mailbox channel for inter-agent communication via filesystem.""" + +from __future__ import annotations + +import asyncio +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from pydantic import Field + +from nanobot.channels.base import BaseChannel +from nanobot.channels.mailbox import MailboxManager +from nanobot.bus.events import InboundMessage, OutboundMessage +from nanobot.config.schema import Base + +logger = logging.getLogger(__name__) + + +class MailboxConfig(Base): + """Mailbox channel configuration.""" + + enabled: bool = False + agent_id: str = "" + description: str = "" + capabilities: list[str] = Field(default_factory=list) + allow_from: list[str] = Field(default_factory=lambda: ["*"]) + max_concurrent_tasks: int = 3 + poll_interval: float = 5.0 + mailboxes_root: str = "~/.nanobot/mailboxes" + + +class MailboxChannel(BaseChannel): + """Channel plugin for inter-agent communication via filesystem mailboxes. + + The LLM sends messages to other agents using the existing MessageTool: + MessageTool(channel='mailbox', chat_id='', content='...') + + This produces an OutboundMessage routed to MailboxChannel.send(), + which writes to the target agent's mailbox. + """ + + name = "mailbox" + display_name = "Mailbox" + + def __init__(self, config: Any, bus: Any) -> None: + if isinstance(config, dict): + config = MailboxConfig.model_validate(config) + super().__init__(config, bus) + self.config: MailboxConfig = config + root = Path(self.config.mailboxes_root).expanduser() + self.manager = MailboxManager(root) + self._running = False + self._poll_task: asyncio.Task | None = None + + async def start(self) -> None: + """Register agent and start poll loop.""" + self.manager.register(self.config.agent_id, self._build_card()) + self._running = True + self._poll_task = asyncio.create_task(self._poll_loop()) + + async def _poll_loop(self) -> None: + """Periodically scan inbox and inject messages into bus.""" + while self._running: + await self._poll_once() + self.manager.heartbeat(self.config.agent_id) + await asyncio.sleep(self.config.poll_interval) + + async def _poll_once(self) -> None: + """Single poll cycle: read inbox, inject to bus, mark processed.""" + messages = self.manager.poll(self.config.agent_id) + for msg in messages: + filename = msg.pop("_filename", "") + callback = msg.get("callback") + if callback and callback.get("channel") and callback.get("chat_id"): + await self._handle_callback_message(msg, callback) + else: + await self._handle_message( + sender_id=msg["from"], + chat_id=msg["from"], + content=self._extract_text(msg), + metadata=self._build_metadata(msg), + ) + if filename: + self.manager.mark_processed(self.config.agent_id, filename) + + async def _handle_callback_message(self, msg: dict, callback: dict) -> None: + """Route message to original session (e.g., Feishu) via callback info.""" + if not self.is_allowed(msg["from"]): + logger.warning("Mailbox: access denied from {}", msg["from"]) + return + inbound = InboundMessage( + channel=callback["channel"], + sender_id=msg["from"], + chat_id=callback["chat_id"], + content=self._extract_text(msg), + metadata=self._build_metadata(msg), + session_key_override=callback.get("session_id"), + ) + await self.bus.publish_inbound(inbound) + + def _extract_text(self, msg: dict) -> str: + """Extract plain text from message content parts.""" + parts = msg.get("content", {}).get("parts", []) + texts = [p["text"] for p in parts if p.get("type") == "text" and "text" in p] + return "\n".join(texts) if texts else "" + + def _build_metadata(self, msg: dict) -> dict[str, Any]: + """Build metadata dict from mailbox message.""" + return { + "mailbox_type": msg.get("type", "message"), + "mailbox_task": msg.get("task"), + "mailbox_parts": msg.get("content", {}).get("parts"), + "mailbox_ttl": msg.get("ttl"), + "mailbox_trace": msg.get("trace"), + "reply_to": msg.get("reply_to"), + } + + async def send(self, msg: OutboundMessage) -> None: + """Handle outbound message routed from ChannelManager. + + Called when LLM uses MessageTool with channel='mailbox'. + Writes message to the target agent's mailbox file. + Auto-adds TTL, trace, and callback info. + """ + target = msg.chat_id + meta = msg.metadata or {} + mailbox_msg: dict[str, Any] = { + "type": meta.get("mailbox_type", "message"), + "from": self.config.agent_id, + "to": target, + "content": {"parts": [{"type": "text", "text": msg.content}]}, + "ttl": meta.get("mailbox_ttl", 3), + "trace": [self.config.agent_id], + } + if meta.get("mailbox_task"): + mailbox_msg["task"] = meta["mailbox_task"] + if meta.get("mailbox_callback"): + mailbox_msg["callback"] = meta["mailbox_callback"] + self.manager.send(self.config.agent_id, target, mailbox_msg) + + async def stop(self) -> None: + """Stop polling and mark offline.""" + if not self._running: + return + self._running = False + if self._poll_task: + self._poll_task.cancel() + try: + await self._poll_task + except asyncio.CancelledError: + pass + self._poll_task = None + self.manager.update_status(self.config.agent_id, "offline") + + def _build_card(self) -> dict: + """Build Agent Card for registry from mailbox config.""" + return { + "agent_id": self.config.agent_id, + "description": self.config.description, + "capabilities": self.config.capabilities, + "status": "idle", + "allow_from": self.config.allow_from, + "max_concurrent_tasks": self.config.max_concurrent_tasks, + "current_tasks": [], + "registered_at": datetime.now(timezone.utc).isoformat(), + "last_heartbeat": datetime.now(timezone.utc).isoformat(), + } + + @classmethod + def default_config(cls) -> dict[str, Any]: + return MailboxConfig().model_dump(by_alias=True) +``` + +- [ ] **Step 4: Run tests to verify they pass** + +Run: `cd D:\Documents\GitHub\nanobot\.worktrees\n2n && uv run pytest tests/channels/test_mailbox_channel.py -v` +Expected: All PASS + +- [ ] **Step 5: Commit** + +```bash +git add nanobot/channels/mailbox.py tests/channels/test_mailbox.py +git commit -m "feat: add MailboxChannel plugin for inter-agent communication" +``` + +--- + +### Task 3: Integration Tests + +**Files:** +- Modify: `tests/channels/test_mailbox.py` — append integration tests + +- [ ] **Step 1: Append integration tests to test_mailbox.py** + +```python +"""Integration test: two agents communicating via mailbox.""" + +import asyncio +import json +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nanobot.channels.mailbox import MailboxChannel +from nanobot.channels.mailbox import MailboxManager +from nanobot.bus.events import OutboundMessage + + +@pytest.fixture +def root(tmp_path: Path) -> Path: + mailboxes = tmp_path / "mailboxes" + mailboxes.mkdir() + return mailboxes + + +@pytest.fixture +def bus_a(): + b = MagicMock() + b.publish_inbound = AsyncMock() + return b + + +@pytest.fixture +def bus_b(): + b = MagicMock() + b.publish_inbound = AsyncMock() + return b + + +def _make_channel(bus, root: Path, agent_id: str) -> MailboxChannel: + return MailboxChannel({ + "enabled": True, + "agentId": agent_id, + "description": f"{agent_id} agent", + "allowFrom": ["*"], + "maxConcurrentTasks": 3, + "pollInterval": 0.05, + "mailboxesRoot": str(root), + }, bus) + + +class TestTwoAgentCommunication: + @pytest.mark.asyncio + async def test_agent_a_sends_agent_b_receives(self, root: Path, bus_a, bus_b): + """Agent A uses MessageTool(channel='mailbox', chat_id='coder') → + MailboxChannel.send() writes to coder's inbox → + Agent B polls and receives the message.""" + ch_a = _make_channel(bus_a, root, "researcher") + ch_b = _make_channel(bus_b, root, "coder") + await ch_a.start() + await ch_b.start() + + # Simulate what MessageTool produces: OutboundMessage(channel='mailbox', chat_id='coder') + outbound = OutboundMessage( + channel="mailbox", + chat_id="coder", + content="please write a sort function", + ) + await ch_a.send(outbound) + + # Agent B polls inbox + await ch_b._poll_once() + bus_b.publish_inbound.assert_called_once() + msg = bus_b.publish_inbound.call_args[0][0] + assert msg.channel == "mailbox" + assert msg.sender_id == "researcher" + assert "sort function" in msg.content + # Verify TTL and trace are auto-added + assert msg.metadata["mailbox_ttl"] == 3 + assert "researcher" in msg.metadata["mailbox_trace"] + + await ch_a.stop() + await ch_b.stop() + + @pytest.mark.asyncio + async def test_agent_b_sends_response_back(self, root: Path, bus_a, bus_b): + """Agent B responds via MessageTool(channel='mailbox', chat_id='researcher') → + Agent A receives the response.""" + ch_a = _make_channel(bus_a, root, "researcher") + ch_b = _make_channel(bus_b, root, "coder") + await ch_a.start() + await ch_b.start() + + # Agent B responds + response = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="sort function completed: sort_by_mtime()", + ) + await ch_b.send(response) + + # Agent A polls inbox + await ch_a._poll_once() + bus_a.publish_inbound.assert_called_once() + msg = bus_a.publish_inbound.call_args[0][0] + assert "sort function completed" in msg.content + assert msg.sender_id == "coder" + + await ch_a.stop() + await ch_b.stop() + + @pytest.mark.asyncio + async def test_callback_routes_to_original_feishu_session(self, root: Path, bus_a, bus_b): + """Agent A sends task with callback. Agent B responds with callback. + Agent A's MailboxChannel routes the response to the original Feishu session.""" + ch_a = _make_channel(bus_a, root, "researcher") + ch_b = _make_channel(bus_b, root, "coder") + await ch_a.start() + await ch_b.start() + + # Agent A sends task with callback metadata (simulating LLM adding callback info) + task_outbound = OutboundMessage( + channel="mailbox", + chat_id="coder", + content="do work", + metadata={ + "mailbox_callback": { + "channel": "feishu", + "chat_id": "user_123", + "session_id": "feishu:user_123", + }, + }, + ) + await ch_a.send(task_outbound) + + # Agent B receives task + await ch_b._poll_once() + task_msg = bus_b.publish_inbound.call_args[0][0] + + # Agent B responds with the callback carried forward + response_outbound = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="work done", + metadata={ + "mailbox_callback": { + "channel": "feishu", + "chat_id": "user_123", + "session_id": "feishu:user_123", + }, + }, + ) + await ch_b.send(response_outbound) + + # Agent A receives response — should route to feishu session + await ch_a._poll_once() + bus_a.publish_inbound.assert_called_once() + update = bus_a.publish_inbound.call_args[0][0] + assert update.channel == "feishu" + assert update.session_key_override == "feishu:user_123" + assert update.chat_id == "user_123" + + await ch_a.stop() + await ch_b.stop() + + @pytest.mark.asyncio + async def test_anti_loop_trace(self, root: Path, bus_a): + """Messages with trace info are preserved in metadata for LLM awareness.""" + ch_a = _make_channel(bus_a, root, "researcher") + await ch_a.start() + + mgr = MailboxManager(root) + mgr.register("coder", {"agent_id": "coder"}) + mgr.send("coder", "researcher", { + "type": "task", + "ttl": 1, + "trace": ["researcher", "coder"], + "content": {"parts": [{"type": "text", "text": "bounced message"}]}, + }) + + await ch_a._poll_once() + bus_a.publish_inbound.assert_called_once() + msg = bus_a.publish_inbound.call_args[0][0] + assert msg.metadata["mailbox_ttl"] == 1 + assert "researcher" in msg.metadata["mailbox_trace"] + assert "coder" in msg.metadata["mailbox_trace"] + + await ch_a.stop() + + @pytest.mark.asyncio + async def test_allow_from_blocks_unauthorized(self, root: Path, bus_a): + """Agent with allowFrom=['coder'] rejects messages from 'stranger'.""" + ch_a = MailboxChannel({ + "enabled": True, + "agentId": "researcher", + "allowFrom": ["coder"], + "pollInterval": 0.05, + "mailboxesRoot": str(root), + }, bus_a) + await ch_a.start() + + mgr = MailboxManager(root) + mgr.register("stranger", {"agent_id": "stranger"}) + mgr.send("stranger", "researcher", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "should be blocked"}]}, + }) + + await ch_a._poll_once() + bus_a.publish_inbound.assert_not_called() + + await ch_a.stop() + + @pytest.mark.asyncio + async def test_registry_discovery(self, root: Path, bus_a, bus_b): + """Both agents register and can discover each other via registry.""" + ch_a = _make_channel(bus_a, root, "researcher") + ch_b = _make_channel(bus_b, root, "coder") + await ch_a.start() + await ch_b.start() + + mgr = MailboxManager(root) + agents = mgr.list_online_agents() + ids = {a["agent_id"] for a in agents} + assert "researcher" in ids + assert "coder" in ids + + # Verify descriptions are registered + researcher = mgr.get_agent("researcher") + assert researcher["description"] == "researcher agent" + + await ch_a.stop() + await ch_b.stop() +``` + +- [ ] **Step 2: Run integration tests** + +Run: `cd D:\Documents\GitHub\nanobot\.worktrees\n2n && uv run pytest tests/channels/test_mailbox.py -v` +Expected: All PASS + +- [ ] **Step 3: Commit** + +```bash +git add tests/channels/test_mailbox.py +git commit -m "test: add mailbox integration tests" +``` + +--- + +## How It Works End-to-End + +``` +LLM calls: MessageTool(channel="mailbox", chat_id="coder", content="write sort") + ↓ +OutboundMessage(channel="mailbox", chat_id="coder") + ↓ +ChannelManager._dispatch_outbound() routes to MailboxChannel.send() + ↓ +MailboxChannel.send() writes to ~/.nanobot/mailboxes/coder/inbox/ + ↓ +Agent B's MailboxChannel._poll_once() reads inbox + ↓ +Converts to InboundMessage → bus.publish_inbound() + ↓ +AgentLoop processes as normal message +``` + +No new tools. No code modifications. Pure channel plugin. + +--- + +## Self-Review + +### Spec Coverage + +| Spec Requirement | Task | +|-----------------|------| +| File-system mailbox storage | Task 1 (MailboxManager) | +| Atomic writes (.tmp → rename) | Task 1 (MailboxManager.send) | +| Agent discovery / Agent Card | Task 2 (MailboxChannel._build_card) | +| Agent states (idle/busy/offline) | Task 2 (start/stop + update_status) | +| Message types (message/task) | Task 2 (MailboxChannel.send), Task 3 (integration) | +| TTL + trace anti-loop | Task 2 (send auto-adds), Task 3 (integration) | +| Callback routing to original session | Task 2 (_handle_callback_message), Task 3 (integration) | +| allowFrom access control | Task 2 (uses BaseChannel.is_allowed), Task 3 (integration) | +| MailboxConfig (all fields) | Task 2 (MailboxConfig) | +| Zero modifications to existing code | Confirmed: 1 new file (mailbox.py) + 1 test file | + +### Placeholder Scan + +No TBD/TODO/placeholder patterns. All steps contain complete code. + +### Type Consistency + +- `MailboxManager(root: Path)` — consistent across all tasks +- `MailboxConfig.model_validate(config)` in `__init__` +- `InboundMessage(channel, sender_id, chat_id, content, metadata, session_key_override)` — matches dataclass +- `OutboundMessage(channel, chat_id, content, metadata)` — matches dataclass +- `MailboxChannel.name = "mailbox"` — matches config key and registry discovery diff --git a/docs/superpowers/specs/2026-04-26-multi-agent-mailbox-design.md b/docs/superpowers/specs/2026-04-26-multi-agent-mailbox-design.md new file mode 100644 index 000000000..a6d419883 --- /dev/null +++ b/docs/superpowers/specs/2026-04-26-multi-agent-mailbox-design.md @@ -0,0 +1,430 @@ +# Multi-Agent Mailbox Communication Design + +## Overview + +Design a simple, zero-side-effect mechanism for multiple independent nanobot agent instances to communicate with each other. The mechanism uses a file-system-based mailbox system implemented as a standard nanobot channel plugin, requiring no modifications to existing code. + +## Goals + +- Multiple independent agent processes can discover and communicate with each other +- Peer-to-peer (fully connected) topology +- Asynchronous event-driven messaging +- "Boss experience": agents auto-delegate, auto-report progress, user only interacts through their normal channel (e.g., Feishu) +- Zero side effects: only new files added, no modifications to existing code +- Incorporate concepts from Google's A2A protocol (Agent Cards, Task lifecycle, Message Parts) + +## Architecture + +### Storage: File-System Mailbox + +``` +~/.nanobot/mailboxes/ +├── _registry.json # Agent Cards (discovery) +├── researcher/ +│ ├── inbox/ # Pending messages +│ │ └── 1745659200_coder.msg.json +│ └── processed/ # Archived messages +│ └── 1745659000_coder.msg.json +└── coder/ + ├── inbox/ + └── processed/ +``` + +Global path `~/.nanobot/mailboxes/` is used because different agents may have different working directories. + +### Message File Naming + +`{unix_timestamp}_{from_agent_id}.msg.json` + +Atomic writes: write to `.tmp` file first, then `os.rename()` to prevent reading half-written messages. + +### Agent Discovery (A2A Agent Card) + +Each agent registers itself in `_registry.json` on startup and updates `last_heartbeat` on every poll cycle. + +The registry contains two types of data: +- **Agent identity** (`description`, `capabilities`) — configured by the user in `channels.mailbox` config section. Optional; if not set, other agents can only see `agent_id` and `status`. +- **Runtime state** (`status`, `current_tasks`, `last_heartbeat`) — computed automatically by the mailbox channel. + +```json +{ + "researcher": { + "agent_id": "researcher", + "description": "负责信息检索和分析的 agent", + "capabilities": ["web_search", "code_analysis", "summarization"], + "status": "idle", + "allow_from": ["coder", "writer"], + "max_concurrent_tasks": 3, + "current_tasks": ["msg_1745659200_coder"], + "registered_at": "2026-04-26T10:00:00Z", + "last_heartbeat": "2026-04-26T10:05:00Z" + }, + "coder": { + "agent_id": "coder", + "description": "负责代码编写和修改的 agent", + "capabilities": ["code_write", "test_run", "review"], + "status": "idle", + "allow_from": ["*"], + "max_concurrent_tasks": 3, + "current_tasks": [], + "registered_at": "2026-04-26T10:00:30Z", + "last_heartbeat": "2026-04-26T10:05:30Z" + } +} +``` + +### Agent States + +| State | Accept New Tasks? | Description | +|-------|-------------------|-------------| +| `idle` | Yes | Available, accepts immediately | +| `busy` | If quota allows | `current_tasks` < `max_concurrent_tasks` then queue, otherwise reject | +| `offline` | Messages queue | Messages stay in inbox, processed when agent comes online | + +State transitions: +- Startup → `idle` +- Receive task + accept → `busy` +- All tasks completed → `idle` +- Heartbeat timeout → `offline` + +## Communication Protocol + +### Message Types + +1. **message** — Instant notification, Q&A, chat +2. **task** — Work request requiring execution and result return +3. **task_update** — Status update for a previously sent task + +### Task Lifecycle (A2A Task Concept) + +``` +pending → accepted → working → completed + \→ rejected + \→ failed +``` + +### Message Format + +```json +{ + "id": "msg_1745659200_researcher", + "from": "researcher", + "to": "coder", + "timestamp": "2026-04-26T10:00:00Z", + "type": "task | message | task_update", + "ttl": 3, + "trace": ["researcher"], + "task": { + "id": "original_task_id", + "state": "pending | accepted | working | completed | failed | rejected", + "deadline": "2026-04-26T11:00:00Z" + }, + "content": { + "parts": [ + {"type": "text", "text": "..."}, + {"type": "data", "data": {}}, + {"type": "file", "path": "/path/to/file"} + ] + }, + "callback": { + "session_id": "feishu:user_123", + "channel": "feishu" + }, + "reply_to": "replied_message_id | null", + "metadata": {} +} +``` + +Fields: +- `ttl` — Time-to-live hop count. Decremented on each relay. Default 3. At 0, agent must handle itself, cannot delegate. +- `trace` — List of agent_ids this message has passed through. Prevents circular routing. +- `task` — Present for `task` and `task_update` types. Contains lifecycle state and optional deadline. +- `content.parts` — Structured content (A2A Message Parts concept). Supports text, data, and file types. +- `callback` — Original session info from the initiating channel. Carried through the task lifecycle so results route back to the correct user conversation. + +### Task Acceptance Criteria + +All conditions must be met for an agent to accept a task: +1. Sender is in `allow_from` list (`"*"` = accept all) +2. `current_tasks` count < `max_concurrent_tasks` +3. `deadline` has not expired (if present) +4. LLM judges it has the capability to complete the task + +Decision outcomes: +- Accept → reply `task_update {state: "accepted"}`, add to `current_tasks` +- Reject → reply `task_update {state: "rejected"}` with reason + +### Anti-Loop Mechanism + +Two fields prevent infinite agent-to-agent conversations: + +1. **TTL**: Decremented on each relay. At 0, no further delegation allowed. Default = 3 (max 3 hops: A→B→C→D). +2. **Trace**: Append agent_id on each relay. Reject forwarding to any agent already in trace. + +| Scenario | Protection | +|----------|-----------| +| A↔B mutual ping | Trace: B sees A in trace, rejects | +| A→B→C→A cycle | Trace: C sees A in trace, rejects | +| A→B→C→D→... infinite chain | TTL: reaches 0, stops | + +### User Experience: End-to-End Flow + +The user never interacts with mailbox directly. They communicate through their normal channel (Feishu, WeChat, etc.). The mailbox is invisible infrastructure. + +``` +User (Feishu) Agent A (researcher) Agent B (coder) +│ │ │ +│ "帮我写排序函数" │ │ +│ ──────────────────→ │ │ +│ │ LLM decides to delegate │ +│ │ │ +│ "我让 coder 去处理, │ │ +│ 完成后通知你" │ │ +│ ←────────────────── │ │ +│ │ │ +│ ...time passes...│ │ +│ │ │ +│ │ task { │ +│ │ callback: { │ +│ │ session_id: "feishu:user_123", +│ │ channel: "feishu" │ +│ │ } │ +│ │ } │ +│ │ ──────────────────────→ │ +│ │ │ B processes... +│ │ │ +│ │ task_update {completed} │ +│ │ ←────────────────────── │ +│ │ │ +│ MailboxChannel polls task_update │ +│ Routes to session "feishu:user_123" │ +│ LLM sees result in original conversation context │ +│ │ │ +│ "排序函数已完成: │ │ +│ sort_by_mtime()..." │ │ +│ ←────────────────── │ │ +``` + +The `callback` field carries the original channel session info through the entire task lifecycle. When the task_update arrives at Agent A's mailbox, the MailboxChannel restores the original `session_id` and `channel`, so the AgentLoop processes it in the correct conversation context and the LLM naturally responds to the user via Feishu. + +### Error Scenarios + +| Scenario | Handling | +|----------|----------| +| Target agent offline | Messages queue in inbox; processed when agent comes online | +| Heartbeat timeout | Registry marks agent `offline`; visible to other agents on next registry read | +| Agent crash during task | No `completed`/`failed` sent; sender can use `deadline` to detect timeout | +| Task rejected | Sender receives `rejected` with reason; decides next action | +| Task deadline expired | Receiver checks deadline on processing; rejects if expired | +| allow_from mismatch | Receiver discards message; optionally replies `rejected` with "unauthorized" | +| Registry concurrent write | Atomic writes (.tmp → rename); each agent only writes its own entry | + +## Implementation + +### New Components + +| Component | Type | Modifies Existing Code | +|-----------|------|----------------------| +| `MailboxManager` | New file (`nanobot/channels/mailbox_manager.py`) | No | +| `MailboxChannel` | New file (`nanobot/channels/mailbox.py`) | No | +| `MailboxConfig` | Pydantic model inside mailbox.py | No | +| Config section `channels.mailbox` | User config.json | No (additive) | + +### MailboxManager + +Low-level file operations with no nanobot dependencies: + +```python +class MailboxManager: + def __init__(self, mailboxes_root: Path): ... + + def register(self, agent_id: str, card: dict) -> None: + """Atomic write to registry (.tmp → rename)""" + + def heartbeat(self, agent_id: str) -> None: + """Update last_heartbeat in registry""" + + def update_status(self, agent_id: str, status: str, current_tasks: list[str] | None = None) -> None: + """Update agent status and current task list""" + + def send(self, from_id: str, to_id: str, msg: dict) -> None: + """Atomic write to to_id/inbox/{timestamp}_{from_id}.msg.json""" + + def poll(self, agent_id: str) -> list[dict]: + """Scan inbox/, return new messages sorted by timestamp""" + + def mark_processed(self, agent_id: str, filename: str) -> None: + """Move from inbox/ to processed/""" + + def list_online_agents(self) -> list[dict]: + """Read all online agents from registry""" + + def get_agent(self, agent_id: str) -> dict | None: + """Read single agent info from registry""" +``` + +### MailboxChannel + +Follows existing channel pattern (like telegram.py, feishu.py): + +```python +class MailboxChannel(BaseChannel): + def __init__(self, config: Any, bus: MessageBus): + if isinstance(config, dict): + config = MailboxConfig.model_validate(config) + super().__init__(config, bus) + self.config: MailboxConfig = config + self.manager = MailboxManager(Path(self.config.mailboxes_root).expanduser()) + self._running = False + + def _build_card(self) -> dict: + """Build Agent Card from mailbox config. Identity fields are user-configured.""" + return { + "agent_id": self.config.agent_id, + "description": self.config.description or "", + "capabilities": self.config.capabilities or [], + "status": "idle", + "allow_from": self.config.allow_from, + "max_concurrent_tasks": self.config.max_concurrent_tasks, + "current_tasks": [], + "registered_at": datetime.now(timezone.utc).isoformat(), + "last_heartbeat": datetime.now(timezone.utc).isoformat(), + } + + async def start(self) -> None: + """Register agent + start poll loop""" + self.manager.register(self.config.agent_id, self._build_card()) + self._running = True + asyncio.create_task(self._poll_loop()) + + async def _poll_loop(self) -> None: + """Periodically scan inbox, inject new messages into bus""" + while self._running: + messages = self.manager.poll(self.config.agent_id) + for msg in messages: + inbound = self._to_inbound(msg) + await self.bus.publish_inbound(inbound) + self.manager.mark_processed(self.config.agent_id, msg["filename"]) + self.manager.heartbeat(self.config.agent_id) + await asyncio.sleep(self.config.poll_interval) + + async def send_message(self, to_id: str, msg: dict) -> None: + """Send message to another agent's mailbox""" + # Anti-loop checks + if to_id in msg.get("trace", []): + logger.warning(f"Rejecting circular route: {to_id} already in trace") + return + if msg.get("ttl", 0) <= 0: + logger.warning("TTL exhausted, cannot forward") + return + msg["ttl"] = msg.get("ttl", 3) - 1 + msg.setdefault("trace", []).append(self.config.agent_id) + self.manager.send(self.config.agent_id, to_id, msg) + + def _to_inbound(self, msg: dict) -> InboundMessage: + """Convert mailbox JSON to standard InboundMessage""" + callback = msg.get("callback", {}) + return InboundMessage( + channel=callback.get("channel", "mailbox"), + sender=msg["from"], + content=self._extract_text(msg), + session_id=callback.get("session_id") or f"mailbox:{msg['from']}", + metadata={ + "mailbox_type": msg["type"], + "mailbox_task": msg.get("task"), + "mailbox_parts": msg.get("content", {}).get("parts"), + "mailbox_ttl": msg.get("ttl"), + "mailbox_trace": msg.get("trace"), + "reply_to": msg.get("reply_to"), + }, + ) + + async def stop(self) -> None: + self._running = False + self.manager.update_status(self.config.agent_id, "offline") + + @classmethod + def default_config(cls) -> dict[str, Any]: + return MailboxConfig().model_dump(by_alias=True) +``` + +### MailboxConfig + +All settings are self-contained within the mailbox channel config. + +```python +class MailboxConfig(Base): + enabled: bool = False + agent_id: str = "" + description: str = "" # optional, for agent discovery + capabilities: list[str] = [] # optional, for agent discovery + allow_from: list[str] = Field(default_factory=lambda: ["*"]) + max_concurrent_tasks: int = 3 + poll_interval: float = 5.0 + mailboxes_root: str = "~/.nanobot/mailboxes" +``` + +### Configuration + +In `~/.nanobot/config.json`: + +```json +{ + "channels": { + "feishu": { + "enabled": true, + "appId": "...", + "appSecret": "..." + }, + "mailbox": { + "enabled": true, + "agentId": "coder", + "description": "负责代码编写和修改的 agent", + "capabilities": ["code_write", "test_run", "review"], + "allowFrom": ["researcher"], + "maxConcurrentTasks": 3, + "pollInterval": 5, + "mailboxesRoot": "~/.nanobot/mailboxes" + } + } +} +``` + +Two agents running independently: + +```json +// Agent A config — researcher +{ + "channels": { + "mailbox": { + "enabled": true, + "agentId": "researcher", + "description": "负责信息检索和分析的 agent", + "capabilities": ["web_search", "code_analysis", "summarization"], + "allowFrom": ["*"] + } + } +} + +// Agent B config — coder +{ + "channels": { + "mailbox": { + "enabled": true, + "agentId": "coder", + "description": "负责代码编写和修改的 agent", + "capabilities": ["code_write", "test_run", "review"], + "allowFrom": ["researcher"] + } + } +} +``` + +## Design Principles + +- **Zero side effects**: Only new files, no modifications to existing code +- **Channel plugin pattern**: MailboxChannel follows the same interface as all other channels +- **Bus integration**: Mailbox messages become standard `InboundMessage` objects; AgentLoop is unaware of mailbox +- **Callback routing**: Original channel session is preserved through the task lifecycle for seamless user experience +- **Anti-loop by default**: TTL + trace prevents runaway agent conversations without configuration +- **Best-effort deadlines**: Optional `deadline` field for task timeout, not a blocking mechanism diff --git a/nanobot/channels/mailbox.py b/nanobot/channels/mailbox.py new file mode 100644 index 000000000..ab8f3b52c --- /dev/null +++ b/nanobot/channels/mailbox.py @@ -0,0 +1,291 @@ +"""Mailbox channel for inter-agent communication via filesystem.""" + +from __future__ import annotations + +import asyncio +import json +import os +import time +import uuid +from datetime import datetime, timezone +from pathlib import Path +from typing import Any + +from loguru import logger +from pydantic import Field + +from nanobot.bus.events import InboundMessage, OutboundMessage +from nanobot.channels.base import BaseChannel +from nanobot.config.schema import Base + + +# --------------------------------------------------------------------------- +# MailboxManager — low-level file operations +# --------------------------------------------------------------------------- + + +class MailboxManager: + """File-system operations for agent mailboxes.""" + + def __init__(self, mailboxes_root: Path) -> None: + self.root = Path(mailboxes_root) + self.root.mkdir(parents=True, exist_ok=True) + + # -- registry -- + + def _registry_path(self) -> Path: + return self.root / "_registry.json" + + def _read_registry(self) -> dict: + path = self._registry_path() + if not path.exists(): + return {} + return json.loads(path.read_text(encoding="utf-8")) + + def _write_registry(self, data: dict) -> None: + path = self._registry_path() + tmp = path.with_suffix(".tmp") + tmp.write_text(json.dumps(data, ensure_ascii=False, indent=2), encoding="utf-8") + os.replace(tmp, path) + + def _ensure_dirs(self, agent_id: str) -> None: + agent_dir = self.root / agent_id + (agent_dir / "inbox").mkdir(parents=True, exist_ok=True) + (agent_dir / "processed").mkdir(parents=True, exist_ok=True) + + def register(self, agent_id: str, card: dict) -> None: + self._ensure_dirs(agent_id) + registry = self._read_registry() + card = {**card} + card.setdefault("registered_at", datetime.now(timezone.utc).isoformat()) + card["last_heartbeat"] = datetime.now(timezone.utc).isoformat() + registry[agent_id] = card + self._write_registry(registry) + + def heartbeat(self, agent_id: str) -> None: + registry = self._read_registry() + if agent_id in registry: + registry[agent_id]["last_heartbeat"] = datetime.now(timezone.utc).isoformat() + self._write_registry(registry) + + def update_status( + self, agent_id: str, status: str, current_tasks: list[str] | None = None + ) -> None: + registry = self._read_registry() + if agent_id in registry: + registry[agent_id]["status"] = status + if current_tasks is not None: + registry[agent_id]["current_tasks"] = current_tasks + self._write_registry(registry) + + # -- message I/O -- + + def send(self, from_id: str, to_id: str, msg: dict) -> None: + self._ensure_dirs(to_id) + msg = {**msg} + ts = int(time.time() * 1000) + unique = uuid.uuid4().hex[:8] + filename = f"{ts}_{from_id}_{unique}.msg.json" + msg.setdefault("id", f"msg_{ts}_{from_id}") + msg.setdefault("from", from_id) + msg.setdefault("to", to_id) + msg.setdefault("timestamp", datetime.now(timezone.utc).isoformat()) + filepath = self.root / to_id / "inbox" / filename + tmp = filepath.with_suffix(".tmp") + tmp.write_text(json.dumps(msg, ensure_ascii=False), encoding="utf-8") + os.replace(tmp, filepath) + + def poll(self, agent_id: str) -> list[dict]: + inbox = self.root / agent_id / "inbox" + if not inbox.is_dir(): + return [] + messages = [] + for f in sorted(inbox.glob("*.msg.json")): + data = json.loads(f.read_text(encoding="utf-8")) + data["_filename"] = f.name + messages.append(data) + return messages + + def mark_processed(self, agent_id: str, filename: str) -> None: + src = self.root / agent_id / "inbox" / filename + dst = self.root / agent_id / "processed" / filename + if src.exists(): + os.replace(src, dst) + + # -- discovery -- + + def list_online_agents(self) -> list[dict]: + registry = self._read_registry() + return [card for card in registry.values() if card.get("status") != "offline"] + + def get_agent(self, agent_id: str) -> dict | None: + return self._read_registry().get(agent_id) + + +# --------------------------------------------------------------------------- +# MailboxConfig — channel configuration +# --------------------------------------------------------------------------- + + +class MailboxConfig(Base): + """Mailbox channel configuration.""" + + enabled: bool = False + agent_id: str = "" + description: str = "" + capabilities: list[str] = Field(default_factory=list) + allow_from: list[str] = Field(default_factory=lambda: ["*"]) + max_concurrent_tasks: int = 3 + poll_interval: float = 5.0 + mailboxes_root: str = "~/.nanobot/mailboxes" + + +# --------------------------------------------------------------------------- +# MailboxChannel — channel plugin +# --------------------------------------------------------------------------- + + +class MailboxChannel(BaseChannel): + """Channel plugin for inter-agent communication via filesystem mailboxes. + + The LLM sends messages to other agents using the existing MessageTool: + MessageTool(channel='mailbox', chat_id='', content='...') + + This produces an OutboundMessage routed to MailboxChannel.send(), + which writes to the target agent's mailbox. + """ + + name = "mailbox" + display_name = "Mailbox" + + def __init__(self, config: Any, bus: Any) -> None: + if isinstance(config, dict): + config = MailboxConfig.model_validate(config) + super().__init__(config, bus) + self.config: MailboxConfig = config + root = Path(self.config.mailboxes_root).expanduser() + self.manager = MailboxManager(root) + self._running = False + self._poll_task: asyncio.Task | None = None + + async def start(self) -> None: + self.manager.register(self.config.agent_id, self._build_card()) + self._running = True + self._poll_task = asyncio.create_task(self._poll_loop()) + + async def _poll_loop(self) -> None: + while self._running: + await self._poll_once() + self.manager.heartbeat(self.config.agent_id) + await asyncio.sleep(self.config.poll_interval) + + async def _poll_once(self) -> None: + messages = self.manager.poll(self.config.agent_id) + for msg in messages: + filename = msg.pop("_filename", "") + try: + callback = msg.get("callback") + if callback and callback.get("channel") and callback.get("chat_id"): + await self._handle_callback_message(msg, callback) + else: + await self._handle_message( + sender_id=msg["from"], + chat_id=msg["from"], + content=self._extract_text(msg), + metadata=self._build_metadata(msg), + ) + except Exception: + logger.exception("Error processing mailbox message {}", filename) + if filename: + self.manager.mark_processed(self.config.agent_id, filename) + + async def _handle_callback_message(self, msg: dict, callback: dict) -> None: + if not self.is_allowed(msg["from"]): + logger.warning("Mailbox: access denied from {}", msg["from"]) + return + inbound = InboundMessage( + channel=callback["channel"], + sender_id=msg["from"], + chat_id=callback["chat_id"], + content=self._extract_text(msg), + metadata=self._build_metadata(msg), + session_key_override=callback.get("session_id"), + ) + await self.bus.publish_inbound(inbound) + + def _extract_text(self, msg: dict) -> str: + parts = msg.get("content", {}).get("parts", []) + texts = [p["text"] for p in parts if p.get("type") == "text" and "text" in p] + return "\n".join(texts) if texts else "" + + def _build_metadata(self, msg: dict) -> dict[str, Any]: + return { + "mailbox_type": msg.get("type", "message"), + "mailbox_task": msg.get("task"), + "mailbox_parts": msg.get("content", {}).get("parts"), + "mailbox_ttl": msg.get("ttl"), + "mailbox_trace": msg.get("trace"), + "reply_to": msg.get("reply_to"), + } + + async def send(self, msg: OutboundMessage) -> None: + target = msg.chat_id + meta = msg.metadata or {} + trace: list[str] = list(meta.get("mailbox_trace", [])) + ttl: int = meta.get("mailbox_ttl", 3) + + # Anti-loop: check circular route + if target in trace: + logger.warning("Rejecting circular route: {} already in trace", target) + return + # Anti-loop: check TTL exhausted + if ttl <= 0: + logger.warning("TTL exhausted, cannot forward to {}", target) + return + + trace.append(self.config.agent_id) + ttl -= 1 + + mailbox_msg: dict[str, Any] = { + "type": meta.get("mailbox_type", "message"), + "from": self.config.agent_id, + "to": target, + "content": {"parts": [{"type": "text", "text": msg.content}]}, + "ttl": ttl, + "trace": trace, + } + if meta.get("mailbox_task"): + mailbox_msg["task"] = meta["mailbox_task"] + if meta.get("mailbox_callback"): + mailbox_msg["callback"] = meta["mailbox_callback"] + self.manager.send(self.config.agent_id, target, mailbox_msg) + + async def stop(self) -> None: + if not self._running: + return + self._running = False + if self._poll_task: + self._poll_task.cancel() + try: + await self._poll_task + except asyncio.CancelledError: + pass + self._poll_task = None + self.manager.update_status(self.config.agent_id, "offline") + + def _build_card(self) -> dict: + return { + "agent_id": self.config.agent_id, + "description": self.config.description, + "capabilities": self.config.capabilities, + "status": "idle", + "allow_from": self.config.allow_from, + "max_concurrent_tasks": self.config.max_concurrent_tasks, + "current_tasks": [], + "registered_at": datetime.now(timezone.utc).isoformat(), + "last_heartbeat": datetime.now(timezone.utc).isoformat(), + } + + @classmethod + def default_config(cls) -> dict[str, Any]: + return MailboxConfig().model_dump(by_alias=True) diff --git a/scripts/smoke_test_mailbox.py b/scripts/smoke_test_mailbox.py new file mode 100644 index 000000000..9bf6c26e7 --- /dev/null +++ b/scripts/smoke_test_mailbox.py @@ -0,0 +1,370 @@ +"""Smoke test for the mailbox channel plugin. + +Tests the full lifecycle of two agents communicating via filesystem mailbox, +without requiring LLM or API keys. + +Usage: + uv run python scripts/smoke_test_mailbox.py +""" + +import asyncio +import json +import shutil +import sys +import tempfile +import time +from pathlib import Path + +from nanobot.bus.events import OutboundMessage +from nanobot.channels.mailbox import MailboxChannel, MailboxManager + + +class BusMock: + """Mock bus that captures published InboundMessages.""" + + def __init__(self, name: str): + self.name = name + self.messages: list = [] + + async def publish_inbound(self, msg): + self.messages.append(msg) + + +def make_channel(bus: BusMock, root: Path, agent_id: str, **overrides) -> MailboxChannel: + cfg = { + "enabled": True, + "agentId": agent_id, + "description": f"{agent_id} agent for smoke test", + "capabilities": ["test"], + "allowFrom": ["*"], + "maxConcurrentTasks": 3, + "pollInterval": 0.05, + "mailboxesRoot": str(root), + } + cfg.update(overrides) + return MailboxChannel(cfg, bus) + + +def header(text: str) -> None: + print(f"\n{'='*60}") + print(f" {text}") + print(f"{'='*60}") + + +def ok(test_name: str) -> None: + print(f" [PASS] {test_name}") + + +def fail(test_name: str, detail: str = "") -> None: + print(f" [FAIL] {test_name}") + if detail: + print(f" {detail}") + + +async def run_smoke_tests() -> bool: + tmpdir = tempfile.mkdtemp(prefix="mailbox_smoke_") + root = Path(tmpdir) + all_passed = True + + try: + # --------------------------------------------------------------- + header("1. Agent Registration") + # --------------------------------------------------------------- + bus_a = BusMock("researcher") + bus_b = BusMock("coder") + ch_a = make_channel(bus_a, root, "researcher") + ch_b = make_channel(bus_b, root, "coder") + + await ch_a.start() + await ch_b.start() + + mgr = MailboxManager(root) + registry = mgr._read_registry() + + if "researcher" in registry and "coder" in registry: + ok("Both agents registered in _registry.json") + else: + fail("Agent registration", f"Got keys: {list(registry.keys())}") + all_passed = False + + agents = mgr.list_online_agents() + ids = {a["agent_id"] for a in agents} + if ids == {"researcher", "coder"}: + ok("list_online_agents returns both agents") + else: + fail("list_online_agents", f"Got: {ids}") + all_passed = False + + # --------------------------------------------------------------- + header("2. Agent A → Agent B messaging") + # --------------------------------------------------------------- + outbound = OutboundMessage( + channel="mailbox", + chat_id="coder", + content="Please write a sort function for the project.", + ) + await ch_a.send(outbound) + + # Verify file in B's inbox + inbox_files = list((root / "coder" / "inbox").glob("*.msg.json")) + if len(inbox_files) == 1: + ok("Message file created in coder's inbox") + else: + fail("Message file", f"Expected 1 file, got {len(inbox_files)}") + all_passed = False + + # Verify message content + data = json.loads(inbox_files[0].read_text()) + if data["from"] == "researcher" and "sort function" in data["content"]["parts"][0]["text"]: + ok("Message content correct (from + text)") + else: + fail("Message content", f"Got: {data}") + all_passed = False + + # B polls and receives + await ch_b._poll_once() + if len(bus_b.messages) == 1: + msg = bus_b.messages[0] + if msg.sender_id == "researcher" and "sort function" in msg.content: + ok("Agent B received InboundMessage with correct content") + else: + fail("InboundMessage content", f"sender={msg.sender_id}, content={msg.content}") + all_passed = False + else: + fail("Agent B receive", f"Expected 1 message, got {len(bus_b.messages)}") + all_passed = False + + # Verify inbox processed + if len(list((root / "coder" / "inbox").glob("*.msg.json"))) == 0: + ok("Inbox cleared after processing") + else: + fail("Inbox not cleared") + all_passed = False + + # --------------------------------------------------------------- + header("3. Agent B → Agent A response") + # --------------------------------------------------------------- + bus_b.messages.clear() + + response = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="Sort function completed: sort_by_mtime() at utils.py:42", + ) + await ch_b.send(response) + await ch_a._poll_once() + + if len(bus_a.messages) == 1: + msg = bus_a.messages[0] + if msg.sender_id == "coder" and "sort_by_mtime" in msg.content: + ok("Agent A received response from coder") + else: + fail("Response content", f"sender={msg.sender_id}, content={msg.content}") + all_passed = False + else: + fail("Agent A response", f"Expected 1, got {len(bus_a.messages)}") + all_passed = False + + # --------------------------------------------------------------- + header("4. Callback routing (Feishu session)") + # --------------------------------------------------------------- + bus_a.messages.clear() + bus_b.messages.clear() + + # A sends task with callback + task_msg = OutboundMessage( + channel="mailbox", + chat_id="coder", + content="Analyze the codebase", + metadata={ + "mailbox_callback": { + "channel": "feishu", + "chat_id": "user_feishu_123", + "session_id": "feishu:user_feishu_123", + }, + }, + ) + await ch_a.send(task_msg) + + # B receives + await ch_b._poll_once() + if len(bus_b.messages) == 1: + ok("Agent B received task with callback") + else: + fail("Task receive") + all_passed = False + + # B responds with callback forwarded + bus_b.messages.clear() + response_with_callback = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="Analysis complete: 3 modules, 1500 lines", + metadata={ + "mailbox_callback": { + "channel": "feishu", + "chat_id": "user_feishu_123", + "session_id": "feishu:user_feishu_123", + }, + }, + ) + await ch_b.send(response_with_callback) + await ch_a._poll_once() + + if len(bus_a.messages) == 1: + update = bus_a.messages[0] + if (update.channel == "feishu" + and update.chat_id == "user_feishu_123" + and update.session_key_override == "feishu:user_feishu_123"): + ok("Callback routes to Feishu session correctly") + else: + fail("Callback routing", f"channel={update.channel}, chat_id={update.chat_id}") + all_passed = False + else: + fail("Callback receive", f"Expected 1, got {len(bus_a.messages)}") + all_passed = False + + # --------------------------------------------------------------- + header("5. Anti-loop: TTL and trace") + # --------------------------------------------------------------- + bus_a.messages.clear() + + # Send a message, then check that ttl is decremented and trace populated + loop_outbound = OutboundMessage( + channel="mailbox", + chat_id="coder", + content="test anti-loop", + metadata={"mailbox_ttl": 3, "mailbox_trace": []}, + ) + await ch_a.send(loop_outbound) + + inbox_files = list((root / "coder" / "inbox").glob("*.msg.json")) + latest = sorted(inbox_files)[-1] + data = json.loads(latest.read_text()) + if data["ttl"] == 2 and "researcher" in data["trace"]: + ok("TTL decremented (3→2) and trace populated") + else: + fail("Anti-loop", f"ttl={data['ttl']}, trace={data['trace']}") + all_passed = False + + # Test circular route rejection + circular_outbound = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="should be rejected", + metadata={"mailbox_ttl": 3, "mailbox_trace": ["researcher"]}, + ) + before_count = len(list((root / "researcher" / "inbox").glob("*.msg.json"))) + await ch_a.send(circular_outbound) + after_count = len(list((root / "researcher" / "inbox").glob("*.msg.json"))) + if after_count == before_count: + ok("Circular route rejected (trace contains target)") + else: + fail("Circular route", "Message was written despite circular trace") + all_passed = False + + # Test TTL=0 rejection + exhausted_outbound = OutboundMessage( + channel="mailbox", + chat_id="coder", + content="should be rejected", + metadata={"mailbox_ttl": 0, "mailbox_trace": []}, + ) + before_count = len(list((root / "coder" / "inbox").glob("*.msg.json"))) + await ch_a.send(exhausted_outbound) + after_count = len(list((root / "coder" / "inbox").glob("*.msg.json"))) + if after_count == before_count: + ok("TTL=0 rejected (exhausted hop count)") + else: + fail("TTL=0", "Message was written despite TTL=0") + all_passed = False + + # --------------------------------------------------------------- + header("6. allowFrom access control") + # --------------------------------------------------------------- + bus_c = BusMock("restricted") + ch_c = make_channel(bus_c, root, "restricted", allowFrom=["researcher"]) + await ch_c.start() + + # Stranger sends message — should be blocked + mgr.send("stranger", "restricted", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "should be blocked"}]}, + }) + await ch_c._poll_once() + + if len(bus_c.messages) == 0: + ok("Stranger blocked by allowFrom=['researcher']") + else: + fail("allowFrom", f"Got {len(bus_c.messages)} messages, expected 0") + all_passed = False + + # Researcher sends message — should pass + bus_c.messages.clear() + mgr.send("researcher", "restricted", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "should pass"}]}, + }) + await ch_c._poll_once() + + if len(bus_c.messages) == 1: + ok("Researcher allowed by allowFrom=['researcher']") + else: + fail("allowFrom allow", f"Got {len(bus_c.messages)} messages, expected 1") + all_passed = False + + await ch_c.stop() + + # --------------------------------------------------------------- + header("7. Heartbeat and status") + # --------------------------------------------------------------- + before_hb = mgr._read_registry()["researcher"]["last_heartbeat"] + time.sleep(0.02) + mgr.heartbeat("researcher") + after_hb = mgr._read_registry()["researcher"]["last_heartbeat"] + if after_hb > before_hb: + ok("Heartbeat updates timestamp") + else: + fail("Heartbeat") + all_passed = False + + # --------------------------------------------------------------- + header("8. Stop and offline") + # --------------------------------------------------------------- + await ch_a.stop() + await ch_b.stop() + + registry = mgr._read_registry() + if registry["researcher"]["status"] == "offline" and registry["coder"]["status"] == "offline": + ok("Both agents marked offline after stop()") + else: + fail("Offline status", f"researcher={registry['researcher']['status']}, coder={registry['coder']['status']}") + all_passed = False + + online = mgr.list_online_agents() + agent_ids = {a["agent_id"] for a in online} + if "researcher" not in agent_ids and "coder" not in agent_ids: + ok("Offline agents excluded from list_online_agents()") + else: + fail("Online list", f"Still shows: {agent_ids}") + all_passed = False + + # --------------------------------------------------------------- + # Summary + # --------------------------------------------------------------- + print(f"\n{'='*60}") + if all_passed: + print(" ALL SMOKE TESTS PASSED") + else: + print(" SOME TESTS FAILED — see above for details") + print(f"{'='*60}\n") + + return all_passed + + finally: + shutil.rmtree(tmpdir, ignore_errors=True) + + +if __name__ == "__main__": + passed = asyncio.run(run_smoke_tests()) + sys.exit(0 if passed else 1) diff --git a/tests/channels/test_mailbox.py b/tests/channels/test_mailbox.py new file mode 100644 index 000000000..4e42daffe --- /dev/null +++ b/tests/channels/test_mailbox.py @@ -0,0 +1,548 @@ +"""Tests for MailboxManager (file operations) and MailboxChannel.""" + +import asyncio +import json +import time +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from nanobot.bus.events import InboundMessage, OutboundMessage +from nanobot.channels.mailbox import MailboxManager, MailboxChannel, MailboxConfig + + +# --- MailboxManager Tests --- + +@pytest.fixture +def root(tmp_path: Path) -> Path: + mailboxes = tmp_path / "mailboxes" + mailboxes.mkdir() + return mailboxes + + +@pytest.fixture +def mgr(root: Path) -> MailboxManager: + return MailboxManager(root) + + +class TestRegister: + def test_register_creates_agent_entry(self, mgr: MailboxManager, root: Path): + card = {"agent_id": "researcher", "description": "test agent"} + mgr.register("researcher", card) + registry = json.loads((root / "_registry.json").read_text()) + assert "researcher" in registry + assert registry["researcher"]["agent_id"] == "researcher" + + def test_register_creates_directories(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + assert (root / "coder" / "inbox").is_dir() + assert (root / "coder" / "processed").is_dir() + + def test_register_overwrite(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder", "status": "idle"}) + mgr.register("coder", {"agent_id": "coder", "status": "busy"}) + registry = json.loads((root / "_registry.json").read_text()) + assert registry["coder"]["status"] == "busy" + + +class TestHeartbeat: + def test_heartbeat_updates_timestamp(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + before = json.loads((root / "_registry.json").read_text())["coder"]["last_heartbeat"] + time.sleep(0.01) + mgr.heartbeat("coder") + after = json.loads((root / "_registry.json").read_text())["coder"]["last_heartbeat"] + assert after >= before + + +class TestUpdateStatus: + def test_update_status(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder", "status": "idle"}) + mgr.update_status("coder", "busy", current_tasks=["task_1"]) + registry = json.loads((root / "_registry.json").read_text()) + assert registry["coder"]["status"] == "busy" + assert registry["coder"]["current_tasks"] == ["task_1"] + + +class TestSendAndPoll: + def test_send_creates_message_file(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + mgr.register("researcher", {"agent_id": "researcher"}) + msg = {"type": "message", "content": {"parts": [{"type": "text", "text": "hello"}]}} + mgr.send("researcher", "coder", msg) + files = list((root / "coder" / "inbox").glob("*.msg.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["from"] == "researcher" + assert data["to"] == "coder" + + def test_poll_returns_new_messages_sorted(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + mgr.register("a1", {"agent_id": "a1"}) + mgr.register("a2", {"agent_id": "a2"}) + mgr.send("a1", "coder", {"type": "message", "content": {"parts": []}}) + time.sleep(0.01) + mgr.send("a2", "coder", {"type": "task", "content": {"parts": []}}) + messages = mgr.poll("coder") + assert len(messages) == 2 + assert messages[0]["from"] == "a1" + assert messages[1]["from"] == "a2" + + def test_mark_processed_moves_file(self, mgr: MailboxManager, root: Path): + mgr.register("coder", {"agent_id": "coder"}) + mgr.register("researcher", {"agent_id": "researcher"}) + mgr.send("researcher", "coder", {"type": "message", "content": {"parts": []}}) + messages = mgr.poll("coder") + mgr.mark_processed("coder", messages[0]["_filename"]) + assert len(list((root / "coder" / "inbox").glob("*.msg.json"))) == 0 + assert len(list((root / "coder" / "processed").glob("*.msg.json"))) == 1 + + +class TestListAndGetAgents: + def test_list_online_agents(self, mgr: MailboxManager, root: Path): + mgr.register("researcher", {"agent_id": "researcher", "status": "idle"}) + mgr.register("coder", {"agent_id": "coder", "status": "busy"}) + agents = mgr.list_online_agents() + ids = {a["agent_id"] for a in agents} + assert ids == {"researcher", "coder"} + + def test_get_agent_not_found(self, mgr: MailboxManager): + assert mgr.get_agent("nonexistent") is None + + +# --- MailboxChannel & integration Tests --- + +def _make_channel(bus, root, agent_id, **overrides): + """Create a MailboxChannel with sensible defaults.""" + cfg = { + "enabled": True, + "agent_id": agent_id, + "description": f"Test agent {agent_id}", + "mailboxes_root": str(root), + "poll_interval": 0.05, + "allow_from": ["*"], + } + cfg.update(overrides) + return MailboxChannel(cfg, bus) + + +class TestConfig: + def test_default_config(self): + cfg = MailboxConfig() + assert cfg.enabled is False + assert cfg.agent_id == "" + assert cfg.allow_from == ["*"] + + def test_config_from_dict(self): + """Validate from dict with camelCase aliases.""" + cfg = MailboxConfig.model_validate({ + "enabled": True, + "agentId": "coder", + "allowFrom": ["researcher"], + "pollInterval": 2.0, + }) + assert cfg.enabled is True + assert cfg.agent_id == "coder" + assert cfg.allow_from == ["researcher"] + assert cfg.poll_interval == 2.0 + + +class TestChannelAttributes: + def test_name(self, root: Path): + bus = MagicMock() + ch = _make_channel(bus, root, "test") + assert ch.name == "mailbox" + + def test_display_name(self, root: Path): + bus = MagicMock() + ch = _make_channel(bus, root, "test") + assert ch.display_name == "Mailbox" + + +class TestStartAndStop: + @pytest.mark.asyncio + async def test_start_registers_agent(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder", description="A coding agent") + try: + await ch.start() + card = ch.manager.get_agent("coder") + assert card is not None + assert card["agent_id"] == "coder" + assert card["description"] == "A coding agent" + finally: + ch._running = False + if ch._poll_task: + ch._poll_task.cancel() + try: + await ch._poll_task + except asyncio.CancelledError: + pass + + @pytest.mark.asyncio + async def test_stop_marks_offline(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder") + await ch.start() + await ch.stop() + card = ch.manager.get_agent("coder") + assert card["status"] == "offline" + + @pytest.mark.asyncio + async def test_stop_idempotent(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder") + await ch.start() + await ch.stop() + # Calling stop() a second time should not raise + await ch.stop() + + +class TestPollAndInbound: + @pytest.mark.asyncio + async def test_poll_delivers_inbound_message(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder") + await ch.start() + try: + # Simulate researcher sending a message to coder + ch.manager.send("researcher", "coder", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "Hello from researcher"}]}, + }) + # Manually trigger a poll + await ch._poll_once() + bus.publish_inbound.assert_awaited_once() + inbound: InboundMessage = bus.publish_inbound.call_args[0][0] + assert inbound.channel == "mailbox" + assert inbound.sender_id == "researcher" + assert inbound.chat_id == "researcher" + assert "Hello from researcher" in inbound.content + finally: + await ch.stop() + + @pytest.mark.asyncio + async def test_poll_with_callback_routes_to_original_session(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder") + await ch.start() + try: + # Message with callback routing info + ch.manager.send("researcher", "coder", { + "type": "message", + "from": "researcher", + "content": {"parts": [{"type": "text", "text": "Task result"}]}, + "callback": { + "channel": "feishu", + "chat_id": "oc_abc123", + "session_id": "sess_xyz", + }, + }) + await ch._poll_once() + bus.publish_inbound.assert_awaited_once() + inbound: InboundMessage = bus.publish_inbound.call_args[0][0] + assert inbound.channel == "feishu" + assert inbound.chat_id == "oc_abc123" + assert inbound.session_key_override == "sess_xyz" + finally: + await ch.stop() + + @pytest.mark.asyncio + async def test_poll_respects_allow_from(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder", allow_from=["researcher"]) + await ch.start() + try: + # Message from a stranger should be blocked + ch.manager.send("stranger", "coder", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "Hi"}]}, + }) + await ch._poll_once() + bus.publish_inbound.assert_not_awaited() + finally: + await ch.stop() + + @pytest.mark.asyncio + async def test_poll_marks_processed(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder") + await ch.start() + try: + ch.manager.send("researcher", "coder", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "hello"}]}, + }) + await ch._poll_once() + # Inbox should be empty + assert len(ch.manager.poll("coder")) == 0 + # Processed dir should have the file + processed = list((root / "coder" / "processed").glob("*.msg.json")) + assert len(processed) == 1 + finally: + await ch.stop() + + +class TestSend: + @pytest.mark.asyncio + async def test_send_writes_to_target_mailbox(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder") + await ch.start() + try: + msg = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="Hello researcher", + ) + await ch.send(msg) + # Check target inbox + files = list((root / "researcher" / "inbox").glob("*.msg.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["from"] == "coder" + assert data["to"] == "researcher" + assert data["ttl"] == 2 # started at 3, decremented to 2 + assert data["trace"] == ["coder"] + finally: + await ch.stop() + + @pytest.mark.asyncio + async def test_send_uses_existing_message_tool(self, root: Path): + bus = MagicMock() + bus.publish_inbound = AsyncMock() + ch = _make_channel(bus, root, "coder") + await ch.start() + try: + # Simulate what MessageTool would produce — metadata carrying task info + msg = OutboundMessage( + channel="mailbox", + chat_id="researcher", + content="Here is the result", + metadata={ + "mailbox_task": "analyze_code", + "mailbox_type": "task_result", + }, + ) + await ch.send(msg) + files = list((root / "researcher" / "inbox").glob("*.msg.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + assert data["type"] == "task_result" + assert data["task"] == "analyze_code" + finally: + await ch.stop() + + +class TestTwoAgentCommunication: + @pytest.mark.asyncio + async def test_agent_a_sends_agent_b_receives(self, root: Path): + bus_a = MagicMock() + bus_a.publish_inbound = AsyncMock() + bus_b = MagicMock() + bus_b.publish_inbound = AsyncMock() + + ch_a = _make_channel(bus_a, root, "alpha") + ch_b = _make_channel(bus_b, root, "beta") + await ch_a.start() + await ch_b.start() + try: + # Alpha sends to Beta + out = OutboundMessage( + channel="mailbox", + chat_id="beta", + content="Hello Beta, this is Alpha", + ) + await ch_a.send(out) + + # Beta polls and receives + await ch_b._poll_once() + bus_b.publish_inbound.assert_awaited_once() + inbound: InboundMessage = bus_b.publish_inbound.call_args[0][0] + assert inbound.channel == "mailbox" + assert inbound.sender_id == "alpha" + assert "Hello Beta" in inbound.content + finally: + await ch_a.stop() + await ch_b.stop() + + @pytest.mark.asyncio + async def test_agent_b_sends_response_back(self, root: Path): + bus_a = MagicMock() + bus_a.publish_inbound = AsyncMock() + bus_b = MagicMock() + bus_b.publish_inbound = AsyncMock() + + ch_a = _make_channel(bus_a, root, "alpha") + ch_b = _make_channel(bus_b, root, "beta") + await ch_a.start() + await ch_b.start() + try: + # Beta sends to Alpha + out = OutboundMessage( + channel="mailbox", + chat_id="alpha", + content="Response from Beta", + ) + await ch_b.send(out) + + # Alpha polls and receives + await ch_a._poll_once() + bus_a.publish_inbound.assert_awaited_once() + inbound: InboundMessage = bus_a.publish_inbound.call_args[0][0] + assert inbound.sender_id == "beta" + assert "Response from Beta" in inbound.content + finally: + await ch_a.stop() + await ch_b.stop() + + @pytest.mark.asyncio + async def test_callback_routes_to_original_feishu_session(self, root: Path): + """A sends task with callback -> B receives -> B responds with callback -> A receives routed to feishu session.""" + bus_a = MagicMock() + bus_a.publish_inbound = AsyncMock() + bus_b = MagicMock() + bus_b.publish_inbound = AsyncMock() + + ch_a = _make_channel(bus_a, root, "alpha") + ch_b = _make_channel(bus_b, root, "beta") + await ch_a.start() + await ch_b.start() + try: + # Alpha sends a task to Beta with a callback to feishu + out = OutboundMessage( + channel="mailbox", + chat_id="beta", + content="Analyze this code", + metadata={ + "mailbox_task": "code_review", + "mailbox_callback": { + "channel": "feishu", + "chat_id": "oc_feishu_chat", + "session_id": "sess_feishu_123", + }, + }, + ) + await ch_a.send(out) + + # Beta polls and receives + await ch_b._poll_once() + bus_b.publish_inbound.assert_awaited_once() + inbound_b: InboundMessage = bus_b.publish_inbound.call_args[0][0] + assert "Analyze this code" in inbound_b.content + + # Beta responds, echoing the callback + out_b = OutboundMessage( + channel="mailbox", + chat_id="alpha", + content="Code review complete", + metadata={ + "mailbox_task": "code_review", + "mailbox_callback": { + "channel": "feishu", + "chat_id": "oc_feishu_chat", + "session_id": "sess_feishu_123", + }, + }, + ) + await ch_b.send(out_b) + + # Alpha polls — the callback routes to feishu channel + await ch_a._poll_once() + bus_a.publish_inbound.assert_awaited_once() + inbound_a: InboundMessage = bus_a.publish_inbound.call_args[0][0] + assert inbound_a.channel == "feishu" + assert inbound_a.chat_id == "oc_feishu_chat" + assert inbound_a.session_key_override == "sess_feishu_123" + assert "Code review complete" in inbound_a.content + finally: + await ch_a.stop() + await ch_b.stop() + + @pytest.mark.asyncio + async def test_anti_loop_trace(self, root: Path): + """Message with existing trace preserved in metadata.""" + bus_a = MagicMock() + bus_a.publish_inbound = AsyncMock() + + ch_a = _make_channel(bus_a, root, "alpha") + await ch_a.start() + try: + # Send with pre-existing trace + out = OutboundMessage( + channel="mailbox", + chat_id="beta", + content="Forwarding along", + metadata={ + "mailbox_trace": ["origin", "relay1"], + "mailbox_ttl": 5, + }, + ) + await ch_a.send(out) + files = list((root / "beta" / "inbox").glob("*.msg.json")) + assert len(files) == 1 + data = json.loads(files[0].read_text()) + # Trace should be: origin, relay1, alpha + assert data["trace"] == ["origin", "relay1", "alpha"] + assert data["ttl"] == 4 # decremented from 5 + finally: + await ch_a.stop() + + @pytest.mark.asyncio + async def test_allow_from_blocks_unauthorized(self, root: Path): + """allowFrom=['coder'] blocks stranger.""" + bus = MagicMock() + bus.publish_inbound = AsyncMock() + + ch = _make_channel(bus, root, "coder", allow_from=["coder"]) + await ch.start() + try: + # Stranger sends to coder + ch.manager.send("stranger", "coder", { + "type": "message", + "content": {"parts": [{"type": "text", "text": "Spam"}]}, + }) + await ch._poll_once() + bus.publish_inbound.assert_not_awaited() + finally: + await ch.stop() + + @pytest.mark.asyncio + async def test_registry_discovery(self, root: Path): + """Both agents register and can discover each other.""" + bus_a = MagicMock() + bus_a.publish_inbound = AsyncMock() + bus_b = MagicMock() + bus_b.publish_inbound = AsyncMock() + + ch_a = _make_channel(bus_a, root, "alpha", description="Agent Alpha") + ch_b = _make_channel(bus_b, root, "beta", description="Agent Beta") + await ch_a.start() + await ch_b.start() + try: + agents = ch_a.manager.list_online_agents() + ids = {a["agent_id"] for a in agents} + assert "alpha" in ids + assert "beta" in ids + + # Alpha can look up Beta's card + beta_card = ch_a.manager.get_agent("beta") + assert beta_card is not None + assert beta_card["description"] == "Agent Beta" + + # Beta can look up Alpha's card + alpha_card = ch_b.manager.get_agent("alpha") + assert alpha_card is not None + assert alpha_card["description"] == "Agent Alpha" + finally: + await ch_a.stop() + await ch_b.stop()