diff --git a/.agent/cron-session-memory.md b/.agent/cron-session-memory.md new file mode 100644 index 000000000..d1b5be582 --- /dev/null +++ b/.agent/cron-session-memory.md @@ -0,0 +1,246 @@ +# Cron / Session / Memory Design Decisions + +This note records the agreed design direction for fixing the mismatch between +scheduled automations and chat session memory. + +## Problem + +User-created cron jobs currently run their agent turn under an internal key such +as `cron:{job.id}` and only deliver the final response back to the user channel. +That splits the turn's working memory from the session where the user sees and +continues the conversation. + +The visible failure mode is awkward: a cron job reports something into a chat, +the user discusses it in that chat, and the next cron run behaves as if that +discussion never happened. + +The fix is not to make cron a separate delivery system. A user automation should +be a scheduled input into a session. + +## Core Model + +For new user-created cron jobs, `payload.session_key` is the canonical anchor. + +- The cron job belongs to that session. +- The cron job reads that session's memory/history. +- The cron job produces a normal session turn. +- There is no separate delivery target concept for new jobs. + +Legacy fields remain in the store only for compatibility: + +- `payload.channel` +- `payload.to` +- `payload.channel_meta` +- `payload.deliver` + +These fields are legacy-only. New cron creation should not depend on them. + +## Job Categories + +Use explicit branching: + +- **Bound user automation**: `payload.kind == "agent_turn"` and + `payload.session_key` is present. This uses the new session-turn model. +- **Legacy unbound automation**: user job with no `payload.session_key`. Keep the + existing behavior. Do not migrate, infer, bind, or add UI for these jobs in + this change. +- **System job**: `payload.kind == "system_event"` or known internal jobs such + as `dream` / `heartbeat`. Keep their specialized paths. + +The project should not grow a compatibility subsystem for legacy jobs. Missing +`session_key` means old behavior. + +## New Job Creation + +`CronTool` must create user automations with a `session_key`. + +- If no request/session context exists, `cron action=add` should fail. +- Do not create new unbound jobs. +- Do not infer `session_key` from `channel/to` for new jobs. +- Remove `deliver` from the advertised tool schema. It can remain as a Python + compatibility argument, but it must not affect new bound jobs. +- New bound jobs should persist `message` and `session_key`; legacy delivery + fields should not be populated as part of the new path. + +## Execution Path + +Bound user automations should execute through `AgentLoop` as internal inbound +session events, not as an out-of-band `agent.process_direct()` call. + +The intended flow is: + +```text +cron due -> create automation inbound -> AgentLoop dispatches session turn +``` + +The inbound event should carry metadata identifying the automation, such as: + +- job id +- job name +- run id +- prompt reference +- persisted trigger content + +This keeps locking, runtime status, session persistence, and WebUI behavior on +the same path as normal chat turns. + +`session_key` is the ownership anchor, but an `InboundMessage` still needs an +execution context. Bound cron must resolve `channel`, `chat_id`, and any +channel metadata from the target session/session metadata. It must not fall back +to legacy `payload.channel`, `payload.to`, or `payload.channel_meta` for bound +jobs. Those fields are only for the legacy unbound path. + +The scheduler must not mark a bound job run as complete just because the inbound +event was queued. It should either wait for the automation turn to complete and +record the real outcome, or explicitly model the run as separate states such as +`queued` and `turn_completed`. A failed automation turn must be reflected in the +cron run record/job state, not hidden behind a successful enqueue. + +## Active Session Behavior + +Cron must not interrupt an active session turn. + +- If the target session is idle, run the automation turn immediately. +- If the target session is running, defer the automation until the current turn + completes. +- Do not inject the automation into the active turn's runtime context. +- Do not route automation messages into the existing mid-turn pending injection + queue. +- UI/runtime status may show that an automation is queued, but the current LLM + call should not see the queued automation. + +Automation inbound events need explicit metadata, for example +`_automation_trigger` plus `_defer_until_session_idle`. `AgentLoop.run()` must +recognize that metadata before the existing `_pending_queues` mid-turn injection +branch. If the session is active, the event goes to a deferred automation queue, +not the pending injection queue. + +The user experience goal is: cron can run after the current answer, but it +should not take over an answer already in progress. + +## Session History + +Do not persist the raw internal execution prompt as a normal user message. + +Instead, persist a readable automation trigger event, for example: + +```json +{ + "role": "user", + "content": "Scheduled automation triggered: daily monitor\n\nCheck ...", + "_automation_trigger": true, + "automation_id": "abc123", + "automation_name": "daily monitor", + "automation_run_id": "abc123:1770000000000", + "automation_prompt_ref": { + "id": "cron.agent_turn.reminder", + "version": 1, + "sha256": "..." + } +} +``` + +The assistant result should be saved as the normal assistant response for that +turn, with source metadata suitable for WebUI rendering. + +This gives future turns useful context without leaking internal instruction text +into the transcript. + +## Prompt Traceability + +The rendered execution prompt should remain traceable, but it should not be part +of normal session history. + +Use a named/versioned prompt reference in session history and save the full +rendered prompt in an internal run record. + +Preferred direction: + +- Move the cron execution prompt out of `commands.py` into a named template. +- Use a stable prompt id such as `cron.agent_turn.reminder`. +- Store `prompt_ref` and `automation_run_id` in session history. +- Store the full rendered prompt, prompt variables, and errors in an internal + run record. + +Avoid putting full prompt text into `jobs.json`; run records should not make the +cron store grow without bound. + +## Visibility and Evaluation + +A bound user automation is a real session turn. + +- If it succeeds, save and publish the assistant response. +- Do not pass bound automation responses through `evaluate_response()`. +- Keep `evaluate_response()` only for system/legacy paths where the old behavior + still applies. +- Avoid states where session history contains a response the user never saw. + +If a bound automation starts executing, it must leave a visible closure in the +session: + +- success response +- short failure message +- or an empty-result status message + +Full exceptions and diagnostic details belong in the internal run record, not in +the user-facing transcript. + +## Deleting Sessions + +Deleting a session with bound automations should be a two-step operation. + +Default delete behavior should block and return the associated automations: + +```json +{ + "deleted": false, + "blocked_by_automations": true, + "automations": [ + {"id": "abc123", "name": "daily monitor", "enabled": true} + ] +} +``` + +After explicit confirmation, the API may delete the bound user automations and +then delete the session/thread. + +Rules: + +- Only block on user-created bound jobs whose `payload.session_key` equals the + session being deleted. +- Do not block on system jobs. +- Do not block on legacy unbound jobs. +- If the user manually deletes files outside the WebUI/API, do not try to + compensate. + +## WebUI Scope + +This change should not grow into a full automation manager. + +Keep the scope focused: + +- Fix cron/session/memory semantics for new bound jobs. +- Preserve legacy job behavior. +- Add deletion protection for sessions with bound automations. +- Update the existing session automation panel only as needed for the new + bound-job status. + +Do not add deterministic legacy migration, legacy binding UI, or a global +calendar/task manager in this change. + +## Manual Run + +Do not add a user-visible "run now" feature as part of this design. + +`CronService.run_job()` may remain an internal/test helper. It should not become +a product surface, and the implementation should avoid creating a separate +execution path that behaves differently from scheduled runs. + +## Non-Goals + +- No legacy migration. +- No automatic binding of legacy jobs. +- No runtime-context prompt asking the model to bind jobs. +- No new global automation manager. +- No new delivery-target abstraction. +- No user-visible manual cron run. diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 3431237fa..4a9947e4a 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -39,6 +39,11 @@ from nanobot.bus.runtime_events import ( ) from nanobot.command import CommandContext, CommandRouter, register_builtin_commands from nanobot.config.schema import AgentDefaults, ModelPresetConfig +from nanobot.cron.automation import ( + automation_run_id, + automation_trigger, + defer_until_session_idle, +) from nanobot.providers.base import LLMProvider from nanobot.providers.factory import ProviderSnapshot from nanobot.security.workspace_access import ( @@ -53,6 +58,7 @@ from nanobot.session.goal_state import ( sustained_goal_active, ) from nanobot.session.manager import Session, SessionManager +from nanobot.session.routing import persist_routing_context from nanobot.utils.document import extract_documents, reference_non_image_attachments from nanobot.utils.helpers import image_placeholder_text from nanobot.utils.helpers import truncate_text as truncate_text_fn @@ -300,6 +306,10 @@ class AgentLoop: # When a session has an active task, new messages for that session # are routed here instead of creating a new task. self._pending_queues: dict[str, asyncio.Queue] = {} + # Scheduled automations wait for the current visible turn to finish. + # They must not be injected into the active model call as follow-up text. + self._deferred_automation_queues: dict[str, list[InboundMessage]] = {} + self._automation_waiters: dict[str, asyncio.Future[OutboundMessage | None]] = {} # NANOBOT_MAX_CONCURRENT_REQUESTS: <=0 means unlimited; default 3. _max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3")) self._concurrency_gate: asyncio.Semaphore | None = ( @@ -565,6 +575,55 @@ class AgentLoop: def _runtime_events(self) -> RuntimeEventPublisher: return ensure_runtime_event_publisher(self) + async def submit_automation_turn(self, msg: InboundMessage) -> OutboundMessage | None: + """Submit a scheduled automation as an internal session turn and wait for it.""" + run_id = automation_run_id(msg.metadata) + if not run_id: + raise ValueError("automation turn metadata must include a run_id") + loop = asyncio.get_running_loop() + future: asyncio.Future[OutboundMessage | None] = loop.create_future() + if run_id in self._automation_waiters: + raise RuntimeError(f"automation run {run_id!r} is already pending") + self._automation_waiters[run_id] = future + try: + if self._running: + await self.bus.publish_inbound(msg) + else: + await self._dispatch(msg) + return await future + finally: + self._automation_waiters.pop(run_id, None) + + def _complete_automation_turn( + self, + msg: InboundMessage, + *, + response: OutboundMessage | None = None, + error: BaseException | None = None, + ) -> None: + run_id = automation_run_id(msg.metadata) + if not run_id: + return + future = self._automation_waiters.get(run_id) + if future is None or future.done(): + return + if error is not None: + future.set_exception(error) + else: + future.set_result(response) + + def _defer_automation_turn(self, session_key: str, msg: InboundMessage) -> None: + self._deferred_automation_queues.setdefault(session_key, []).append(msg) + + async def _publish_next_deferred_automation(self, session_key: str) -> None: + queue = self._deferred_automation_queues.get(session_key) + if not queue: + return + msg = queue.pop(0) + if not queue: + self._deferred_automation_queues.pop(session_key, None) + await self.bus.publish_inbound(msg) + def _persist_user_message_early( self, msg: InboundMessage, @@ -583,6 +642,17 @@ class AgentLoop: extra: dict[str, Any] = ({"media": list(media_paths)} if media_paths else {}) | agent_context.session_extra(msg.metadata) extra.update(kwargs) text = msg.content if isinstance(msg.content, str) else "" + if trigger := automation_trigger(msg.metadata): + persist_content = trigger.get("persist_content") + if isinstance(persist_content, str) and persist_content.strip(): + text = persist_content + extra.update({ + "_automation_trigger": True, + "automation_id": trigger.get("job_id"), + "automation_name": trigger.get("job_name"), + "automation_run_id": trigger.get("run_id"), + "automation_prompt_ref": trigger.get("prompt_ref"), + }) session.add_message("user", text, **extra) self._mark_pending_user_turn(session) self.sessions.save(session) @@ -883,6 +953,22 @@ class AgentLoop: self.commands.dispatch_priority, ) continue + if ( + defer_until_session_idle(msg.metadata) + and effective_key in self._pending_queues + ): + pending_msg = msg + if effective_key != msg.session_key: + pending_msg = dataclasses.replace( + msg, + session_key_override=effective_key, + ) + self._defer_automation_turn(effective_key, pending_msg) + logger.info( + "Deferred automation turn for active session {}", + effective_key, + ) + continue # If this session already has an active pending queue (i.e. a task # is processing this session), route the message there for mid-turn # injection instead of creating a competing task. @@ -996,7 +1082,12 @@ class AgentLoop: session_key=session_key, metadata=msg.metadata, ) + self._complete_automation_turn(msg, response=response) except asyncio.CancelledError: + self._complete_automation_turn( + msg, + error=asyncio.CancelledError(), + ) logger.info("Task cancelled for session {}", session_key) # Preserve partial context from the interrupted turn so # the user does not lose tool results and assistant @@ -1022,7 +1113,7 @@ class AgentLoop: exc_info=True, ) raise - except Exception: + except Exception as exc: logger.exception("Error processing message for session {}", session_key) await self.bus.publish_outbound(OutboundMessage( channel=msg.channel, chat_id=msg.chat_id, @@ -1035,6 +1126,7 @@ class AgentLoop: session_key=session_key, metadata=msg.metadata, ) + self._complete_automation_turn(msg, error=exc) finally: # Drain any messages still in the pending queue and re-publish # them to the bus so they are processed as fresh inbound messages @@ -1065,12 +1157,14 @@ class AgentLoop: msg, session_key, "idle" ) self._runtime_events().clear_turn(session_key) + await self._publish_next_deferred_automation(session_key) finally: if pending is None: await self._runtime_events().run_status_changed( msg, session_key, "idle" ) self._runtime_events().clear_turn(session_key) + await self._publish_next_deferred_automation(session_key) async def close_mcp(self) -> None: """Drain pending background archives, then close MCP connections.""" @@ -1342,6 +1436,8 @@ class AgentLoop: ctx.session = self.sessions.get_or_create(ctx.session_key) await self._runtime_events().session_turn_started(msg, ctx.session_key) self.workspace_scopes.persist_message_scope(ctx.session, msg) + if persist_routing_context(ctx.session, msg): + self.sessions.save(ctx.session) if self._restore_runtime_checkpoint(ctx.session): self.sessions.save(ctx.session) diff --git a/nanobot/agent/tools/cron.py b/nanobot/agent/tools/cron.py index ff376a87b..7268b49c9 100644 --- a/nanobot/agent/tools/cron.py +++ b/nanobot/agent/tools/cron.py @@ -9,7 +9,6 @@ from typing import Any from nanobot.agent.tools.base import Tool, tool_parameters from nanobot.agent.tools.context import ContextAware, RequestContext from nanobot.agent.tools.schema import ( - BooleanSchema, IntegerSchema, StringSchema, tool_parameters_schema, @@ -38,10 +37,6 @@ _CRON_PARAMETERS = tool_parameters_schema( "ISO datetime for one-time execution (e.g. '2026-02-12T10:30:00'). " "Naive values use the tool's default timezone." ), - deliver=BooleanSchema( - description="Whether to deliver the execution result to the user channel (default true)", - default=True, - ), job_id=StringSchema("REQUIRED when action='remove'. Job ID to remove (obtain via action='list')."), required=["action"], description=( @@ -76,11 +71,11 @@ class CronTool(Tool, ContextAware): return cls(cron_service=ctx.cron_service, default_timezone=ctx.timezone) def set_context(self, ctx: RequestContext) -> None: - """Set the current session context for delivery.""" + """Set the current session context for scheduled automation ownership.""" self._channel.set(ctx.channel) self._chat_id.set(ctx.chat_id) self._metadata.set(ctx.metadata) - self._session_key.set(ctx.session_key or f"{ctx.channel}:{ctx.chat_id}") + self._session_key.set(ctx.session_key or "") def set_cron_context(self, active: bool): """Mark whether the tool is executing inside a cron job callback.""" @@ -170,10 +165,9 @@ class CronTool(Tool, ContextAware): "describing what to do when the job triggers " "(e.g. the reminder text). Retry including message=\"...\"." ) - channel = self._channel.get() - chat_id = self._chat_id.get() - if not channel or not chat_id: - return "Error: no session context (channel/chat_id)" + session_key = self._session_key.get() + if not session_key: + return "Error: scheduled automations must be created from a chat session" if tz and not cron_expr: return "Error: tz can only be used with cron_expr" if tz: @@ -210,12 +204,8 @@ class CronTool(Tool, ContextAware): name=name or message[:30], schedule=schedule, message=message, - deliver=deliver, - channel=channel, - to=chat_id, delete_after_run=delete_after, - channel_meta=self._metadata.get(), - session_key=self._session_key.get() or None, + session_key=session_key, ) return f"Created job '{job.name}' (id: {job.id})" diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 8f60fd9ed..3e71cb3ba 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -1,10 +1,12 @@ """CLI commands for nanobot.""" import asyncio +import hashlib import os import select import signal import sys +import time import uuid from collections.abc import Callable from contextlib import nullcontext, suppress @@ -975,12 +977,19 @@ def _run_gateway( from nanobot.bus.queue import MessageBus from nanobot.bus.runtime_events import RuntimeEventBus from nanobot.channels.manager import ChannelManager + from nanobot.cron.automation import ( + AUTOMATION_DEFER_UNTIL_IDLE_META, + AUTOMATION_TRIGGER_META, + ) from nanobot.cron.service import CronService from nanobot.cron.types import CronJob from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot from nanobot.providers.image_generation import image_gen_provider_configs + from nanobot.security.workspace_access import WORKSPACE_SCOPE_METADATA_KEY from nanobot.session.manager import SessionManager + from nanobot.session.routing import read_routing_context from nanobot.session.webui_turns import WebuiTurnCoordinator + from nanobot.utils.prompt_templates import render_template from nanobot.webui.token_usage import TokenUsageHook port = port if port is not None else config.gateway.port @@ -1025,7 +1034,7 @@ def _run_gateway( ).subscribe(runtime_events) from nanobot.agent.loop import UNIFIED_SESSION_KEY - from nanobot.bus.events import OutboundMessage + from nanobot.bus.events import InboundMessage, OutboundMessage def _channel_session_key(channel: str, chat_id: str) -> str: return ( @@ -1034,6 +1043,152 @@ def _run_gateway( else f"{channel}:{chat_id}" ) + def _session_metadata(session_key: str) -> dict[str, Any]: + data = session_manager.read_session_file(session_key) + metadata = data.get("metadata", {}) if isinstance(data, dict) else {} + return dict(metadata) if isinstance(metadata, dict) else {} + + def _bound_session_delivery_context( + session_key: str, + *, + turn_seed: str, + source_label: str | None, + ) -> tuple[str, str, dict[str, Any]]: + if ":" not in session_key: + raise ValueError(f"bound cron session_key is invalid: {session_key!r}") + channel, rest = session_key.split(":", 1) + if not channel or not rest: + raise ValueError(f"bound cron session_key is invalid: {session_key!r}") + + session_metadata = _session_metadata(session_key) + routed = read_routing_context(session_metadata) + if routed is not None: + channel, rest, metadata = routed + else: + metadata: dict[str, Any] = {} + + if channel == "websocket": + metadata["webui"] = True + scope = session_metadata.get(WORKSPACE_SCOPE_METADATA_KEY) + if isinstance(scope, dict): + metadata[WORKSPACE_SCOPE_METADATA_KEY] = dict(scope) + metadata.update( + _proactive_delivery_metadata( + "websocket", + metadata, + turn_seed=turn_seed, + source_label=source_label, + ) + ) + return channel, rest, metadata + + if channel == "slack" and ":" in rest: + chat_id, thread_ts = rest.split(":", 1) + if thread_ts: + metadata["slack"] = {"thread_ts": thread_ts} + return channel, chat_id, metadata + + return channel, rest, metadata + + def _automation_prompt_ref(prompt: str) -> dict[str, Any]: + return { + "id": "cron.agent_turn.reminder", + "version": 1, + "sha256": hashlib.sha256(prompt.encode("utf-8")).hexdigest(), + } + + async def _run_bound_cron_job(job: CronJob) -> str | None: + session_key = job.payload.session_key + if not session_key: + raise ValueError(f"cron job {job.id} is missing payload.session_key") + + prompt = render_template( + "agent/cron_reminder.md", + strip=True, + message=job.payload.message, + ) + prompt_ref = _automation_prompt_ref(prompt) + run_id = f"{job.id}:{int(time.time() * 1000)}:{uuid.uuid4().hex[:8]}" + channel, chat_id, metadata = _bound_session_delivery_context( + session_key, + turn_seed=f"cron:{job.id}", + source_label=job.name, + ) + metadata[AUTOMATION_TRIGGER_META] = { + "job_id": job.id, + "job_name": job.name, + "run_id": run_id, + "prompt_ref": prompt_ref, + "persist_content": ( + f"Scheduled automation triggered: {job.name}\n\n{job.payload.message}" + ), + } + metadata[AUTOMATION_DEFER_UNTIL_IDLE_META] = True + + cron.write_run_record( + run_id, + { + "job_id": job.id, + "job_name": job.name, + "session_key": session_key, + "status": "queued", + "prompt_ref": prompt_ref, + "prompt_vars": {"message": job.payload.message}, + "rendered_prompt": prompt, + }, + ) + + cron_tool = agent.tools.get("cron") + cron_token = None + if isinstance(cron_tool, CronTool): + cron_token = cron_tool.set_cron_context(True) + try: + resp = await agent.submit_automation_turn( + InboundMessage( + channel=channel, + sender_id="cron", + chat_id=chat_id, + content=prompt, + metadata=metadata, + session_key_override=session_key, + ) + ) + except (Exception, asyncio.CancelledError) as exc: + error_text = str(exc) or exc.__class__.__name__ + cron.write_run_record( + run_id, + { + "job_id": job.id, + "job_name": job.name, + "session_key": session_key, + "status": "error", + "error": error_text, + "prompt_ref": prompt_ref, + "prompt_vars": {"message": job.payload.message}, + "rendered_prompt": prompt, + }, + ) + raise + finally: + if isinstance(cron_tool, CronTool) and cron_token is not None: + cron_tool.reset_cron_context(cron_token) + + response = resp.content if resp else "" + cron.write_run_record( + run_id, + { + "job_id": job.id, + "job_name": job.name, + "session_key": session_key, + "status": "ok", + "prompt_ref": prompt_ref, + "prompt_vars": {"message": job.payload.message}, + "rendered_prompt": prompt, + "response": response, + }, + ) + return response + async def _deliver_to_channel( msg: OutboundMessage, *, record: bool = False, session_key: str | None = None, ) -> None: @@ -1194,6 +1349,9 @@ def _run_gateway( logger.info("Heartbeat: silenced by post-run evaluation") return response + if job.payload.kind == "agent_turn" and job.payload.session_key: + return await _run_bound_cron_job(job) + reminder_note = ( "The scheduled time has arrived. Deliver this reminder to the user now, " "as a brief and natural message in their language. Speak directly to them — " diff --git a/nanobot/cron/automation.py b/nanobot/cron/automation.py new file mode 100644 index 000000000..7eacac9ce --- /dev/null +++ b/nanobot/cron/automation.py @@ -0,0 +1,33 @@ +"""Shared metadata helpers for scheduled automation turns.""" + +from __future__ import annotations + +from typing import Any, Mapping + +AUTOMATION_TRIGGER_META = "_automation_trigger" +AUTOMATION_DEFER_UNTIL_IDLE_META = "_defer_until_session_idle" + + +def automation_trigger(metadata: Mapping[str, Any] | None) -> dict[str, Any] | None: + """Return structured automation trigger metadata when present.""" + raw = (metadata or {}).get(AUTOMATION_TRIGGER_META) + return raw if isinstance(raw, dict) else None + + +def is_automation_turn(metadata: Mapping[str, Any] | None) -> bool: + return automation_trigger(metadata) is not None + + +def defer_until_session_idle(metadata: Mapping[str, Any] | None) -> bool: + return bool( + is_automation_turn(metadata) + and (metadata or {}).get(AUTOMATION_DEFER_UNTIL_IDLE_META) is True + ) + + +def automation_run_id(metadata: Mapping[str, Any] | None) -> str | None: + trigger = automation_trigger(metadata) + if not trigger: + return None + value = trigger.get("run_id") + return value if isinstance(value, str) and value else None diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py index 31c5b50a7..a75d024dd 100644 --- a/nanobot/cron/service.py +++ b/nanobot/cron/service.py @@ -84,6 +84,7 @@ class CronService: ): self.store_path = store_path self._action_path = store_path.parent / "action.jsonl" + self._run_records_dir = store_path.parent / "runs" self._lock = FileLock(str(self._action_path.parent) + ".lock") self.on_job = on_job self._store: CronStore | None = None @@ -325,6 +326,23 @@ class CronService: tmp_path.unlink(missing_ok=True) raise + @staticmethod + def _safe_run_record_name(run_id: str) -> str: + return "".join(c if c.isalnum() or c in "._-" else "_" for c in run_id) + + def write_run_record(self, run_id: str, record: dict[str, Any]) -> None: + """Write an internal audit record for one cron execution.""" + name = self._safe_run_record_name(run_id) + if not name: + name = str(uuid.uuid4()) + path = self._run_records_dir / f"{name}.json" + payload = { + **record, + "run_id": run_id, + "updated_at_ms": _now_ms(), + } + self._atomic_write(path, json.dumps(payload, indent=2, ensure_ascii=False)) + async def start(self) -> None: """Start the cron service.""" self._running = True @@ -473,6 +491,20 @@ class CronService: jobs = store.jobs if include_disabled else [j for j in store.jobs if j.enabled] return sorted(jobs, key=lambda j: j.state.next_run_at_ms or float('inf')) + def list_bound_agent_jobs_for_session( + self, + session_key: str, + *, + include_disabled: bool = True, + ) -> list[CronJob]: + """Return user-created bound automation jobs owned by *session_key*.""" + return [ + job + for job in self.list_jobs(include_disabled=include_disabled) + if job.payload.kind == "agent_turn" + and job.payload.session_key == session_key + ] + def add_job( self, name: str, diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 890b25c20..9041aa27e 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -36,6 +36,7 @@ _FORK_VOLATILE_METADATA_KEYS = { "pending_user_turn", "runtime_checkpoint", "thread_goal", + "_routing_context", "title", "title_user_edited", } diff --git a/nanobot/session/routing.py b/nanobot/session/routing.py new file mode 100644 index 000000000..cad4578c0 --- /dev/null +++ b/nanobot/session/routing.py @@ -0,0 +1,105 @@ +"""Persisted session routing context for proactive turns.""" + +from __future__ import annotations + +from typing import Any, Mapping + +from nanobot.bus.events import InboundMessage +from nanobot.cron.automation import is_automation_turn +from nanobot.session.manager import Session + +SESSION_ROUTING_METADATA_KEY = "_routing_context" + +_ROUTING_METADATA_KEYS = { + "chat_type", + "context_chat_id", + "conversation_type", + "event_id", + "message_thread_id", + "msg_type", + "parent_channel_id", + "parent_id", + "platform", + "root_id", + "thread_id", + "thread_reply_to_event_id", + "thread_root_event_id", +} +_CHANNEL_ROUTING_METADATA_KEYS = { + # Feishu needs a message anchor to reply into an existing topic. Other + # channels should avoid stale reply anchors for scheduled automation turns. + "feishu": {"message_id"}, +} +_SLACK_ROUTING_KEYS = {"channel_type", "thread_ts"} + + +def _scalar(value: Any) -> str | int | float | bool | None: + if value is None or isinstance(value, (str, int, float, bool)): + return value + return None + + +def _routing_metadata(channel: str, metadata: Mapping[str, Any] | None) -> dict[str, Any]: + if not isinstance(metadata, Mapping): + return {} + + out: dict[str, Any] = {} + keys = _ROUTING_METADATA_KEYS | _CHANNEL_ROUTING_METADATA_KEYS.get(channel, set()) + for key in keys: + if key not in metadata: + continue + value = _scalar(metadata.get(key)) + if value is not None: + out[key] = value + + slack = metadata.get("slack") + if isinstance(slack, Mapping): + slack_out = { + key: value + for key in _SLACK_ROUTING_KEYS + if (value := _scalar(slack.get(key))) is not None + } + if slack_out: + out["slack"] = slack_out + + return out + + +def routing_context_for_message(msg: InboundMessage) -> dict[str, Any]: + """Return the stable routing context needed to deliver future session turns.""" + return { + "channel": msg.channel, + "chat_id": msg.chat_id, + "metadata": _routing_metadata(msg.channel, msg.metadata), + } + + +def persist_routing_context(session: Session, msg: InboundMessage) -> bool: + """Persist the latest non-automation delivery context for a session.""" + if is_automation_turn(msg.metadata): + return False + context = routing_context_for_message(msg) + if session.metadata.get(SESSION_ROUTING_METADATA_KEY) == context: + return False + session.metadata[SESSION_ROUTING_METADATA_KEY] = context + return True + + +def read_routing_context(metadata: Mapping[str, Any] | None) -> tuple[str, str, dict[str, Any]] | None: + """Decode a persisted routing context from session metadata.""" + if not isinstance(metadata, Mapping): + return None + raw = metadata.get(SESSION_ROUTING_METADATA_KEY) + if not isinstance(raw, Mapping): + return None + + channel = raw.get("channel") + chat_id = raw.get("chat_id") + if not isinstance(channel, str) or not channel: + return None + if not isinstance(chat_id, str) or not chat_id: + return None + + route_meta = raw.get("metadata") + metadata_out = dict(route_meta) if isinstance(route_meta, Mapping) else {} + return channel, chat_id, metadata_out diff --git a/nanobot/templates/agent/cron_reminder.md b/nanobot/templates/agent/cron_reminder.md new file mode 100644 index 000000000..af9803d5a --- /dev/null +++ b/nanobot/templates/agent/cron_reminder.md @@ -0,0 +1,9 @@ +The scheduled time has arrived. Execute this scheduled automation now and report the result to the user in the same session. + +Rules: +- Speak directly to the user in their language. +- Do not narrate internal progress. +- Do not include user IDs. +- Do not add status reports like "Done" or "Reminded" unless they are the natural response. + +Automation: {{ message }} diff --git a/nanobot/webui/session_automations.py b/nanobot/webui/session_automations.py index 52d503f54..8a57b9442 100644 --- a/nanobot/webui/session_automations.py +++ b/nanobot/webui/session_automations.py @@ -8,7 +8,25 @@ from nanobot.cron.types import CronJob class _CronServiceLike(Protocol): - def list_jobs(self, *, include_disabled: bool = False) -> list[CronJob]: ... + def list_bound_agent_jobs_for_session( + self, + session_key: str, + *, + include_disabled: bool = True, + ) -> list[CronJob]: ... + + +def bound_session_automation_jobs( + cron_service: _CronServiceLike | None, + session_key: str, +) -> list[CronJob]: + """Return agent-turn automation jobs explicitly bound to *session_key*.""" + if cron_service is None: + return [] + return cron_service.list_bound_agent_jobs_for_session( + session_key, + include_disabled=True, + ) def session_automations_payload( @@ -16,22 +34,11 @@ def session_automations_payload( session_key: str, ) -> dict[str, Any]: """Return user-created automation jobs attached to a WebUI session.""" - jobs: list[CronJob] = [] - if cron_service is not None: - all_jobs = cron_service.list_jobs(include_disabled=True) - jobs = [job for job in all_jobs if _job_matches_session(job, session_key)] - return {"jobs": [_serialize_job(job) for job in jobs]} + return {"jobs": serialize_automation_jobs(bound_session_automation_jobs(cron_service, session_key))} -def _job_matches_session(job: CronJob, session_key: str) -> bool: - payload = job.payload - if payload.kind != "agent_turn": - return False - if payload.session_key: - return payload.session_key == session_key - if payload.channel and payload.to: - return f"{payload.channel}:{payload.to}" == session_key - return False +def serialize_automation_jobs(jobs: list[CronJob]) -> list[dict[str, Any]]: + return [_serialize_job(job) for job in jobs] def _serialize_job(job: CronJob) -> dict[str, Any]: diff --git a/nanobot/webui/ws_http.py b/nanobot/webui/ws_http.py index 101b309fe..e0e0d321f 100644 --- a/nanobot/webui/ws_http.py +++ b/nanobot/webui/ws_http.py @@ -61,7 +61,11 @@ from nanobot.webui.http_utils import ( safe_host_header as _safe_host_header, ) from nanobot.webui.media_gateway import WebUIMediaGateway -from nanobot.webui.session_automations import session_automations_payload +from nanobot.webui.session_automations import ( + bound_session_automation_jobs, + serialize_automation_jobs, + session_automations_payload, +) from nanobot.webui.session_list_index import list_webui_sessions from nanobot.webui.sidebar_state import ( read_webui_sidebar_state, @@ -446,6 +450,20 @@ class GatewayHTTPHandler: return _http_error(400, "invalid session key") if not _is_websocket_channel_session_key(decoded_key): return _http_error(404, "session not found") + query = _parse_query(request.path) + delete_automations = (_query_first(query, "delete_automations") or "").lower() + bound_jobs = bound_session_automation_jobs(self.cron_service, decoded_key) + if bound_jobs and delete_automations not in {"1", "true", "yes"}: + return _http_json_response( + { + "deleted": False, + "blocked_by_automations": True, + "automations": serialize_automation_jobs(bound_jobs), + } + ) + if bound_jobs and self.cron_service is not None: + for job in bound_jobs: + self.cron_service.remove_job(job.id) deleted = self.session_manager.delete_session(decoded_key) delete_webui_thread(decoded_key) return _http_json_response({"deleted": bool(deleted)}) diff --git a/tests/agent/test_loop_save_turn.py b/tests/agent/test_loop_save_turn.py index 295bc4888..1c1f9de64 100644 --- a/tests/agent/test_loop_save_turn.py +++ b/tests/agent/test_loop_save_turn.py @@ -11,6 +11,7 @@ from nanobot.bus.queue import MessageBus from nanobot.providers.base import LLMResponse from nanobot.session.goal_state import GOAL_STATE_KEY from nanobot.session.manager import Session, SessionManager +from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY from nanobot.session.turn_continuation import ( INTERNAL_CONTINUATION_META, INTERNAL_CONTINUATION_RUN_STARTED_AT_META, @@ -827,6 +828,12 @@ async def test_process_message_uses_context_chat_id_for_runtime_prompt(tmp_path: assert result.chat_id == "thread-777" assert loop.context.build_messages.call_args.kwargs["chat_id"] == "parent-456" assert loop._run_agent_loop.call_args.kwargs["chat_id"] == "thread-777" + session = loop.sessions.get_or_create("discord:parent-456:thread:thread-777") + assert session.metadata[SESSION_ROUTING_METADATA_KEY] == { + "channel": "discord", + "chat_id": "thread-777", + "metadata": {"context_chat_id": "parent-456"}, + } @pytest.mark.asyncio diff --git a/tests/agent/test_runner_injections.py b/tests/agent/test_runner_injections.py index 3686574c8..b5d970a11 100644 --- a/tests/agent/test_runner_injections.py +++ b/tests/agent/test_runner_injections.py @@ -616,6 +616,54 @@ async def test_followup_routed_to_pending_queue(tmp_path): assert queued_msg.session_key == UNIFIED_SESSION_KEY +@pytest.mark.asyncio +async def test_automation_turn_deferred_while_session_active(tmp_path): + """Automation turns wait for the active session instead of becoming injections.""" + from nanobot.bus.events import InboundMessage + from nanobot.cron.automation import ( + AUTOMATION_DEFER_UNTIL_IDLE_META, + AUTOMATION_TRIGGER_META, + ) + + loop = _make_loop(tmp_path) + loop._dispatch = AsyncMock() # type: ignore[method-assign] + + session_key = "websocket:chat-1" + pending = asyncio.Queue(maxsize=20) + loop._pending_queues[session_key] = pending + + run_task = asyncio.create_task(loop.run()) + msg = InboundMessage( + channel="websocket", + sender_id="cron", + chat_id="chat-1", + content="scheduled work", + metadata={ + AUTOMATION_TRIGGER_META: {"run_id": "run-1"}, + AUTOMATION_DEFER_UNTIL_IDLE_META: True, + }, + session_key_override=session_key, + ) + await loop.bus.publish_inbound(msg) + + for _ in range(20): + if loop._deferred_automation_queues.get(session_key): + break + await asyncio.sleep(0.05) + + loop.stop() + await asyncio.wait_for(run_task, timeout=2) + + assert pending.empty() + assert loop._dispatch.await_count == 0 + assert loop._deferred_automation_queues[session_key] == [msg] + + await loop._publish_next_deferred_automation(session_key) + queued = await asyncio.wait_for(loop.bus.consume_inbound(), timeout=0.5) + assert queued is msg + assert session_key not in loop._deferred_automation_queues + + @pytest.mark.asyncio async def test_pending_queue_preserves_overflow_for_next_injection_cycle(tmp_path): """Pending queue should leave overflow messages queued for later drains.""" diff --git a/tests/agent/test_session_manager_history.py b/tests/agent/test_session_manager_history.py index 3441c4833..91520ed86 100644 --- a/tests/agent/test_session_manager_history.py +++ b/tests/agent/test_session_manager_history.py @@ -1,4 +1,5 @@ from nanobot.session.manager import Session, SessionManager +from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY def _assert_no_orphans(history: list[dict]) -> None: @@ -432,6 +433,11 @@ def test_fork_session_before_user_index_copies_only_prefix(tmp_path): source.metadata["webui"] = True source.metadata["title"] = "Old title" source.metadata["goal_state"] = {"status": "active", "objective": "do not inherit"} + source.metadata[SESSION_ROUTING_METADATA_KEY] = { + "channel": "websocket", + "chat_id": "source", + "metadata": {}, + } source.add_message("user", "round1") source.add_message("assistant", "answer1") source.add_message("user", "round2 fork me") @@ -450,6 +456,7 @@ def test_fork_session_before_user_index_copies_only_prefix(tmp_path): assert forked.metadata["webui"] is True assert "title" not in forked.metadata assert "goal_state" not in forked.metadata + assert SESSION_ROUTING_METADATA_KEY not in forked.metadata saved = manager.read_session_file("websocket:fork") assert [m["content"] for m in saved["messages"]] == ["round1", "answer1"] diff --git a/tests/channels/test_websocket_http_routes.py b/tests/channels/test_websocket_http_routes.py index 8eba67588..bf2dafe59 100644 --- a/tests/channels/test_websocket_http_routes.py +++ b/tests/channels/test_websocket_http_routes.py @@ -188,6 +188,13 @@ async def test_session_automations_route_filters_by_webui_session( to=to, session_key=f"websocket:{to}", ) + cron.add_job( + name="Legacy same target", + schedule=hourly, + message="Legacy job should not be treated as bound", + channel="websocket", + to="abc", + ) cron.register_system_job( CronJob( id="heartbeat", @@ -659,6 +666,91 @@ async def test_session_delete_removes_file( await server_task +@pytest.mark.asyncio +async def test_session_delete_blocks_when_bound_automation_exists( + bus: MagicMock, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr("nanobot.config.paths.get_data_dir", lambda: tmp_path) + sm = _seed_session(tmp_path, key="websocket:doomed") + cron = CronService(tmp_path / "cron" / "jobs.json") + cron.add_job( + name="Daily check", + schedule=CronSchedule(kind="every", every_ms=86_400_000), + message="Check the repo", + session_key="websocket:doomed", + ) + channel = _ch(bus, session_manager=sm, cron_service=cron, port=29915) + server_task = asyncio.create_task(channel.start()) + await asyncio.sleep(0.3) + try: + boot = await _http_get("http://127.0.0.1:29915/webui/bootstrap") + token = boot.json()["token"] + auth = {"Authorization": f"Bearer {token}"} + + path = sm._get_session_path("websocket:doomed") + resp = await _http_get( + "http://127.0.0.1:29915/api/sessions/websocket:doomed/delete", + headers=auth, + ) + + assert resp.status_code == 200 + body = resp.json() + assert body["deleted"] is False + assert body["blocked_by_automations"] is True + assert [job["name"] for job in body["automations"]] == ["Daily check"] + assert path.exists() + assert cron.list_bound_agent_jobs_for_session("websocket:doomed") + finally: + await channel.stop() + await server_task + + +@pytest.mark.asyncio +async def test_session_delete_can_cascade_bound_automations( + bus: MagicMock, tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr("nanobot.config.paths.get_data_dir", lambda: tmp_path) + sm = _seed_session(tmp_path, key="websocket:doomed") + cron = CronService(tmp_path / "cron" / "jobs.json") + cron.add_job( + name="Daily check", + schedule=CronSchedule(kind="every", every_ms=86_400_000), + message="Check the repo", + session_key="websocket:doomed", + ) + cron.add_job( + name="Legacy same target", + schedule=CronSchedule(kind="every", every_ms=86_400_000), + message="Legacy job remains", + channel="websocket", + to="doomed", + ) + channel = _ch(bus, session_manager=sm, cron_service=cron, port=29916) + server_task = asyncio.create_task(channel.start()) + await asyncio.sleep(0.3) + try: + boot = await _http_get("http://127.0.0.1:29916/webui/bootstrap") + token = boot.json()["token"] + auth = {"Authorization": f"Bearer {token}"} + + path = sm._get_session_path("websocket:doomed") + resp = await _http_get( + "http://127.0.0.1:29916/api/sessions/websocket:doomed/delete?delete_automations=true", + headers=auth, + ) + + assert resp.status_code == 200 + assert resp.json()["deleted"] is True + assert not path.exists() + assert cron.list_bound_agent_jobs_for_session("websocket:doomed") == [] + assert [job.name for job in cron.list_jobs(include_disabled=True)] == [ + "Legacy same target" + ] + finally: + await channel.stop() + await server_task + + @pytest.mark.asyncio async def test_session_routes_accept_percent_encoded_websocket_keys( bus: MagicMock, tmp_path: Path diff --git a/tests/cli/test_commands.py b/tests/cli/test_commands.py index 3e30de858..01be99252 100644 --- a/tests/cli/test_commands.py +++ b/tests/cli/test_commands.py @@ -8,13 +8,14 @@ from unittest.mock import AsyncMock, MagicMock, patch import pytest from typer.testing import CliRunner -from nanobot.bus.events import OutboundMessage +from nanobot.bus.events import InboundMessage, OutboundMessage from nanobot.cli.commands import _proactive_delivery_metadata, app from nanobot.config.schema import Config from nanobot.cron.types import CronJob, CronPayload from nanobot.providers.factory import ProviderSnapshot, make_provider from nanobot.providers.openai_codex_provider import _strip_model_prefix from nanobot.providers.registry import find_by_name +from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY runner = CliRunner() @@ -1352,7 +1353,6 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( "webui_turn_id": old_turn_id, "workspace_scope": {"mode": "default"}, }, - session_key="websocket:chat-1", ), ) @@ -1373,6 +1373,175 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context( } +def test_gateway_bound_cron_runs_as_session_turn( + monkeypatch, tmp_path: Path +) -> None: + config_file = tmp_path / "instance" / "config.json" + config_file.parent.mkdir(parents=True) + config_file.write_text("{}") + + config = Config() + config.agents.defaults.workspace = str(tmp_path / "config-workspace") + provider = _fake_provider() + bus = MagicMock() + bus.publish_outbound = AsyncMock() + seen: dict[str, object] = {"run_records": []} + + monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None) + monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config) + monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None) + monkeypatch.setattr("nanobot.providers.factory.make_provider", lambda _config: provider) + monkeypatch.setattr( + "nanobot.providers.factory.build_provider_snapshot", + lambda _config: _test_provider_snapshot(provider, _config), + ) + monkeypatch.setattr( + "nanobot.providers.factory.load_provider_snapshot", + lambda _config_path=None: _test_provider_snapshot(provider, config), + ) + monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus) + + route_metadata = { + "websocket:chat-1": { + "workspace_scope": { + "project_path": str(tmp_path), + "access_mode": "restricted", + }, + SESSION_ROUTING_METADATA_KEY: { + "channel": "websocket", + "chat_id": "chat-1", + "metadata": {}, + }, + }, + "discord:456:thread:777": { + SESSION_ROUTING_METADATA_KEY: { + "channel": "discord", + "chat_id": "777", + "metadata": { + "context_chat_id": "456", + "parent_channel_id": "456", + "thread_id": "777", + }, + }, + }, + } + + class _FakeSessionManager: + def __init__(self, _workspace: Path) -> None: + pass + + def read_session_file(self, key: str) -> dict[str, object] | None: + return {"metadata": route_metadata.get(key, {})} + + monkeypatch.setattr("nanobot.session.manager.SessionManager", _FakeSessionManager) + + class _FakeCron: + def __init__(self, _store_path: Path) -> None: + self.on_job = None + seen["cron"] = self + + def write_run_record(self, run_id: str, record: dict[str, object]) -> None: + seen["run_records"].append((run_id, record)) + + class _FakeAgentLoop: + @classmethod + def from_config(cls, config, bus=None, **extra): + return cls(**extra) + + def __init__(self, *args, **kwargs) -> None: + self.model = "test-model" + self.provider = kwargs.get("provider", object()) + self.tools = {} + seen["agent"] = self + + async def submit_automation_turn(self, msg: InboundMessage): + seen["automation_msg"] = msg + return OutboundMessage( + channel=msg.channel, + chat_id=msg.chat_id, + content="Checked the repo.", + ) + + async def close_mcp(self) -> None: + return None + + async def run(self) -> None: + return None + + def stop(self) -> None: + return None + + class _StopAfterCronSetup: + def __init__(self, *_args, **_kwargs) -> None: + raise _StopGatewayError("stop") + + async def _unexpected_evaluator(*_args, **_kwargs) -> bool: + raise AssertionError("bound cron must not use legacy response evaluator") + + monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron) + monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop) + monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup) + monkeypatch.setattr("nanobot.cli.commands.evaluate_response", _unexpected_evaluator) + + result = runner.invoke(app, ["gateway", "--config", str(config_file)]) + assert isinstance(result.exception, _StopGatewayError) + + cron = seen["cron"] + job = CronJob( + id="repo-check", + name="Repo check", + payload=CronPayload( + message="Check repository health.", + session_key="websocket:chat-1", + ), + ) + + response = asyncio.run(cron.on_job(job)) + + assert response == "Checked the repo." + msg = seen["automation_msg"] + assert isinstance(msg, InboundMessage) + assert msg.channel == "websocket" + assert msg.chat_id == "chat-1" + assert msg.sender_id == "cron" + assert msg.session_key_override == "websocket:chat-1" + assert "Automation: Check repository health." in msg.content + assert msg.metadata["webui"] is True + assert msg.metadata["workspace_scope"]["project_path"] == str(tmp_path) + assert msg.metadata["_webui_message_source"] == {"kind": "cron", "label": "Repo check"} + trigger = msg.metadata["_automation_trigger"] + assert trigger["job_id"] == "repo-check" + assert trigger["job_name"] == "Repo check" + assert trigger["persist_content"] == ( + "Scheduled automation triggered: Repo check\n\nCheck repository health." + ) + assert msg.metadata["_defer_until_session_idle"] is True + statuses = [record["status"] for _run_id, record in seen["run_records"]] + assert statuses == ["queued", "ok"] + assert seen["run_records"][0][0] == seen["run_records"][1][0] + + discord_job = CronJob( + id="thread-check", + name="Thread check", + payload=CronPayload( + message="Check the Discord thread.", + session_key="discord:456:thread:777", + ), + ) + + response = asyncio.run(cron.on_job(discord_job)) + + assert response == "Checked the repo." + msg = seen["automation_msg"] + assert isinstance(msg, InboundMessage) + assert msg.channel == "discord" + assert msg.chat_id == "777" + assert msg.session_key_override == "discord:456:thread:777" + assert msg.metadata["context_chat_id"] == "456" + assert msg.metadata["parent_channel_id"] == "456" + assert msg.metadata["thread_id"] == "777" + + def test_gateway_cron_job_suppresses_intermediate_progress( monkeypatch, tmp_path: Path ) -> None: diff --git a/tests/cron/test_cron_tool_list.py b/tests/cron/test_cron_tool_list.py index b67879715..23d426031 100644 --- a/tests/cron/test_cron_tool_list.py +++ b/tests/cron/test_cron_tool_list.py @@ -303,7 +303,9 @@ def test_remove_protected_dream_job_returns_clear_feedback(tmp_path) -> None: def test_add_cron_job_defaults_to_tool_timezone(tmp_path) -> None: tool = _make_tool_with_tz(tmp_path, "Asia/Shanghai") - tool.set_context(RequestContext(channel="telegram", chat_id="chat-1")) + tool.set_context( + RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1") + ) result = tool._add_job(None, "Morning standup", None, "0 8 * * *", None, None) @@ -314,7 +316,9 @@ def test_add_cron_job_defaults_to_tool_timezone(tmp_path) -> None: def test_add_at_job_uses_default_timezone_for_naive_datetime(tmp_path) -> None: tool = _make_tool_with_tz(tmp_path, "Asia/Shanghai") - tool.set_context(RequestContext(channel="telegram", chat_id="chat-1")) + tool.set_context( + RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1") + ) result = tool._add_job(None, "Morning reminder", None, None, None, "2026-03-25T08:00:00") @@ -324,26 +328,29 @@ def test_add_at_job_uses_default_timezone_for_naive_datetime(tmp_path) -> None: assert job.schedule.at_ms == expected -def test_add_job_delivers_by_default(tmp_path) -> None: +def test_add_job_binds_current_session_key(tmp_path) -> None: tool = _make_tool(tmp_path) - tool.set_context(RequestContext(channel="telegram", chat_id="chat-1")) + tool.set_context( + RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1") + ) result = tool._add_job(None, "Morning standup", 60, None, None, None) assert result.startswith("Created job") job = tool._cron.list_jobs()[0] - assert job.payload.deliver is True + assert job.payload.session_key == "telegram:chat-1" + assert job.payload.channel is None + assert job.payload.to is None -def test_add_job_can_disable_delivery(tmp_path) -> None: +def test_add_job_requires_session_key(tmp_path) -> None: tool = _make_tool(tmp_path) tool.set_context(RequestContext(channel="telegram", chat_id="chat-1")) result = tool._add_job(None, "Background refresh", 60, None, None, None, deliver=False) - assert result.startswith("Created job") - job = tool._cron.list_jobs()[0] - assert job.payload.deliver is False + assert result == "Error: scheduled automations must be created from a chat session" + assert tool._cron.list_jobs() == [] def test_cron_schema_advertises_action_specific_requirements(tmp_path) -> None: @@ -375,7 +382,9 @@ def test_validate_params_requires_message_only_for_add(tmp_path) -> None: def test_add_job_empty_message_returns_actionable_error(tmp_path) -> None: tool = _make_tool(tmp_path) - tool.set_context(RequestContext(channel="telegram", chat_id="chat-1")) + tool.set_context( + RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1") + ) result = tool._add_job(None, "", 60, None, None, None) @@ -383,8 +392,8 @@ def test_add_job_empty_message_returns_actionable_error(tmp_path) -> None: assert "Retry including message=" in result -def test_add_job_captures_metadata_and_session_key(tmp_path) -> None: - """CronTool stores channel metadata and session_key when adding a job.""" +def test_add_job_captures_only_session_key(tmp_path) -> None: + """CronTool stores the canonical session key without legacy delivery fields.""" tool = _make_tool(tmp_path) meta = {"slack": {"thread_ts": "111.222", "channel_type": "channel"}} tool.set_context(RequestContext( @@ -396,8 +405,10 @@ def test_add_job_captures_metadata_and_session_key(tmp_path) -> None: jobs = tool._cron.list_jobs() assert len(jobs) == 1 - assert jobs[0].payload.channel_meta == meta assert jobs[0].payload.session_key == "slack:C99:111.222" + assert jobs[0].payload.channel is None + assert jobs[0].payload.to is None + assert jobs[0].payload.channel_meta == {} def test_list_excludes_disabled_jobs(tmp_path) -> None: diff --git a/tests/cron/test_cron_tool_schema_contract.py b/tests/cron/test_cron_tool_schema_contract.py index e26989d85..b0454b9f7 100644 --- a/tests/cron/test_cron_tool_schema_contract.py +++ b/tests/cron/test_cron_tool_schema_contract.py @@ -41,7 +41,9 @@ class _SvcStub: @pytest.fixture def registry() -> ToolRegistry: tool = CronTool(_SvcStub(), default_timezone="UTC") - tool.set_context(RequestContext(channel="channel", chat_id="chat-id")) + tool.set_context( + RequestContext(channel="channel", chat_id="chat-id", session_key="channel:chat-id") + ) reg = ToolRegistry() reg.register(tool) return reg diff --git a/tests/session/test_routing.py b/tests/session/test_routing.py new file mode 100644 index 000000000..fdd882f90 --- /dev/null +++ b/tests/session/test_routing.py @@ -0,0 +1,53 @@ +from nanobot.bus.events import InboundMessage +from nanobot.session.routing import routing_context_for_message + + +def test_routing_context_keeps_telegram_topic_without_stale_message_id() -> None: + context = routing_context_for_message( + InboundMessage( + channel="telegram", + sender_id="user-1", + chat_id="-100123", + content="set a reminder", + metadata={ + "message_id": 100, + "message_thread_id": 42, + "_progress": True, + }, + session_key_override="telegram:-100123:topic:42", + ) + ) + + assert context == { + "channel": "telegram", + "chat_id": "-100123", + "metadata": {"message_thread_id": 42}, + } + + +def test_routing_context_keeps_feishu_topic_anchor() -> None: + context = routing_context_for_message( + InboundMessage( + channel="feishu", + sender_id="ou_user", + chat_id="oc_chat", + content="set a reminder", + metadata={ + "chat_type": "group", + "message_id": "om_msg", + "thread_id": "omt_thread", + "_progress": True, + }, + session_key_override="feishu:oc_chat:om_root", + ) + ) + + assert context == { + "channel": "feishu", + "chat_id": "oc_chat", + "metadata": { + "chat_type": "group", + "message_id": "om_msg", + "thread_id": "omt_thread", + }, + } diff --git a/tests/test_tool_contextvars.py b/tests/test_tool_contextvars.py index e2b7f66ab..a72296d67 100644 --- a/tests/test_tool_contextvars.py +++ b/tests/test_tool_contextvars.py @@ -99,14 +99,18 @@ async def test_cron_tool_keeps_task_local_context(tmp_path) -> None: release = asyncio.Event() async def task_one() -> str: - tool.set_context(RequestContext(channel="feishu", chat_id="chat-a")) + tool.set_context( + RequestContext(channel="feishu", chat_id="chat-a", session_key="feishu:chat-a") + ) entered.set() await release.wait() return await tool.execute(action="add", message="first", every_seconds=60) async def task_two() -> str: await entered.wait() - tool.set_context(RequestContext(channel="email", chat_id="chat-b")) + tool.set_context( + RequestContext(channel="email", chat_id="chat-b", session_key="email:chat-b") + ) release.set() return await tool.execute(action="add", message="second", every_seconds=60) @@ -116,8 +120,7 @@ async def test_cron_tool_keeps_task_local_context(tmp_path) -> None: assert result_two.startswith("Created job") jobs = tool._cron.list_jobs() - assert {job.payload.channel for job in jobs} == {"feishu", "email"} - assert {job.payload.to for job in jobs} == {"chat-a", "chat-b"} + assert {job.payload.session_key for job in jobs} == {"feishu:chat-a", "email:chat-b"} # --- Basic single-task regression tests --- @@ -228,15 +231,16 @@ async def test_spawn_tool_default_values_without_set_context() -> None: async def test_cron_tool_basic_set_context_and_execute(tmp_path) -> None: """Single task: set_context then add job should use correct target.""" tool = CronTool(CronService(tmp_path / "jobs.json")) - tool.set_context(RequestContext(channel="wechat", chat_id="user-789")) + tool.set_context( + RequestContext(channel="wechat", chat_id="user-789", session_key="wechat:user-789") + ) result = await tool.execute(action="add", message="standup", every_seconds=300) assert result.startswith("Created job") jobs = tool._cron.list_jobs() assert len(jobs) == 1 - assert jobs[0].payload.channel == "wechat" - assert jobs[0].payload.to == "user-789" + assert jobs[0].payload.session_key == "wechat:user-789" @pytest.mark.asyncio @@ -245,4 +249,4 @@ async def test_cron_tool_no_context_returns_error(tmp_path) -> None: tool = CronTool(CronService(tmp_path / "jobs.json")) result = await tool.execute(action="add", message="test", every_seconds=60) - assert result == "Error: no session context (channel/chat_id)" + assert result == "Error: scheduled automations must be created from a chat session" diff --git a/webui/src/App.tsx b/webui/src/App.tsx index 70c6ef6cf..55f94fb7c 100644 --- a/webui/src/App.tsx +++ b/webui/src/App.tsx @@ -36,6 +36,7 @@ import { ClientProvider, useClient } from "@/providers/ClientProvider"; import type { ChatSummary, RuntimeSurface, + SessionAutomationJob, SettingsPayload, WorkspaceScopePayload, WorkspacesPayload, @@ -546,6 +547,8 @@ function Shell({ const [pendingDelete, setPendingDelete] = useState<{ key: string; label: string; + automations?: SessionAutomationJob[]; + confirmAutomations?: boolean; } | null>(null); const [pendingRename, setPendingRename] = useState<{ key: string; @@ -1275,24 +1278,28 @@ function Shell({ const fallbackKey = deletingActive ? (sessions[currentIndex + 1]?.key ?? sessions[currentIndex - 1]?.key ?? null) : activeKey; - setPendingDelete(null); - if (deletingActive) { - navigate({ - view: "chat", - activeKey: fallbackKey, - settingsSection: "overview", - }, { replace: true }); - } try { - await deleteChat(key); - } catch (e) { + const result = await deleteChat( + key, + pendingDelete.confirmAutomations ? { deleteAutomations: true } : undefined, + ); + if (result.blocked_by_automations) { + setPendingDelete({ + ...pendingDelete, + automations: result.automations ?? [], + confirmAutomations: true, + }); + return; + } + setPendingDelete(null); if (deletingActive) { navigate({ view: "chat", - activeKey: key, + activeKey: fallbackKey, settingsSection: "overview", }, { replace: true }); } + } catch (e) { console.error("Failed to delete session", e); } }, [pendingDelete, deleteChat, activeKey, navigate, sessions]); @@ -1559,6 +1566,7 @@ function Shell({ setPendingDelete(null)} onConfirm={onConfirmDelete} /> diff --git a/webui/src/components/DeleteConfirm.tsx b/webui/src/components/DeleteConfirm.tsx index fdad9e3ac..2a58a7dc5 100644 --- a/webui/src/components/DeleteConfirm.tsx +++ b/webui/src/components/DeleteConfirm.tsx @@ -10,10 +10,12 @@ import { } from "@/components/ui/alert-dialog"; import { Trash2 } from "lucide-react"; import { useTranslation } from "react-i18next"; +import type { SessionAutomationJob } from "@/lib/types"; interface DeleteConfirmProps { open: boolean; title: string; + automations?: SessionAutomationJob[]; onCancel: () => void; onConfirm: () => void; } @@ -21,14 +23,18 @@ interface DeleteConfirmProps { export function DeleteConfirm({ open, title, + automations = [], onCancel, onConfirm, }: DeleteConfirmProps) { const { t } = useTranslation(); + const hasAutomations = automations.length > 0; + const visibleAutomations = automations.slice(0, 4); + const hiddenCount = Math.max(0, automations.length - visibleAutomations.length); return ( (!o ? onCancel() : undefined)}>
@@ -40,8 +46,31 @@ export function DeleteConfirm({ {t("deleteConfirm.title", { title })} - {t("deleteConfirm.description")} + {hasAutomations + ? t("deleteConfirm.automationsDescription", { + count: automations.length, + defaultValue: + "This chat has scheduled automations. Deleting it will also delete them.", + }) + : t("deleteConfirm.description")} + {hasAutomations ? ( +
+ {visibleAutomations.map((job) => ( +
+ {job.name || job.id} +
+ ))} + {hiddenCount > 0 ? ( +
+ {t("deleteConfirm.moreAutomations", { + count: hiddenCount, + defaultValue: "+ {{count}} more", + })} +
+ ) : null} +
+ ) : null} - {t("deleteConfirm.confirm")} + {hasAutomations + ? t("deleteConfirm.confirmWithAutomations", { + defaultValue: "Delete all", + }) + : t("deleteConfirm.confirm")} diff --git a/webui/src/hooks/useSessions.ts b/webui/src/hooks/useSessions.ts index ab2aed727..428b37312 100644 --- a/webui/src/hooks/useSessions.ts +++ b/webui/src/hooks/useSessions.ts @@ -9,7 +9,12 @@ import { listSessions, } from "@/lib/api"; import { deriveTitle } from "@/lib/format"; -import type { ChatSummary, UIMessage, WorkspaceScopePayload } from "@/lib/types"; +import type { + ChatSummary, + SessionDeleteResult, + UIMessage, + WorkspaceScopePayload, +} from "@/lib/types"; const EMPTY_MESSAGES: UIMessage[] = []; const INITIAL_HISTORY_PAGE_LIMIT = 160; @@ -31,7 +36,10 @@ export function useSessions(): { refresh: () => Promise; createChat: (workspaceScope?: WorkspaceScopePayload | null) => Promise; forkChat: (sourceChatId: string, beforeUserIndex: number, title?: string) => Promise; - deleteChat: (key: string) => Promise; + deleteChat: ( + key: string, + options?: { deleteAutomations?: boolean }, + ) => Promise; } { const { client, token } = useClient(); const [sessions, setSessions] = useState([]); @@ -124,10 +132,12 @@ export function useSessions(): { }, [client]); const deleteChat = useCallback( - async (key: string) => { - await apiDeleteSession(tokenRef.current, key); + async (key: string, options?: { deleteAutomations?: boolean }) => { + const result = await apiDeleteSession(tokenRef.current, key, options); + if (!result.deleted) return result; optimisticKeysRef.current.delete(key); setSessions((prev) => prev.filter((s) => s.key !== key)); + return result; }, [], ); diff --git a/webui/src/lib/api.ts b/webui/src/lib/api.ts index 9b8aa2551..5a6fee9cd 100644 --- a/webui/src/lib/api.ts +++ b/webui/src/lib/api.ts @@ -9,6 +9,7 @@ import type { NetworkSafetySettingsUpdate, ProviderModelsPayload, ProviderSettingsUpdate, + SessionDeleteResult, SessionAutomationsPayload, SettingsPayload, SettingsUpdate, @@ -211,13 +212,18 @@ export async function fetchSkillDetail( export async function deleteSession( token: string, key: string, + optionsOrBase?: { deleteAutomations?: boolean } | string, base: string = "", -): Promise { - const body = await request<{ deleted: boolean }>( - `${base}/api/sessions/${encodeURIComponent(key)}/delete`, +): Promise { + const options = typeof optionsOrBase === "string" ? undefined : optionsOrBase; + const resolvedBase = typeof optionsOrBase === "string" ? optionsOrBase : base; + const query = new URLSearchParams(); + if (options?.deleteAutomations) query.set("delete_automations", "true"); + const suffix = query.toString() ? `?${query}` : ""; + return request( + `${resolvedBase}/api/sessions/${encodeURIComponent(key)}/delete${suffix}`, token, ); - return body.deleted; } export async function fetchSettings( diff --git a/webui/src/lib/types.ts b/webui/src/lib/types.ts index 3365b83f4..f02a2e650 100644 --- a/webui/src/lib/types.ts +++ b/webui/src/lib/types.ts @@ -118,6 +118,12 @@ export interface SessionAutomationJob { export interface SessionAutomationsPayload { jobs: SessionAutomationJob[]; } +export interface SessionDeleteResult { + deleted: boolean; + blocked_by_automations?: boolean; + automations?: SessionAutomationJob[]; +} + export interface SkillSummary { name: string; description: string; diff --git a/webui/src/tests/api.test.ts b/webui/src/tests/api.test.ts index f4c5972f2..71a5b8648 100644 --- a/webui/src/tests/api.test.ts +++ b/webui/src/tests/api.test.ts @@ -131,6 +131,17 @@ describe("webui API helpers", () => { ); }); + it("passes the automation cascade flag when deleting a session", async () => { + await deleteSession("tok", "websocket:chat-1", { deleteAutomations: true }); + + expect(fetch).toHaveBeenCalledWith( + "/api/sessions/websocket%3Achat-1/delete?delete_automations=true", + expect.objectContaining({ + headers: { Authorization: "Bearer tok" }, + }), + ); + }); + it("serializes settings updates as a narrow query string", async () => { await updateSettings("tok", { modelPreset: "default", diff --git a/webui/src/tests/app-layout.test.tsx b/webui/src/tests/app-layout.test.tsx index 3fa3e8124..8735fa00b 100644 --- a/webui/src/tests/app-layout.test.tsx +++ b/webui/src/tests/app-layout.test.tsx @@ -149,6 +149,7 @@ vi.mock("@/hooks/useSessions", async (importOriginal) => { deleteChat: async (key: string) => { await deleteChatSpy(key); setSessions((prev: ChatSummary[]) => prev.filter((s) => s.key !== key)); + return { deleted: true }; }, }; }, diff --git a/webui/src/tests/useSessions.test.tsx b/webui/src/tests/useSessions.test.tsx index a606b249a..826a862a1 100644 --- a/webui/src/tests/useSessions.test.tsx +++ b/webui/src/tests/useSessions.test.tsx @@ -103,7 +103,7 @@ describe("useSessions", () => { preview: "Beta", }, ]); - vi.mocked(api.deleteSession).mockResolvedValue(true); + vi.mocked(api.deleteSession).mockResolvedValue({ deleted: true }); const { result } = renderHook(() => useSessions(), { wrapper: wrap(fakeClient()), @@ -115,10 +115,42 @@ describe("useSessions", () => { await result.current.deleteChat("websocket:chat-a"); }); - expect(api.deleteSession).toHaveBeenCalledWith("tok", "websocket:chat-a"); + expect(api.deleteSession).toHaveBeenCalledWith("tok", "websocket:chat-a", undefined); expect(result.current.sessions.map((s) => s.key)).toEqual(["websocket:chat-b"]); }); + it("keeps a session when delete is blocked by bound automations", async () => { + vi.mocked(api.listSessions).mockResolvedValue([ + { + key: "websocket:chat-a", + channel: "websocket", + chatId: "chat-a", + createdAt: "2026-04-16T10:00:00Z", + updatedAt: "2026-04-16T10:00:00Z", + preview: "Alpha", + }, + ]); + vi.mocked(api.deleteSession).mockResolvedValue({ + deleted: false, + blocked_by_automations: true, + automations: [], + }); + + const { result } = renderHook(() => useSessions(), { + wrapper: wrap(fakeClient()), + }); + + await waitFor(() => expect(result.current.sessions).toHaveLength(1)); + + let deleteResult: Awaited> | undefined; + await act(async () => { + deleteResult = await result.current.deleteChat("websocket:chat-a"); + }); + + expect(deleteResult?.blocked_by_automations).toBe(true); + expect(result.current.sessions.map((s) => s.key)).toEqual(["websocket:chat-a"]); + }); + it("refreshes sessions when the websocket reports a session update", async () => { vi.mocked(api.listSessions) .mockResolvedValueOnce([