"""Tests for MCP connection lifecycle in AgentLoop.""" from __future__ import annotations import asyncio from contextlib import AsyncExitStack from typing import Any from unittest.mock import MagicMock import pytest from nanobot.agent.loop import AgentLoop from nanobot.agent.tools import mcp as mcp_runtime from nanobot.agent.tools.base import Tool from nanobot.bus.queue import MessageBus from nanobot.config.loader import load_config, save_config from nanobot.config.schema import MCPServerConfig class _FakeMcpTool(Tool): def __init__(self, name: str) -> None: self._name = name @property def name(self) -> str: return self._name @property def description(self) -> str: return "fake MCP tool" @property def parameters(self) -> dict[str, Any]: return {"type": "object", "properties": {}} async def execute(self, **_kwargs: Any) -> str: return "ok" def _make_loop(tmp_path, *, mcp_servers: dict | None = None) -> AgentLoop: bus = MessageBus() provider = MagicMock() provider.get_default_model.return_value = "test-model" provider.generation.max_tokens = 4096 return AgentLoop( bus=bus, provider=provider, workspace=tmp_path, model="test-model", mcp_servers=mcp_servers or {"test": object()}, ) @pytest.mark.asyncio async def test_connect_mcp_retries_when_no_servers_connect(tmp_path, monkeypatch: pytest.MonkeyPatch): loop = _make_loop(tmp_path) attempts = 0 async def _fake_connect(_servers, _registry): nonlocal attempts attempts += 1 return {} monkeypatch.setattr("nanobot.agent.tools.mcp.connect_mcp_servers", _fake_connect) await loop._connect_mcp() await loop._connect_mcp() assert attempts == 2 assert loop._mcp_connected is False assert loop._mcp_stacks == {} @pytest.mark.asyncio async def test_reload_mcp_servers_adds_and_removes_tools_without_restart( tmp_path, monkeypatch: pytest.MonkeyPatch, ): config_path = tmp_path / "config.json" monkeypatch.setattr("nanobot.config.loader._current_config_path", config_path) config = load_config() config.tools.mcp_servers["browserbase"] = MCPServerConfig( type="stdio", command="browserbase-mcp", ) save_config(config) closed: list[str] = [] async def _mark_closed(name: str) -> None: closed.append(name) async def _fake_connect(servers, registry): stacks = {} for name in servers: registry.register(_FakeMcpTool(f"mcp_{name}_navigate")) stack = AsyncExitStack() await stack.__aenter__() stack.push_async_callback(_mark_closed, name) stacks[name] = stack return stacks monkeypatch.setattr("nanobot.agent.tools.mcp.connect_mcp_servers", _fake_connect) loop = _make_loop(tmp_path, mcp_servers={}) added = await mcp_runtime.reload_servers(loop, loop.tools) assert added["ok"] is True assert added["added"] == ["browserbase"] assert loop.tools.has("mcp_browserbase_navigate") assert "browserbase" in loop._mcp_stacks config = load_config() del config.tools.mcp_servers["browserbase"] save_config(config) removed = await mcp_runtime.reload_servers(loop, loop.tools) assert removed["ok"] is True assert removed["removed"] == ["browserbase"] assert not loop.tools.has("mcp_browserbase_navigate") assert "browserbase" not in loop._mcp_stacks assert closed == ["browserbase"] @pytest.mark.asyncio async def test_request_mcp_reload_reaches_runtime_control_without_restart( tmp_path, monkeypatch: pytest.MonkeyPatch, ): config_path = tmp_path / "config.json" monkeypatch.setattr("nanobot.config.loader._current_config_path", config_path) config = load_config() config.tools.mcp_servers["browserbase"] = MCPServerConfig( type="stdio", command="browserbase-mcp", ) save_config(config) closed: list[str] = [] async def _mark_closed(name: str) -> None: closed.append(name) async def _fake_connect(servers, registry): stacks = {} for name in servers: registry.register(_FakeMcpTool(f"mcp_{name}_navigate")) stack = AsyncExitStack() await stack.__aenter__() stack.push_async_callback(_mark_closed, name) stacks[name] = stack return stacks monkeypatch.setattr("nanobot.agent.tools.mcp.connect_mcp_servers", _fake_connect) loop = _make_loop(tmp_path, mcp_servers={}) async def _handle_one_runtime_control() -> None: msg = await loop.bus.consume_inbound() handled = await mcp_runtime.handle_runtime_control(loop, msg, loop.tools) assert handled is True consumer = asyncio.create_task(_handle_one_runtime_control()) result = await mcp_runtime.request_mcp_reload(loop.bus, timeout=2.0) await consumer assert result["ok"] is True assert result["added"] == ["browserbase"] assert result["requires_restart"] is False assert loop.tools.has("mcp_browserbase_navigate") config = load_config() del config.tools.mcp_servers["browserbase"] save_config(config) consumer = asyncio.create_task(_handle_one_runtime_control()) result = await mcp_runtime.request_mcp_reload(loop.bus, timeout=2.0) await consumer assert result["ok"] is True assert result["removed"] == ["browserbase"] assert result["requires_restart"] is False assert not loop.tools.has("mcp_browserbase_navigate") assert closed == ["browserbase"] @pytest.mark.asyncio async def test_reload_mcp_servers_retries_configured_server_without_live_stack( tmp_path, monkeypatch: pytest.MonkeyPatch, ): config_path = tmp_path / "config.json" monkeypatch.setattr("nanobot.config.loader._current_config_path", config_path) config = load_config() config.tools.mcp_servers["browserbase"] = MCPServerConfig( type="stdio", command="browserbase-mcp", ) save_config(config) async def _fake_connect(servers, registry): stacks = {} for name in servers: registry.register(_FakeMcpTool(f"mcp_{name}_navigate")) stack = AsyncExitStack() await stack.__aenter__() stacks[name] = stack return stacks monkeypatch.setattr("nanobot.agent.tools.mcp.connect_mcp_servers", _fake_connect) loop = _make_loop(tmp_path, mcp_servers={"browserbase": config.tools.mcp_servers["browserbase"]}) result = await mcp_runtime.reload_servers(loop, loop.tools) assert result["ok"] is True assert result["added"] == [] assert result["changed"] == [] assert result["retried"] == ["browserbase"] assert loop.tools.has("mcp_browserbase_navigate") await loop.close_mcp()