fix: preserve legacy cron delivery payloads

maintainer edit: keep existing cron jobs with legacy delivery fields on the legacy execution path, even when they already carry a sessionKey. This preserves deliver=false behavior and channel-specific routing metadata for upgraded jobs.
This commit is contained in:
chengyongru 2026-06-11 22:26:06 +08:00
parent f82ab9f192
commit b4b6c04657
2 changed files with 157 additions and 1 deletions

View File

@ -1189,6 +1189,17 @@ def _run_gateway(
)
return response
def _is_bound_cron_job(job: CronJob) -> bool:
payload = job.payload
if payload.kind != "agent_turn" or not payload.session_key:
return False
return not (
payload.deliver
or payload.channel
or payload.to
or payload.channel_meta
)
async def _deliver_to_channel(
msg: OutboundMessage, *, record: bool = False, session_key: str | None = None,
) -> None:
@ -1349,7 +1360,7 @@ def _run_gateway(
logger.info("Heartbeat: silenced by post-run evaluation")
return response
if job.payload.kind == "agent_turn" and job.payload.session_key:
if _is_bound_cron_job(job):
return await _run_bound_cron_job(job)
reminder_note = (

View File

@ -1373,6 +1373,151 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
}
def test_gateway_legacy_cron_payloads_with_session_key_stay_legacy(
monkeypatch, tmp_path: Path
) -> None:
config_file = _write_instance_config(tmp_path)
config = Config()
config.agents.defaults.workspace = str(tmp_path / "config-workspace")
bus = MagicMock()
bus.publish_outbound = AsyncMock()
seen: dict[str, object] = {"process_calls": [], "evaluations": [], "saved_keys": []}
class _FakeSession:
def __init__(self) -> None:
self.messages = []
def add_message(self, role: str, content: str, **kwargs) -> None:
self.messages.append({"role": role, "content": content, **kwargs})
class _FakeSessionManager:
def __init__(self, _workspace: Path) -> None:
self.session = _FakeSession()
seen["session_manager"] = self
def read_session_file(self, _key: str) -> dict[str, object]:
return {"metadata": {}}
def get_or_create(self, key: str) -> _FakeSession:
seen["saved_keys"].append(key)
return self.session
def save(self, session: _FakeSession) -> None:
seen["saved_session"] = session
class _FakeCron:
def __init__(self, _store_path: Path) -> None:
self.on_job = None
seen["cron"] = self
class _FakeAgentLoop:
@classmethod
def from_config(cls, config, bus=None, **extra):
return cls(**extra)
def __init__(self, *args, **kwargs) -> None:
self.model = "test-model"
self.provider = kwargs.get("provider", object())
self.tools = {}
async def process_direct(self, prompt: str, **kwargs):
seen["process_calls"].append((prompt, kwargs))
return OutboundMessage(
channel=kwargs["channel"],
chat_id=kwargs["chat_id"],
content="Legacy response.",
)
async def submit_automation_turn(self, _msg: InboundMessage):
raise AssertionError("legacy cron payload must not run as bound automation")
async def close_mcp(self) -> None:
return None
async def run(self) -> None:
return None
def stop(self) -> None:
return None
class _StopAfterCronSetup:
def __init__(self, *_args, **_kwargs) -> None:
raise _StopGatewayError("stop")
async def _capture_evaluate_response(*args, **_kwargs) -> bool:
seen["evaluations"].append(args)
return True
_patch_cli_command_runtime(
monkeypatch,
config,
message_bus=lambda: bus,
session_manager=_FakeSessionManager,
cron_service=_FakeCron,
)
monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop)
monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup)
monkeypatch.setattr(
"nanobot.cli.commands.evaluate_response",
_capture_evaluate_response,
)
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
assert isinstance(result.exception, _StopGatewayError)
cron = seen["cron"]
silent_job = CronJob(
id="silent-legacy",
name="Silent legacy",
payload=CronPayload(
message="Run silently.",
deliver=False,
channel="telegram",
to="user-1",
session_key="telegram:user-1",
),
)
response = asyncio.run(cron.on_job(silent_job))
assert response == "Legacy response."
prompt, kwargs = seen["process_calls"][-1]
assert "Reminder: Run silently." in prompt
assert kwargs["session_key"] == "cron:silent-legacy"
assert kwargs["channel"] == "telegram"
assert kwargs["chat_id"] == "user-1"
assert seen["evaluations"] == []
bus.publish_outbound.assert_not_awaited()
topic_job = CronJob(
id="topic-legacy",
name="Topic legacy",
payload=CronPayload(
message="Ping the topic.",
deliver=True,
channel="telegram",
to="-100123",
channel_meta={"message_thread_id": 42},
session_key="telegram:-100123:topic:42",
),
)
response = asyncio.run(cron.on_job(topic_job))
assert response == "Legacy response."
_prompt, kwargs = seen["process_calls"][-1]
assert kwargs["session_key"] == "cron:topic-legacy"
assert kwargs["channel"] == "telegram"
assert kwargs["chat_id"] == "-100123"
assert len(seen["evaluations"]) == 1
bus.publish_outbound.assert_awaited_once()
delivered = bus.publish_outbound.await_args.args[0]
assert delivered.channel == "telegram"
assert delivered.chat_id == "-100123"
assert delivered.metadata["message_thread_id"] == 42
assert seen["saved_keys"] == ["telegram:-100123:topic:42"]
def test_gateway_bound_cron_runs_as_session_turn(
monkeypatch, tmp_path: Path
) -> None: