diff --git a/.agent/cron-session-memory.md b/.agent/cron-session-memory.md index ecc2ee75a..57f5afc17 100644 --- a/.agent/cron-session-memory.md +++ b/.agent/cron-session-memory.md @@ -1,7 +1,7 @@ # Cron / Session / Memory Design Decisions This note records the agreed design direction for fixing the mismatch between -scheduled automations and chat session memory. +scheduled cron jobs and chat session memory. ## Problem @@ -14,7 +14,7 @@ The visible failure mode is awkward: a cron job reports something into a chat, the user discusses it in that chat, and the next cron run behaves as if that discussion never happened. -The fix is not to make cron a separate delivery system. A user automation should +The fix is not to make cron a separate delivery system. A user cron job should be a scheduled input into a session. ## Core Model @@ -39,11 +39,11 @@ These fields are legacy-only. New cron creation should not depend on them. Use explicit branching: -- **Bound user automation**: `payload.kind == "agent_turn"`, +- **Bound user cron job**: `payload.kind == "agent_turn"`, `payload.session_key` is present, and no legacy delivery fields (`deliver`, `channel`, `to`, or `channel_meta`) are set. This uses the new session-turn model. -- **Legacy unbound automation**: user job with no `payload.session_key`. Keep the +- **Legacy unbound cron job**: user job with no `payload.session_key`. Keep the existing behavior. Do not migrate, infer, bind, or add UI for these jobs in this change. - **System job**: `payload.kind == "system_event"` or known internal jobs such @@ -54,7 +54,7 @@ The project should not grow a compatibility subsystem for legacy jobs. Missing ## New Job Creation -`CronTool` must create user automations with a `session_key`. +`CronTool` must create user cron jobs with a `session_key`. - If no request/session context exists, `cron action=add` should fail. - Do not create new unbound jobs. @@ -66,16 +66,16 @@ The project should not grow a compatibility subsystem for legacy jobs. Missing ## Execution Path -Bound user automations should execute through `AgentLoop` as internal inbound +Bound user cron jobs should execute through `AgentLoop` as internal inbound session events, not as an out-of-band `agent.process_direct()` call. The intended flow is: ```text -cron due -> create automation inbound -> AgentLoop dispatches session turn +cron due -> create cron inbound -> AgentLoop dispatches session turn ``` -The inbound event should carry metadata identifying the automation, such as: +The inbound event should carry metadata identifying the cron run, such as: - job id - job name @@ -93,28 +93,28 @@ to legacy `payload.channel`, `payload.to`, or `payload.channel_meta` for bound jobs. Those fields are only for the legacy unbound path. The scheduler must not mark a bound job run as complete just because the inbound -event was queued. It should either wait for the automation turn to complete and +event was queued. It should either wait for the cron turn to complete and record the real outcome, or explicitly model the run as separate states such as -`queued` and `turn_completed`. A failed automation turn must be reflected in the +`queued` and `turn_completed`. A failed cron turn must be reflected in the cron run record/job state, not hidden behind a successful enqueue. ## Active Session Behavior Cron must not interrupt an active session turn. -- If the target session is idle, run the automation turn immediately. -- If the target session is running, defer the automation until the current turn +- If the target session is idle, run the cron turn immediately. +- If the target session is running, defer the cron turn until the current turn completes. -- Do not inject the automation into the active turn's runtime context. -- Do not route automation messages into the existing mid-turn pending injection +- Do not inject the cron turn into the active turn's runtime context. +- Do not route cron messages into the existing mid-turn pending injection queue. -- UI/runtime status may show that an automation is queued, but the current LLM - call should not see the queued automation. +- UI/runtime status may show that a cron run is queued, but the current LLM + call should not see the queued cron turn. -Automation inbound events need explicit metadata, for example -`_automation_trigger` plus `_defer_until_session_idle`. `AgentLoop.run()` must +Cron inbound events need explicit metadata, for example +`_cron_trigger` plus `_cron_defer_until_session_idle`. `AgentLoop.run()` must recognize that metadata before the existing `_pending_queues` mid-turn injection -branch. If the session is active, the event goes to a deferred automation queue, +branch. If the session is active, the event goes to a deferred cron queue, not the pending injection queue. The user experience goal is: cron can run after the current answer, but it @@ -124,17 +124,17 @@ should not take over an answer already in progress. Do not persist the raw internal execution prompt as a normal user message. -Instead, persist a readable automation trigger event, for example: +Instead, persist a readable cron trigger event, for example: ```json { "role": "user", - "content": "Scheduled automation triggered: daily monitor\n\nCheck ...", - "_automation_turn": true, - "automation_id": "abc123", - "automation_name": "daily monitor", - "automation_run_id": "abc123:1770000000000", - "automation_prompt_ref": { + "content": "Scheduled cron job triggered: daily monitor\n\nCheck ...", + "_cron_turn": true, + "cron_job_id": "abc123", + "cron_job_name": "daily monitor", + "cron_run_id": "abc123:1770000000000", + "cron_prompt_ref": { "id": "cron.agent_turn.reminder", "version": 1, "sha256": "..." @@ -160,7 +160,7 @@ Preferred direction: - Move the cron execution prompt out of `commands.py` into a named template. - Use a stable prompt id such as `cron.agent_turn.reminder`. -- Store `prompt_ref` and `automation_run_id` in session history. +- Store `prompt_ref` and `cron_run_id` in session history. - Store the full rendered prompt, prompt variables, and errors in an internal run record. @@ -169,15 +169,15 @@ cron store grow without bound. ## Visibility and Evaluation -A bound user automation is a real session turn. +A bound user cron job is a real session turn. - If it succeeds, save and publish the assistant response. -- Do not pass bound automation responses through `evaluate_response()`. +- Do not pass bound cron responses through `evaluate_response()`. - Keep `evaluate_response()` only for system/legacy paths where the old behavior still applies. - Avoid states where session history contains a response the user never saw. -If a bound automation starts executing, it must leave a visible closure in the +If a bound cron job starts executing, it must leave a visible closure in the session: - success response @@ -189,9 +189,10 @@ the user-facing transcript. ## Deleting Sessions -Deleting a session with bound automations should be a two-step operation. +Deleting a session with bound cron jobs should be a two-step operation. -Default delete behavior should block and return the associated automations: +Default delete behavior should block and return the associated cron jobs. +Existing WebUI/API response field names are kept for compatibility: ```json { @@ -203,7 +204,7 @@ Default delete behavior should block and return the associated automations: } ``` -After explicit confirmation, the API may delete the bound user automations and +After explicit confirmation, the API may delete the bound user cron jobs and then delete the session/thread. Rules: @@ -212,18 +213,18 @@ Rules: session being deleted. - Do not block on system jobs. - Do not block on legacy unbound jobs. -- In unified-session mode, WebUI chats display automations owned by +- In unified-session mode, WebUI chats display cron jobs owned by `unified:default`, but deleting an individual `websocket:*` thread should not - block on or delete those unified automations. + block on or delete those unified cron jobs. - If the user manually deletes files outside the WebUI/API, do not try to compensate. ## Unified Session Mode -When `unified_session` is enabled, WebUI-created automations should bind to the +When `unified_session` is enabled, WebUI-created cron jobs should bind to the same unified session as normal WebUI chat turns: `unified:default`. -- All WebUI chats should display automations owned by `unified:default`. +- All WebUI chats should display cron jobs owned by `unified:default`. - Individual WebUI thread deletion should remain scoped to the concrete `websocket:*` thread being deleted. - Toggling `unified_session` does not migrate existing cron jobs. Existing jobs @@ -232,15 +233,15 @@ same unified session as normal WebUI chat turns: `unified:default`. ## WebUI Scope -This change should not grow into a full automation manager. +This change should not grow into a global scheduler/task manager. Keep the scope focused: - Fix cron/session/memory semantics for new bound jobs. - Preserve legacy job behavior. -- Add deletion protection for sessions with bound automations. -- Update the existing session automation panel only as needed for the new - bound-job status. +- Add deletion protection for sessions with bound cron jobs. +- Update the existing WebUI panel that lists scheduled jobs only as needed for + the new bound-job status. Do not add deterministic legacy migration, legacy binding UI, or a global calendar/task manager in this change. @@ -258,6 +259,6 @@ execution path that behaves differently from scheduled runs. - No legacy migration. - No automatic binding of legacy jobs. - No runtime-context prompt asking the model to bind jobs. -- No new global automation manager. +- No new global scheduler/task manager. - No new delivery-target abstraction. - No user-visible manual cron run. diff --git a/nanobot/agent/cron_turns.py b/nanobot/agent/cron_turns.py new file mode 100644 index 000000000..54c34095e --- /dev/null +++ b/nanobot/agent/cron_turns.py @@ -0,0 +1,88 @@ +"""Coordination for scheduled cron turns.""" + +from __future__ import annotations + +import asyncio +from collections.abc import Awaitable, Callable, Iterable + +from nanobot.bus.events import InboundMessage, OutboundMessage +from nanobot.cron.session_turns import cron_run_id, defer_cron_until_session_idle + + +class CronTurnCoordinator: + """Manage scheduled cron turns without mixing them into live injections.""" + + def __init__( + self, + *, + publish_inbound: Callable[[InboundMessage], Awaitable[None]], + dispatch: Callable[[InboundMessage], Awaitable[object]], + is_running: Callable[[], bool], + ) -> None: + self._publish_inbound = publish_inbound + self._dispatch = dispatch + self._is_running = is_running + self.deferred_queues: dict[str, list[InboundMessage]] = {} + self._waiters: dict[str, asyncio.Future[OutboundMessage | None]] = {} + + async def submit(self, msg: InboundMessage) -> OutboundMessage | None: + """Submit a scheduled cron turn and wait for its session response.""" + run_id = cron_run_id(msg.metadata) + if not run_id: + raise ValueError("cron turn metadata must include a run_id") + if run_id in self._waiters: + raise RuntimeError(f"cron run {run_id!r} is already pending") + + loop = asyncio.get_running_loop() + future: asyncio.Future[OutboundMessage | None] = loop.create_future() + self._waiters[run_id] = future + try: + if self._is_running(): + await self._publish_inbound(msg) + else: + await self._dispatch(msg) + return await future + finally: + self._waiters.pop(run_id, None) + + def should_defer( + self, + msg: InboundMessage, + *, + session_key: str, + active_session_keys: Iterable[str], + ) -> bool: + return ( + defer_cron_until_session_idle(msg.metadata) + and session_key in active_session_keys + ) + + def complete( + self, + msg: InboundMessage, + *, + response: OutboundMessage | None = None, + error: BaseException | None = None, + ) -> None: + run_id = cron_run_id(msg.metadata) + if not run_id: + return + future = self._waiters.get(run_id) + if future is None or future.done(): + return + if error is not None: + future.set_exception(error) + else: + future.set_result(response) + + def defer(self, session_key: str, msg: InboundMessage) -> None: + self.deferred_queues.setdefault(session_key, []).append(msg) + + async def publish_next_deferred(self, session_key: str) -> None: + queue = self.deferred_queues.get(session_key) + if not queue: + return + msg = queue.pop(0) + if not queue: + self.deferred_queues.pop(session_key, None) + await self._publish_inbound(msg) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 2c1cf2375..fddc8180e 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -19,6 +19,7 @@ from nanobot.agent import context as agent_context from nanobot.agent import model_presets as preset_helpers from nanobot.agent.autocompact import AutoCompact from nanobot.agent.context import ContextBuilder +from nanobot.agent.cron_turns import CronTurnCoordinator from nanobot.agent.hook import AgentHook, CompositeHook from nanobot.agent.memory import Consolidator from nanobot.agent.progress_hook import AgentProgressHook @@ -39,11 +40,9 @@ from nanobot.bus.runtime_events import ( ) from nanobot.command import CommandContext, CommandRouter, register_builtin_commands from nanobot.config.schema import AgentDefaults, ModelPresetConfig -from nanobot.cron.automation import ( - AUTOMATION_HISTORY_META, - automation_run_id, - automation_trigger, - defer_until_session_idle, +from nanobot.cron.session_turns import ( + CRON_HISTORY_META, + cron_trigger, ) from nanobot.providers.base import LLMProvider from nanobot.providers.factory import ProviderSnapshot @@ -306,10 +305,11 @@ class AgentLoop: # When a session has an active task, new messages for that session # are routed here instead of creating a new task. self._pending_queues: dict[str, asyncio.Queue] = {} - # Scheduled automations wait for the current visible turn to finish. - # They must not be injected into the active model call as follow-up text. - self._deferred_automation_queues: dict[str, list[InboundMessage]] = {} - self._automation_waiters: dict[str, asyncio.Future[OutboundMessage | None]] = {} + self._cron_turns = CronTurnCoordinator( + publish_inbound=self.bus.publish_inbound, + dispatch=self._dispatch, + is_running=lambda: self._running, + ) # NANOBOT_MAX_CONCURRENT_REQUESTS: <=0 means unlimited; default 3. _max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3")) self._concurrency_gate: asyncio.Semaphore | None = ( @@ -573,54 +573,8 @@ class AgentLoop: def _runtime_events(self) -> RuntimeEventPublisher: return ensure_runtime_event_publisher(self) - async def submit_automation_turn(self, msg: InboundMessage) -> OutboundMessage | None: - """Submit a scheduled automation as an internal session turn and wait for it.""" - run_id = automation_run_id(msg.metadata) - if not run_id: - raise ValueError("automation turn metadata must include a run_id") - loop = asyncio.get_running_loop() - future: asyncio.Future[OutboundMessage | None] = loop.create_future() - if run_id in self._automation_waiters: - raise RuntimeError(f"automation run {run_id!r} is already pending") - self._automation_waiters[run_id] = future - try: - if self._running: - await self.bus.publish_inbound(msg) - else: - await self._dispatch(msg) - return await future - finally: - self._automation_waiters.pop(run_id, None) - - def _complete_automation_turn( - self, - msg: InboundMessage, - *, - response: OutboundMessage | None = None, - error: BaseException | None = None, - ) -> None: - run_id = automation_run_id(msg.metadata) - if not run_id: - return - future = self._automation_waiters.get(run_id) - if future is None or future.done(): - return - if error is not None: - future.set_exception(error) - else: - future.set_result(response) - - def _defer_automation_turn(self, session_key: str, msg: InboundMessage) -> None: - self._deferred_automation_queues.setdefault(session_key, []).append(msg) - - async def _publish_next_deferred_automation(self, session_key: str) -> None: - queue = self._deferred_automation_queues.get(session_key) - if not queue: - return - msg = queue.pop(0) - if not queue: - self._deferred_automation_queues.pop(session_key, None) - await self.bus.publish_inbound(msg) + async def submit_cron_turn(self, msg: InboundMessage) -> OutboundMessage | None: + return await self._cron_turns.submit(msg) def _persist_user_message_early( self, @@ -640,16 +594,16 @@ class AgentLoop: extra: dict[str, Any] = ({"media": list(media_paths)} if media_paths else {}) | agent_context.session_extra(msg.metadata) extra.update(kwargs) text = msg.content if isinstance(msg.content, str) else "" - if trigger := automation_trigger(msg.metadata): + if trigger := cron_trigger(msg.metadata): persist_content = trigger.get("persist_content") if isinstance(persist_content, str) and persist_content.strip(): text = persist_content extra.update({ - AUTOMATION_HISTORY_META: True, - "automation_id": trigger.get("job_id"), - "automation_name": trigger.get("job_name"), - "automation_run_id": trigger.get("run_id"), - "automation_prompt_ref": trigger.get("prompt_ref"), + CRON_HISTORY_META: True, + "cron_job_id": trigger.get("job_id"), + "cron_job_name": trigger.get("job_name"), + "cron_run_id": trigger.get("run_id"), + "cron_prompt_ref": trigger.get("prompt_ref"), }) session.add_message("user", text, **extra) self._mark_pending_user_turn(session) @@ -951,9 +905,10 @@ class AgentLoop: self.commands.dispatch_priority, ) continue - if ( - defer_until_session_idle(msg.metadata) - and effective_key in self._pending_queues + if self._cron_turns.should_defer( + msg, + session_key=effective_key, + active_session_keys=self._pending_queues.keys(), ): pending_msg = msg if effective_key != msg.session_key: @@ -961,9 +916,9 @@ class AgentLoop: msg, session_key_override=effective_key, ) - self._defer_automation_turn(effective_key, pending_msg) + self._cron_turns.defer(effective_key, pending_msg) logger.info( - "Deferred automation turn for active session {}", + "Deferred cron turn for active session {}", effective_key, ) continue @@ -1080,9 +1035,9 @@ class AgentLoop: session_key=session_key, metadata=msg.metadata, ) - self._complete_automation_turn(msg, response=response) + self._cron_turns.complete(msg, response=response) except asyncio.CancelledError: - self._complete_automation_turn( + self._cron_turns.complete( msg, error=asyncio.CancelledError(), ) @@ -1124,7 +1079,7 @@ class AgentLoop: session_key=session_key, metadata=msg.metadata, ) - self._complete_automation_turn(msg, error=exc) + self._cron_turns.complete(msg, error=exc) finally: # Drain any messages still in the pending queue and re-publish # them to the bus so they are processed as fresh inbound messages @@ -1155,14 +1110,14 @@ class AgentLoop: msg, session_key, "idle" ) self._runtime_events().clear_turn(session_key) - await self._publish_next_deferred_automation(session_key) + await self._cron_turns.publish_next_deferred(session_key) finally: if pending is None: await self._runtime_events().run_status_changed( msg, session_key, "idle" ) self._runtime_events().clear_turn(session_key) - await self._publish_next_deferred_automation(session_key) + await self._cron_turns.publish_next_deferred(session_key) async def close_mcp(self) -> None: """Drain pending background archives, then close MCP connections.""" diff --git a/nanobot/agent/tools/cron.py b/nanobot/agent/tools/cron.py index b1d3b41e2..07584b71e 100644 --- a/nanobot/agent/tools/cron.py +++ b/nanobot/agent/tools/cron.py @@ -71,7 +71,7 @@ class CronTool(Tool, ContextAware): return cls(cron_service=ctx.cron_service, default_timezone=ctx.timezone) def set_context(self, ctx: RequestContext) -> None: - """Set the current session context for scheduled automation ownership.""" + """Set the current session context for scheduled cron job ownership.""" self._channel.set(ctx.channel) self._chat_id.set(ctx.chat_id) self._metadata.set(ctx.metadata) @@ -166,7 +166,7 @@ class CronTool(Tool, ContextAware): ) session_key = self._session_key.get() if not session_key: - return "Error: scheduled automations must be created from a chat session" + return "Error: scheduled cron jobs must be created from a chat session" if tz and not cron_expr: return "Error: tz can only be used with cron_expr" if tz: diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 0f48635e2..9de150115 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -979,12 +979,12 @@ def _run_gateway( from nanobot.bus.queue import MessageBus from nanobot.bus.runtime_events import RuntimeEventBus from nanobot.channels.manager import ChannelManager - from nanobot.cron.automation import ( - AUTOMATION_DEFER_UNTIL_IDLE_META, - AUTOMATION_TRIGGER_META, - is_bound_agent_job, - ) from nanobot.cron.service import CronService + from nanobot.cron.session_turns import ( + CRON_DEFER_UNTIL_IDLE_META, + CRON_TRIGGER_META, + is_bound_cron_job, + ) from nanobot.cron.types import CronJob from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot from nanobot.providers.image_generation import image_gen_provider_configs @@ -1093,7 +1093,7 @@ def _run_gateway( return channel, rest, metadata - def _automation_prompt_ref(prompt: str) -> dict[str, Any]: + def _cron_prompt_ref(prompt: str) -> dict[str, Any]: return { "id": "cron.agent_turn.reminder", "version": 1, @@ -1110,23 +1110,23 @@ def _run_gateway( strip=True, message=job.payload.message, ) - prompt_ref = _automation_prompt_ref(prompt) + prompt_ref = _cron_prompt_ref(prompt) run_id = f"{job.id}:{int(time.time() * 1000)}:{uuid.uuid4().hex[:8]}" channel, chat_id, metadata = _bound_session_delivery_context( session_key, turn_seed=f"cron:{job.id}", source_label=job.name, ) - metadata[AUTOMATION_TRIGGER_META] = { + metadata[CRON_TRIGGER_META] = { "job_id": job.id, "job_name": job.name, "run_id": run_id, "prompt_ref": prompt_ref, "persist_content": ( - f"Scheduled automation triggered: {job.name}\n\n{job.payload.message}" + f"Scheduled cron job triggered: {job.name}\n\n{job.payload.message}" ), } - metadata[AUTOMATION_DEFER_UNTIL_IDLE_META] = True + metadata[CRON_DEFER_UNTIL_IDLE_META] = True run_record_base: dict[str, Any] = { "job_id": job.id, "job_name": job.name, @@ -1149,7 +1149,7 @@ def _run_gateway( if isinstance(cron_tool, CronTool): cron_token = cron_tool.set_cron_context(True) try: - resp = await agent.submit_automation_turn( + resp = await agent.submit_cron_turn( InboundMessage( channel=channel, sender_id="cron", @@ -1345,7 +1345,7 @@ def _run_gateway( logger.info("Heartbeat: silenced by post-run evaluation") return response - if is_bound_agent_job(job): + if is_bound_cron_job(job): return await _run_bound_cron_job(job) reminder_note = ( diff --git a/nanobot/cron/automation.py b/nanobot/cron/automation.py deleted file mode 100644 index f619870ff..000000000 --- a/nanobot/cron/automation.py +++ /dev/null @@ -1,49 +0,0 @@ -"""Shared metadata helpers for scheduled automation turns.""" - -from __future__ import annotations - -from typing import Any, Mapping - -from nanobot.cron.types import CronJob - -AUTOMATION_TRIGGER_META = "_automation_trigger" -AUTOMATION_DEFER_UNTIL_IDLE_META = "_defer_until_session_idle" -AUTOMATION_HISTORY_META = "_automation_turn" - - -def automation_trigger(metadata: Mapping[str, Any] | None) -> dict[str, Any] | None: - """Return structured automation trigger metadata when present.""" - raw = (metadata or {}).get(AUTOMATION_TRIGGER_META) - return raw if isinstance(raw, dict) else None - - -def is_automation_turn(metadata: Mapping[str, Any] | None) -> bool: - return automation_trigger(metadata) is not None - - -def defer_until_session_idle(metadata: Mapping[str, Any] | None) -> bool: - return bool( - is_automation_turn(metadata) - and (metadata or {}).get(AUTOMATION_DEFER_UNTIL_IDLE_META) is True - ) - - -def automation_run_id(metadata: Mapping[str, Any] | None) -> str | None: - trigger = automation_trigger(metadata) - if not trigger: - return None - value = trigger.get("run_id") - return value if isinstance(value, str) and value else None - - -def is_bound_agent_job(job: CronJob) -> bool: - """True for new session-bound user automations, excluding legacy delivery payloads.""" - payload = job.payload - if payload.kind != "agent_turn" or not payload.session_key: - return False - return not ( - payload.deliver - or payload.channel - or payload.to - or payload.channel_meta - ) diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py index 30ee7aea9..57c4dc204 100644 --- a/nanobot/cron/service.py +++ b/nanobot/cron/service.py @@ -14,7 +14,7 @@ from typing import Any, Callable, Coroutine, Literal from filelock import FileLock from loguru import logger -from nanobot.cron.automation import is_bound_agent_job +from nanobot.cron.session_turns import is_bound_cron_job from nanobot.cron.types import ( CronJob, CronJobState, @@ -499,17 +499,17 @@ class CronService: jobs = store.jobs if include_disabled else [j for j in store.jobs if j.enabled] return sorted(jobs, key=lambda j: j.state.next_run_at_ms or float('inf')) - def list_bound_agent_jobs_for_session( + def list_bound_cron_jobs_for_session( self, session_key: str, *, include_disabled: bool = True, ) -> list[CronJob]: - """Return user-created bound automation jobs owned by *session_key*.""" + """Return user-created bound cron jobs owned by *session_key*.""" return [ job for job in self.list_jobs(include_disabled=include_disabled) - if is_bound_agent_job(job) + if is_bound_cron_job(job) and job.payload.session_key == session_key ] diff --git a/nanobot/cron/session_turns.py b/nanobot/cron/session_turns.py new file mode 100644 index 000000000..7a55e36ef --- /dev/null +++ b/nanobot/cron/session_turns.py @@ -0,0 +1,49 @@ +"""Shared metadata helpers for scheduled cron session turns.""" + +from __future__ import annotations + +from typing import Any, Mapping + +from nanobot.cron.types import CronJob + +CRON_TRIGGER_META = "_cron_trigger" +CRON_DEFER_UNTIL_IDLE_META = "_cron_defer_until_session_idle" +CRON_HISTORY_META = "_cron_turn" + + +def cron_trigger(metadata: Mapping[str, Any] | None) -> dict[str, Any] | None: + """Return structured cron trigger metadata when present.""" + raw = (metadata or {}).get(CRON_TRIGGER_META) + return raw if isinstance(raw, dict) else None + + +def is_cron_turn(metadata: Mapping[str, Any] | None) -> bool: + return cron_trigger(metadata) is not None + + +def defer_cron_until_session_idle(metadata: Mapping[str, Any] | None) -> bool: + return bool( + is_cron_turn(metadata) + and (metadata or {}).get(CRON_DEFER_UNTIL_IDLE_META) is True + ) + + +def cron_run_id(metadata: Mapping[str, Any] | None) -> str | None: + trigger = cron_trigger(metadata) + if not trigger: + return None + value = trigger.get("run_id") + return value if isinstance(value, str) and value else None + + +def is_bound_cron_job(job: CronJob) -> bool: + """True for new session-bound cron jobs, excluding legacy delivery payloads.""" + payload = job.payload + if payload.kind != "agent_turn" or not payload.session_key: + return False + return not ( + payload.deliver + or payload.channel + or payload.to + or payload.channel_meta + ) diff --git a/nanobot/session/routing.py b/nanobot/session/routing.py index 5ae53c552..5d7c97791 100644 --- a/nanobot/session/routing.py +++ b/nanobot/session/routing.py @@ -5,7 +5,7 @@ from __future__ import annotations from typing import Any, Mapping from nanobot.bus.events import InboundMessage -from nanobot.cron.automation import is_automation_turn +from nanobot.cron.session_turns import is_cron_turn from nanobot.session.manager import Session from nanobot.session.metadata import SESSION_ROUTING_METADATA_KEY @@ -26,7 +26,7 @@ _ROUTING_METADATA_KEYS = { } _CHANNEL_ROUTING_METADATA_KEYS = { # Feishu needs a message anchor to reply into an existing topic. Other - # channels should avoid stale reply anchors for scheduled automation turns. + # channels should avoid stale reply anchors for scheduled cron turns. "feishu": {"message_id"}, } _SLACK_ROUTING_KEYS = {"channel_type", "thread_ts"} @@ -74,8 +74,8 @@ def routing_context_for_message(msg: InboundMessage) -> dict[str, Any]: def persist_routing_context(session: Session, msg: InboundMessage) -> bool: - """Persist the latest non-automation delivery context for a session.""" - if is_automation_turn(msg.metadata): + """Persist the latest non-cron delivery context for a session.""" + if is_cron_turn(msg.metadata): return False context = routing_context_for_message(msg) if session.metadata.get(SESSION_ROUTING_METADATA_KEY) == context: diff --git a/nanobot/templates/agent/cron_reminder.md b/nanobot/templates/agent/cron_reminder.md index af9803d5a..64f94f21b 100644 --- a/nanobot/templates/agent/cron_reminder.md +++ b/nanobot/templates/agent/cron_reminder.md @@ -1,4 +1,4 @@ -The scheduled time has arrived. Execute this scheduled automation now and report the result to the user in the same session. +The scheduled time has arrived. Execute this scheduled cron job now and report the result to the user in the same session. Rules: - Speak directly to the user in their language. @@ -6,4 +6,4 @@ Rules: - Do not include user IDs. - Do not add status reports like "Done" or "Reminded" unless they are the natural response. -Automation: {{ message }} +Cron job: {{ message }} diff --git a/nanobot/webui/session_automations.py b/nanobot/webui/session_automations.py index 19af69a64..3eb56e5b2 100644 --- a/nanobot/webui/session_automations.py +++ b/nanobot/webui/session_automations.py @@ -8,7 +8,7 @@ from nanobot.cron.types import CronJob class _CronServiceLike(Protocol): - def list_bound_agent_jobs_for_session( + def list_bound_cron_jobs_for_session( self, session_key: str, *, @@ -23,7 +23,7 @@ def session_automation_jobs( """Return user automations attached to the WebUI session.""" if cron_service is None: return [] - return cron_service.list_bound_agent_jobs_for_session( + return cron_service.list_bound_cron_jobs_for_session( session_key, include_disabled=True, ) diff --git a/tests/agent/test_loop_save_turn.py b/tests/agent/test_loop_save_turn.py index 0b212f5fc..5295065d0 100644 --- a/tests/agent/test_loop_save_turn.py +++ b/tests/agent/test_loop_save_turn.py @@ -8,7 +8,7 @@ from nanobot.agent.context import ContextBuilder from nanobot.agent.loop import AgentLoop from nanobot.bus.events import InboundMessage from nanobot.bus.queue import MessageBus -from nanobot.cron.automation import AUTOMATION_HISTORY_META, AUTOMATION_TRIGGER_META +from nanobot.cron.session_turns import CRON_HISTORY_META, CRON_TRIGGER_META from nanobot.providers.base import LLMResponse from nanobot.session.goal_state import GOAL_STATE_KEY from nanobot.session.manager import Session, SessionManager @@ -66,7 +66,7 @@ def test_agent_loop_llm_runtime_reflects_current_provider_and_model(tmp_path: Pa assert runtime.model == "next-model" -def test_persist_automation_turn_uses_distinct_history_marker(tmp_path: Path) -> None: +def test_persist_cron_turn_uses_distinct_history_marker(tmp_path: Path) -> None: loop = _make_full_loop(tmp_path) session = loop.sessions.get_or_create("websocket:auto") prompt_ref = {"id": "cron.agent_turn.reminder", "version": 1, "sha256": "abc"} @@ -76,14 +76,14 @@ def test_persist_automation_turn_uses_distinct_history_marker(tmp_path: Path) -> channel="websocket", sender_id="cron", chat_id="auto", - content="Automation: internal prompt", + content="Cron job: internal prompt", metadata={ - AUTOMATION_TRIGGER_META: { + CRON_TRIGGER_META: { "job_id": "job-1", "job_name": "Daily check", "run_id": "job-1:1", "prompt_ref": prompt_ref, - "persist_content": "Scheduled automation triggered: Daily check", + "persist_content": "Scheduled cron job triggered: Daily check", } }, ), @@ -92,13 +92,13 @@ def test_persist_automation_turn_uses_distinct_history_marker(tmp_path: Path) -> assert persisted is True message = session.messages[-1] - assert message["content"] == "Scheduled automation triggered: Daily check" - assert message[AUTOMATION_HISTORY_META] is True - assert AUTOMATION_TRIGGER_META not in message - assert message["automation_id"] == "job-1" - assert message["automation_name"] == "Daily check" - assert message["automation_run_id"] == "job-1:1" - assert message["automation_prompt_ref"] == prompt_ref + assert message["content"] == "Scheduled cron job triggered: Daily check" + assert message[CRON_HISTORY_META] is True + assert CRON_TRIGGER_META not in message + assert message["cron_job_id"] == "job-1" + assert message["cron_job_name"] == "Daily check" + assert message["cron_run_id"] == "job-1:1" + assert message["cron_prompt_ref"] == prompt_ref def test_clean_generated_title_strips_reasoning_tags() -> None: diff --git a/tests/agent/test_runner_injections.py b/tests/agent/test_runner_injections.py index 3133a1698..97d653f3a 100644 --- a/tests/agent/test_runner_injections.py +++ b/tests/agent/test_runner_injections.py @@ -617,12 +617,12 @@ async def test_followup_routed_to_pending_queue(tmp_path): @pytest.mark.asyncio -async def test_automation_turn_deferred_while_session_active(tmp_path): - """Automation turns wait for the active session instead of becoming injections.""" +async def test_cron_turn_deferred_while_session_active(tmp_path): + """Cron turns wait for the active session instead of becoming injections.""" from nanobot.bus.events import InboundMessage - from nanobot.cron.automation import ( - AUTOMATION_DEFER_UNTIL_IDLE_META, - AUTOMATION_TRIGGER_META, + from nanobot.cron.session_turns import ( + CRON_DEFER_UNTIL_IDLE_META, + CRON_TRIGGER_META, ) loop = _make_loop(tmp_path) @@ -639,15 +639,15 @@ async def test_automation_turn_deferred_while_session_active(tmp_path): chat_id="chat-1", content="scheduled work", metadata={ - AUTOMATION_TRIGGER_META: {"run_id": "run-1"}, - AUTOMATION_DEFER_UNTIL_IDLE_META: True, + CRON_TRIGGER_META: {"run_id": "run-1"}, + CRON_DEFER_UNTIL_IDLE_META: True, }, session_key_override=session_key, ) await loop.bus.publish_inbound(msg) for _ in range(20): - if loop._deferred_automation_queues.get(session_key): + if loop._cron_turns.deferred_queues.get(session_key): break await asyncio.sleep(0.05) @@ -656,12 +656,12 @@ async def test_automation_turn_deferred_while_session_active(tmp_path): assert pending.empty() assert loop._dispatch.await_count == 0 - assert loop._deferred_automation_queues[session_key] == [msg] + assert loop._cron_turns.deferred_queues[session_key] == [msg] - await loop._publish_next_deferred_automation(session_key) + await loop._cron_turns.publish_next_deferred(session_key) queued = await asyncio.wait_for(loop.bus.consume_inbound(), timeout=0.5) assert queued is msg - assert session_key not in loop._deferred_automation_queues + assert session_key not in loop._cron_turns.deferred_queues @pytest.mark.asyncio diff --git a/tests/agent/test_task_cancel.py b/tests/agent/test_task_cancel.py index 0111c2f59..e6a59d5e1 100644 --- a/tests/agent/test_task_cancel.py +++ b/tests/agent/test_task_cancel.py @@ -10,6 +10,7 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from nanobot.config.schema import AgentDefaults +from nanobot.session.keys import UNIFIED_SESSION_KEY _MAX_TOOL_RESULT_CHARS = AgentDefaults().max_tool_result_chars @@ -450,12 +451,12 @@ class TestSubagentAnnounceSessionKey: so the result matches the pending queue key.""" mgr, bus = self._make_mgr() - origin = {"channel": "telegram", "chat_id": "111", "session_key": "unified:default"} + origin = {"channel": "telegram", "chat_id": "111", "session_key": UNIFIED_SESSION_KEY} await mgr._announce_result("sub-1", "label", "task", "result", origin, "ok") msg = await bus.consume_inbound() - assert msg.session_key_override == "unified:default" - assert msg.session_key == "unified:default" + assert msg.session_key_override == UNIFIED_SESSION_KEY + assert msg.session_key == UNIFIED_SESSION_KEY @pytest.mark.asyncio async def test_announce_uses_raw_key_in_normal_mode(self): @@ -505,9 +506,9 @@ class TestSubagentAnnounceSessionKey: ) await mgr._run_subagent( "sub-4", "task", "label", - {"channel": "telegram", "chat_id": "444", "session_key": "unified:default"}, + {"channel": "telegram", "chat_id": "444", "session_key": UNIFIED_SESSION_KEY}, status, ) msg = await bus.consume_inbound() - assert msg.session_key_override == "unified:default" + assert msg.session_key_override == UNIFIED_SESSION_KEY diff --git a/tests/agent/test_unified_session.py b/tests/agent/test_unified_session.py index 48fd91bdc..d6059f747 100644 --- a/tests/agent/test_unified_session.py +++ b/tests/agent/test_unified_session.py @@ -25,9 +25,9 @@ from nanobot.bus.queue import MessageBus from nanobot.command.builtin import cmd_new, register_builtin_commands from nanobot.command.router import CommandContext, CommandRouter from nanobot.config.schema import AgentDefaults, Config +from nanobot.session.keys import UNIFIED_SESSION_KEY from nanobot.session.manager import Session, SessionManager - # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- @@ -39,8 +39,8 @@ def _make_loop(tmp_path: Path, unified_session: bool = False) -> AgentLoop: provider.get_default_model.return_value = "test-model" with patch("nanobot.agent.loop.SessionManager"), \ - patch("nanobot.agent.loop.SubagentManager") as MockSubMgr: - MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0) + patch("nanobot.agent.loop.SubagentManager") as mock_sub_mgr: + mock_sub_mgr.return_value.cancel_by_session = AsyncMock(return_value=0) loop = AgentLoop( bus=bus, provider=provider, @@ -415,10 +415,8 @@ class TestStopCommandWithUnifiedSession: @pytest.mark.asyncio async def test_active_tasks_use_effective_key_in_unified_mode(self, tmp_path: Path): """When unified_session=True, tasks are stored under UNIFIED_SESSION_KEY.""" - from nanobot.agent.loop import UNIFIED_SESSION_KEY - loop = _make_loop(tmp_path, unified_session=True) - + # Create a message from telegram channel msg = _make_msg(channel="telegram", chat_id="123456") @@ -443,7 +441,6 @@ class TestStopCommandWithUnifiedSession: @pytest.mark.asyncio async def test_stop_command_finds_task_in_unified_mode(self, tmp_path: Path): """cmd_stop can cancel tasks when unified_session=True.""" - from nanobot.agent.loop import UNIFIED_SESSION_KEY from nanobot.command.builtin import cmd_stop loop = _make_loop(tmp_path, unified_session=True) @@ -476,7 +473,6 @@ class TestStopCommandWithUnifiedSession: @pytest.mark.asyncio async def test_stop_command_uses_effective_key_without_session_override(self, tmp_path: Path): """Priority /stop must cancel the unified session even before dispatch rewrites the message.""" - from nanobot.agent.loop import UNIFIED_SESSION_KEY from nanobot.command.builtin import cmd_stop loop = _make_loop(tmp_path, unified_session=True) @@ -502,7 +498,6 @@ class TestStopCommandWithUnifiedSession: @pytest.mark.asyncio async def test_stop_command_cross_channel_in_unified_mode(self, tmp_path: Path): """In unified mode, /stop from one channel cancels tasks from another channel.""" - from nanobot.agent.loop import UNIFIED_SESSION_KEY from nanobot.command.builtin import cmd_stop loop = _make_loop(tmp_path, unified_session=True) diff --git a/tests/channels/test_websocket_http_routes.py b/tests/channels/test_websocket_http_routes.py index f8fd70c15..50e935f8d 100644 --- a/tests/channels/test_websocket_http_routes.py +++ b/tests/channels/test_websocket_http_routes.py @@ -749,7 +749,7 @@ async def test_session_delete_blocks_when_bound_automation_exists( assert body["blocked_by_automations"] is True assert [job["name"] for job in body["automations"]] == ["Daily check"] assert path.exists() - assert cron.list_bound_agent_jobs_for_session("websocket:doomed") + assert cron.list_bound_cron_jobs_for_session("websocket:doomed") finally: await channel.stop() await server_task @@ -792,7 +792,7 @@ async def test_session_delete_can_cascade_bound_automations( assert resp.status_code == 200 assert resp.json()["deleted"] is True assert not path.exists() - assert cron.list_bound_agent_jobs_for_session("websocket:doomed") == [] + assert cron.list_bound_cron_jobs_for_session("websocket:doomed") == [] assert [job.name for job in cron.list_jobs(include_disabled=True)] == [ "Legacy same target" ] @@ -837,7 +837,7 @@ async def test_session_delete_does_not_cascade_unified_automations( assert resp.status_code == 200 assert resp.json()["deleted"] is True assert not path.exists() - assert [job.name for job in cron.list_bound_agent_jobs_for_session(UNIFIED_SESSION_KEY)] == [ + assert [job.name for job in cron.list_bound_cron_jobs_for_session(UNIFIED_SESSION_KEY)] == [ "Shared daily check" ] finally: diff --git a/tests/cli/test_commands.py b/tests/cli/test_commands.py index 975126ad8..74d623696 100644 --- a/tests/cli/test_commands.py +++ b/tests/cli/test_commands.py @@ -11,7 +11,7 @@ from typer.testing import CliRunner from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.cli.commands import _proactive_delivery_metadata, app from nanobot.config.schema import Config -from nanobot.cron.automation import AUTOMATION_DEFER_UNTIL_IDLE_META, AUTOMATION_TRIGGER_META +from nanobot.cron.session_turns import CRON_DEFER_UNTIL_IDLE_META, CRON_TRIGGER_META from nanobot.cron.types import CronJob, CronPayload from nanobot.providers.factory import ProviderSnapshot, make_provider from nanobot.providers.openai_codex_provider import _strip_model_prefix @@ -1430,8 +1430,8 @@ def test_gateway_legacy_cron_payloads_with_session_key_stay_legacy( content="Legacy response.", ) - async def submit_automation_turn(self, _msg: InboundMessage): - raise AssertionError("legacy cron payload must not run as bound automation") + async def submit_cron_turn(self, _msg: InboundMessage): + raise AssertionError("legacy cron payload must not run as bound cron turn") async def close_mcp(self) -> None: return None @@ -1601,8 +1601,8 @@ def test_gateway_bound_cron_runs_as_session_turn( self.tools = {} seen["agent"] = self - async def submit_automation_turn(self, msg: InboundMessage): - seen["automation_msg"] = msg + async def submit_cron_turn(self, msg: InboundMessage): + seen["cron_msg"] = msg return OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, @@ -1646,26 +1646,26 @@ def test_gateway_bound_cron_runs_as_session_turn( response = asyncio.run(cron.on_job(job)) assert response == "Checked the repo." - msg = seen["automation_msg"] + msg = seen["cron_msg"] assert isinstance(msg, InboundMessage) assert msg.channel == "websocket" assert msg.chat_id == "chat-1" assert msg.sender_id == "cron" assert msg.session_key_override == "websocket:chat-1" - assert "Automation: Check repository health." in msg.content + assert "Cron job: Check repository health." in msg.content assert msg.metadata["webui"] is True assert msg.metadata["workspace_scope"]["project_path"] == str(tmp_path) assert msg.metadata[WEBUI_MESSAGE_SOURCE_METADATA_KEY] == { "kind": "cron", "label": "Repo check", } - trigger = msg.metadata[AUTOMATION_TRIGGER_META] + trigger = msg.metadata[CRON_TRIGGER_META] assert trigger["job_id"] == "repo-check" assert trigger["job_name"] == "Repo check" assert trigger["persist_content"] == ( - "Scheduled automation triggered: Repo check\n\nCheck repository health." + "Scheduled cron job triggered: Repo check\n\nCheck repository health." ) - assert msg.metadata[AUTOMATION_DEFER_UNTIL_IDLE_META] is True + assert msg.metadata[CRON_DEFER_UNTIL_IDLE_META] is True statuses = [record["status"] for _run_id, record in seen["run_records"]] assert statuses == ["queued", "ok"] assert seen["run_records"][0][0] == seen["run_records"][1][0] @@ -1682,7 +1682,7 @@ def test_gateway_bound_cron_runs_as_session_turn( response = asyncio.run(cron.on_job(discord_job)) assert response == "Checked the repo." - msg = seen["automation_msg"] + msg = seen["cron_msg"] assert isinstance(msg, InboundMessage) assert msg.channel == "discord" assert msg.chat_id == "777" diff --git a/tests/cron/test_cron_service.py b/tests/cron/test_cron_service.py index f258fdd22..98e37dc13 100644 --- a/tests/cron/test_cron_service.py +++ b/tests/cron/test_cron_service.py @@ -84,7 +84,7 @@ def test_list_bound_agent_jobs_excludes_legacy_delivery_payloads(tmp_path) -> No session_key="websocket:chat-1", ) - assert service.list_bound_agent_jobs_for_session("websocket:chat-1") == [bound] + assert service.list_bound_cron_jobs_for_session("websocket:chat-1") == [bound] @pytest.mark.asyncio diff --git a/tests/cron/test_cron_tool_list.py b/tests/cron/test_cron_tool_list.py index 499979a66..4af4de13a 100644 --- a/tests/cron/test_cron_tool_list.py +++ b/tests/cron/test_cron_tool_list.py @@ -349,7 +349,7 @@ def test_add_job_requires_session_key(tmp_path) -> None: result = tool._add_job(None, "Background refresh", 60, None, None, None) - assert result == "Error: scheduled automations must be created from a chat session" + assert result == "Error: scheduled cron jobs must be created from a chat session" assert tool._cron.list_jobs() == []