diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 3e71cb3ba..a248556b1 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -1189,6 +1189,17 @@ def _run_gateway( ) return response + def _is_bound_cron_job(job: CronJob) -> bool: + payload = job.payload + if payload.kind != "agent_turn" or not payload.session_key: + return False + return not ( + payload.deliver + or payload.channel + or payload.to + or payload.channel_meta + ) + async def _deliver_to_channel( msg: OutboundMessage, *, record: bool = False, session_key: str | None = None, ) -> None: @@ -1349,7 +1360,7 @@ def _run_gateway( logger.info("Heartbeat: silenced by post-run evaluation") return response - if job.payload.kind == "agent_turn" and job.payload.session_key: + if _is_bound_cron_job(job): return await _run_bound_cron_job(job) reminder_note = ( diff --git a/tests/cli/test_commands.py b/tests/cli/test_commands.py index 01be99252..86901dab3 100644 --- a/tests/cli/test_commands.py +++ b/tests/cli/test_commands.py @@ -1373,6 +1373,151 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( } +def test_gateway_legacy_cron_payloads_with_session_key_stay_legacy( + monkeypatch, tmp_path: Path +) -> None: + config_file = _write_instance_config(tmp_path) + config = Config() + config.agents.defaults.workspace = str(tmp_path / "config-workspace") + bus = MagicMock() + bus.publish_outbound = AsyncMock() + seen: dict[str, object] = {"process_calls": [], "evaluations": [], "saved_keys": []} + + class _FakeSession: + def __init__(self) -> None: + self.messages = [] + + def add_message(self, role: str, content: str, **kwargs) -> None: + self.messages.append({"role": role, "content": content, **kwargs}) + + class _FakeSessionManager: + def __init__(self, _workspace: Path) -> None: + self.session = _FakeSession() + seen["session_manager"] = self + + def read_session_file(self, _key: str) -> dict[str, object]: + return {"metadata": {}} + + def get_or_create(self, key: str) -> _FakeSession: + seen["saved_keys"].append(key) + return self.session + + def save(self, session: _FakeSession) -> None: + seen["saved_session"] = session + + class _FakeCron: + def __init__(self, _store_path: Path) -> None: + self.on_job = None + seen["cron"] = self + + class _FakeAgentLoop: + @classmethod + def from_config(cls, config, bus=None, **extra): + return cls(**extra) + + def __init__(self, *args, **kwargs) -> None: + self.model = "test-model" + self.provider = kwargs.get("provider", object()) + self.tools = {} + + async def process_direct(self, prompt: str, **kwargs): + seen["process_calls"].append((prompt, kwargs)) + return OutboundMessage( + channel=kwargs["channel"], + chat_id=kwargs["chat_id"], + content="Legacy response.", + ) + + async def submit_automation_turn(self, _msg: InboundMessage): + raise AssertionError("legacy cron payload must not run as bound automation") + + async def close_mcp(self) -> None: + return None + + async def run(self) -> None: + return None + + def stop(self) -> None: + return None + + class _StopAfterCronSetup: + def __init__(self, *_args, **_kwargs) -> None: + raise _StopGatewayError("stop") + + async def _capture_evaluate_response(*args, **_kwargs) -> bool: + seen["evaluations"].append(args) + return True + + _patch_cli_command_runtime( + monkeypatch, + config, + message_bus=lambda: bus, + session_manager=_FakeSessionManager, + cron_service=_FakeCron, + ) + monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop) + monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup) + monkeypatch.setattr( + "nanobot.cli.commands.evaluate_response", + _capture_evaluate_response, + ) + + result = runner.invoke(app, ["gateway", "--config", str(config_file)]) + assert isinstance(result.exception, _StopGatewayError) + cron = seen["cron"] + + silent_job = CronJob( + id="silent-legacy", + name="Silent legacy", + payload=CronPayload( + message="Run silently.", + deliver=False, + channel="telegram", + to="user-1", + session_key="telegram:user-1", + ), + ) + + response = asyncio.run(cron.on_job(silent_job)) + + assert response == "Legacy response." + prompt, kwargs = seen["process_calls"][-1] + assert "Reminder: Run silently." in prompt + assert kwargs["session_key"] == "cron:silent-legacy" + assert kwargs["channel"] == "telegram" + assert kwargs["chat_id"] == "user-1" + assert seen["evaluations"] == [] + bus.publish_outbound.assert_not_awaited() + + topic_job = CronJob( + id="topic-legacy", + name="Topic legacy", + payload=CronPayload( + message="Ping the topic.", + deliver=True, + channel="telegram", + to="-100123", + channel_meta={"message_thread_id": 42}, + session_key="telegram:-100123:topic:42", + ), + ) + + response = asyncio.run(cron.on_job(topic_job)) + + assert response == "Legacy response." + _prompt, kwargs = seen["process_calls"][-1] + assert kwargs["session_key"] == "cron:topic-legacy" + assert kwargs["channel"] == "telegram" + assert kwargs["chat_id"] == "-100123" + assert len(seen["evaluations"]) == 1 + bus.publish_outbound.assert_awaited_once() + delivered = bus.publish_outbound.await_args.args[0] + assert delivered.channel == "telegram" + assert delivered.chat_id == "-100123" + assert delivered.metadata["message_thread_id"] == 42 + assert seen["saved_keys"] == ["telegram:-100123:topic:42"] + + def test_gateway_bound_cron_runs_as_session_turn( monkeypatch, tmp_path: Path ) -> None: