refactor: migrate legacy cron payloads to bound sessions

This commit is contained in:
chengyongru 2026-06-12 16:51:20 +08:00
parent af8192dc38
commit 8335554894
6 changed files with 210 additions and 431 deletions

View File

@ -7,7 +7,6 @@ import signal
import sys
from collections.abc import Callable
from contextlib import nullcontext, suppress
from contextvars import ContextVar
from pathlib import Path
from typing import Any
@ -54,7 +53,6 @@ from nanobot.agent.loop import AgentLoop # noqa: E402
from nanobot.cli.stream import StreamRenderer, ThinkingSpinner # noqa: E402
from nanobot.config.paths import get_workspace_path, is_default_workspace # noqa: E402
from nanobot.config.schema import Config # noqa: E402
from nanobot.cron.webui_metadata import cron_proactive_delivery_metadata # noqa: E402
from nanobot.utils.evaluator import evaluate_response # noqa: E402
from nanobot.utils.helpers import sync_workspace_templates # noqa: E402
from nanobot.utils.restart import ( # noqa: E402
@ -87,12 +85,6 @@ class SafeFileHistory(FileHistory):
super().store_string(_sanitize_surrogates(string))
_PROACTIVE_WEBUI_METADATA: ContextVar[dict[str, Any] | None] = ContextVar(
"proactive_webui_metadata",
default=None,
)
app = typer.Typer(
name="nanobot",
context_settings={"help_option_names": ["-h", "--help"]},
@ -950,7 +942,6 @@ def _run_gateway(
health_server_enabled: bool = True,
) -> None:
"""Shared gateway runtime; ``open_browser_url`` opens a tab once channels are up."""
from nanobot.agent.tools.cron import CronTool
from nanobot.agent.tools.message import MessageTool
from nanobot.bus.queue import MessageBus
from nanobot.bus.runtime_events import RuntimeEventBus
@ -1022,9 +1013,6 @@ def _run_gateway(
"""Publish a user-visible message and mirror it into that channel's session."""
metadata = dict(msg.metadata or {})
record = record or bool(metadata.pop("_record_channel_delivery", False))
proactive_webui_metadata = _PROACTIVE_WEBUI_METADATA.get()
if record and msg.channel == "websocket" and proactive_webui_metadata:
metadata = {**metadata, **proactive_webui_metadata}
if metadata != (msg.metadata or {}):
msg = OutboundMessage(
channel=msg.channel,
@ -1179,73 +1167,12 @@ def _run_gateway(
if is_bound_cron_job(job):
return await run_bound_cron_job(job, agent=agent, cron=cron)
reminder_note = (
"The scheduled time has arrived. Deliver this reminder to the user now, "
"as a brief and natural message in their language. Speak directly to them — "
"do not narrate progress, summarize, include user IDs, or add status reports "
"like 'Done' or 'Reminded'.\n\n"
f"Reminder: {job.payload.message}"
logger.warning(
"Cron: skipped unbound agent job '{}' ({}); recreate it from a chat session",
job.name,
job.id,
)
cron_tool = agent.tools.get("cron")
cron_token = None
if isinstance(cron_tool, CronTool):
cron_token = cron_tool.set_cron_context(True)
message_record_token = None
if isinstance(message_tool, MessageTool):
message_record_token = message_tool.set_record_channel_delivery(True)
proactive_webui_metadata = cron_proactive_delivery_metadata(
"websocket",
None,
turn_seed=f"cron:{job.id}",
source_label=job.name,
)
proactive_token = _PROACTIVE_WEBUI_METADATA.set(proactive_webui_metadata)
try:
resp = await agent.process_direct(
reminder_note,
session_key=f"cron:{job.id}",
channel=job.payload.channel or "cli",
chat_id=job.payload.to or "direct",
on_progress=_silent,
)
finally:
_PROACTIVE_WEBUI_METADATA.reset(proactive_token)
if isinstance(cron_tool, CronTool) and cron_token is not None:
cron_tool.reset_cron_context(cron_token)
if isinstance(message_tool, MessageTool) and message_record_token is not None:
message_tool.reset_record_channel_delivery(message_record_token)
response = resp.content if resp else ""
if job.payload.deliver and isinstance(message_tool, MessageTool) and message_tool._sent_in_turn:
return response
if job.payload.deliver and job.payload.to and response:
should_notify = await evaluate_response(
response, reminder_note, agent.provider, agent.model,
)
if should_notify:
proactive_metadata = cron_proactive_delivery_metadata(
job.payload.channel or "cli",
job.payload.channel_meta,
turn_seed=f"cron:{job.id}",
source_label=job.name,
)
await _deliver_to_channel(
OutboundMessage(
channel=job.payload.channel or "cli",
chat_id=job.payload.to,
content=response,
metadata=proactive_metadata,
),
record=True,
session_key=job.payload.session_key,
)
return response
return None
cron.on_job = on_cron_job

View File

@ -72,6 +72,62 @@ def _validate_schedule_for_add(schedule: CronSchedule) -> None:
raise ValueError(f"unknown timezone '{schedule.tz}'") from None
def _has_legacy_delivery_context(payload: CronPayload) -> bool:
return bool(payload.deliver or payload.channel or payload.to or payload.channel_meta)
def _legacy_session_key(payload: CronPayload) -> str | None:
if payload.session_key:
return payload.session_key
if payload.channel and payload.to:
return f"{payload.channel}:{payload.to}"
return None
def _disable_malformed_legacy_job(job: CronJob) -> None:
reason = "legacy cron payload is missing channel/to; recreate it from a chat session"
job.payload.deliver = False
job.payload.channel = None
job.payload.to = None
job.payload.channel_meta = {}
job.enabled = False
job.state.next_run_at_ms = None
job.state.last_status = "error"
job.state.last_error = reason
logger.warning("Cron: disabled malformed legacy job '{}' ({}): {}", job.name, job.id, reason)
def _normalize_agent_turn_job(job: CronJob) -> bool:
"""Migrate legacy user cron payloads into session-bound payloads.
Pre-bound user cron jobs stored their delivery target in ``channel``/``to``.
Normal user-created legacy jobs always have those fields; if they are
missing, keep the record for inspection but disable it instead of preserving
a runtime legacy execution path.
"""
payload = job.payload
if payload.kind != "agent_turn" or not _has_legacy_delivery_context(payload):
return False
if not payload.channel or not payload.to:
_disable_malformed_legacy_job(job)
return True
payload.session_key = _legacy_session_key(payload)
payload.origin_channel = payload.origin_channel or payload.channel
payload.origin_chat_id = payload.origin_chat_id or payload.to
if not payload.origin_metadata:
payload.origin_metadata = dict(payload.channel_meta or {})
payload.deliver = False
payload.channel = None
payload.to = None
payload.channel_meta = {}
job.updated_at_ms = max(job.updated_at_ms, _now_ms())
logger.info("Cron: migrated legacy job '{}' ({}) to session-bound payload", job.name, job.id)
return True
class CronService:
"""Service for managing and executing scheduled jobs."""
@ -115,7 +171,7 @@ class CronService:
jobs = []
version = data.get("version", 1)
for j in data.get("jobs", []):
jobs.append(CronJob(
job = CronJob(
id=j["id"],
name=j["name"],
enabled=j.get("enabled", True),
@ -170,7 +226,9 @@ class CronService:
created_at_ms=j.get("createdAtMs", 0),
updated_at_ms=j.get("updatedAtMs", 0),
delete_after_run=j.get("deleteAfterRun", False),
))
)
_normalize_agent_turn_job(job)
jobs.append(job)
except Exception:
# Preserve the corrupt file for forensic recovery instead of
# letting the next save overwrite it with an empty job list.
@ -196,6 +254,7 @@ class CronService:
jobs_map = {j.id: j for j in self._store.jobs}
def _update(params: dict):
j = CronJob.from_dict(params)
_normalize_agent_turn_job(j)
jobs_map[j.id] = j
def _del(params: dict):
@ -570,6 +629,7 @@ class CronService:
updated_at_ms=now,
delete_after_run=delete_after_run,
)
_normalize_agent_turn_job(job)
if self._running:
store = self._load_store()
store.jobs.append(job)
@ -678,6 +738,7 @@ class CronService:
job.payload.to = to
if delete_after_run is not None:
job.delete_after_run = delete_after_run
_normalize_agent_turn_job(job)
job.updated_at_ms = _now_ms()
if job.enabled:

View File

@ -57,9 +57,14 @@ def cron_history_overrides(metadata: Mapping[str, Any] | None) -> tuple[str | No
def is_bound_cron_job(job: CronJob) -> bool:
"""True for new session-bound cron jobs, excluding legacy delivery payloads."""
"""True for session-bound cron jobs with complete delivery context."""
payload = job.payload
if payload.kind != "agent_turn" or not payload.session_key:
if (
payload.kind != "agent_turn"
or not payload.session_key
or not payload.origin_channel
or not payload.origin_chat_id
):
return False
return not (
payload.deliver

View File

@ -186,11 +186,13 @@ async def test_session_automations_route_filters_by_webui_session(
schedule=hourly,
message=message,
session_key=f"websocket:{to}",
origin_channel="websocket",
origin_chat_id=to,
)
cron.add_job(
name="Legacy same target",
schedule=hourly,
message="Legacy job should not be treated as bound",
message="Legacy job should be migrated",
deliver=True,
channel="websocket",
to="abc",
@ -228,7 +230,7 @@ async def test_session_automations_route_filters_by_webui_session(
assert resp.status_code == 200
body = resp.json()
assert [job["name"] for job in body["jobs"]] == ["Morning check"]
assert [job["name"] for job in body["jobs"]] == ["Morning check", "Legacy same target"]
job = body["jobs"][0]
assert job["schedule"]["kind"] == "every"
assert job["schedule"]["every_ms"] == 3_600_000
@ -249,12 +251,16 @@ async def test_session_automations_route_ignores_unified_owner(
schedule=hourly,
message="Check the shared session",
session_key=UNIFIED_SESSION_KEY,
origin_channel="websocket",
origin_chat_id="abc",
)
cron.add_job(
name="Visible chat job",
schedule=hourly,
message="Show for this chat",
session_key="websocket:abc",
origin_channel="websocket",
origin_chat_id="abc",
)
channel = _ch(
bus,
@ -728,6 +734,8 @@ async def test_session_delete_blocks_when_bound_automation_exists(
schedule=CronSchedule(kind="every", every_ms=86_400_000),
message="Check the repo",
session_key="websocket:doomed",
origin_channel="websocket",
origin_chat_id="doomed",
)
channel = _ch(bus, session_manager=sm, cron_service=cron, port=29915)
server_task = asyncio.create_task(channel.start())
@ -767,6 +775,8 @@ async def test_session_delete_can_cascade_bound_automations(
schedule=CronSchedule(kind="every", every_ms=86_400_000),
message="Check the repo",
session_key="websocket:doomed",
origin_channel="websocket",
origin_chat_id="doomed",
)
cron.add_job(
name="Legacy same target",
@ -793,9 +803,7 @@ async def test_session_delete_can_cascade_bound_automations(
assert resp.json()["deleted"] is True
assert not path.exists()
assert cron.list_bound_cron_jobs_for_session("websocket:doomed") == []
assert [job.name for job in cron.list_jobs(include_disabled=True)] == [
"Legacy same target"
]
assert cron.list_jobs(include_disabled=True) == []
finally:
await channel.stop()
await server_task
@ -813,6 +821,8 @@ async def test_session_delete_blocks_origin_automation_when_unified_enabled(
schedule=CronSchedule(kind="every", every_ms=86_400_000),
message="Check this chat",
session_key="websocket:doomed",
origin_channel="websocket",
origin_chat_id="doomed",
)
channel = _ch(
bus,

View File

@ -1185,7 +1185,7 @@ def test_gateway_uses_workspace_directory_for_cron_store(monkeypatch, tmp_path:
assert seen["cron_store"] == config.workspace_path / "cron" / "jobs.json"
def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
def test_gateway_unbound_agent_cron_is_skipped(
monkeypatch, tmp_path: Path
) -> None:
config_file = tmp_path / "instance" / "config.json"
@ -1250,11 +1250,10 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
seen["agent"] = self
async def process_direct(self, *_args, **_kwargs):
return OutboundMessage(
channel="telegram",
chat_id="user-1",
content="Time to stretch.",
)
raise AssertionError("unbound cron job must not use process_direct")
async def submit_cron_turn(self, _msg: InboundMessage):
raise AssertionError("unbound cron job must not run as a bound cron turn")
async def close_mcp(self) -> None:
return None
@ -1270,16 +1269,10 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
raise _StopGatewayError("stop")
async def _capture_evaluate_response(
response: str,
task_context: str,
provider_arg: object,
model: str,
*_args,
**_kwargs,
) -> bool:
seen["response"] = response
seen["task_context"] = task_context
seen["provider"] = provider_arg
seen["model"] = model
return True
raise AssertionError("unbound cron job must not be evaluated for delivery")
monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron)
monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop)
@ -1314,214 +1307,9 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
response = asyncio.run(cron.on_job(job))
assert response == "Time to stretch."
assert seen["response"] == "Time to stretch."
assert seen["provider"] is runtime_provider
assert seen["model"] == "runtime-model"
assert seen["task_context"] == (
"The scheduled time has arrived. Deliver this reminder to the user now, "
"as a brief and natural message in their language. Speak directly to them — "
"do not narrate progress, summarize, include user IDs, or add status reports "
"like 'Done' or 'Reminded'.\n\n"
"Reminder: Remind me to stretch."
)
bus.publish_outbound.assert_awaited_once_with(
OutboundMessage(
channel="telegram",
chat_id="user-1",
content="Time to stretch.",
)
)
assert seen["session_key"] == "telegram:user-1"
saved_session = seen["saved_session"]
assert isinstance(saved_session, _FakeSession)
assert saved_session.messages == [
{
"role": "assistant",
"content": "Time to stretch.",
"_channel_delivery": True,
}
]
bus.publish_outbound.reset_mock()
old_turn_id = "turn-that-created-the-reminder"
websocket_job = CronJob(
id="drink-water",
name="drink water",
payload=CronPayload(
message="Remind me to drink water.",
deliver=True,
channel="websocket",
to="chat-1",
channel_meta={
"webui": True,
WEBUI_TURN_METADATA_KEY: old_turn_id,
"workspace_scope": {"mode": "default"},
},
),
)
response = asyncio.run(cron.on_job(websocket_job))
assert response == "Time to stretch."
bus.publish_outbound.assert_awaited_once()
delivered = bus.publish_outbound.await_args.args[0]
assert delivered.channel == "websocket"
assert delivered.chat_id == "chat-1"
assert delivered.metadata["webui"] is True
assert delivered.metadata["workspace_scope"] == {"mode": "default"}
assert delivered.metadata[WEBUI_TURN_METADATA_KEY].startswith("cron:drink-water:")
assert delivered.metadata[WEBUI_TURN_METADATA_KEY] != old_turn_id
assert delivered.metadata[WEBUI_MESSAGE_SOURCE_METADATA_KEY] == {
"kind": "cron",
"label": "drink water",
}
def test_gateway_legacy_cron_payloads_with_session_key_stay_legacy(
monkeypatch, tmp_path: Path
) -> None:
config_file = _write_instance_config(tmp_path)
config = Config()
config.agents.defaults.workspace = str(tmp_path / "config-workspace")
bus = MagicMock()
bus.publish_outbound = AsyncMock()
seen: dict[str, object] = {"process_calls": [], "evaluations": [], "saved_keys": []}
class _FakeSession:
def __init__(self) -> None:
self.messages = []
def add_message(self, role: str, content: str, **kwargs) -> None:
self.messages.append({"role": role, "content": content, **kwargs})
class _FakeSessionManager:
def __init__(self, _workspace: Path) -> None:
self.session = _FakeSession()
seen["session_manager"] = self
def read_session_file(self, _key: str) -> dict[str, object]:
return {"metadata": {}}
def get_or_create(self, key: str) -> _FakeSession:
seen["saved_keys"].append(key)
return self.session
def save(self, session: _FakeSession) -> None:
seen["saved_session"] = session
class _FakeCron:
def __init__(self, _store_path: Path) -> None:
self.on_job = None
seen["cron"] = self
class _FakeAgentLoop:
@classmethod
def from_config(cls, config, bus=None, **extra):
return cls(**extra)
def __init__(self, *args, **kwargs) -> None:
self.model = "test-model"
self.provider = kwargs.get("provider", object())
self.tools = {}
async def process_direct(self, prompt: str, **kwargs):
seen["process_calls"].append((prompt, kwargs))
return OutboundMessage(
channel=kwargs["channel"],
chat_id=kwargs["chat_id"],
content="Legacy response.",
)
async def submit_cron_turn(self, _msg: InboundMessage):
raise AssertionError("legacy cron payload must not run as bound cron turn")
async def close_mcp(self) -> None:
return None
async def run(self) -> None:
return None
def stop(self) -> None:
return None
class _StopAfterCronSetup:
def __init__(self, *_args, **_kwargs) -> None:
raise _StopGatewayError("stop")
async def _capture_evaluate_response(*args, **_kwargs) -> bool:
seen["evaluations"].append(args)
return True
_patch_cli_command_runtime(
monkeypatch,
config,
message_bus=lambda: bus,
session_manager=_FakeSessionManager,
cron_service=_FakeCron,
)
monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop)
monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup)
monkeypatch.setattr(
"nanobot.cli.commands.evaluate_response",
_capture_evaluate_response,
)
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
assert isinstance(result.exception, _StopGatewayError)
cron = seen["cron"]
silent_job = CronJob(
id="silent-legacy",
name="Silent legacy",
payload=CronPayload(
message="Run silently.",
deliver=False,
channel="telegram",
to="user-1",
session_key="telegram:user-1",
),
)
response = asyncio.run(cron.on_job(silent_job))
assert response == "Legacy response."
prompt, kwargs = seen["process_calls"][-1]
assert "Reminder: Run silently." in prompt
assert kwargs["session_key"] == "cron:silent-legacy"
assert kwargs["channel"] == "telegram"
assert kwargs["chat_id"] == "user-1"
assert seen["evaluations"] == []
assert response is None
bus.publish_outbound.assert_not_awaited()
topic_job = CronJob(
id="topic-legacy",
name="Topic legacy",
payload=CronPayload(
message="Ping the topic.",
deliver=True,
channel="telegram",
to="-100123",
channel_meta={"message_thread_id": 42},
session_key="telegram:-100123:topic:42",
),
)
response = asyncio.run(cron.on_job(topic_job))
assert response == "Legacy response."
_prompt, kwargs = seen["process_calls"][-1]
assert kwargs["session_key"] == "cron:topic-legacy"
assert kwargs["channel"] == "telegram"
assert kwargs["chat_id"] == "-100123"
assert len(seen["evaluations"]) == 1
bus.publish_outbound.assert_awaited_once()
delivered = bus.publish_outbound.await_args.args[0]
assert delivered.channel == "telegram"
assert delivered.chat_id == "-100123"
assert delivered.metadata["message_thread_id"] == 42
assert seen["saved_keys"] == ["telegram:-100123:topic:42"]
def test_gateway_bound_cron_runs_as_session_turn(
monkeypatch, tmp_path: Path
@ -1724,109 +1512,6 @@ def test_gateway_bound_cron_runs_as_session_turn(
assert msg.metadata["thread_id"] == "om_root123"
def test_gateway_cron_job_suppresses_intermediate_progress(
monkeypatch, tmp_path: Path
) -> None:
"""Cron jobs must pass on_progress=_silent to process_direct so that
tool hints and streaming deltas are never leaked to the user channel
before evaluate_response decides whether to deliver."""
config_file = tmp_path / "instance" / "config.json"
config_file.parent.mkdir(parents=True)
config_file.write_text("{}")
config = Config()
config.agents.defaults.workspace = str(tmp_path / "config-workspace")
bus = MagicMock()
bus.publish_outbound = AsyncMock()
seen: dict[str, object] = {}
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
monkeypatch.setattr("nanobot.providers.factory.make_provider", lambda _config: _fake_provider())
monkeypatch.setattr(
"nanobot.providers.factory.build_provider_snapshot",
lambda _config: _test_provider_snapshot(object(), _config),
)
monkeypatch.setattr(
"nanobot.providers.factory.load_provider_snapshot",
lambda _config_path=None: _test_provider_snapshot(object(), config),
)
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus)
monkeypatch.setattr("nanobot.session.manager.SessionManager", lambda _workspace: object())
class _FakeCron:
def __init__(self, _store_path: Path) -> None:
self.on_job = None
seen["cron"] = self
class _FakeAgentLoop:
@classmethod
def from_config(cls, config, bus=None, **extra):
return cls(**extra)
def __init__(self, *args, **kwargs) -> None:
self.model = "test-model"
self.provider = object()
self.tools = {}
async def process_direct(self, *_args, on_progress=None, **_kwargs):
seen["on_progress"] = on_progress
return OutboundMessage(
channel="telegram",
chat_id="user-1",
content="Done.",
)
async def close_mcp(self) -> None:
return None
async def run(self) -> None:
return None
def stop(self) -> None:
return None
class _StopAfterCronSetup:
def __init__(self, *_args, **_kwargs) -> None:
raise _StopGatewayError("stop")
async def _always_reject(*_args, **_kwargs) -> bool:
return False
monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron)
monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop)
monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup)
monkeypatch.setattr(
"nanobot.cli.commands.evaluate_response",
_always_reject,
)
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
assert isinstance(result.exception, _StopGatewayError)
cron = seen["cron"]
job = CronJob(
id="cron-silent-test",
name="test-silent",
payload=CronPayload(
message="Run something.",
deliver=True,
channel="telegram",
to="user-1",
),
)
response = asyncio.run(cron.on_job(job))
assert response == "Done."
# on_progress must be a callable (the _silent noop), not None and not bus_progress
assert seen["on_progress"] is not None
assert callable(seen["on_progress"])
# Verify it actually swallows calls (no side effects)
asyncio.run(seen["on_progress"]("tool_hint", "🔧 $ echo test"))
# Nothing published to bus since evaluator rejected
bus.publish_outbound.assert_not_awaited()
def test_gateway_workspace_override_does_not_migrate_legacy_cron(
monkeypatch, tmp_path: Path
) -> None:

View File

@ -43,7 +43,7 @@ def test_add_job_accepts_valid_timezone(tmp_path) -> None:
assert job.state.next_run_at_ms is not None
def test_add_job_preserves_channel_meta_and_session_key(tmp_path) -> None:
def test_add_job_migrates_legacy_delivery_context(tmp_path) -> None:
service = CronService(tmp_path / "cron" / "jobs.json")
meta = {"slack": {"thread_ts": "1234567890.123456", "channel_type": "channel"}}
job = service.add_job(
@ -56,16 +56,108 @@ def test_add_job_preserves_channel_meta_and_session_key(tmp_path) -> None:
channel_meta=meta,
session_key="slack:C123:1234567890.123456",
)
assert job.payload.channel_meta == meta
assert job.payload.deliver is False
assert job.payload.channel is None
assert job.payload.to is None
assert job.payload.channel_meta == {}
assert job.payload.session_key == "slack:C123:1234567890.123456"
assert job.payload.origin_channel == "slack"
assert job.payload.origin_chat_id == "C123"
assert job.payload.origin_metadata == meta
reloaded = service.get_job(job.id)
assert reloaded is not None
assert reloaded.payload.channel_meta == meta
assert reloaded.payload.channel_meta == {}
assert reloaded.payload.session_key == "slack:C123:1234567890.123456"
assert reloaded.payload.origin_channel == "slack"
assert reloaded.payload.origin_chat_id == "C123"
assert reloaded.payload.origin_metadata == meta
def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> None:
def test_load_store_migrates_legacy_delivery_context(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
store_path.parent.mkdir(parents=True)
store_path.write_text(
json.dumps(
{
"version": 1,
"jobs": [
{
"id": "legacy-1",
"name": "Legacy reminder",
"enabled": True,
"schedule": {"kind": "every", "everyMs": 60_000},
"payload": {
"kind": "agent_turn",
"message": "check status",
"deliver": True,
"channel": "telegram",
"to": "user-1",
"channelMeta": {"message_thread_id": 42},
"sessionKey": "telegram:user-1:topic:42",
},
"state": {},
"createdAtMs": 1,
"updatedAtMs": 1,
}
],
}
),
encoding="utf-8",
)
job = CronService(store_path).get_job("legacy-1")
assert job is not None
assert job.payload.session_key == "telegram:user-1:topic:42"
assert job.payload.origin_channel == "telegram"
assert job.payload.origin_chat_id == "user-1"
assert job.payload.origin_metadata == {"message_thread_id": 42}
assert job.payload.deliver is False
assert job.payload.channel is None
assert job.payload.to is None
assert job.payload.channel_meta == {}
def test_load_store_disables_malformed_legacy_payload(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
store_path.parent.mkdir(parents=True)
store_path.write_text(
json.dumps(
{
"version": 1,
"jobs": [
{
"id": "legacy-bad",
"name": "Broken legacy",
"enabled": True,
"schedule": {"kind": "every", "everyMs": 60_000},
"payload": {
"kind": "agent_turn",
"message": "check status",
"deliver": True,
},
"state": {"nextRunAtMs": 123},
"createdAtMs": 1,
"updatedAtMs": 1,
}
],
}
),
encoding="utf-8",
)
job = CronService(store_path).get_job("legacy-bad")
assert job is not None
assert job.enabled is False
assert job.state.next_run_at_ms is None
assert job.state.last_status == "error"
assert "missing channel/to" in (job.state.last_error or "")
assert job.payload.deliver is False
def test_list_bound_agent_jobs_includes_migrated_legacy_delivery_payloads(tmp_path) -> None:
service = CronService(tmp_path / "cron" / "jobs.json")
schedule = CronSchedule(kind="every", every_ms=60_000)
bound = service.add_job(
@ -73,8 +165,10 @@ def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> No
schedule=schedule,
message="new bound job",
session_key="websocket:chat-1",
origin_channel="websocket",
origin_chat_id="chat-1",
)
service.add_job(
migrated = service.add_job(
name="Legacy same session",
schedule=schedule,
message="legacy job",
@ -84,7 +178,7 @@ def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> No
session_key="websocket:chat-1",
)
assert service.list_bound_cron_jobs_for_session("websocket:chat-1") == [bound]
assert service.list_bound_cron_jobs_for_session("websocket:chat-1") == [bound, migrated]
def test_add_job_preserves_origin_delivery_context(tmp_path) -> None:
@ -143,7 +237,10 @@ async def test_channel_meta_and_session_key_survive_store_reload(tmp_path) -> No
raw = json.loads(store_path.read_text(encoding="utf-8"))
payload = raw["jobs"][0]["payload"]
assert payload["channelMeta"] == meta
assert payload["deliver"] is False
assert payload["channel"] is None
assert payload["to"] is None
assert payload["channelMeta"] == {}
assert payload["sessionKey"] == "slack:C123:1234567890.123456"
assert payload["originChannel"] == "slack"
assert payload["originChatId"] == "C123"
@ -151,7 +248,7 @@ async def test_channel_meta_and_session_key_survive_store_reload(tmp_path) -> No
reloaded = CronService(store_path).get_job(job.id)
assert reloaded is not None
assert reloaded.payload.channel_meta == meta
assert reloaded.payload.channel_meta == {}
assert reloaded.payload.session_key == "slack:C123:1234567890.123456"
assert reloaded.payload.origin_channel == "slack"
assert reloaded.payload.origin_chat_id == "C123"
@ -648,28 +745,22 @@ def test_update_job_offline_writes_action(tmp_path) -> None:
assert last["params"]["name"] == "updated-offline"
def test_update_job_sentinel_channel_and_to(tmp_path) -> None:
"""Passing None clears channel/to; omitting leaves them unchanged."""
def test_update_job_migrates_legacy_delivery_target(tmp_path) -> None:
service = CronService(tmp_path / "cron" / "jobs.json")
job = service.add_job(
name="sentinel",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
channel="telegram",
to="user123",
)
assert job.payload.channel == "telegram"
assert job.payload.to == "user123"
result = service.update_job(job.id, name="renamed")
assert isinstance(result, CronJob)
assert result.payload.channel == "telegram"
assert result.payload.to == "user123"
result = service.update_job(job.id, channel=None, to=None)
result = service.update_job(job.id, channel="telegram", to="user123")
assert isinstance(result, CronJob)
assert result.payload.session_key == "telegram:user123"
assert result.payload.origin_channel == "telegram"
assert result.payload.origin_chat_id == "user123"
assert result.payload.channel is None
assert result.payload.to is None
assert result.payload.channel_meta == {}
@pytest.mark.asyncio