import asyncio import json import re import shutil from pathlib import Path from unittest.mock import AsyncMock, MagicMock, patch import pytest from typer.testing import CliRunner from nanobot.bus.events import OutboundMessage from nanobot.cli.commands import _make_provider, app from nanobot.config.schema import Config from nanobot.cron.types import CronJob, CronPayload from nanobot.providers.openai_codex_provider import _strip_model_prefix from nanobot.providers.registry import find_by_name runner = CliRunner() class _StopGatewayError(RuntimeError): pass @pytest.fixture def mock_paths(): """Mock config/workspace paths for test isolation.""" with patch("nanobot.config.loader.get_config_path") as mock_cp, \ patch("nanobot.config.loader.save_config") as mock_sc, \ patch("nanobot.config.loader.load_config") as mock_lc, \ patch("nanobot.cli.commands.get_workspace_path") as mock_ws: base_dir = Path("./test_onboard_data") if base_dir.exists(): shutil.rmtree(base_dir) base_dir.mkdir() config_file = base_dir / "config.json" workspace_dir = base_dir / "workspace" mock_cp.return_value = config_file mock_ws.return_value = workspace_dir mock_lc.side_effect = lambda _config_path=None: Config() def _save_config(config: Config, config_path: Path | None = None): target = config_path or config_file target.parent.mkdir(parents=True, exist_ok=True) target.write_text(json.dumps(config.model_dump(by_alias=True)), encoding="utf-8") mock_sc.side_effect = _save_config yield config_file, workspace_dir, mock_ws if base_dir.exists(): shutil.rmtree(base_dir) def test_onboard_fresh_install(mock_paths): """No existing config — should create from scratch.""" config_file, workspace_dir, mock_ws = mock_paths result = runner.invoke(app, ["onboard"]) assert result.exit_code == 0 assert "Created config" in result.stdout assert "Created workspace" in result.stdout assert "nanobot is ready" in result.stdout assert config_file.exists() assert (workspace_dir / "AGENTS.md").exists() assert (workspace_dir / "memory" / "MEMORY.md").exists() expected_workspace = Config().workspace_path assert mock_ws.call_args.args == (expected_workspace,) def test_onboard_existing_config_refresh(mock_paths): """Config exists, user declines overwrite — should refresh (load-merge-save).""" config_file, workspace_dir, _ = mock_paths config_file.write_text('{"existing": true}') result = runner.invoke(app, ["onboard"], input="n\n") assert result.exit_code == 0 assert "Config already exists" in result.stdout assert "existing values preserved" in result.stdout assert workspace_dir.exists() assert (workspace_dir / "AGENTS.md").exists() def test_onboard_existing_config_overwrite(mock_paths): """Config exists, user confirms overwrite — should reset to defaults.""" config_file, workspace_dir, _ = mock_paths config_file.write_text('{"existing": true}') result = runner.invoke(app, ["onboard"], input="y\n") assert result.exit_code == 0 assert "Config already exists" in result.stdout assert "Config reset to defaults" in result.stdout assert workspace_dir.exists() def test_onboard_existing_workspace_safe_create(mock_paths): """Workspace exists — should not recreate, but still add missing templates.""" config_file, workspace_dir, _ = mock_paths workspace_dir.mkdir(parents=True) config_file.write_text("{}") result = runner.invoke(app, ["onboard"], input="n\n") assert result.exit_code == 0 assert "Created workspace" not in result.stdout assert "Created AGENTS.md" in result.stdout assert (workspace_dir / "AGENTS.md").exists() def _strip_ansi(text): """Remove ANSI escape codes from text.""" ansi_escape = re.compile(r'\x1b\[[0-9;]*m') return ansi_escape.sub('', text) def test_onboard_help_shows_workspace_and_config_options(): result = runner.invoke(app, ["onboard", "--help"]) assert result.exit_code == 0 stripped_output = _strip_ansi(result.stdout) assert "--workspace" in stripped_output assert "-w" in stripped_output assert "--config" in stripped_output assert "-c" in stripped_output assert "--wizard" in stripped_output assert "--dir" not in stripped_output def test_onboard_interactive_discard_does_not_save_or_create_workspace(mock_paths, monkeypatch): config_file, workspace_dir, _ = mock_paths from nanobot.cli.onboard import OnboardResult monkeypatch.setattr( "nanobot.cli.onboard.run_onboard", lambda initial_config: OnboardResult(config=initial_config, should_save=False), ) result = runner.invoke(app, ["onboard", "--wizard"]) assert result.exit_code == 0 assert "No changes were saved" in result.stdout assert not config_file.exists() assert not workspace_dir.exists() def test_onboard_uses_explicit_config_and_workspace_paths(tmp_path, monkeypatch): config_path = tmp_path / "instance" / "config.json" workspace_path = tmp_path / "workspace" monkeypatch.setattr("nanobot.channels.registry.discover_all", lambda: {}) result = runner.invoke( app, ["onboard", "--config", str(config_path), "--workspace", str(workspace_path)], ) assert result.exit_code == 0 saved = Config.model_validate(json.loads(config_path.read_text(encoding="utf-8"))) assert saved.workspace_path == workspace_path assert (workspace_path / "AGENTS.md").exists() stripped_output = _strip_ansi(result.stdout) compact_output = stripped_output.replace("\n", "") resolved_config = str(config_path.resolve()) assert resolved_config in compact_output assert f"--config {resolved_config}" in compact_output def test_onboard_wizard_preserves_explicit_config_in_next_steps(tmp_path, monkeypatch): config_path = tmp_path / "instance" / "config.json" workspace_path = tmp_path / "workspace" from nanobot.cli.onboard import OnboardResult monkeypatch.setattr( "nanobot.cli.onboard.run_onboard", lambda initial_config: OnboardResult(config=initial_config, should_save=True), ) monkeypatch.setattr("nanobot.channels.registry.discover_all", lambda: {}) result = runner.invoke( app, ["onboard", "--wizard", "--config", str(config_path), "--workspace", str(workspace_path)], ) assert result.exit_code == 0 stripped_output = _strip_ansi(result.stdout) compact_output = stripped_output.replace("\n", "") resolved_config = str(config_path.resolve()) assert f'nanobot agent -m "Hello!" --config {resolved_config}' in compact_output assert f"nanobot gateway --config {resolved_config}" in compact_output def test_config_matches_github_copilot_codex_with_hyphen_prefix(): config = Config() config.agents.defaults.model = "github-copilot/gpt-5.3-codex" assert config.get_provider_name() == "github_copilot" def test_config_matches_openai_codex_with_hyphen_prefix(): config = Config() config.agents.defaults.model = "openai-codex/gpt-5.1-codex" assert config.get_provider_name() == "openai_codex" def test_config_dump_excludes_oauth_provider_blocks(): config = Config() providers = config.model_dump(by_alias=True)["providers"] assert "openaiCodex" not in providers assert "githubCopilot" not in providers def test_config_matches_explicit_ollama_prefix_without_api_key(): config = Config() config.agents.defaults.model = "ollama/llama3.2" assert config.get_provider_name() == "ollama" assert config.get_api_base() == "http://localhost:11434/v1" def test_config_explicit_ollama_provider_uses_default_localhost_api_base(): config = Config() config.agents.defaults.provider = "ollama" config.agents.defaults.model = "llama3.2" assert config.get_provider_name() == "ollama" assert config.get_api_base() == "http://localhost:11434/v1" def test_config_accepts_camel_case_explicit_provider_name_for_coding_plan(): config = Config.model_validate( { "agents": { "defaults": { "provider": "volcengineCodingPlan", "model": "doubao-1-5-pro", } }, "providers": { "volcengineCodingPlan": { "apiKey": "test-key", } }, } ) assert config.get_provider_name() == "volcengine_coding_plan" assert config.get_api_base() == "https://ark.cn-beijing.volces.com/api/coding/v3" def test_find_by_name_accepts_camel_case_and_hyphen_aliases(): assert find_by_name("volcengineCodingPlan") is not None assert find_by_name("volcengineCodingPlan").name == "volcengine_coding_plan" assert find_by_name("github-copilot") is not None assert find_by_name("github-copilot").name == "github_copilot" def test_config_auto_detects_ollama_from_local_api_base(): config = Config.model_validate( { "agents": {"defaults": {"provider": "auto", "model": "llama3.2"}}, "providers": {"ollama": {"apiBase": "http://localhost:11434/v1"}}, } ) assert config.get_provider_name() == "ollama" assert config.get_api_base() == "http://localhost:11434/v1" def test_config_prefers_ollama_over_vllm_when_both_local_providers_configured(): config = Config.model_validate( { "agents": {"defaults": {"provider": "auto", "model": "llama3.2"}}, "providers": { "vllm": {"apiBase": "http://localhost:8000"}, "ollama": {"apiBase": "http://localhost:11434/v1"}, }, } ) assert config.get_provider_name() == "ollama" assert config.get_api_base() == "http://localhost:11434/v1" def test_config_falls_back_to_vllm_when_ollama_not_configured(): config = Config.model_validate( { "agents": {"defaults": {"provider": "auto", "model": "llama3.2"}}, "providers": { "vllm": {"apiBase": "http://localhost:8000"}, }, } ) assert config.get_provider_name() == "vllm" assert config.get_api_base() == "http://localhost:8000" def test_openai_compat_provider_passes_model_through(): from nanobot.providers.openai_compat_provider import OpenAICompatProvider with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): provider = OpenAICompatProvider(default_model="github-copilot/gpt-5.3-codex") assert provider.get_default_model() == "github-copilot/gpt-5.3-codex" def test_make_provider_uses_github_copilot_backend(): from nanobot.cli.commands import _make_provider from nanobot.config.schema import Config config = Config.model_validate( { "agents": { "defaults": { "provider": "github-copilot", "model": "github-copilot/gpt-4.1", } } } ) with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): provider = _make_provider(config) assert provider.__class__.__name__ == "GitHubCopilotProvider" def test_github_copilot_provider_strips_prefixed_model_name(): from nanobot.providers.github_copilot_provider import GitHubCopilotProvider with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI"): provider = GitHubCopilotProvider(default_model="github-copilot/gpt-5.1") kwargs = provider._build_kwargs( messages=[{"role": "user", "content": "hi"}], tools=None, model="github-copilot/gpt-5.1", max_tokens=16, temperature=0.1, reasoning_effort=None, tool_choice=None, ) assert kwargs["model"] == "gpt-5.1" @pytest.mark.asyncio async def test_github_copilot_provider_refreshes_client_api_key_before_chat(): from nanobot.providers.github_copilot_provider import GitHubCopilotProvider mock_client = MagicMock() mock_client.api_key = "no-key" mock_client.chat.completions.create = AsyncMock(return_value={ "choices": [{"message": {"content": "ok"}, "finish_reason": "stop"}], "usage": {"prompt_tokens": 1, "completion_tokens": 1, "total_tokens": 2}, }) with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI", return_value=mock_client): provider = GitHubCopilotProvider(default_model="github-copilot/gpt-5.1") provider._get_copilot_access_token = AsyncMock(return_value="copilot-access-token") response = await provider.chat( messages=[{"role": "user", "content": "hi"}], model="github-copilot/gpt-5.1", max_tokens=16, temperature=0.1, ) assert response.content == "ok" assert provider._client.api_key == "copilot-access-token" provider._get_copilot_access_token.assert_awaited_once() mock_client.chat.completions.create.assert_awaited_once() def test_openai_codex_strip_prefix_supports_hyphen_and_underscore(): assert _strip_model_prefix("openai-codex/gpt-5.1-codex") == "gpt-5.1-codex" assert _strip_model_prefix("openai_codex/gpt-5.1-codex") == "gpt-5.1-codex" def test_make_provider_passes_extra_headers_to_custom_provider(): config = Config.model_validate( { "agents": {"defaults": {"provider": "custom", "model": "gpt-4o-mini"}}, "providers": { "custom": { "apiKey": "test-key", "apiBase": "https://example.com/v1", "extraHeaders": { "APP-Code": "demo-app", "x-session-affinity": "sticky-session", }, } }, } ) with patch("nanobot.providers.openai_compat_provider.AsyncOpenAI") as mock_async_openai: _make_provider(config) kwargs = mock_async_openai.call_args.kwargs assert kwargs["api_key"] == "test-key" assert kwargs["base_url"] == "https://example.com/v1" assert kwargs["default_headers"]["APP-Code"] == "demo-app" assert kwargs["default_headers"]["x-session-affinity"] == "sticky-session" @pytest.fixture def mock_agent_runtime(tmp_path): """Mock agent command dependencies for focused CLI tests.""" config = Config() config.agents.defaults.workspace = str(tmp_path / "default-workspace") with patch("nanobot.config.loader.load_config", return_value=config) as mock_load_config, \ patch("nanobot.config.loader.resolve_config_env_vars", side_effect=lambda c: c), \ patch("nanobot.cli.commands.sync_workspace_templates") as mock_sync_templates, \ patch("nanobot.cli.commands._make_provider", return_value=object()), \ patch("nanobot.cli.commands._print_agent_response") as mock_print_response, \ patch("nanobot.bus.queue.MessageBus"), \ patch("nanobot.cron.service.CronService"), \ patch("nanobot.agent.loop.AgentLoop") as mock_agent_loop_cls: agent_loop = MagicMock() agent_loop.channels_config = None agent_loop.process_direct = AsyncMock( return_value=OutboundMessage(channel="cli", chat_id="direct", content="mock-response"), ) agent_loop.close_mcp = AsyncMock(return_value=None) mock_agent_loop_cls.return_value = agent_loop yield { "config": config, "load_config": mock_load_config, "sync_templates": mock_sync_templates, "agent_loop_cls": mock_agent_loop_cls, "agent_loop": agent_loop, "print_response": mock_print_response, } def test_agent_help_shows_workspace_and_config_options(): result = runner.invoke(app, ["agent", "--help"]) assert result.exit_code == 0 stripped_output = _strip_ansi(result.stdout) assert "--workspace" in stripped_output assert "-w" in stripped_output assert "--config" in stripped_output assert "-c" in stripped_output def test_agent_uses_default_config_when_no_workspace_or_config_flags(mock_agent_runtime): result = runner.invoke(app, ["agent", "-m", "hello"]) assert result.exit_code == 0 assert mock_agent_runtime["load_config"].call_args.args == (None,) assert mock_agent_runtime["sync_templates"].call_args.args == ( mock_agent_runtime["config"].workspace_path, ) assert mock_agent_runtime["agent_loop_cls"].call_args.kwargs["workspace"] == ( mock_agent_runtime["config"].workspace_path ) mock_agent_runtime["agent_loop"].process_direct.assert_awaited_once() mock_agent_runtime["print_response"].assert_called_once_with( "mock-response", render_markdown=True, metadata={}, ) def test_agent_uses_explicit_config_path(mock_agent_runtime, tmp_path: Path): config_path = tmp_path / "agent-config.json" config_path.write_text("{}") result = runner.invoke(app, ["agent", "-m", "hello", "-c", str(config_path)]) assert result.exit_code == 0 assert mock_agent_runtime["load_config"].call_args.args == (config_path.resolve(),) def test_agent_config_sets_active_path(monkeypatch, tmp_path: Path) -> None: config_file = tmp_path / "instance" / "config.json" config_file.parent.mkdir(parents=True) config_file.write_text("{}") config = Config() seen: dict[str, Path] = {} monkeypatch.setattr( "nanobot.config.loader.set_config_path", lambda path: seen.__setitem__("config_path", path), ) monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object()) monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object()) monkeypatch.setattr("nanobot.cron.service.CronService", lambda _store: object()) class _FakeAgentLoop: def __init__(self, *args, **kwargs) -> None: pass async def process_direct(self, *_args, **_kwargs): return OutboundMessage(channel="cli", chat_id="direct", content="ok") async def close_mcp(self) -> None: return None monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop) monkeypatch.setattr("nanobot.cli.commands._print_agent_response", lambda *_args, **_kwargs: None) result = runner.invoke(app, ["agent", "-m", "hello", "-c", str(config_file)]) assert result.exit_code == 0 assert seen["config_path"] == config_file.resolve() def test_agent_uses_workspace_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None: config_file = tmp_path / "instance" / "config.json" config_file.parent.mkdir(parents=True) config_file.write_text("{}") config = Config() config.agents.defaults.workspace = str(tmp_path / "agent-workspace") seen: dict[str, Path] = {} monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None) monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object()) monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object()) class _FakeCron: def __init__(self, store_path: Path) -> None: seen["cron_store"] = store_path class _FakeAgentLoop: def __init__(self, *args, **kwargs) -> None: pass async def process_direct(self, *_args, **_kwargs): return OutboundMessage(channel="cli", chat_id="direct", content="ok") async def close_mcp(self) -> None: return None monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron) monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop) monkeypatch.setattr("nanobot.cli.commands._print_agent_response", lambda *_args, **_kwargs: None) result = runner.invoke(app, ["agent", "-m", "hello", "-c", str(config_file)]) assert result.exit_code == 0 assert seen["cron_store"] == config.workspace_path / "cron" / "jobs.json" def test_agent_workspace_override_does_not_migrate_legacy_cron( monkeypatch, tmp_path: Path ) -> None: config_file = tmp_path / "instance" / "config.json" config_file.parent.mkdir(parents=True) config_file.write_text("{}") legacy_dir = tmp_path / "global" / "cron" legacy_dir.mkdir(parents=True) legacy_file = legacy_dir / "jobs.json" legacy_file.write_text('{"jobs": []}') override = tmp_path / "override-workspace" config = Config() seen: dict[str, Path] = {} monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None) monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object()) monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object()) monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: legacy_dir) class _FakeCron: def __init__(self, store_path: Path) -> None: seen["cron_store"] = store_path class _FakeAgentLoop: def __init__(self, *args, **kwargs) -> None: pass async def process_direct(self, *_args, **_kwargs): return OutboundMessage(channel="cli", chat_id="direct", content="ok") async def close_mcp(self) -> None: return None monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron) monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop) monkeypatch.setattr("nanobot.cli.commands._print_agent_response", lambda *_args, **_kwargs: None) result = runner.invoke( app, ["agent", "-m", "hello", "-c", str(config_file), "-w", str(override)], ) assert result.exit_code == 0 assert seen["cron_store"] == override / "cron" / "jobs.json" assert legacy_file.exists() assert not (override / "cron" / "jobs.json").exists() def test_agent_custom_config_workspace_does_not_migrate_legacy_cron( monkeypatch, tmp_path: Path ) -> None: config_file = tmp_path / "instance" / "config.json" config_file.parent.mkdir(parents=True) config_file.write_text("{}") legacy_dir = tmp_path / "global" / "cron" legacy_dir.mkdir(parents=True) legacy_file = legacy_dir / "jobs.json" legacy_file.write_text('{"jobs": []}') custom_workspace = tmp_path / "custom-workspace" config = Config() config.agents.defaults.workspace = str(custom_workspace) seen: dict[str, Path] = {} monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None) monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: object()) monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: object()) monkeypatch.setattr("nanobot.config.paths.get_cron_dir", lambda: legacy_dir) class _FakeCron: def __init__(self, store_path: Path) -> None: seen["cron_store"] = store_path class _FakeAgentLoop: def __init__(self, *args, **kwargs) -> None: pass async def process_direct(self, *_args, **_kwargs): return OutboundMessage(channel="cli", chat_id="direct", content="ok") async def close_mcp(self) -> None: return None monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron) monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop) monkeypatch.setattr( "nanobot.cli.commands._print_agent_response", lambda *_args, **_kwargs: None ) result = runner.invoke(app, ["agent", "-m", "hello", "-c", str(config_file)]) assert result.exit_code == 0 assert seen["cron_store"] == custom_workspace / "cron" / "jobs.json" assert legacy_file.exists() assert not (custom_workspace / "cron" / "jobs.json").exists() def test_agent_overrides_workspace_path(mock_agent_runtime): workspace_path = Path("/tmp/agent-workspace") result = runner.invoke(app, ["agent", "-m", "hello", "-w", str(workspace_path)]) assert result.exit_code == 0 assert mock_agent_runtime["config"].agents.defaults.workspace == str(workspace_path) assert mock_agent_runtime["sync_templates"].call_args.args == (workspace_path,) assert mock_agent_runtime["agent_loop_cls"].call_args.kwargs["workspace"] == workspace_path def test_agent_workspace_override_wins_over_config_workspace(mock_agent_runtime, tmp_path: Path): config_path = tmp_path / "agent-config.json" config_path.write_text("{}") workspace_path = Path("/tmp/agent-workspace") result = runner.invoke( app, ["agent", "-m", "hello", "-c", str(config_path), "-w", str(workspace_path)], ) assert result.exit_code == 0 assert mock_agent_runtime["load_config"].call_args.args == (config_path.resolve(),) assert mock_agent_runtime["config"].agents.defaults.workspace == str(workspace_path) assert mock_agent_runtime["sync_templates"].call_args.args == (workspace_path,) assert mock_agent_runtime["agent_loop_cls"].call_args.kwargs["workspace"] == workspace_path def test_agent_hints_about_deprecated_memory_window(mock_agent_runtime, tmp_path): config_file = tmp_path / "config.json" config_file.write_text(json.dumps({"agents": {"defaults": {"memoryWindow": 42}}})) result = runner.invoke(app, ["agent", "-m", "hello", "-c", str(config_file)]) assert result.exit_code == 0 assert "memoryWindow" in result.stdout assert "no longer used" in result.stdout def test_heartbeat_retains_recent_messages_by_default(): config = Config() assert config.gateway.heartbeat.keep_recent_messages == 8 def _write_instance_config(tmp_path: Path) -> Path: config_file = tmp_path / "instance" / "config.json" config_file.parent.mkdir(parents=True) config_file.write_text("{}") return config_file def _stop_gateway_provider(_config) -> object: raise _StopGatewayError("stop") def _patch_cli_command_runtime( monkeypatch, config: Config, *, set_config_path=None, sync_templates=None, make_provider=None, message_bus=None, session_manager=None, cron_service=None, get_cron_dir=None, ) -> None: monkeypatch.setattr( "nanobot.config.loader.set_config_path", set_config_path or (lambda _path: None), ) monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) monkeypatch.setattr("nanobot.config.loader.resolve_config_env_vars", lambda c: c) monkeypatch.setattr( "nanobot.cli.commands.sync_workspace_templates", sync_templates or (lambda _path: None), ) monkeypatch.setattr( "nanobot.cli.commands._make_provider", make_provider or (lambda _config: object()), ) if message_bus is not None: monkeypatch.setattr("nanobot.bus.queue.MessageBus", message_bus) if session_manager is not None: monkeypatch.setattr("nanobot.session.manager.SessionManager", session_manager) if cron_service is not None: monkeypatch.setattr("nanobot.cron.service.CronService", cron_service) if get_cron_dir is not None: monkeypatch.setattr("nanobot.config.paths.get_cron_dir", get_cron_dir) def _patch_serve_runtime(monkeypatch, config: Config, seen: dict[str, object]) -> None: pytest.importorskip("aiohttp") class _FakeApiApp: def __init__(self) -> None: self.on_startup: list[object] = [] self.on_cleanup: list[object] = [] class _FakeAgentLoop: def __init__(self, **kwargs) -> None: seen["workspace"] = kwargs["workspace"] async def _connect_mcp(self) -> None: return None async def close_mcp(self) -> None: return None def _fake_create_app(agent_loop, model_name: str, request_timeout: float): seen["agent_loop"] = agent_loop seen["model_name"] = model_name seen["request_timeout"] = request_timeout return _FakeApiApp() def _fake_run_app(api_app, host: str, port: int, print): seen["api_app"] = api_app seen["host"] = host seen["port"] = port _patch_cli_command_runtime( monkeypatch, config, message_bus=lambda: object(), session_manager=lambda _workspace: object(), ) monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop) monkeypatch.setattr("nanobot.api.server.create_app", _fake_create_app) monkeypatch.setattr("aiohttp.web.run_app", _fake_run_app) def test_gateway_uses_workspace_from_config_by_default(monkeypatch, tmp_path: Path) -> None: config_file = _write_instance_config(tmp_path) config = Config() config.agents.defaults.workspace = str(tmp_path / "config-workspace") seen: dict[str, Path] = {} _patch_cli_command_runtime( monkeypatch, config, set_config_path=lambda path: seen.__setitem__("config_path", path), sync_templates=lambda path: seen.__setitem__("workspace", path), make_provider=_stop_gateway_provider, ) result = runner.invoke(app, ["gateway", "--config", str(config_file)]) assert isinstance(result.exception, _StopGatewayError) assert seen["config_path"] == config_file.resolve() assert seen["workspace"] == Path(config.agents.defaults.workspace) def test_gateway_workspace_option_overrides_config(monkeypatch, tmp_path: Path) -> None: config_file = _write_instance_config(tmp_path) config = Config() config.agents.defaults.workspace = str(tmp_path / "config-workspace") override = tmp_path / "override-workspace" seen: dict[str, Path] = {} _patch_cli_command_runtime( monkeypatch, config, sync_templates=lambda path: seen.__setitem__("workspace", path), make_provider=_stop_gateway_provider, ) result = runner.invoke( app, ["gateway", "--config", str(config_file), "--workspace", str(override)], ) assert isinstance(result.exception, _StopGatewayError) assert seen["workspace"] == override assert config.workspace_path == override def test_gateway_uses_workspace_directory_for_cron_store(monkeypatch, tmp_path: Path) -> None: config_file = _write_instance_config(tmp_path) config = Config() config.agents.defaults.workspace = str(tmp_path / "config-workspace") seen: dict[str, Path] = {} class _StopCron: def __init__(self, store_path: Path) -> None: seen["cron_store"] = store_path raise _StopGatewayError("stop") _patch_cli_command_runtime( monkeypatch, config, message_bus=lambda: object(), session_manager=lambda _workspace: object(), cron_service=_StopCron, ) result = runner.invoke(app, ["gateway", "--config", str(config_file)]) assert isinstance(result.exception, _StopGatewayError) assert seen["cron_store"] == config.workspace_path / "cron" / "jobs.json" def test_gateway_cron_evaluator_receives_scheduled_reminder_context( monkeypatch, tmp_path: Path ) -> None: config_file = tmp_path / "instance" / "config.json" config_file.parent.mkdir(parents=True) config_file.write_text("{}") config = Config() config.agents.defaults.workspace = str(tmp_path / "config-workspace") provider = object() bus = MagicMock() bus.publish_outbound = AsyncMock() seen: dict[str, object] = {} monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None) monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) monkeypatch.setattr("nanobot.cli.commands._make_provider", lambda _config: provider) monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus) monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object()) class _FakeCron: def __init__(self, _store_path: Path) -> None: self.on_job = None seen["cron"] = self class _FakeAgentLoop: def __init__(self, *args, **kwargs) -> None: self.model = "test-model" self.tools = {} async def process_direct(self, *_args, **_kwargs): return OutboundMessage( channel="telegram", chat_id="user-1", content="Time to stretch.", ) async def close_mcp(self) -> None: return None async def run(self) -> None: return None def stop(self) -> None: return None class _StopAfterCronSetup: def __init__(self, *_args, **_kwargs) -> None: raise _StopGatewayError("stop") async def _capture_evaluate_response( response: str, task_context: str, provider_arg: object, model: str, ) -> bool: seen["response"] = response seen["task_context"] = task_context seen["provider"] = provider_arg seen["model"] = model return True monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron) monkeypatch.setattr("nanobot.agent.loop.AgentLoop", _FakeAgentLoop) monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup) monkeypatch.setattr( "nanobot.utils.evaluator.evaluate_response", _capture_evaluate_response, ) result = runner.invoke(app, ["gateway", "--config", str(config_file)]) assert isinstance(result.exception, _StopGatewayError) cron = seen["cron"] assert isinstance(cron, _FakeCron) assert cron.on_job is not None job = CronJob( id="cron-1", name="stretch", payload=CronPayload( message="Remind me to stretch.", deliver=True, channel="telegram", to="user-1", ), ) response = asyncio.run(cron.on_job(job)) assert response == "Time to stretch." assert seen["response"] == "Time to stretch." assert seen["provider"] is provider assert seen["model"] == "test-model" assert seen["task_context"] == ( "[Scheduled Task] Timer finished.\n\n" "Task 'stretch' has been triggered.\n" "Scheduled instruction: Remind me to stretch." ) bus.publish_outbound.assert_awaited_once_with( OutboundMessage( channel="telegram", chat_id="user-1", content="Time to stretch.", ) ) def test_gateway_workspace_override_does_not_migrate_legacy_cron( monkeypatch, tmp_path: Path ) -> None: config_file = _write_instance_config(tmp_path) legacy_dir = tmp_path / "global" / "cron" legacy_dir.mkdir(parents=True) legacy_file = legacy_dir / "jobs.json" legacy_file.write_text('{"jobs": []}') override = tmp_path / "override-workspace" config = Config() seen: dict[str, Path] = {} class _StopCron: def __init__(self, store_path: Path) -> None: seen["cron_store"] = store_path raise _StopGatewayError("stop") _patch_cli_command_runtime( monkeypatch, config, message_bus=lambda: object(), session_manager=lambda _workspace: object(), cron_service=_StopCron, get_cron_dir=lambda: legacy_dir, ) result = runner.invoke( app, ["gateway", "--config", str(config_file), "--workspace", str(override)], ) assert isinstance(result.exception, _StopGatewayError) assert seen["cron_store"] == override / "cron" / "jobs.json" assert legacy_file.exists() assert not (override / "cron" / "jobs.json").exists() def test_gateway_custom_config_workspace_does_not_migrate_legacy_cron( monkeypatch, tmp_path: Path ) -> None: config_file = _write_instance_config(tmp_path) legacy_dir = tmp_path / "global" / "cron" legacy_dir.mkdir(parents=True) legacy_file = legacy_dir / "jobs.json" legacy_file.write_text('{"jobs": []}') custom_workspace = tmp_path / "custom-workspace" config = Config() config.agents.defaults.workspace = str(custom_workspace) seen: dict[str, Path] = {} class _StopCron: def __init__(self, store_path: Path) -> None: seen["cron_store"] = store_path raise _StopGatewayError("stop") _patch_cli_command_runtime( monkeypatch, config, message_bus=lambda: object(), session_manager=lambda _workspace: object(), cron_service=_StopCron, get_cron_dir=lambda: legacy_dir, ) result = runner.invoke(app, ["gateway", "--config", str(config_file)]) assert isinstance(result.exception, _StopGatewayError) assert seen["cron_store"] == custom_workspace / "cron" / "jobs.json" assert legacy_file.exists() assert not (custom_workspace / "cron" / "jobs.json").exists() def test_migrate_cron_store_moves_legacy_file(tmp_path: Path) -> None: """Legacy global jobs.json is moved into the workspace on first run.""" from nanobot.cli.commands import _migrate_cron_store legacy_dir = tmp_path / "global" / "cron" legacy_dir.mkdir(parents=True) legacy_file = legacy_dir / "jobs.json" legacy_file.write_text('{"jobs": []}') config = Config() config.agents.defaults.workspace = str(tmp_path / "workspace") workspace_cron = config.workspace_path / "cron" / "jobs.json" with patch("nanobot.config.paths.get_cron_dir", return_value=legacy_dir): _migrate_cron_store(config) assert workspace_cron.exists() assert workspace_cron.read_text() == '{"jobs": []}' assert not legacy_file.exists() def test_migrate_cron_store_skips_when_workspace_file_exists(tmp_path: Path) -> None: """Migration does not overwrite an existing workspace cron store.""" from nanobot.cli.commands import _migrate_cron_store legacy_dir = tmp_path / "global" / "cron" legacy_dir.mkdir(parents=True) (legacy_dir / "jobs.json").write_text('{"old": true}') config = Config() config.agents.defaults.workspace = str(tmp_path / "workspace") workspace_cron = config.workspace_path / "cron" / "jobs.json" workspace_cron.parent.mkdir(parents=True) workspace_cron.write_text('{"new": true}') with patch("nanobot.config.paths.get_cron_dir", return_value=legacy_dir): _migrate_cron_store(config) assert workspace_cron.read_text() == '{"new": true}' def test_gateway_uses_configured_port_when_cli_flag_is_missing(monkeypatch, tmp_path: Path) -> None: config_file = _write_instance_config(tmp_path) config = Config() config.gateway.port = 18791 _patch_cli_command_runtime( monkeypatch, config, make_provider=_stop_gateway_provider, ) result = runner.invoke(app, ["gateway", "--config", str(config_file)]) assert isinstance(result.exception, _StopGatewayError) assert "port 18791" in result.stdout def test_gateway_cli_port_overrides_configured_port(monkeypatch, tmp_path: Path) -> None: config_file = _write_instance_config(tmp_path) config = Config() config.gateway.port = 18791 _patch_cli_command_runtime( monkeypatch, config, make_provider=_stop_gateway_provider, ) result = runner.invoke(app, ["gateway", "--config", str(config_file), "--port", "18792"]) assert isinstance(result.exception, _StopGatewayError) assert "port 18792" in result.stdout def test_serve_uses_api_config_defaults_and_workspace_override( monkeypatch, tmp_path: Path ) -> None: config_file = _write_instance_config(tmp_path) config = Config() config.agents.defaults.workspace = str(tmp_path / "config-workspace") config.api.host = "127.0.0.2" config.api.port = 18900 config.api.timeout = 45.0 override_workspace = tmp_path / "override-workspace" seen: dict[str, object] = {} _patch_serve_runtime(monkeypatch, config, seen) result = runner.invoke( app, ["serve", "--config", str(config_file), "--workspace", str(override_workspace)], ) assert result.exit_code == 0 assert seen["workspace"] == override_workspace assert seen["host"] == "127.0.0.2" assert seen["port"] == 18900 assert seen["request_timeout"] == 45.0 def test_serve_cli_options_override_api_config(monkeypatch, tmp_path: Path) -> None: config_file = _write_instance_config(tmp_path) config = Config() config.api.host = "127.0.0.2" config.api.port = 18900 config.api.timeout = 45.0 seen: dict[str, object] = {} _patch_serve_runtime(monkeypatch, config, seen) result = runner.invoke( app, [ "serve", "--config", str(config_file), "--host", "127.0.0.1", "--port", "18901", "--timeout", "46", ], ) assert result.exit_code == 0 assert seen["host"] == "127.0.0.1" assert seen["port"] == 18901 assert seen["request_timeout"] == 46.0 def test_channels_login_requires_channel_name() -> None: result = runner.invoke(app, ["channels", "login"]) assert result.exit_code == 2