fix: use shared bound cron predicate

maintainer edit: make gateway execution, WebUI automation listing, and delete protection agree on the new bound cron shape. Legacy delivery payloads that carry sessionKey are excluded from the WebUI-bound automation surface.
This commit is contained in:
chengyongru 2026-06-11 23:09:21 +08:00
parent b4b6c04657
commit 3725b42e0e
5 changed files with 43 additions and 15 deletions

View File

@ -980,6 +980,7 @@ def _run_gateway(
from nanobot.cron.automation import ( from nanobot.cron.automation import (
AUTOMATION_DEFER_UNTIL_IDLE_META, AUTOMATION_DEFER_UNTIL_IDLE_META,
AUTOMATION_TRIGGER_META, AUTOMATION_TRIGGER_META,
is_bound_agent_job,
) )
from nanobot.cron.service import CronService from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob from nanobot.cron.types import CronJob
@ -1189,17 +1190,6 @@ def _run_gateway(
) )
return response return response
def _is_bound_cron_job(job: CronJob) -> bool:
payload = job.payload
if payload.kind != "agent_turn" or not payload.session_key:
return False
return not (
payload.deliver
or payload.channel
or payload.to
or payload.channel_meta
)
async def _deliver_to_channel( async def _deliver_to_channel(
msg: OutboundMessage, *, record: bool = False, session_key: str | None = None, msg: OutboundMessage, *, record: bool = False, session_key: str | None = None,
) -> None: ) -> None:
@ -1360,7 +1350,7 @@ def _run_gateway(
logger.info("Heartbeat: silenced by post-run evaluation") logger.info("Heartbeat: silenced by post-run evaluation")
return response return response
if _is_bound_cron_job(job): if is_bound_agent_job(job):
return await _run_bound_cron_job(job) return await _run_bound_cron_job(job)
reminder_note = ( reminder_note = (

View File

@ -4,6 +4,8 @@ from __future__ import annotations
from typing import Any, Mapping from typing import Any, Mapping
from nanobot.cron.types import CronJob
AUTOMATION_TRIGGER_META = "_automation_trigger" AUTOMATION_TRIGGER_META = "_automation_trigger"
AUTOMATION_DEFER_UNTIL_IDLE_META = "_defer_until_session_idle" AUTOMATION_DEFER_UNTIL_IDLE_META = "_defer_until_session_idle"
@ -31,3 +33,16 @@ def automation_run_id(metadata: Mapping[str, Any] | None) -> str | None:
return None return None
value = trigger.get("run_id") value = trigger.get("run_id")
return value if isinstance(value, str) and value else None return value if isinstance(value, str) and value else None
def is_bound_agent_job(job: CronJob) -> bool:
"""True for new session-bound user automations, excluding legacy delivery payloads."""
payload = job.payload
if payload.kind != "agent_turn" or not payload.session_key:
return False
return not (
payload.deliver
or payload.channel
or payload.to
or payload.channel_meta
)

View File

@ -14,6 +14,7 @@ from typing import Any, Callable, Coroutine, Literal
from filelock import FileLock from filelock import FileLock
from loguru import logger from loguru import logger
from nanobot.cron.automation import is_bound_agent_job
from nanobot.cron.types import ( from nanobot.cron.types import (
CronJob, CronJob,
CronJobState, CronJobState,
@ -508,7 +509,7 @@ class CronService:
return [ return [
job job
for job in self.list_jobs(include_disabled=include_disabled) for job in self.list_jobs(include_disabled=include_disabled)
if job.payload.kind == "agent_turn" if is_bound_agent_job(job)
and job.payload.session_key == session_key and job.payload.session_key == session_key
] ]

View File

@ -184,16 +184,16 @@ async def test_session_automations_route_filters_by_webui_session(
name=name, name=name,
schedule=hourly, schedule=hourly,
message=message, message=message,
channel="websocket",
to=to,
session_key=f"websocket:{to}", session_key=f"websocket:{to}",
) )
cron.add_job( cron.add_job(
name="Legacy same target", name="Legacy same target",
schedule=hourly, schedule=hourly,
message="Legacy job should not be treated as bound", message="Legacy job should not be treated as bound",
deliver=True,
channel="websocket", channel="websocket",
to="abc", to="abc",
session_key="websocket:abc",
) )
cron.register_system_job( cron.register_system_job(
CronJob( CronJob(

View File

@ -65,6 +65,28 @@ def test_add_job_preserves_channel_meta_and_session_key(tmp_path) -> None:
assert reloaded.payload.session_key == "slack:C123:1234567890.123456" assert reloaded.payload.session_key == "slack:C123:1234567890.123456"
def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> None:
service = CronService(tmp_path / "cron" / "jobs.json")
schedule = CronSchedule(kind="every", every_ms=60_000)
bound = service.add_job(
name="Bound",
schedule=schedule,
message="new bound job",
session_key="websocket:chat-1",
)
service.add_job(
name="Legacy same session",
schedule=schedule,
message="legacy job",
deliver=True,
channel="websocket",
to="chat-1",
session_key="websocket:chat-1",
)
assert service.list_bound_agent_jobs_for_session("websocket:chat-1") == [bound]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_channel_meta_and_session_key_survive_store_reload(tmp_path) -> None: async def test_channel_meta_and_session_key_survive_store_reload(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json" store_path = tmp_path / "cron" / "jobs.json"