diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index ba3f7f89b..bd40dd2aa 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -7,7 +7,6 @@ import signal import sys from collections.abc import Callable from contextlib import nullcontext, suppress -from contextvars import ContextVar from pathlib import Path from typing import Any @@ -54,7 +53,6 @@ from nanobot.agent.loop import AgentLoop # noqa: E402 from nanobot.cli.stream import StreamRenderer, ThinkingSpinner # noqa: E402 from nanobot.config.paths import get_workspace_path, is_default_workspace # noqa: E402 from nanobot.config.schema import Config # noqa: E402 -from nanobot.cron.webui_metadata import cron_proactive_delivery_metadata # noqa: E402 from nanobot.utils.evaluator import evaluate_response # noqa: E402 from nanobot.utils.helpers import sync_workspace_templates # noqa: E402 from nanobot.utils.restart import ( # noqa: E402 @@ -87,12 +85,6 @@ class SafeFileHistory(FileHistory): super().store_string(_sanitize_surrogates(string)) -_PROACTIVE_WEBUI_METADATA: ContextVar[dict[str, Any] | None] = ContextVar( - "proactive_webui_metadata", - default=None, -) - - app = typer.Typer( name="nanobot", context_settings={"help_option_names": ["-h", "--help"]}, @@ -950,7 +942,6 @@ def _run_gateway( health_server_enabled: bool = True, ) -> None: """Shared gateway runtime; ``open_browser_url`` opens a tab once channels are up.""" - from nanobot.agent.tools.cron import CronTool from nanobot.agent.tools.message import MessageTool from nanobot.bus.queue import MessageBus from nanobot.bus.runtime_events import RuntimeEventBus @@ -1022,9 +1013,6 @@ def _run_gateway( """Publish a user-visible message and mirror it into that channel's session.""" metadata = dict(msg.metadata or {}) record = record or bool(metadata.pop("_record_channel_delivery", False)) - proactive_webui_metadata = _PROACTIVE_WEBUI_METADATA.get() - if record and msg.channel == "websocket" and proactive_webui_metadata: - metadata = {**metadata, **proactive_webui_metadata} if metadata != (msg.metadata or {}): msg = OutboundMessage( channel=msg.channel, @@ -1179,73 +1167,12 @@ def _run_gateway( if is_bound_cron_job(job): return await run_bound_cron_job(job, agent=agent, cron=cron) - reminder_note = ( - "The scheduled time has arrived. Deliver this reminder to the user now, " - "as a brief and natural message in their language. Speak directly to them — " - "do not narrate progress, summarize, include user IDs, or add status reports " - "like 'Done' or 'Reminded'.\n\n" - f"Reminder: {job.payload.message}" + logger.warning( + "Cron: skipped unbound agent job '{}' ({}); recreate it from a chat session", + job.name, + job.id, ) - - cron_tool = agent.tools.get("cron") - cron_token = None - if isinstance(cron_tool, CronTool): - cron_token = cron_tool.set_cron_context(True) - - message_record_token = None - if isinstance(message_tool, MessageTool): - message_record_token = message_tool.set_record_channel_delivery(True) - - proactive_webui_metadata = cron_proactive_delivery_metadata( - "websocket", - None, - turn_seed=f"cron:{job.id}", - source_label=job.name, - ) - proactive_token = _PROACTIVE_WEBUI_METADATA.set(proactive_webui_metadata) - - try: - resp = await agent.process_direct( - reminder_note, - session_key=f"cron:{job.id}", - channel=job.payload.channel or "cli", - chat_id=job.payload.to or "direct", - on_progress=_silent, - ) - finally: - _PROACTIVE_WEBUI_METADATA.reset(proactive_token) - if isinstance(cron_tool, CronTool) and cron_token is not None: - cron_tool.reset_cron_context(cron_token) - if isinstance(message_tool, MessageTool) and message_record_token is not None: - message_tool.reset_record_channel_delivery(message_record_token) - - response = resp.content if resp else "" - - if job.payload.deliver and isinstance(message_tool, MessageTool) and message_tool._sent_in_turn: - return response - - if job.payload.deliver and job.payload.to and response: - should_notify = await evaluate_response( - response, reminder_note, agent.provider, agent.model, - ) - if should_notify: - proactive_metadata = cron_proactive_delivery_metadata( - job.payload.channel or "cli", - job.payload.channel_meta, - turn_seed=f"cron:{job.id}", - source_label=job.name, - ) - await _deliver_to_channel( - OutboundMessage( - channel=job.payload.channel or "cli", - chat_id=job.payload.to, - content=response, - metadata=proactive_metadata, - ), - record=True, - session_key=job.payload.session_key, - ) - return response + return None cron.on_job = on_cron_job diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py index 68145893e..1a66af007 100644 --- a/nanobot/cron/service.py +++ b/nanobot/cron/service.py @@ -72,6 +72,62 @@ def _validate_schedule_for_add(schedule: CronSchedule) -> None: raise ValueError(f"unknown timezone '{schedule.tz}'") from None +def _has_legacy_delivery_context(payload: CronPayload) -> bool: + return bool(payload.deliver or payload.channel or payload.to or payload.channel_meta) + + +def _legacy_session_key(payload: CronPayload) -> str | None: + if payload.session_key: + return payload.session_key + if payload.channel and payload.to: + return f"{payload.channel}:{payload.to}" + return None + + +def _disable_malformed_legacy_job(job: CronJob) -> None: + reason = "legacy cron payload is missing channel/to; recreate it from a chat session" + job.payload.deliver = False + job.payload.channel = None + job.payload.to = None + job.payload.channel_meta = {} + job.enabled = False + job.state.next_run_at_ms = None + job.state.last_status = "error" + job.state.last_error = reason + logger.warning("Cron: disabled malformed legacy job '{}' ({}): {}", job.name, job.id, reason) + + +def _normalize_agent_turn_job(job: CronJob) -> bool: + """Migrate legacy user cron payloads into session-bound payloads. + + Pre-bound user cron jobs stored their delivery target in ``channel``/``to``. + Normal user-created legacy jobs always have those fields; if they are + missing, keep the record for inspection but disable it instead of preserving + a runtime legacy execution path. + """ + payload = job.payload + if payload.kind != "agent_turn" or not _has_legacy_delivery_context(payload): + return False + + if not payload.channel or not payload.to: + _disable_malformed_legacy_job(job) + return True + + payload.session_key = _legacy_session_key(payload) + payload.origin_channel = payload.origin_channel or payload.channel + payload.origin_chat_id = payload.origin_chat_id or payload.to + if not payload.origin_metadata: + payload.origin_metadata = dict(payload.channel_meta or {}) + + payload.deliver = False + payload.channel = None + payload.to = None + payload.channel_meta = {} + job.updated_at_ms = max(job.updated_at_ms, _now_ms()) + logger.info("Cron: migrated legacy job '{}' ({}) to session-bound payload", job.name, job.id) + return True + + class CronService: """Service for managing and executing scheduled jobs.""" @@ -115,7 +171,7 @@ class CronService: jobs = [] version = data.get("version", 1) for j in data.get("jobs", []): - jobs.append(CronJob( + job = CronJob( id=j["id"], name=j["name"], enabled=j.get("enabled", True), @@ -170,7 +226,9 @@ class CronService: created_at_ms=j.get("createdAtMs", 0), updated_at_ms=j.get("updatedAtMs", 0), delete_after_run=j.get("deleteAfterRun", False), - )) + ) + _normalize_agent_turn_job(job) + jobs.append(job) except Exception: # Preserve the corrupt file for forensic recovery instead of # letting the next save overwrite it with an empty job list. @@ -196,6 +254,7 @@ class CronService: jobs_map = {j.id: j for j in self._store.jobs} def _update(params: dict): j = CronJob.from_dict(params) + _normalize_agent_turn_job(j) jobs_map[j.id] = j def _del(params: dict): @@ -570,6 +629,7 @@ class CronService: updated_at_ms=now, delete_after_run=delete_after_run, ) + _normalize_agent_turn_job(job) if self._running: store = self._load_store() store.jobs.append(job) @@ -678,6 +738,7 @@ class CronService: job.payload.to = to if delete_after_run is not None: job.delete_after_run = delete_after_run + _normalize_agent_turn_job(job) job.updated_at_ms = _now_ms() if job.enabled: diff --git a/nanobot/cron/session_turns.py b/nanobot/cron/session_turns.py index 49662e1e8..a85c47937 100644 --- a/nanobot/cron/session_turns.py +++ b/nanobot/cron/session_turns.py @@ -57,9 +57,14 @@ def cron_history_overrides(metadata: Mapping[str, Any] | None) -> tuple[str | No def is_bound_cron_job(job: CronJob) -> bool: - """True for new session-bound cron jobs, excluding legacy delivery payloads.""" + """True for session-bound cron jobs with complete delivery context.""" payload = job.payload - if payload.kind != "agent_turn" or not payload.session_key: + if ( + payload.kind != "agent_turn" + or not payload.session_key + or not payload.origin_channel + or not payload.origin_chat_id + ): return False return not ( payload.deliver diff --git a/tests/channels/test_websocket_http_routes.py b/tests/channels/test_websocket_http_routes.py index d8c137630..c364f5a36 100644 --- a/tests/channels/test_websocket_http_routes.py +++ b/tests/channels/test_websocket_http_routes.py @@ -186,11 +186,13 @@ async def test_session_automations_route_filters_by_webui_session( schedule=hourly, message=message, session_key=f"websocket:{to}", + origin_channel="websocket", + origin_chat_id=to, ) cron.add_job( name="Legacy same target", schedule=hourly, - message="Legacy job should not be treated as bound", + message="Legacy job should be migrated", deliver=True, channel="websocket", to="abc", @@ -228,7 +230,7 @@ async def test_session_automations_route_filters_by_webui_session( assert resp.status_code == 200 body = resp.json() - assert [job["name"] for job in body["jobs"]] == ["Morning check"] + assert [job["name"] for job in body["jobs"]] == ["Morning check", "Legacy same target"] job = body["jobs"][0] assert job["schedule"]["kind"] == "every" assert job["schedule"]["every_ms"] == 3_600_000 @@ -249,12 +251,16 @@ async def test_session_automations_route_ignores_unified_owner( schedule=hourly, message="Check the shared session", session_key=UNIFIED_SESSION_KEY, + origin_channel="websocket", + origin_chat_id="abc", ) cron.add_job( name="Visible chat job", schedule=hourly, message="Show for this chat", session_key="websocket:abc", + origin_channel="websocket", + origin_chat_id="abc", ) channel = _ch( bus, @@ -728,6 +734,8 @@ async def test_session_delete_blocks_when_bound_automation_exists( schedule=CronSchedule(kind="every", every_ms=86_400_000), message="Check the repo", session_key="websocket:doomed", + origin_channel="websocket", + origin_chat_id="doomed", ) channel = _ch(bus, session_manager=sm, cron_service=cron, port=29915) server_task = asyncio.create_task(channel.start()) @@ -767,6 +775,8 @@ async def test_session_delete_can_cascade_bound_automations( schedule=CronSchedule(kind="every", every_ms=86_400_000), message="Check the repo", session_key="websocket:doomed", + origin_channel="websocket", + origin_chat_id="doomed", ) cron.add_job( name="Legacy same target", @@ -793,9 +803,7 @@ async def test_session_delete_can_cascade_bound_automations( assert resp.json()["deleted"] is True assert not path.exists() assert cron.list_bound_cron_jobs_for_session("websocket:doomed") == [] - assert [job.name for job in cron.list_jobs(include_disabled=True)] == [ - "Legacy same target" - ] + assert cron.list_jobs(include_disabled=True) == [] finally: await channel.stop() await server_task @@ -813,6 +821,8 @@ async def test_session_delete_blocks_origin_automation_when_unified_enabled( schedule=CronSchedule(kind="every", every_ms=86_400_000), message="Check this chat", session_key="websocket:doomed", + origin_channel="websocket", + origin_chat_id="doomed", ) channel = _ch( bus, diff --git a/tests/cli/test_commands.py b/tests/cli/test_commands.py index 52095cb46..abbd28427 100644 --- a/tests/cli/test_commands.py +++ b/tests/cli/test_commands.py @@ -1185,7 +1185,7 @@ def test_gateway_uses_workspace_directory_for_cron_store(monkeypatch, tmp_path: assert seen["cron_store"] == config.workspace_path / "cron" / "jobs.json" -def test_gateway_cron_evaluator_receives_scheduled_reminder_context( +def test_gateway_unbound_agent_cron_is_skipped( monkeypatch, tmp_path: Path ) -> None: config_file = tmp_path / "instance" / "config.json" @@ -1250,11 +1250,10 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( seen["agent"] = self async def process_direct(self, *_args, **_kwargs): - return OutboundMessage( - channel="telegram", - chat_id="user-1", - content="Time to stretch.", - ) + raise AssertionError("unbound cron job must not use process_direct") + + async def submit_cron_turn(self, _msg: InboundMessage): + raise AssertionError("unbound cron job must not run as a bound cron turn") async def close_mcp(self) -> None: return None @@ -1270,16 +1269,10 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( raise _StopGatewayError("stop") async def _capture_evaluate_response( - response: str, - task_context: str, - provider_arg: object, - model: str, + *_args, + **_kwargs, ) -> bool: - seen["response"] = response - seen["task_context"] = task_context - seen["provider"] = provider_arg - seen["model"] = model - return True + raise AssertionError("unbound cron job must not be evaluated for delivery") monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron) monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop) @@ -1314,214 +1307,9 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( response = asyncio.run(cron.on_job(job)) - assert response == "Time to stretch." - assert seen["response"] == "Time to stretch." - assert seen["provider"] is runtime_provider - assert seen["model"] == "runtime-model" - assert seen["task_context"] == ( - "The scheduled time has arrived. Deliver this reminder to the user now, " - "as a brief and natural message in their language. Speak directly to them — " - "do not narrate progress, summarize, include user IDs, or add status reports " - "like 'Done' or 'Reminded'.\n\n" - "Reminder: Remind me to stretch." - ) - bus.publish_outbound.assert_awaited_once_with( - OutboundMessage( - channel="telegram", - chat_id="user-1", - content="Time to stretch.", - ) - ) - assert seen["session_key"] == "telegram:user-1" - saved_session = seen["saved_session"] - assert isinstance(saved_session, _FakeSession) - assert saved_session.messages == [ - { - "role": "assistant", - "content": "Time to stretch.", - "_channel_delivery": True, - } - ] - - bus.publish_outbound.reset_mock() - old_turn_id = "turn-that-created-the-reminder" - websocket_job = CronJob( - id="drink-water", - name="drink water", - payload=CronPayload( - message="Remind me to drink water.", - deliver=True, - channel="websocket", - to="chat-1", - channel_meta={ - "webui": True, - WEBUI_TURN_METADATA_KEY: old_turn_id, - "workspace_scope": {"mode": "default"}, - }, - ), - ) - - response = asyncio.run(cron.on_job(websocket_job)) - - assert response == "Time to stretch." - bus.publish_outbound.assert_awaited_once() - delivered = bus.publish_outbound.await_args.args[0] - assert delivered.channel == "websocket" - assert delivered.chat_id == "chat-1" - assert delivered.metadata["webui"] is True - assert delivered.metadata["workspace_scope"] == {"mode": "default"} - assert delivered.metadata[WEBUI_TURN_METADATA_KEY].startswith("cron:drink-water:") - assert delivered.metadata[WEBUI_TURN_METADATA_KEY] != old_turn_id - assert delivered.metadata[WEBUI_MESSAGE_SOURCE_METADATA_KEY] == { - "kind": "cron", - "label": "drink water", - } - - -def test_gateway_legacy_cron_payloads_with_session_key_stay_legacy( - monkeypatch, tmp_path: Path -) -> None: - config_file = _write_instance_config(tmp_path) - config = Config() - config.agents.defaults.workspace = str(tmp_path / "config-workspace") - bus = MagicMock() - bus.publish_outbound = AsyncMock() - seen: dict[str, object] = {"process_calls": [], "evaluations": [], "saved_keys": []} - - class _FakeSession: - def __init__(self) -> None: - self.messages = [] - - def add_message(self, role: str, content: str, **kwargs) -> None: - self.messages.append({"role": role, "content": content, **kwargs}) - - class _FakeSessionManager: - def __init__(self, _workspace: Path) -> None: - self.session = _FakeSession() - seen["session_manager"] = self - - def read_session_file(self, _key: str) -> dict[str, object]: - return {"metadata": {}} - - def get_or_create(self, key: str) -> _FakeSession: - seen["saved_keys"].append(key) - return self.session - - def save(self, session: _FakeSession) -> None: - seen["saved_session"] = session - - class _FakeCron: - def __init__(self, _store_path: Path) -> None: - self.on_job = None - seen["cron"] = self - - class _FakeAgentLoop: - @classmethod - def from_config(cls, config, bus=None, **extra): - return cls(**extra) - - def __init__(self, *args, **kwargs) -> None: - self.model = "test-model" - self.provider = kwargs.get("provider", object()) - self.tools = {} - - async def process_direct(self, prompt: str, **kwargs): - seen["process_calls"].append((prompt, kwargs)) - return OutboundMessage( - channel=kwargs["channel"], - chat_id=kwargs["chat_id"], - content="Legacy response.", - ) - - async def submit_cron_turn(self, _msg: InboundMessage): - raise AssertionError("legacy cron payload must not run as bound cron turn") - - async def close_mcp(self) -> None: - return None - - async def run(self) -> None: - return None - - def stop(self) -> None: - return None - - class _StopAfterCronSetup: - def __init__(self, *_args, **_kwargs) -> None: - raise _StopGatewayError("stop") - - async def _capture_evaluate_response(*args, **_kwargs) -> bool: - seen["evaluations"].append(args) - return True - - _patch_cli_command_runtime( - monkeypatch, - config, - message_bus=lambda: bus, - session_manager=_FakeSessionManager, - cron_service=_FakeCron, - ) - monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop) - monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup) - monkeypatch.setattr( - "nanobot.cli.commands.evaluate_response", - _capture_evaluate_response, - ) - - result = runner.invoke(app, ["gateway", "--config", str(config_file)]) - assert isinstance(result.exception, _StopGatewayError) - cron = seen["cron"] - - silent_job = CronJob( - id="silent-legacy", - name="Silent legacy", - payload=CronPayload( - message="Run silently.", - deliver=False, - channel="telegram", - to="user-1", - session_key="telegram:user-1", - ), - ) - - response = asyncio.run(cron.on_job(silent_job)) - - assert response == "Legacy response." - prompt, kwargs = seen["process_calls"][-1] - assert "Reminder: Run silently." in prompt - assert kwargs["session_key"] == "cron:silent-legacy" - assert kwargs["channel"] == "telegram" - assert kwargs["chat_id"] == "user-1" - assert seen["evaluations"] == [] + assert response is None bus.publish_outbound.assert_not_awaited() - topic_job = CronJob( - id="topic-legacy", - name="Topic legacy", - payload=CronPayload( - message="Ping the topic.", - deliver=True, - channel="telegram", - to="-100123", - channel_meta={"message_thread_id": 42}, - session_key="telegram:-100123:topic:42", - ), - ) - - response = asyncio.run(cron.on_job(topic_job)) - - assert response == "Legacy response." - _prompt, kwargs = seen["process_calls"][-1] - assert kwargs["session_key"] == "cron:topic-legacy" - assert kwargs["channel"] == "telegram" - assert kwargs["chat_id"] == "-100123" - assert len(seen["evaluations"]) == 1 - bus.publish_outbound.assert_awaited_once() - delivered = bus.publish_outbound.await_args.args[0] - assert delivered.channel == "telegram" - assert delivered.chat_id == "-100123" - assert delivered.metadata["message_thread_id"] == 42 - assert seen["saved_keys"] == ["telegram:-100123:topic:42"] - def test_gateway_bound_cron_runs_as_session_turn( monkeypatch, tmp_path: Path @@ -1724,109 +1512,6 @@ def test_gateway_bound_cron_runs_as_session_turn( assert msg.metadata["thread_id"] == "om_root123" -def test_gateway_cron_job_suppresses_intermediate_progress( - monkeypatch, tmp_path: Path -) -> None: - """Cron jobs must pass on_progress=_silent to process_direct so that - tool hints and streaming deltas are never leaked to the user channel - before evaluate_response decides whether to deliver.""" - config_file = tmp_path / "instance" / "config.json" - config_file.parent.mkdir(parents=True) - config_file.write_text("{}") - - config = Config() - config.agents.defaults.workspace = str(tmp_path / "config-workspace") - bus = MagicMock() - bus.publish_outbound = AsyncMock() - seen: dict[str, object] = {} - - monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None) - monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) - monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) - monkeypatch.setattr("nanobot.providers.factory.make_provider", lambda _config: _fake_provider()) - monkeypatch.setattr( - "nanobot.providers.factory.build_provider_snapshot", - lambda _config: _test_provider_snapshot(object(), _config), - ) - monkeypatch.setattr( - "nanobot.providers.factory.load_provider_snapshot", - lambda _config_path=None: _test_provider_snapshot(object(), config), - ) - monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus) - monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object()) - - class _FakeCron: - def __init__(self, _store_path: Path) -> None: - self.on_job = None - seen["cron"] = self - - class _FakeAgentLoop: - @classmethod - def from_config(cls, config, bus=None, **extra): - return cls(**extra) - def __init__(self, *args, **kwargs) -> None: - self.model = "test-model" - self.provider = object() - self.tools = {} - - async def process_direct(self, *_args, on_progress=None, **_kwargs): - seen["on_progress"] = on_progress - return OutboundMessage( - channel="telegram", - chat_id="user-1", - content="Done.", - ) - - async def close_mcp(self) -> None: - return None - - async def run(self) -> None: - return None - - def stop(self) -> None: - return None - - class _StopAfterCronSetup: - def __init__(self, *_args, **_kwargs) -> None: - raise _StopGatewayError("stop") - - async def _always_reject(*_args, **_kwargs) -> bool: - return False - - monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron) - monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop) - monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup) - monkeypatch.setattr( - "nanobot.cli.commands.evaluate_response", - _always_reject, - ) - - result = runner.invoke(app, ["gateway", "--config", str(config_file)]) - assert isinstance(result.exception, _StopGatewayError) - - cron = seen["cron"] - job = CronJob( - id="cron-silent-test", - name="test-silent", - payload=CronPayload( - message="Run something.", - deliver=True, - channel="telegram", - to="user-1", - ), - ) - response = asyncio.run(cron.on_job(job)) - - assert response == "Done." - # on_progress must be a callable (the _silent noop), not None and not bus_progress - assert seen["on_progress"] is not None - assert callable(seen["on_progress"]) - # Verify it actually swallows calls (no side effects) - asyncio.run(seen["on_progress"]("tool_hint", "🔧 $ echo test")) - # Nothing published to bus since evaluator rejected - bus.publish_outbound.assert_not_awaited() - - def test_gateway_workspace_override_does_not_migrate_legacy_cron( monkeypatch, tmp_path: Path ) -> None: diff --git a/tests/cron/test_cron_service.py b/tests/cron/test_cron_service.py index fc5194f22..7a0db0dd8 100644 --- a/tests/cron/test_cron_service.py +++ b/tests/cron/test_cron_service.py @@ -43,7 +43,7 @@ def test_add_job_accepts_valid_timezone(tmp_path) -> None: assert job.state.next_run_at_ms is not None -def test_add_job_preserves_channel_meta_and_session_key(tmp_path) -> None: +def test_add_job_migrates_legacy_delivery_context(tmp_path) -> None: service = CronService(tmp_path / "cron" / "jobs.json") meta = {"slack": {"thread_ts": "1234567890.123456", "channel_type": "channel"}} job = service.add_job( @@ -56,16 +56,108 @@ def test_add_job_preserves_channel_meta_and_session_key(tmp_path) -> None: channel_meta=meta, session_key="slack:C123:1234567890.123456", ) - assert job.payload.channel_meta == meta + assert job.payload.deliver is False + assert job.payload.channel is None + assert job.payload.to is None + assert job.payload.channel_meta == {} assert job.payload.session_key == "slack:C123:1234567890.123456" + assert job.payload.origin_channel == "slack" + assert job.payload.origin_chat_id == "C123" + assert job.payload.origin_metadata == meta reloaded = service.get_job(job.id) assert reloaded is not None - assert reloaded.payload.channel_meta == meta + assert reloaded.payload.channel_meta == {} assert reloaded.payload.session_key == "slack:C123:1234567890.123456" + assert reloaded.payload.origin_channel == "slack" + assert reloaded.payload.origin_chat_id == "C123" + assert reloaded.payload.origin_metadata == meta -def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> None: +def test_load_store_migrates_legacy_delivery_context(tmp_path) -> None: + store_path = tmp_path / "cron" / "jobs.json" + store_path.parent.mkdir(parents=True) + store_path.write_text( + json.dumps( + { + "version": 1, + "jobs": [ + { + "id": "legacy-1", + "name": "Legacy reminder", + "enabled": True, + "schedule": {"kind": "every", "everyMs": 60_000}, + "payload": { + "kind": "agent_turn", + "message": "check status", + "deliver": True, + "channel": "telegram", + "to": "user-1", + "channelMeta": {"message_thread_id": 42}, + "sessionKey": "telegram:user-1:topic:42", + }, + "state": {}, + "createdAtMs": 1, + "updatedAtMs": 1, + } + ], + } + ), + encoding="utf-8", + ) + + job = CronService(store_path).get_job("legacy-1") + + assert job is not None + assert job.payload.session_key == "telegram:user-1:topic:42" + assert job.payload.origin_channel == "telegram" + assert job.payload.origin_chat_id == "user-1" + assert job.payload.origin_metadata == {"message_thread_id": 42} + assert job.payload.deliver is False + assert job.payload.channel is None + assert job.payload.to is None + assert job.payload.channel_meta == {} + + +def test_load_store_disables_malformed_legacy_payload(tmp_path) -> None: + store_path = tmp_path / "cron" / "jobs.json" + store_path.parent.mkdir(parents=True) + store_path.write_text( + json.dumps( + { + "version": 1, + "jobs": [ + { + "id": "legacy-bad", + "name": "Broken legacy", + "enabled": True, + "schedule": {"kind": "every", "everyMs": 60_000}, + "payload": { + "kind": "agent_turn", + "message": "check status", + "deliver": True, + }, + "state": {"nextRunAtMs": 123}, + "createdAtMs": 1, + "updatedAtMs": 1, + } + ], + } + ), + encoding="utf-8", + ) + + job = CronService(store_path).get_job("legacy-bad") + + assert job is not None + assert job.enabled is False + assert job.state.next_run_at_ms is None + assert job.state.last_status == "error" + assert "missing channel/to" in (job.state.last_error or "") + assert job.payload.deliver is False + + +def test_list_bound_agent_jobs_includes_migrated_legacy_delivery_payloads(tmp_path) -> None: service = CronService(tmp_path / "cron" / "jobs.json") schedule = CronSchedule(kind="every", every_ms=60_000) bound = service.add_job( @@ -73,8 +165,10 @@ def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> No schedule=schedule, message="new bound job", session_key="websocket:chat-1", + origin_channel="websocket", + origin_chat_id="chat-1", ) - service.add_job( + migrated = service.add_job( name="Legacy same session", schedule=schedule, message="legacy job", @@ -84,7 +178,7 @@ def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> No session_key="websocket:chat-1", ) - assert service.list_bound_cron_jobs_for_session("websocket:chat-1") == [bound] + assert service.list_bound_cron_jobs_for_session("websocket:chat-1") == [bound, migrated] def test_add_job_preserves_origin_delivery_context(tmp_path) -> None: @@ -143,7 +237,10 @@ async def test_channel_meta_and_session_key_survive_store_reload(tmp_path) -> No raw = json.loads(store_path.read_text(encoding="utf-8")) payload = raw["jobs"][0]["payload"] - assert payload["channelMeta"] == meta + assert payload["deliver"] is False + assert payload["channel"] is None + assert payload["to"] is None + assert payload["channelMeta"] == {} assert payload["sessionKey"] == "slack:C123:1234567890.123456" assert payload["originChannel"] == "slack" assert payload["originChatId"] == "C123" @@ -151,7 +248,7 @@ async def test_channel_meta_and_session_key_survive_store_reload(tmp_path) -> No reloaded = CronService(store_path).get_job(job.id) assert reloaded is not None - assert reloaded.payload.channel_meta == meta + assert reloaded.payload.channel_meta == {} assert reloaded.payload.session_key == "slack:C123:1234567890.123456" assert reloaded.payload.origin_channel == "slack" assert reloaded.payload.origin_chat_id == "C123" @@ -648,28 +745,22 @@ def test_update_job_offline_writes_action(tmp_path) -> None: assert last["params"]["name"] == "updated-offline" -def test_update_job_sentinel_channel_and_to(tmp_path) -> None: - """Passing None clears channel/to; omitting leaves them unchanged.""" +def test_update_job_migrates_legacy_delivery_target(tmp_path) -> None: service = CronService(tmp_path / "cron" / "jobs.json") job = service.add_job( name="sentinel", schedule=CronSchedule(kind="every", every_ms=60_000), message="hello", - channel="telegram", - to="user123", ) - assert job.payload.channel == "telegram" - assert job.payload.to == "user123" - result = service.update_job(job.id, name="renamed") - assert isinstance(result, CronJob) - assert result.payload.channel == "telegram" - assert result.payload.to == "user123" - - result = service.update_job(job.id, channel=None, to=None) + result = service.update_job(job.id, channel="telegram", to="user123") assert isinstance(result, CronJob) + assert result.payload.session_key == "telegram:user123" + assert result.payload.origin_channel == "telegram" + assert result.payload.origin_chat_id == "user123" assert result.payload.channel is None assert result.payload.to is None + assert result.payload.channel_meta == {} @pytest.mark.asyncio