mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-06-15 15:24:06 +00:00
feat(cron): bind scheduled automations to sessions
This commit is contained in:
parent
ffae1dca6d
commit
a326ba40f4
246
.agent/cron-session-memory.md
Normal file
246
.agent/cron-session-memory.md
Normal file
@ -0,0 +1,246 @@
|
|||||||
|
# Cron / Session / Memory Design Decisions
|
||||||
|
|
||||||
|
This note records the agreed design direction for fixing the mismatch between
|
||||||
|
scheduled automations and chat session memory.
|
||||||
|
|
||||||
|
## Problem
|
||||||
|
|
||||||
|
User-created cron jobs currently run their agent turn under an internal key such
|
||||||
|
as `cron:{job.id}` and only deliver the final response back to the user channel.
|
||||||
|
That splits the turn's working memory from the session where the user sees and
|
||||||
|
continues the conversation.
|
||||||
|
|
||||||
|
The visible failure mode is awkward: a cron job reports something into a chat,
|
||||||
|
the user discusses it in that chat, and the next cron run behaves as if that
|
||||||
|
discussion never happened.
|
||||||
|
|
||||||
|
The fix is not to make cron a separate delivery system. A user automation should
|
||||||
|
be a scheduled input into a session.
|
||||||
|
|
||||||
|
## Core Model
|
||||||
|
|
||||||
|
For new user-created cron jobs, `payload.session_key` is the canonical anchor.
|
||||||
|
|
||||||
|
- The cron job belongs to that session.
|
||||||
|
- The cron job reads that session's memory/history.
|
||||||
|
- The cron job produces a normal session turn.
|
||||||
|
- There is no separate delivery target concept for new jobs.
|
||||||
|
|
||||||
|
Legacy fields remain in the store only for compatibility:
|
||||||
|
|
||||||
|
- `payload.channel`
|
||||||
|
- `payload.to`
|
||||||
|
- `payload.channel_meta`
|
||||||
|
- `payload.deliver`
|
||||||
|
|
||||||
|
These fields are legacy-only. New cron creation should not depend on them.
|
||||||
|
|
||||||
|
## Job Categories
|
||||||
|
|
||||||
|
Use explicit branching:
|
||||||
|
|
||||||
|
- **Bound user automation**: `payload.kind == "agent_turn"` and
|
||||||
|
`payload.session_key` is present. This uses the new session-turn model.
|
||||||
|
- **Legacy unbound automation**: user job with no `payload.session_key`. Keep the
|
||||||
|
existing behavior. Do not migrate, infer, bind, or add UI for these jobs in
|
||||||
|
this change.
|
||||||
|
- **System job**: `payload.kind == "system_event"` or known internal jobs such
|
||||||
|
as `dream` / `heartbeat`. Keep their specialized paths.
|
||||||
|
|
||||||
|
The project should not grow a compatibility subsystem for legacy jobs. Missing
|
||||||
|
`session_key` means old behavior.
|
||||||
|
|
||||||
|
## New Job Creation
|
||||||
|
|
||||||
|
`CronTool` must create user automations with a `session_key`.
|
||||||
|
|
||||||
|
- If no request/session context exists, `cron action=add` should fail.
|
||||||
|
- Do not create new unbound jobs.
|
||||||
|
- Do not infer `session_key` from `channel/to` for new jobs.
|
||||||
|
- Remove `deliver` from the advertised tool schema. It can remain as a Python
|
||||||
|
compatibility argument, but it must not affect new bound jobs.
|
||||||
|
- New bound jobs should persist `message` and `session_key`; legacy delivery
|
||||||
|
fields should not be populated as part of the new path.
|
||||||
|
|
||||||
|
## Execution Path
|
||||||
|
|
||||||
|
Bound user automations should execute through `AgentLoop` as internal inbound
|
||||||
|
session events, not as an out-of-band `agent.process_direct()` call.
|
||||||
|
|
||||||
|
The intended flow is:
|
||||||
|
|
||||||
|
```text
|
||||||
|
cron due -> create automation inbound -> AgentLoop dispatches session turn
|
||||||
|
```
|
||||||
|
|
||||||
|
The inbound event should carry metadata identifying the automation, such as:
|
||||||
|
|
||||||
|
- job id
|
||||||
|
- job name
|
||||||
|
- run id
|
||||||
|
- prompt reference
|
||||||
|
- persisted trigger content
|
||||||
|
|
||||||
|
This keeps locking, runtime status, session persistence, and WebUI behavior on
|
||||||
|
the same path as normal chat turns.
|
||||||
|
|
||||||
|
`session_key` is the ownership anchor, but an `InboundMessage` still needs an
|
||||||
|
execution context. Bound cron must resolve `channel`, `chat_id`, and any
|
||||||
|
channel metadata from the target session/session metadata. It must not fall back
|
||||||
|
to legacy `payload.channel`, `payload.to`, or `payload.channel_meta` for bound
|
||||||
|
jobs. Those fields are only for the legacy unbound path.
|
||||||
|
|
||||||
|
The scheduler must not mark a bound job run as complete just because the inbound
|
||||||
|
event was queued. It should either wait for the automation turn to complete and
|
||||||
|
record the real outcome, or explicitly model the run as separate states such as
|
||||||
|
`queued` and `turn_completed`. A failed automation turn must be reflected in the
|
||||||
|
cron run record/job state, not hidden behind a successful enqueue.
|
||||||
|
|
||||||
|
## Active Session Behavior
|
||||||
|
|
||||||
|
Cron must not interrupt an active session turn.
|
||||||
|
|
||||||
|
- If the target session is idle, run the automation turn immediately.
|
||||||
|
- If the target session is running, defer the automation until the current turn
|
||||||
|
completes.
|
||||||
|
- Do not inject the automation into the active turn's runtime context.
|
||||||
|
- Do not route automation messages into the existing mid-turn pending injection
|
||||||
|
queue.
|
||||||
|
- UI/runtime status may show that an automation is queued, but the current LLM
|
||||||
|
call should not see the queued automation.
|
||||||
|
|
||||||
|
Automation inbound events need explicit metadata, for example
|
||||||
|
`_automation_trigger` plus `_defer_until_session_idle`. `AgentLoop.run()` must
|
||||||
|
recognize that metadata before the existing `_pending_queues` mid-turn injection
|
||||||
|
branch. If the session is active, the event goes to a deferred automation queue,
|
||||||
|
not the pending injection queue.
|
||||||
|
|
||||||
|
The user experience goal is: cron can run after the current answer, but it
|
||||||
|
should not take over an answer already in progress.
|
||||||
|
|
||||||
|
## Session History
|
||||||
|
|
||||||
|
Do not persist the raw internal execution prompt as a normal user message.
|
||||||
|
|
||||||
|
Instead, persist a readable automation trigger event, for example:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"role": "user",
|
||||||
|
"content": "Scheduled automation triggered: daily monitor\n\nCheck ...",
|
||||||
|
"_automation_trigger": true,
|
||||||
|
"automation_id": "abc123",
|
||||||
|
"automation_name": "daily monitor",
|
||||||
|
"automation_run_id": "abc123:1770000000000",
|
||||||
|
"automation_prompt_ref": {
|
||||||
|
"id": "cron.agent_turn.reminder",
|
||||||
|
"version": 1,
|
||||||
|
"sha256": "..."
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
The assistant result should be saved as the normal assistant response for that
|
||||||
|
turn, with source metadata suitable for WebUI rendering.
|
||||||
|
|
||||||
|
This gives future turns useful context without leaking internal instruction text
|
||||||
|
into the transcript.
|
||||||
|
|
||||||
|
## Prompt Traceability
|
||||||
|
|
||||||
|
The rendered execution prompt should remain traceable, but it should not be part
|
||||||
|
of normal session history.
|
||||||
|
|
||||||
|
Use a named/versioned prompt reference in session history and save the full
|
||||||
|
rendered prompt in an internal run record.
|
||||||
|
|
||||||
|
Preferred direction:
|
||||||
|
|
||||||
|
- Move the cron execution prompt out of `commands.py` into a named template.
|
||||||
|
- Use a stable prompt id such as `cron.agent_turn.reminder`.
|
||||||
|
- Store `prompt_ref` and `automation_run_id` in session history.
|
||||||
|
- Store the full rendered prompt, prompt variables, and errors in an internal
|
||||||
|
run record.
|
||||||
|
|
||||||
|
Avoid putting full prompt text into `jobs.json`; run records should not make the
|
||||||
|
cron store grow without bound.
|
||||||
|
|
||||||
|
## Visibility and Evaluation
|
||||||
|
|
||||||
|
A bound user automation is a real session turn.
|
||||||
|
|
||||||
|
- If it succeeds, save and publish the assistant response.
|
||||||
|
- Do not pass bound automation responses through `evaluate_response()`.
|
||||||
|
- Keep `evaluate_response()` only for system/legacy paths where the old behavior
|
||||||
|
still applies.
|
||||||
|
- Avoid states where session history contains a response the user never saw.
|
||||||
|
|
||||||
|
If a bound automation starts executing, it must leave a visible closure in the
|
||||||
|
session:
|
||||||
|
|
||||||
|
- success response
|
||||||
|
- short failure message
|
||||||
|
- or an empty-result status message
|
||||||
|
|
||||||
|
Full exceptions and diagnostic details belong in the internal run record, not in
|
||||||
|
the user-facing transcript.
|
||||||
|
|
||||||
|
## Deleting Sessions
|
||||||
|
|
||||||
|
Deleting a session with bound automations should be a two-step operation.
|
||||||
|
|
||||||
|
Default delete behavior should block and return the associated automations:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"deleted": false,
|
||||||
|
"blocked_by_automations": true,
|
||||||
|
"automations": [
|
||||||
|
{"id": "abc123", "name": "daily monitor", "enabled": true}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
After explicit confirmation, the API may delete the bound user automations and
|
||||||
|
then delete the session/thread.
|
||||||
|
|
||||||
|
Rules:
|
||||||
|
|
||||||
|
- Only block on user-created bound jobs whose `payload.session_key` equals the
|
||||||
|
session being deleted.
|
||||||
|
- Do not block on system jobs.
|
||||||
|
- Do not block on legacy unbound jobs.
|
||||||
|
- If the user manually deletes files outside the WebUI/API, do not try to
|
||||||
|
compensate.
|
||||||
|
|
||||||
|
## WebUI Scope
|
||||||
|
|
||||||
|
This change should not grow into a full automation manager.
|
||||||
|
|
||||||
|
Keep the scope focused:
|
||||||
|
|
||||||
|
- Fix cron/session/memory semantics for new bound jobs.
|
||||||
|
- Preserve legacy job behavior.
|
||||||
|
- Add deletion protection for sessions with bound automations.
|
||||||
|
- Update the existing session automation panel only as needed for the new
|
||||||
|
bound-job status.
|
||||||
|
|
||||||
|
Do not add deterministic legacy migration, legacy binding UI, or a global
|
||||||
|
calendar/task manager in this change.
|
||||||
|
|
||||||
|
## Manual Run
|
||||||
|
|
||||||
|
Do not add a user-visible "run now" feature as part of this design.
|
||||||
|
|
||||||
|
`CronService.run_job()` may remain an internal/test helper. It should not become
|
||||||
|
a product surface, and the implementation should avoid creating a separate
|
||||||
|
execution path that behaves differently from scheduled runs.
|
||||||
|
|
||||||
|
## Non-Goals
|
||||||
|
|
||||||
|
- No legacy migration.
|
||||||
|
- No automatic binding of legacy jobs.
|
||||||
|
- No runtime-context prompt asking the model to bind jobs.
|
||||||
|
- No new global automation manager.
|
||||||
|
- No new delivery-target abstraction.
|
||||||
|
- No user-visible manual cron run.
|
||||||
@ -39,6 +39,11 @@ from nanobot.bus.runtime_events import (
|
|||||||
)
|
)
|
||||||
from nanobot.command import CommandContext, CommandRouter, register_builtin_commands
|
from nanobot.command import CommandContext, CommandRouter, register_builtin_commands
|
||||||
from nanobot.config.schema import AgentDefaults, ModelPresetConfig
|
from nanobot.config.schema import AgentDefaults, ModelPresetConfig
|
||||||
|
from nanobot.cron.automation import (
|
||||||
|
automation_run_id,
|
||||||
|
automation_trigger,
|
||||||
|
defer_until_session_idle,
|
||||||
|
)
|
||||||
from nanobot.providers.base import LLMProvider
|
from nanobot.providers.base import LLMProvider
|
||||||
from nanobot.providers.factory import ProviderSnapshot
|
from nanobot.providers.factory import ProviderSnapshot
|
||||||
from nanobot.security.workspace_access import (
|
from nanobot.security.workspace_access import (
|
||||||
@ -53,6 +58,7 @@ from nanobot.session.goal_state import (
|
|||||||
sustained_goal_active,
|
sustained_goal_active,
|
||||||
)
|
)
|
||||||
from nanobot.session.manager import Session, SessionManager
|
from nanobot.session.manager import Session, SessionManager
|
||||||
|
from nanobot.session.routing import persist_routing_context
|
||||||
from nanobot.utils.document import extract_documents, reference_non_image_attachments
|
from nanobot.utils.document import extract_documents, reference_non_image_attachments
|
||||||
from nanobot.utils.helpers import image_placeholder_text
|
from nanobot.utils.helpers import image_placeholder_text
|
||||||
from nanobot.utils.helpers import truncate_text as truncate_text_fn
|
from nanobot.utils.helpers import truncate_text as truncate_text_fn
|
||||||
@ -300,6 +306,10 @@ class AgentLoop:
|
|||||||
# When a session has an active task, new messages for that session
|
# When a session has an active task, new messages for that session
|
||||||
# are routed here instead of creating a new task.
|
# are routed here instead of creating a new task.
|
||||||
self._pending_queues: dict[str, asyncio.Queue] = {}
|
self._pending_queues: dict[str, asyncio.Queue] = {}
|
||||||
|
# Scheduled automations wait for the current visible turn to finish.
|
||||||
|
# They must not be injected into the active model call as follow-up text.
|
||||||
|
self._deferred_automation_queues: dict[str, list[InboundMessage]] = {}
|
||||||
|
self._automation_waiters: dict[str, asyncio.Future[OutboundMessage | None]] = {}
|
||||||
# NANOBOT_MAX_CONCURRENT_REQUESTS: <=0 means unlimited; default 3.
|
# NANOBOT_MAX_CONCURRENT_REQUESTS: <=0 means unlimited; default 3.
|
||||||
_max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3"))
|
_max = int(os.environ.get("NANOBOT_MAX_CONCURRENT_REQUESTS", "3"))
|
||||||
self._concurrency_gate: asyncio.Semaphore | None = (
|
self._concurrency_gate: asyncio.Semaphore | None = (
|
||||||
@ -565,6 +575,55 @@ class AgentLoop:
|
|||||||
def _runtime_events(self) -> RuntimeEventPublisher:
|
def _runtime_events(self) -> RuntimeEventPublisher:
|
||||||
return ensure_runtime_event_publisher(self)
|
return ensure_runtime_event_publisher(self)
|
||||||
|
|
||||||
|
async def submit_automation_turn(self, msg: InboundMessage) -> OutboundMessage | None:
|
||||||
|
"""Submit a scheduled automation as an internal session turn and wait for it."""
|
||||||
|
run_id = automation_run_id(msg.metadata)
|
||||||
|
if not run_id:
|
||||||
|
raise ValueError("automation turn metadata must include a run_id")
|
||||||
|
loop = asyncio.get_running_loop()
|
||||||
|
future: asyncio.Future[OutboundMessage | None] = loop.create_future()
|
||||||
|
if run_id in self._automation_waiters:
|
||||||
|
raise RuntimeError(f"automation run {run_id!r} is already pending")
|
||||||
|
self._automation_waiters[run_id] = future
|
||||||
|
try:
|
||||||
|
if self._running:
|
||||||
|
await self.bus.publish_inbound(msg)
|
||||||
|
else:
|
||||||
|
await self._dispatch(msg)
|
||||||
|
return await future
|
||||||
|
finally:
|
||||||
|
self._automation_waiters.pop(run_id, None)
|
||||||
|
|
||||||
|
def _complete_automation_turn(
|
||||||
|
self,
|
||||||
|
msg: InboundMessage,
|
||||||
|
*,
|
||||||
|
response: OutboundMessage | None = None,
|
||||||
|
error: BaseException | None = None,
|
||||||
|
) -> None:
|
||||||
|
run_id = automation_run_id(msg.metadata)
|
||||||
|
if not run_id:
|
||||||
|
return
|
||||||
|
future = self._automation_waiters.get(run_id)
|
||||||
|
if future is None or future.done():
|
||||||
|
return
|
||||||
|
if error is not None:
|
||||||
|
future.set_exception(error)
|
||||||
|
else:
|
||||||
|
future.set_result(response)
|
||||||
|
|
||||||
|
def _defer_automation_turn(self, session_key: str, msg: InboundMessage) -> None:
|
||||||
|
self._deferred_automation_queues.setdefault(session_key, []).append(msg)
|
||||||
|
|
||||||
|
async def _publish_next_deferred_automation(self, session_key: str) -> None:
|
||||||
|
queue = self._deferred_automation_queues.get(session_key)
|
||||||
|
if not queue:
|
||||||
|
return
|
||||||
|
msg = queue.pop(0)
|
||||||
|
if not queue:
|
||||||
|
self._deferred_automation_queues.pop(session_key, None)
|
||||||
|
await self.bus.publish_inbound(msg)
|
||||||
|
|
||||||
def _persist_user_message_early(
|
def _persist_user_message_early(
|
||||||
self,
|
self,
|
||||||
msg: InboundMessage,
|
msg: InboundMessage,
|
||||||
@ -583,6 +642,17 @@ class AgentLoop:
|
|||||||
extra: dict[str, Any] = ({"media": list(media_paths)} if media_paths else {}) | agent_context.session_extra(msg.metadata)
|
extra: dict[str, Any] = ({"media": list(media_paths)} if media_paths else {}) | agent_context.session_extra(msg.metadata)
|
||||||
extra.update(kwargs)
|
extra.update(kwargs)
|
||||||
text = msg.content if isinstance(msg.content, str) else ""
|
text = msg.content if isinstance(msg.content, str) else ""
|
||||||
|
if trigger := automation_trigger(msg.metadata):
|
||||||
|
persist_content = trigger.get("persist_content")
|
||||||
|
if isinstance(persist_content, str) and persist_content.strip():
|
||||||
|
text = persist_content
|
||||||
|
extra.update({
|
||||||
|
"_automation_trigger": True,
|
||||||
|
"automation_id": trigger.get("job_id"),
|
||||||
|
"automation_name": trigger.get("job_name"),
|
||||||
|
"automation_run_id": trigger.get("run_id"),
|
||||||
|
"automation_prompt_ref": trigger.get("prompt_ref"),
|
||||||
|
})
|
||||||
session.add_message("user", text, **extra)
|
session.add_message("user", text, **extra)
|
||||||
self._mark_pending_user_turn(session)
|
self._mark_pending_user_turn(session)
|
||||||
self.sessions.save(session)
|
self.sessions.save(session)
|
||||||
@ -883,6 +953,22 @@ class AgentLoop:
|
|||||||
self.commands.dispatch_priority,
|
self.commands.dispatch_priority,
|
||||||
)
|
)
|
||||||
continue
|
continue
|
||||||
|
if (
|
||||||
|
defer_until_session_idle(msg.metadata)
|
||||||
|
and effective_key in self._pending_queues
|
||||||
|
):
|
||||||
|
pending_msg = msg
|
||||||
|
if effective_key != msg.session_key:
|
||||||
|
pending_msg = dataclasses.replace(
|
||||||
|
msg,
|
||||||
|
session_key_override=effective_key,
|
||||||
|
)
|
||||||
|
self._defer_automation_turn(effective_key, pending_msg)
|
||||||
|
logger.info(
|
||||||
|
"Deferred automation turn for active session {}",
|
||||||
|
effective_key,
|
||||||
|
)
|
||||||
|
continue
|
||||||
# If this session already has an active pending queue (i.e. a task
|
# If this session already has an active pending queue (i.e. a task
|
||||||
# is processing this session), route the message there for mid-turn
|
# is processing this session), route the message there for mid-turn
|
||||||
# injection instead of creating a competing task.
|
# injection instead of creating a competing task.
|
||||||
@ -996,7 +1082,12 @@ class AgentLoop:
|
|||||||
session_key=session_key,
|
session_key=session_key,
|
||||||
metadata=msg.metadata,
|
metadata=msg.metadata,
|
||||||
)
|
)
|
||||||
|
self._complete_automation_turn(msg, response=response)
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
|
self._complete_automation_turn(
|
||||||
|
msg,
|
||||||
|
error=asyncio.CancelledError(),
|
||||||
|
)
|
||||||
logger.info("Task cancelled for session {}", session_key)
|
logger.info("Task cancelled for session {}", session_key)
|
||||||
# Preserve partial context from the interrupted turn so
|
# Preserve partial context from the interrupted turn so
|
||||||
# the user does not lose tool results and assistant
|
# the user does not lose tool results and assistant
|
||||||
@ -1022,7 +1113,7 @@ class AgentLoop:
|
|||||||
exc_info=True,
|
exc_info=True,
|
||||||
)
|
)
|
||||||
raise
|
raise
|
||||||
except Exception:
|
except Exception as exc:
|
||||||
logger.exception("Error processing message for session {}", session_key)
|
logger.exception("Error processing message for session {}", session_key)
|
||||||
await self.bus.publish_outbound(OutboundMessage(
|
await self.bus.publish_outbound(OutboundMessage(
|
||||||
channel=msg.channel, chat_id=msg.chat_id,
|
channel=msg.channel, chat_id=msg.chat_id,
|
||||||
@ -1035,6 +1126,7 @@ class AgentLoop:
|
|||||||
session_key=session_key,
|
session_key=session_key,
|
||||||
metadata=msg.metadata,
|
metadata=msg.metadata,
|
||||||
)
|
)
|
||||||
|
self._complete_automation_turn(msg, error=exc)
|
||||||
finally:
|
finally:
|
||||||
# Drain any messages still in the pending queue and re-publish
|
# Drain any messages still in the pending queue and re-publish
|
||||||
# them to the bus so they are processed as fresh inbound messages
|
# them to the bus so they are processed as fresh inbound messages
|
||||||
@ -1065,12 +1157,14 @@ class AgentLoop:
|
|||||||
msg, session_key, "idle"
|
msg, session_key, "idle"
|
||||||
)
|
)
|
||||||
self._runtime_events().clear_turn(session_key)
|
self._runtime_events().clear_turn(session_key)
|
||||||
|
await self._publish_next_deferred_automation(session_key)
|
||||||
finally:
|
finally:
|
||||||
if pending is None:
|
if pending is None:
|
||||||
await self._runtime_events().run_status_changed(
|
await self._runtime_events().run_status_changed(
|
||||||
msg, session_key, "idle"
|
msg, session_key, "idle"
|
||||||
)
|
)
|
||||||
self._runtime_events().clear_turn(session_key)
|
self._runtime_events().clear_turn(session_key)
|
||||||
|
await self._publish_next_deferred_automation(session_key)
|
||||||
|
|
||||||
async def close_mcp(self) -> None:
|
async def close_mcp(self) -> None:
|
||||||
"""Drain pending background archives, then close MCP connections."""
|
"""Drain pending background archives, then close MCP connections."""
|
||||||
@ -1342,6 +1436,8 @@ class AgentLoop:
|
|||||||
ctx.session = self.sessions.get_or_create(ctx.session_key)
|
ctx.session = self.sessions.get_or_create(ctx.session_key)
|
||||||
await self._runtime_events().session_turn_started(msg, ctx.session_key)
|
await self._runtime_events().session_turn_started(msg, ctx.session_key)
|
||||||
self.workspace_scopes.persist_message_scope(ctx.session, msg)
|
self.workspace_scopes.persist_message_scope(ctx.session, msg)
|
||||||
|
if persist_routing_context(ctx.session, msg):
|
||||||
|
self.sessions.save(ctx.session)
|
||||||
|
|
||||||
if self._restore_runtime_checkpoint(ctx.session):
|
if self._restore_runtime_checkpoint(ctx.session):
|
||||||
self.sessions.save(ctx.session)
|
self.sessions.save(ctx.session)
|
||||||
|
|||||||
@ -9,7 +9,6 @@ from typing import Any
|
|||||||
from nanobot.agent.tools.base import Tool, tool_parameters
|
from nanobot.agent.tools.base import Tool, tool_parameters
|
||||||
from nanobot.agent.tools.context import ContextAware, RequestContext
|
from nanobot.agent.tools.context import ContextAware, RequestContext
|
||||||
from nanobot.agent.tools.schema import (
|
from nanobot.agent.tools.schema import (
|
||||||
BooleanSchema,
|
|
||||||
IntegerSchema,
|
IntegerSchema,
|
||||||
StringSchema,
|
StringSchema,
|
||||||
tool_parameters_schema,
|
tool_parameters_schema,
|
||||||
@ -38,10 +37,6 @@ _CRON_PARAMETERS = tool_parameters_schema(
|
|||||||
"ISO datetime for one-time execution (e.g. '2026-02-12T10:30:00'). "
|
"ISO datetime for one-time execution (e.g. '2026-02-12T10:30:00'). "
|
||||||
"Naive values use the tool's default timezone."
|
"Naive values use the tool's default timezone."
|
||||||
),
|
),
|
||||||
deliver=BooleanSchema(
|
|
||||||
description="Whether to deliver the execution result to the user channel (default true)",
|
|
||||||
default=True,
|
|
||||||
),
|
|
||||||
job_id=StringSchema("REQUIRED when action='remove'. Job ID to remove (obtain via action='list')."),
|
job_id=StringSchema("REQUIRED when action='remove'. Job ID to remove (obtain via action='list')."),
|
||||||
required=["action"],
|
required=["action"],
|
||||||
description=(
|
description=(
|
||||||
@ -76,11 +71,11 @@ class CronTool(Tool, ContextAware):
|
|||||||
return cls(cron_service=ctx.cron_service, default_timezone=ctx.timezone)
|
return cls(cron_service=ctx.cron_service, default_timezone=ctx.timezone)
|
||||||
|
|
||||||
def set_context(self, ctx: RequestContext) -> None:
|
def set_context(self, ctx: RequestContext) -> None:
|
||||||
"""Set the current session context for delivery."""
|
"""Set the current session context for scheduled automation ownership."""
|
||||||
self._channel.set(ctx.channel)
|
self._channel.set(ctx.channel)
|
||||||
self._chat_id.set(ctx.chat_id)
|
self._chat_id.set(ctx.chat_id)
|
||||||
self._metadata.set(ctx.metadata)
|
self._metadata.set(ctx.metadata)
|
||||||
self._session_key.set(ctx.session_key or f"{ctx.channel}:{ctx.chat_id}")
|
self._session_key.set(ctx.session_key or "")
|
||||||
|
|
||||||
def set_cron_context(self, active: bool):
|
def set_cron_context(self, active: bool):
|
||||||
"""Mark whether the tool is executing inside a cron job callback."""
|
"""Mark whether the tool is executing inside a cron job callback."""
|
||||||
@ -170,10 +165,9 @@ class CronTool(Tool, ContextAware):
|
|||||||
"describing what to do when the job triggers "
|
"describing what to do when the job triggers "
|
||||||
"(e.g. the reminder text). Retry including message=\"...\"."
|
"(e.g. the reminder text). Retry including message=\"...\"."
|
||||||
)
|
)
|
||||||
channel = self._channel.get()
|
session_key = self._session_key.get()
|
||||||
chat_id = self._chat_id.get()
|
if not session_key:
|
||||||
if not channel or not chat_id:
|
return "Error: scheduled automations must be created from a chat session"
|
||||||
return "Error: no session context (channel/chat_id)"
|
|
||||||
if tz and not cron_expr:
|
if tz and not cron_expr:
|
||||||
return "Error: tz can only be used with cron_expr"
|
return "Error: tz can only be used with cron_expr"
|
||||||
if tz:
|
if tz:
|
||||||
@ -210,12 +204,8 @@ class CronTool(Tool, ContextAware):
|
|||||||
name=name or message[:30],
|
name=name or message[:30],
|
||||||
schedule=schedule,
|
schedule=schedule,
|
||||||
message=message,
|
message=message,
|
||||||
deliver=deliver,
|
|
||||||
channel=channel,
|
|
||||||
to=chat_id,
|
|
||||||
delete_after_run=delete_after,
|
delete_after_run=delete_after,
|
||||||
channel_meta=self._metadata.get(),
|
session_key=session_key,
|
||||||
session_key=self._session_key.get() or None,
|
|
||||||
)
|
)
|
||||||
return f"Created job '{job.name}' (id: {job.id})"
|
return f"Created job '{job.name}' (id: {job.id})"
|
||||||
|
|
||||||
|
|||||||
@ -1,10 +1,12 @@
|
|||||||
"""CLI commands for nanobot."""
|
"""CLI commands for nanobot."""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
|
import hashlib
|
||||||
import os
|
import os
|
||||||
import select
|
import select
|
||||||
import signal
|
import signal
|
||||||
import sys
|
import sys
|
||||||
|
import time
|
||||||
import uuid
|
import uuid
|
||||||
from collections.abc import Callable
|
from collections.abc import Callable
|
||||||
from contextlib import nullcontext, suppress
|
from contextlib import nullcontext, suppress
|
||||||
@ -975,12 +977,19 @@ def _run_gateway(
|
|||||||
from nanobot.bus.queue import MessageBus
|
from nanobot.bus.queue import MessageBus
|
||||||
from nanobot.bus.runtime_events import RuntimeEventBus
|
from nanobot.bus.runtime_events import RuntimeEventBus
|
||||||
from nanobot.channels.manager import ChannelManager
|
from nanobot.channels.manager import ChannelManager
|
||||||
|
from nanobot.cron.automation import (
|
||||||
|
AUTOMATION_DEFER_UNTIL_IDLE_META,
|
||||||
|
AUTOMATION_TRIGGER_META,
|
||||||
|
)
|
||||||
from nanobot.cron.service import CronService
|
from nanobot.cron.service import CronService
|
||||||
from nanobot.cron.types import CronJob
|
from nanobot.cron.types import CronJob
|
||||||
from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot
|
from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot
|
||||||
from nanobot.providers.image_generation import image_gen_provider_configs
|
from nanobot.providers.image_generation import image_gen_provider_configs
|
||||||
|
from nanobot.security.workspace_access import WORKSPACE_SCOPE_METADATA_KEY
|
||||||
from nanobot.session.manager import SessionManager
|
from nanobot.session.manager import SessionManager
|
||||||
|
from nanobot.session.routing import read_routing_context
|
||||||
from nanobot.session.webui_turns import WebuiTurnCoordinator
|
from nanobot.session.webui_turns import WebuiTurnCoordinator
|
||||||
|
from nanobot.utils.prompt_templates import render_template
|
||||||
from nanobot.webui.token_usage import TokenUsageHook
|
from nanobot.webui.token_usage import TokenUsageHook
|
||||||
|
|
||||||
port = port if port is not None else config.gateway.port
|
port = port if port is not None else config.gateway.port
|
||||||
@ -1025,7 +1034,7 @@ def _run_gateway(
|
|||||||
).subscribe(runtime_events)
|
).subscribe(runtime_events)
|
||||||
|
|
||||||
from nanobot.agent.loop import UNIFIED_SESSION_KEY
|
from nanobot.agent.loop import UNIFIED_SESSION_KEY
|
||||||
from nanobot.bus.events import OutboundMessage
|
from nanobot.bus.events import InboundMessage, OutboundMessage
|
||||||
|
|
||||||
def _channel_session_key(channel: str, chat_id: str) -> str:
|
def _channel_session_key(channel: str, chat_id: str) -> str:
|
||||||
return (
|
return (
|
||||||
@ -1034,6 +1043,152 @@ def _run_gateway(
|
|||||||
else f"{channel}:{chat_id}"
|
else f"{channel}:{chat_id}"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def _session_metadata(session_key: str) -> dict[str, Any]:
|
||||||
|
data = session_manager.read_session_file(session_key)
|
||||||
|
metadata = data.get("metadata", {}) if isinstance(data, dict) else {}
|
||||||
|
return dict(metadata) if isinstance(metadata, dict) else {}
|
||||||
|
|
||||||
|
def _bound_session_delivery_context(
|
||||||
|
session_key: str,
|
||||||
|
*,
|
||||||
|
turn_seed: str,
|
||||||
|
source_label: str | None,
|
||||||
|
) -> tuple[str, str, dict[str, Any]]:
|
||||||
|
if ":" not in session_key:
|
||||||
|
raise ValueError(f"bound cron session_key is invalid: {session_key!r}")
|
||||||
|
channel, rest = session_key.split(":", 1)
|
||||||
|
if not channel or not rest:
|
||||||
|
raise ValueError(f"bound cron session_key is invalid: {session_key!r}")
|
||||||
|
|
||||||
|
session_metadata = _session_metadata(session_key)
|
||||||
|
routed = read_routing_context(session_metadata)
|
||||||
|
if routed is not None:
|
||||||
|
channel, rest, metadata = routed
|
||||||
|
else:
|
||||||
|
metadata: dict[str, Any] = {}
|
||||||
|
|
||||||
|
if channel == "websocket":
|
||||||
|
metadata["webui"] = True
|
||||||
|
scope = session_metadata.get(WORKSPACE_SCOPE_METADATA_KEY)
|
||||||
|
if isinstance(scope, dict):
|
||||||
|
metadata[WORKSPACE_SCOPE_METADATA_KEY] = dict(scope)
|
||||||
|
metadata.update(
|
||||||
|
_proactive_delivery_metadata(
|
||||||
|
"websocket",
|
||||||
|
metadata,
|
||||||
|
turn_seed=turn_seed,
|
||||||
|
source_label=source_label,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
return channel, rest, metadata
|
||||||
|
|
||||||
|
if channel == "slack" and ":" in rest:
|
||||||
|
chat_id, thread_ts = rest.split(":", 1)
|
||||||
|
if thread_ts:
|
||||||
|
metadata["slack"] = {"thread_ts": thread_ts}
|
||||||
|
return channel, chat_id, metadata
|
||||||
|
|
||||||
|
return channel, rest, metadata
|
||||||
|
|
||||||
|
def _automation_prompt_ref(prompt: str) -> dict[str, Any]:
|
||||||
|
return {
|
||||||
|
"id": "cron.agent_turn.reminder",
|
||||||
|
"version": 1,
|
||||||
|
"sha256": hashlib.sha256(prompt.encode("utf-8")).hexdigest(),
|
||||||
|
}
|
||||||
|
|
||||||
|
async def _run_bound_cron_job(job: CronJob) -> str | None:
|
||||||
|
session_key = job.payload.session_key
|
||||||
|
if not session_key:
|
||||||
|
raise ValueError(f"cron job {job.id} is missing payload.session_key")
|
||||||
|
|
||||||
|
prompt = render_template(
|
||||||
|
"agent/cron_reminder.md",
|
||||||
|
strip=True,
|
||||||
|
message=job.payload.message,
|
||||||
|
)
|
||||||
|
prompt_ref = _automation_prompt_ref(prompt)
|
||||||
|
run_id = f"{job.id}:{int(time.time() * 1000)}:{uuid.uuid4().hex[:8]}"
|
||||||
|
channel, chat_id, metadata = _bound_session_delivery_context(
|
||||||
|
session_key,
|
||||||
|
turn_seed=f"cron:{job.id}",
|
||||||
|
source_label=job.name,
|
||||||
|
)
|
||||||
|
metadata[AUTOMATION_TRIGGER_META] = {
|
||||||
|
"job_id": job.id,
|
||||||
|
"job_name": job.name,
|
||||||
|
"run_id": run_id,
|
||||||
|
"prompt_ref": prompt_ref,
|
||||||
|
"persist_content": (
|
||||||
|
f"Scheduled automation triggered: {job.name}\n\n{job.payload.message}"
|
||||||
|
),
|
||||||
|
}
|
||||||
|
metadata[AUTOMATION_DEFER_UNTIL_IDLE_META] = True
|
||||||
|
|
||||||
|
cron.write_run_record(
|
||||||
|
run_id,
|
||||||
|
{
|
||||||
|
"job_id": job.id,
|
||||||
|
"job_name": job.name,
|
||||||
|
"session_key": session_key,
|
||||||
|
"status": "queued",
|
||||||
|
"prompt_ref": prompt_ref,
|
||||||
|
"prompt_vars": {"message": job.payload.message},
|
||||||
|
"rendered_prompt": prompt,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
|
cron_tool = agent.tools.get("cron")
|
||||||
|
cron_token = None
|
||||||
|
if isinstance(cron_tool, CronTool):
|
||||||
|
cron_token = cron_tool.set_cron_context(True)
|
||||||
|
try:
|
||||||
|
resp = await agent.submit_automation_turn(
|
||||||
|
InboundMessage(
|
||||||
|
channel=channel,
|
||||||
|
sender_id="cron",
|
||||||
|
chat_id=chat_id,
|
||||||
|
content=prompt,
|
||||||
|
metadata=metadata,
|
||||||
|
session_key_override=session_key,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except (Exception, asyncio.CancelledError) as exc:
|
||||||
|
error_text = str(exc) or exc.__class__.__name__
|
||||||
|
cron.write_run_record(
|
||||||
|
run_id,
|
||||||
|
{
|
||||||
|
"job_id": job.id,
|
||||||
|
"job_name": job.name,
|
||||||
|
"session_key": session_key,
|
||||||
|
"status": "error",
|
||||||
|
"error": error_text,
|
||||||
|
"prompt_ref": prompt_ref,
|
||||||
|
"prompt_vars": {"message": job.payload.message},
|
||||||
|
"rendered_prompt": prompt,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
raise
|
||||||
|
finally:
|
||||||
|
if isinstance(cron_tool, CronTool) and cron_token is not None:
|
||||||
|
cron_tool.reset_cron_context(cron_token)
|
||||||
|
|
||||||
|
response = resp.content if resp else ""
|
||||||
|
cron.write_run_record(
|
||||||
|
run_id,
|
||||||
|
{
|
||||||
|
"job_id": job.id,
|
||||||
|
"job_name": job.name,
|
||||||
|
"session_key": session_key,
|
||||||
|
"status": "ok",
|
||||||
|
"prompt_ref": prompt_ref,
|
||||||
|
"prompt_vars": {"message": job.payload.message},
|
||||||
|
"rendered_prompt": prompt,
|
||||||
|
"response": response,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
return response
|
||||||
|
|
||||||
async def _deliver_to_channel(
|
async def _deliver_to_channel(
|
||||||
msg: OutboundMessage, *, record: bool = False, session_key: str | None = None,
|
msg: OutboundMessage, *, record: bool = False, session_key: str | None = None,
|
||||||
) -> None:
|
) -> None:
|
||||||
@ -1194,6 +1349,9 @@ def _run_gateway(
|
|||||||
logger.info("Heartbeat: silenced by post-run evaluation")
|
logger.info("Heartbeat: silenced by post-run evaluation")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
|
if job.payload.kind == "agent_turn" and job.payload.session_key:
|
||||||
|
return await _run_bound_cron_job(job)
|
||||||
|
|
||||||
reminder_note = (
|
reminder_note = (
|
||||||
"The scheduled time has arrived. Deliver this reminder to the user now, "
|
"The scheduled time has arrived. Deliver this reminder to the user now, "
|
||||||
"as a brief and natural message in their language. Speak directly to them — "
|
"as a brief and natural message in their language. Speak directly to them — "
|
||||||
|
|||||||
33
nanobot/cron/automation.py
Normal file
33
nanobot/cron/automation.py
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
"""Shared metadata helpers for scheduled automation turns."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Mapping
|
||||||
|
|
||||||
|
AUTOMATION_TRIGGER_META = "_automation_trigger"
|
||||||
|
AUTOMATION_DEFER_UNTIL_IDLE_META = "_defer_until_session_idle"
|
||||||
|
|
||||||
|
|
||||||
|
def automation_trigger(metadata: Mapping[str, Any] | None) -> dict[str, Any] | None:
|
||||||
|
"""Return structured automation trigger metadata when present."""
|
||||||
|
raw = (metadata or {}).get(AUTOMATION_TRIGGER_META)
|
||||||
|
return raw if isinstance(raw, dict) else None
|
||||||
|
|
||||||
|
|
||||||
|
def is_automation_turn(metadata: Mapping[str, Any] | None) -> bool:
|
||||||
|
return automation_trigger(metadata) is not None
|
||||||
|
|
||||||
|
|
||||||
|
def defer_until_session_idle(metadata: Mapping[str, Any] | None) -> bool:
|
||||||
|
return bool(
|
||||||
|
is_automation_turn(metadata)
|
||||||
|
and (metadata or {}).get(AUTOMATION_DEFER_UNTIL_IDLE_META) is True
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def automation_run_id(metadata: Mapping[str, Any] | None) -> str | None:
|
||||||
|
trigger = automation_trigger(metadata)
|
||||||
|
if not trigger:
|
||||||
|
return None
|
||||||
|
value = trigger.get("run_id")
|
||||||
|
return value if isinstance(value, str) and value else None
|
||||||
@ -84,6 +84,7 @@ class CronService:
|
|||||||
):
|
):
|
||||||
self.store_path = store_path
|
self.store_path = store_path
|
||||||
self._action_path = store_path.parent / "action.jsonl"
|
self._action_path = store_path.parent / "action.jsonl"
|
||||||
|
self._run_records_dir = store_path.parent / "runs"
|
||||||
self._lock = FileLock(str(self._action_path.parent) + ".lock")
|
self._lock = FileLock(str(self._action_path.parent) + ".lock")
|
||||||
self.on_job = on_job
|
self.on_job = on_job
|
||||||
self._store: CronStore | None = None
|
self._store: CronStore | None = None
|
||||||
@ -325,6 +326,23 @@ class CronService:
|
|||||||
tmp_path.unlink(missing_ok=True)
|
tmp_path.unlink(missing_ok=True)
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def _safe_run_record_name(run_id: str) -> str:
|
||||||
|
return "".join(c if c.isalnum() or c in "._-" else "_" for c in run_id)
|
||||||
|
|
||||||
|
def write_run_record(self, run_id: str, record: dict[str, Any]) -> None:
|
||||||
|
"""Write an internal audit record for one cron execution."""
|
||||||
|
name = self._safe_run_record_name(run_id)
|
||||||
|
if not name:
|
||||||
|
name = str(uuid.uuid4())
|
||||||
|
path = self._run_records_dir / f"{name}.json"
|
||||||
|
payload = {
|
||||||
|
**record,
|
||||||
|
"run_id": run_id,
|
||||||
|
"updated_at_ms": _now_ms(),
|
||||||
|
}
|
||||||
|
self._atomic_write(path, json.dumps(payload, indent=2, ensure_ascii=False))
|
||||||
|
|
||||||
async def start(self) -> None:
|
async def start(self) -> None:
|
||||||
"""Start the cron service."""
|
"""Start the cron service."""
|
||||||
self._running = True
|
self._running = True
|
||||||
@ -473,6 +491,20 @@ class CronService:
|
|||||||
jobs = store.jobs if include_disabled else [j for j in store.jobs if j.enabled]
|
jobs = store.jobs if include_disabled else [j for j in store.jobs if j.enabled]
|
||||||
return sorted(jobs, key=lambda j: j.state.next_run_at_ms or float('inf'))
|
return sorted(jobs, key=lambda j: j.state.next_run_at_ms or float('inf'))
|
||||||
|
|
||||||
|
def list_bound_agent_jobs_for_session(
|
||||||
|
self,
|
||||||
|
session_key: str,
|
||||||
|
*,
|
||||||
|
include_disabled: bool = True,
|
||||||
|
) -> list[CronJob]:
|
||||||
|
"""Return user-created bound automation jobs owned by *session_key*."""
|
||||||
|
return [
|
||||||
|
job
|
||||||
|
for job in self.list_jobs(include_disabled=include_disabled)
|
||||||
|
if job.payload.kind == "agent_turn"
|
||||||
|
and job.payload.session_key == session_key
|
||||||
|
]
|
||||||
|
|
||||||
def add_job(
|
def add_job(
|
||||||
self,
|
self,
|
||||||
name: str,
|
name: str,
|
||||||
|
|||||||
@ -36,6 +36,7 @@ _FORK_VOLATILE_METADATA_KEYS = {
|
|||||||
"pending_user_turn",
|
"pending_user_turn",
|
||||||
"runtime_checkpoint",
|
"runtime_checkpoint",
|
||||||
"thread_goal",
|
"thread_goal",
|
||||||
|
"_routing_context",
|
||||||
"title",
|
"title",
|
||||||
"title_user_edited",
|
"title_user_edited",
|
||||||
}
|
}
|
||||||
|
|||||||
105
nanobot/session/routing.py
Normal file
105
nanobot/session/routing.py
Normal file
@ -0,0 +1,105 @@
|
|||||||
|
"""Persisted session routing context for proactive turns."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from typing import Any, Mapping
|
||||||
|
|
||||||
|
from nanobot.bus.events import InboundMessage
|
||||||
|
from nanobot.cron.automation import is_automation_turn
|
||||||
|
from nanobot.session.manager import Session
|
||||||
|
|
||||||
|
SESSION_ROUTING_METADATA_KEY = "_routing_context"
|
||||||
|
|
||||||
|
_ROUTING_METADATA_KEYS = {
|
||||||
|
"chat_type",
|
||||||
|
"context_chat_id",
|
||||||
|
"conversation_type",
|
||||||
|
"event_id",
|
||||||
|
"message_thread_id",
|
||||||
|
"msg_type",
|
||||||
|
"parent_channel_id",
|
||||||
|
"parent_id",
|
||||||
|
"platform",
|
||||||
|
"root_id",
|
||||||
|
"thread_id",
|
||||||
|
"thread_reply_to_event_id",
|
||||||
|
"thread_root_event_id",
|
||||||
|
}
|
||||||
|
_CHANNEL_ROUTING_METADATA_KEYS = {
|
||||||
|
# Feishu needs a message anchor to reply into an existing topic. Other
|
||||||
|
# channels should avoid stale reply anchors for scheduled automation turns.
|
||||||
|
"feishu": {"message_id"},
|
||||||
|
}
|
||||||
|
_SLACK_ROUTING_KEYS = {"channel_type", "thread_ts"}
|
||||||
|
|
||||||
|
|
||||||
|
def _scalar(value: Any) -> str | int | float | bool | None:
|
||||||
|
if value is None or isinstance(value, (str, int, float, bool)):
|
||||||
|
return value
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def _routing_metadata(channel: str, metadata: Mapping[str, Any] | None) -> dict[str, Any]:
|
||||||
|
if not isinstance(metadata, Mapping):
|
||||||
|
return {}
|
||||||
|
|
||||||
|
out: dict[str, Any] = {}
|
||||||
|
keys = _ROUTING_METADATA_KEYS | _CHANNEL_ROUTING_METADATA_KEYS.get(channel, set())
|
||||||
|
for key in keys:
|
||||||
|
if key not in metadata:
|
||||||
|
continue
|
||||||
|
value = _scalar(metadata.get(key))
|
||||||
|
if value is not None:
|
||||||
|
out[key] = value
|
||||||
|
|
||||||
|
slack = metadata.get("slack")
|
||||||
|
if isinstance(slack, Mapping):
|
||||||
|
slack_out = {
|
||||||
|
key: value
|
||||||
|
for key in _SLACK_ROUTING_KEYS
|
||||||
|
if (value := _scalar(slack.get(key))) is not None
|
||||||
|
}
|
||||||
|
if slack_out:
|
||||||
|
out["slack"] = slack_out
|
||||||
|
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
def routing_context_for_message(msg: InboundMessage) -> dict[str, Any]:
|
||||||
|
"""Return the stable routing context needed to deliver future session turns."""
|
||||||
|
return {
|
||||||
|
"channel": msg.channel,
|
||||||
|
"chat_id": msg.chat_id,
|
||||||
|
"metadata": _routing_metadata(msg.channel, msg.metadata),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def persist_routing_context(session: Session, msg: InboundMessage) -> bool:
|
||||||
|
"""Persist the latest non-automation delivery context for a session."""
|
||||||
|
if is_automation_turn(msg.metadata):
|
||||||
|
return False
|
||||||
|
context = routing_context_for_message(msg)
|
||||||
|
if session.metadata.get(SESSION_ROUTING_METADATA_KEY) == context:
|
||||||
|
return False
|
||||||
|
session.metadata[SESSION_ROUTING_METADATA_KEY] = context
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def read_routing_context(metadata: Mapping[str, Any] | None) -> tuple[str, str, dict[str, Any]] | None:
|
||||||
|
"""Decode a persisted routing context from session metadata."""
|
||||||
|
if not isinstance(metadata, Mapping):
|
||||||
|
return None
|
||||||
|
raw = metadata.get(SESSION_ROUTING_METADATA_KEY)
|
||||||
|
if not isinstance(raw, Mapping):
|
||||||
|
return None
|
||||||
|
|
||||||
|
channel = raw.get("channel")
|
||||||
|
chat_id = raw.get("chat_id")
|
||||||
|
if not isinstance(channel, str) or not channel:
|
||||||
|
return None
|
||||||
|
if not isinstance(chat_id, str) or not chat_id:
|
||||||
|
return None
|
||||||
|
|
||||||
|
route_meta = raw.get("metadata")
|
||||||
|
metadata_out = dict(route_meta) if isinstance(route_meta, Mapping) else {}
|
||||||
|
return channel, chat_id, metadata_out
|
||||||
9
nanobot/templates/agent/cron_reminder.md
Normal file
9
nanobot/templates/agent/cron_reminder.md
Normal file
@ -0,0 +1,9 @@
|
|||||||
|
The scheduled time has arrived. Execute this scheduled automation now and report the result to the user in the same session.
|
||||||
|
|
||||||
|
Rules:
|
||||||
|
- Speak directly to the user in their language.
|
||||||
|
- Do not narrate internal progress.
|
||||||
|
- Do not include user IDs.
|
||||||
|
- Do not add status reports like "Done" or "Reminded" unless they are the natural response.
|
||||||
|
|
||||||
|
Automation: {{ message }}
|
||||||
@ -8,7 +8,25 @@ from nanobot.cron.types import CronJob
|
|||||||
|
|
||||||
|
|
||||||
class _CronServiceLike(Protocol):
|
class _CronServiceLike(Protocol):
|
||||||
def list_jobs(self, *, include_disabled: bool = False) -> list[CronJob]: ...
|
def list_bound_agent_jobs_for_session(
|
||||||
|
self,
|
||||||
|
session_key: str,
|
||||||
|
*,
|
||||||
|
include_disabled: bool = True,
|
||||||
|
) -> list[CronJob]: ...
|
||||||
|
|
||||||
|
|
||||||
|
def bound_session_automation_jobs(
|
||||||
|
cron_service: _CronServiceLike | None,
|
||||||
|
session_key: str,
|
||||||
|
) -> list[CronJob]:
|
||||||
|
"""Return agent-turn automation jobs explicitly bound to *session_key*."""
|
||||||
|
if cron_service is None:
|
||||||
|
return []
|
||||||
|
return cron_service.list_bound_agent_jobs_for_session(
|
||||||
|
session_key,
|
||||||
|
include_disabled=True,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def session_automations_payload(
|
def session_automations_payload(
|
||||||
@ -16,22 +34,11 @@ def session_automations_payload(
|
|||||||
session_key: str,
|
session_key: str,
|
||||||
) -> dict[str, Any]:
|
) -> dict[str, Any]:
|
||||||
"""Return user-created automation jobs attached to a WebUI session."""
|
"""Return user-created automation jobs attached to a WebUI session."""
|
||||||
jobs: list[CronJob] = []
|
return {"jobs": serialize_automation_jobs(bound_session_automation_jobs(cron_service, session_key))}
|
||||||
if cron_service is not None:
|
|
||||||
all_jobs = cron_service.list_jobs(include_disabled=True)
|
|
||||||
jobs = [job for job in all_jobs if _job_matches_session(job, session_key)]
|
|
||||||
return {"jobs": [_serialize_job(job) for job in jobs]}
|
|
||||||
|
|
||||||
|
|
||||||
def _job_matches_session(job: CronJob, session_key: str) -> bool:
|
def serialize_automation_jobs(jobs: list[CronJob]) -> list[dict[str, Any]]:
|
||||||
payload = job.payload
|
return [_serialize_job(job) for job in jobs]
|
||||||
if payload.kind != "agent_turn":
|
|
||||||
return False
|
|
||||||
if payload.session_key:
|
|
||||||
return payload.session_key == session_key
|
|
||||||
if payload.channel and payload.to:
|
|
||||||
return f"{payload.channel}:{payload.to}" == session_key
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def _serialize_job(job: CronJob) -> dict[str, Any]:
|
def _serialize_job(job: CronJob) -> dict[str, Any]:
|
||||||
|
|||||||
@ -61,7 +61,11 @@ from nanobot.webui.http_utils import (
|
|||||||
safe_host_header as _safe_host_header,
|
safe_host_header as _safe_host_header,
|
||||||
)
|
)
|
||||||
from nanobot.webui.media_gateway import WebUIMediaGateway
|
from nanobot.webui.media_gateway import WebUIMediaGateway
|
||||||
from nanobot.webui.session_automations import session_automations_payload
|
from nanobot.webui.session_automations import (
|
||||||
|
bound_session_automation_jobs,
|
||||||
|
serialize_automation_jobs,
|
||||||
|
session_automations_payload,
|
||||||
|
)
|
||||||
from nanobot.webui.session_list_index import list_webui_sessions
|
from nanobot.webui.session_list_index import list_webui_sessions
|
||||||
from nanobot.webui.sidebar_state import (
|
from nanobot.webui.sidebar_state import (
|
||||||
read_webui_sidebar_state,
|
read_webui_sidebar_state,
|
||||||
@ -446,6 +450,20 @@ class GatewayHTTPHandler:
|
|||||||
return _http_error(400, "invalid session key")
|
return _http_error(400, "invalid session key")
|
||||||
if not _is_websocket_channel_session_key(decoded_key):
|
if not _is_websocket_channel_session_key(decoded_key):
|
||||||
return _http_error(404, "session not found")
|
return _http_error(404, "session not found")
|
||||||
|
query = _parse_query(request.path)
|
||||||
|
delete_automations = (_query_first(query, "delete_automations") or "").lower()
|
||||||
|
bound_jobs = bound_session_automation_jobs(self.cron_service, decoded_key)
|
||||||
|
if bound_jobs and delete_automations not in {"1", "true", "yes"}:
|
||||||
|
return _http_json_response(
|
||||||
|
{
|
||||||
|
"deleted": False,
|
||||||
|
"blocked_by_automations": True,
|
||||||
|
"automations": serialize_automation_jobs(bound_jobs),
|
||||||
|
}
|
||||||
|
)
|
||||||
|
if bound_jobs and self.cron_service is not None:
|
||||||
|
for job in bound_jobs:
|
||||||
|
self.cron_service.remove_job(job.id)
|
||||||
deleted = self.session_manager.delete_session(decoded_key)
|
deleted = self.session_manager.delete_session(decoded_key)
|
||||||
delete_webui_thread(decoded_key)
|
delete_webui_thread(decoded_key)
|
||||||
return _http_json_response({"deleted": bool(deleted)})
|
return _http_json_response({"deleted": bool(deleted)})
|
||||||
|
|||||||
@ -11,6 +11,7 @@ from nanobot.bus.queue import MessageBus
|
|||||||
from nanobot.providers.base import LLMResponse
|
from nanobot.providers.base import LLMResponse
|
||||||
from nanobot.session.goal_state import GOAL_STATE_KEY
|
from nanobot.session.goal_state import GOAL_STATE_KEY
|
||||||
from nanobot.session.manager import Session, SessionManager
|
from nanobot.session.manager import Session, SessionManager
|
||||||
|
from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY
|
||||||
from nanobot.session.turn_continuation import (
|
from nanobot.session.turn_continuation import (
|
||||||
INTERNAL_CONTINUATION_META,
|
INTERNAL_CONTINUATION_META,
|
||||||
INTERNAL_CONTINUATION_RUN_STARTED_AT_META,
|
INTERNAL_CONTINUATION_RUN_STARTED_AT_META,
|
||||||
@ -827,6 +828,12 @@ async def test_process_message_uses_context_chat_id_for_runtime_prompt(tmp_path:
|
|||||||
assert result.chat_id == "thread-777"
|
assert result.chat_id == "thread-777"
|
||||||
assert loop.context.build_messages.call_args.kwargs["chat_id"] == "parent-456"
|
assert loop.context.build_messages.call_args.kwargs["chat_id"] == "parent-456"
|
||||||
assert loop._run_agent_loop.call_args.kwargs["chat_id"] == "thread-777"
|
assert loop._run_agent_loop.call_args.kwargs["chat_id"] == "thread-777"
|
||||||
|
session = loop.sessions.get_or_create("discord:parent-456:thread:thread-777")
|
||||||
|
assert session.metadata[SESSION_ROUTING_METADATA_KEY] == {
|
||||||
|
"channel": "discord",
|
||||||
|
"chat_id": "thread-777",
|
||||||
|
"metadata": {"context_chat_id": "parent-456"},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
|
|||||||
@ -616,6 +616,54 @@ async def test_followup_routed_to_pending_queue(tmp_path):
|
|||||||
assert queued_msg.session_key == UNIFIED_SESSION_KEY
|
assert queued_msg.session_key == UNIFIED_SESSION_KEY
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_automation_turn_deferred_while_session_active(tmp_path):
|
||||||
|
"""Automation turns wait for the active session instead of becoming injections."""
|
||||||
|
from nanobot.bus.events import InboundMessage
|
||||||
|
from nanobot.cron.automation import (
|
||||||
|
AUTOMATION_DEFER_UNTIL_IDLE_META,
|
||||||
|
AUTOMATION_TRIGGER_META,
|
||||||
|
)
|
||||||
|
|
||||||
|
loop = _make_loop(tmp_path)
|
||||||
|
loop._dispatch = AsyncMock() # type: ignore[method-assign]
|
||||||
|
|
||||||
|
session_key = "websocket:chat-1"
|
||||||
|
pending = asyncio.Queue(maxsize=20)
|
||||||
|
loop._pending_queues[session_key] = pending
|
||||||
|
|
||||||
|
run_task = asyncio.create_task(loop.run())
|
||||||
|
msg = InboundMessage(
|
||||||
|
channel="websocket",
|
||||||
|
sender_id="cron",
|
||||||
|
chat_id="chat-1",
|
||||||
|
content="scheduled work",
|
||||||
|
metadata={
|
||||||
|
AUTOMATION_TRIGGER_META: {"run_id": "run-1"},
|
||||||
|
AUTOMATION_DEFER_UNTIL_IDLE_META: True,
|
||||||
|
},
|
||||||
|
session_key_override=session_key,
|
||||||
|
)
|
||||||
|
await loop.bus.publish_inbound(msg)
|
||||||
|
|
||||||
|
for _ in range(20):
|
||||||
|
if loop._deferred_automation_queues.get(session_key):
|
||||||
|
break
|
||||||
|
await asyncio.sleep(0.05)
|
||||||
|
|
||||||
|
loop.stop()
|
||||||
|
await asyncio.wait_for(run_task, timeout=2)
|
||||||
|
|
||||||
|
assert pending.empty()
|
||||||
|
assert loop._dispatch.await_count == 0
|
||||||
|
assert loop._deferred_automation_queues[session_key] == [msg]
|
||||||
|
|
||||||
|
await loop._publish_next_deferred_automation(session_key)
|
||||||
|
queued = await asyncio.wait_for(loop.bus.consume_inbound(), timeout=0.5)
|
||||||
|
assert queued is msg
|
||||||
|
assert session_key not in loop._deferred_automation_queues
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_pending_queue_preserves_overflow_for_next_injection_cycle(tmp_path):
|
async def test_pending_queue_preserves_overflow_for_next_injection_cycle(tmp_path):
|
||||||
"""Pending queue should leave overflow messages queued for later drains."""
|
"""Pending queue should leave overflow messages queued for later drains."""
|
||||||
|
|||||||
@ -1,4 +1,5 @@
|
|||||||
from nanobot.session.manager import Session, SessionManager
|
from nanobot.session.manager import Session, SessionManager
|
||||||
|
from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY
|
||||||
|
|
||||||
|
|
||||||
def _assert_no_orphans(history: list[dict]) -> None:
|
def _assert_no_orphans(history: list[dict]) -> None:
|
||||||
@ -432,6 +433,11 @@ def test_fork_session_before_user_index_copies_only_prefix(tmp_path):
|
|||||||
source.metadata["webui"] = True
|
source.metadata["webui"] = True
|
||||||
source.metadata["title"] = "Old title"
|
source.metadata["title"] = "Old title"
|
||||||
source.metadata["goal_state"] = {"status": "active", "objective": "do not inherit"}
|
source.metadata["goal_state"] = {"status": "active", "objective": "do not inherit"}
|
||||||
|
source.metadata[SESSION_ROUTING_METADATA_KEY] = {
|
||||||
|
"channel": "websocket",
|
||||||
|
"chat_id": "source",
|
||||||
|
"metadata": {},
|
||||||
|
}
|
||||||
source.add_message("user", "round1")
|
source.add_message("user", "round1")
|
||||||
source.add_message("assistant", "answer1")
|
source.add_message("assistant", "answer1")
|
||||||
source.add_message("user", "round2 fork me")
|
source.add_message("user", "round2 fork me")
|
||||||
@ -450,6 +456,7 @@ def test_fork_session_before_user_index_copies_only_prefix(tmp_path):
|
|||||||
assert forked.metadata["webui"] is True
|
assert forked.metadata["webui"] is True
|
||||||
assert "title" not in forked.metadata
|
assert "title" not in forked.metadata
|
||||||
assert "goal_state" not in forked.metadata
|
assert "goal_state" not in forked.metadata
|
||||||
|
assert SESSION_ROUTING_METADATA_KEY not in forked.metadata
|
||||||
saved = manager.read_session_file("websocket:fork")
|
saved = manager.read_session_file("websocket:fork")
|
||||||
assert [m["content"] for m in saved["messages"]] == ["round1", "answer1"]
|
assert [m["content"] for m in saved["messages"]] == ["round1", "answer1"]
|
||||||
|
|
||||||
|
|||||||
@ -188,6 +188,13 @@ async def test_session_automations_route_filters_by_webui_session(
|
|||||||
to=to,
|
to=to,
|
||||||
session_key=f"websocket:{to}",
|
session_key=f"websocket:{to}",
|
||||||
)
|
)
|
||||||
|
cron.add_job(
|
||||||
|
name="Legacy same target",
|
||||||
|
schedule=hourly,
|
||||||
|
message="Legacy job should not be treated as bound",
|
||||||
|
channel="websocket",
|
||||||
|
to="abc",
|
||||||
|
)
|
||||||
cron.register_system_job(
|
cron.register_system_job(
|
||||||
CronJob(
|
CronJob(
|
||||||
id="heartbeat",
|
id="heartbeat",
|
||||||
@ -659,6 +666,91 @@ async def test_session_delete_removes_file(
|
|||||||
await server_task
|
await server_task
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_session_delete_blocks_when_bound_automation_exists(
|
||||||
|
bus: MagicMock, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.setattr("nanobot.config.paths.get_data_dir", lambda: tmp_path)
|
||||||
|
sm = _seed_session(tmp_path, key="websocket:doomed")
|
||||||
|
cron = CronService(tmp_path / "cron" / "jobs.json")
|
||||||
|
cron.add_job(
|
||||||
|
name="Daily check",
|
||||||
|
schedule=CronSchedule(kind="every", every_ms=86_400_000),
|
||||||
|
message="Check the repo",
|
||||||
|
session_key="websocket:doomed",
|
||||||
|
)
|
||||||
|
channel = _ch(bus, session_manager=sm, cron_service=cron, port=29915)
|
||||||
|
server_task = asyncio.create_task(channel.start())
|
||||||
|
await asyncio.sleep(0.3)
|
||||||
|
try:
|
||||||
|
boot = await _http_get("http://127.0.0.1:29915/webui/bootstrap")
|
||||||
|
token = boot.json()["token"]
|
||||||
|
auth = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
path = sm._get_session_path("websocket:doomed")
|
||||||
|
resp = await _http_get(
|
||||||
|
"http://127.0.0.1:29915/api/sessions/websocket:doomed/delete",
|
||||||
|
headers=auth,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
body = resp.json()
|
||||||
|
assert body["deleted"] is False
|
||||||
|
assert body["blocked_by_automations"] is True
|
||||||
|
assert [job["name"] for job in body["automations"]] == ["Daily check"]
|
||||||
|
assert path.exists()
|
||||||
|
assert cron.list_bound_agent_jobs_for_session("websocket:doomed")
|
||||||
|
finally:
|
||||||
|
await channel.stop()
|
||||||
|
await server_task
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_session_delete_can_cascade_bound_automations(
|
||||||
|
bus: MagicMock, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
|
||||||
|
) -> None:
|
||||||
|
monkeypatch.setattr("nanobot.config.paths.get_data_dir", lambda: tmp_path)
|
||||||
|
sm = _seed_session(tmp_path, key="websocket:doomed")
|
||||||
|
cron = CronService(tmp_path / "cron" / "jobs.json")
|
||||||
|
cron.add_job(
|
||||||
|
name="Daily check",
|
||||||
|
schedule=CronSchedule(kind="every", every_ms=86_400_000),
|
||||||
|
message="Check the repo",
|
||||||
|
session_key="websocket:doomed",
|
||||||
|
)
|
||||||
|
cron.add_job(
|
||||||
|
name="Legacy same target",
|
||||||
|
schedule=CronSchedule(kind="every", every_ms=86_400_000),
|
||||||
|
message="Legacy job remains",
|
||||||
|
channel="websocket",
|
||||||
|
to="doomed",
|
||||||
|
)
|
||||||
|
channel = _ch(bus, session_manager=sm, cron_service=cron, port=29916)
|
||||||
|
server_task = asyncio.create_task(channel.start())
|
||||||
|
await asyncio.sleep(0.3)
|
||||||
|
try:
|
||||||
|
boot = await _http_get("http://127.0.0.1:29916/webui/bootstrap")
|
||||||
|
token = boot.json()["token"]
|
||||||
|
auth = {"Authorization": f"Bearer {token}"}
|
||||||
|
|
||||||
|
path = sm._get_session_path("websocket:doomed")
|
||||||
|
resp = await _http_get(
|
||||||
|
"http://127.0.0.1:29916/api/sessions/websocket:doomed/delete?delete_automations=true",
|
||||||
|
headers=auth,
|
||||||
|
)
|
||||||
|
|
||||||
|
assert resp.status_code == 200
|
||||||
|
assert resp.json()["deleted"] is True
|
||||||
|
assert not path.exists()
|
||||||
|
assert cron.list_bound_agent_jobs_for_session("websocket:doomed") == []
|
||||||
|
assert [job.name for job in cron.list_jobs(include_disabled=True)] == [
|
||||||
|
"Legacy same target"
|
||||||
|
]
|
||||||
|
finally:
|
||||||
|
await channel.stop()
|
||||||
|
await server_task
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_session_routes_accept_percent_encoded_websocket_keys(
|
async def test_session_routes_accept_percent_encoded_websocket_keys(
|
||||||
bus: MagicMock, tmp_path: Path
|
bus: MagicMock, tmp_path: Path
|
||||||
|
|||||||
@ -8,13 +8,14 @@ from unittest.mock import AsyncMock, MagicMock, patch
|
|||||||
import pytest
|
import pytest
|
||||||
from typer.testing import CliRunner
|
from typer.testing import CliRunner
|
||||||
|
|
||||||
from nanobot.bus.events import OutboundMessage
|
from nanobot.bus.events import InboundMessage, OutboundMessage
|
||||||
from nanobot.cli.commands import _proactive_delivery_metadata, app
|
from nanobot.cli.commands import _proactive_delivery_metadata, app
|
||||||
from nanobot.config.schema import Config
|
from nanobot.config.schema import Config
|
||||||
from nanobot.cron.types import CronJob, CronPayload
|
from nanobot.cron.types import CronJob, CronPayload
|
||||||
from nanobot.providers.factory import ProviderSnapshot, make_provider
|
from nanobot.providers.factory import ProviderSnapshot, make_provider
|
||||||
from nanobot.providers.openai_codex_provider import _strip_model_prefix
|
from nanobot.providers.openai_codex_provider import _strip_model_prefix
|
||||||
from nanobot.providers.registry import find_by_name
|
from nanobot.providers.registry import find_by_name
|
||||||
|
from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY
|
||||||
|
|
||||||
runner = CliRunner()
|
runner = CliRunner()
|
||||||
|
|
||||||
@ -1352,7 +1353,6 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
|
|||||||
"webui_turn_id": old_turn_id,
|
"webui_turn_id": old_turn_id,
|
||||||
"workspace_scope": {"mode": "default"},
|
"workspace_scope": {"mode": "default"},
|
||||||
},
|
},
|
||||||
session_key="websocket:chat-1",
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -1373,6 +1373,175 @@ def test_gateway_cron_evaluator_receives_scheduled_reminder_context(
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_gateway_bound_cron_runs_as_session_turn(
|
||||||
|
monkeypatch, tmp_path: Path
|
||||||
|
) -> None:
|
||||||
|
config_file = tmp_path / "instance" / "config.json"
|
||||||
|
config_file.parent.mkdir(parents=True)
|
||||||
|
config_file.write_text("{}")
|
||||||
|
|
||||||
|
config = Config()
|
||||||
|
config.agents.defaults.workspace = str(tmp_path / "config-workspace")
|
||||||
|
provider = _fake_provider()
|
||||||
|
bus = MagicMock()
|
||||||
|
bus.publish_outbound = AsyncMock()
|
||||||
|
seen: dict[str, object] = {"run_records": []}
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.config.loader.set_config_path", lambda _path: None)
|
||||||
|
monkeypatch.setattr("nanobot.config.loader.load_config", lambda _path=None: config)
|
||||||
|
monkeypatch.setattr("nanobot.cli.commands.sync_workspace_templates", lambda _path: None)
|
||||||
|
monkeypatch.setattr("nanobot.providers.factory.make_provider", lambda _config: provider)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.providers.factory.build_provider_snapshot",
|
||||||
|
lambda _config: _test_provider_snapshot(provider, _config),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr(
|
||||||
|
"nanobot.providers.factory.load_provider_snapshot",
|
||||||
|
lambda _config_path=None: _test_provider_snapshot(provider, config),
|
||||||
|
)
|
||||||
|
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus)
|
||||||
|
|
||||||
|
route_metadata = {
|
||||||
|
"websocket:chat-1": {
|
||||||
|
"workspace_scope": {
|
||||||
|
"project_path": str(tmp_path),
|
||||||
|
"access_mode": "restricted",
|
||||||
|
},
|
||||||
|
SESSION_ROUTING_METADATA_KEY: {
|
||||||
|
"channel": "websocket",
|
||||||
|
"chat_id": "chat-1",
|
||||||
|
"metadata": {},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
"discord:456:thread:777": {
|
||||||
|
SESSION_ROUTING_METADATA_KEY: {
|
||||||
|
"channel": "discord",
|
||||||
|
"chat_id": "777",
|
||||||
|
"metadata": {
|
||||||
|
"context_chat_id": "456",
|
||||||
|
"parent_channel_id": "456",
|
||||||
|
"thread_id": "777",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
class _FakeSessionManager:
|
||||||
|
def __init__(self, _workspace: Path) -> None:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def read_session_file(self, key: str) -> dict[str, object] | None:
|
||||||
|
return {"metadata": route_metadata.get(key, {})}
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.session.manager.SessionManager", _FakeSessionManager)
|
||||||
|
|
||||||
|
class _FakeCron:
|
||||||
|
def __init__(self, _store_path: Path) -> None:
|
||||||
|
self.on_job = None
|
||||||
|
seen["cron"] = self
|
||||||
|
|
||||||
|
def write_run_record(self, run_id: str, record: dict[str, object]) -> None:
|
||||||
|
seen["run_records"].append((run_id, record))
|
||||||
|
|
||||||
|
class _FakeAgentLoop:
|
||||||
|
@classmethod
|
||||||
|
def from_config(cls, config, bus=None, **extra):
|
||||||
|
return cls(**extra)
|
||||||
|
|
||||||
|
def __init__(self, *args, **kwargs) -> None:
|
||||||
|
self.model = "test-model"
|
||||||
|
self.provider = kwargs.get("provider", object())
|
||||||
|
self.tools = {}
|
||||||
|
seen["agent"] = self
|
||||||
|
|
||||||
|
async def submit_automation_turn(self, msg: InboundMessage):
|
||||||
|
seen["automation_msg"] = msg
|
||||||
|
return OutboundMessage(
|
||||||
|
channel=msg.channel,
|
||||||
|
chat_id=msg.chat_id,
|
||||||
|
content="Checked the repo.",
|
||||||
|
)
|
||||||
|
|
||||||
|
async def close_mcp(self) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
async def run(self) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
def stop(self) -> None:
|
||||||
|
return None
|
||||||
|
|
||||||
|
class _StopAfterCronSetup:
|
||||||
|
def __init__(self, *_args, **_kwargs) -> None:
|
||||||
|
raise _StopGatewayError("stop")
|
||||||
|
|
||||||
|
async def _unexpected_evaluator(*_args, **_kwargs) -> bool:
|
||||||
|
raise AssertionError("bound cron must not use legacy response evaluator")
|
||||||
|
|
||||||
|
monkeypatch.setattr("nanobot.cron.service.CronService", _FakeCron)
|
||||||
|
monkeypatch.setattr("nanobot.cli.commands.AgentLoop", _FakeAgentLoop)
|
||||||
|
monkeypatch.setattr("nanobot.channels.manager.ChannelManager", _StopAfterCronSetup)
|
||||||
|
monkeypatch.setattr("nanobot.cli.commands.evaluate_response", _unexpected_evaluator)
|
||||||
|
|
||||||
|
result = runner.invoke(app, ["gateway", "--config", str(config_file)])
|
||||||
|
assert isinstance(result.exception, _StopGatewayError)
|
||||||
|
|
||||||
|
cron = seen["cron"]
|
||||||
|
job = CronJob(
|
||||||
|
id="repo-check",
|
||||||
|
name="Repo check",
|
||||||
|
payload=CronPayload(
|
||||||
|
message="Check repository health.",
|
||||||
|
session_key="websocket:chat-1",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
response = asyncio.run(cron.on_job(job))
|
||||||
|
|
||||||
|
assert response == "Checked the repo."
|
||||||
|
msg = seen["automation_msg"]
|
||||||
|
assert isinstance(msg, InboundMessage)
|
||||||
|
assert msg.channel == "websocket"
|
||||||
|
assert msg.chat_id == "chat-1"
|
||||||
|
assert msg.sender_id == "cron"
|
||||||
|
assert msg.session_key_override == "websocket:chat-1"
|
||||||
|
assert "Automation: Check repository health." in msg.content
|
||||||
|
assert msg.metadata["webui"] is True
|
||||||
|
assert msg.metadata["workspace_scope"]["project_path"] == str(tmp_path)
|
||||||
|
assert msg.metadata["_webui_message_source"] == {"kind": "cron", "label": "Repo check"}
|
||||||
|
trigger = msg.metadata["_automation_trigger"]
|
||||||
|
assert trigger["job_id"] == "repo-check"
|
||||||
|
assert trigger["job_name"] == "Repo check"
|
||||||
|
assert trigger["persist_content"] == (
|
||||||
|
"Scheduled automation triggered: Repo check\n\nCheck repository health."
|
||||||
|
)
|
||||||
|
assert msg.metadata["_defer_until_session_idle"] is True
|
||||||
|
statuses = [record["status"] for _run_id, record in seen["run_records"]]
|
||||||
|
assert statuses == ["queued", "ok"]
|
||||||
|
assert seen["run_records"][0][0] == seen["run_records"][1][0]
|
||||||
|
|
||||||
|
discord_job = CronJob(
|
||||||
|
id="thread-check",
|
||||||
|
name="Thread check",
|
||||||
|
payload=CronPayload(
|
||||||
|
message="Check the Discord thread.",
|
||||||
|
session_key="discord:456:thread:777",
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
response = asyncio.run(cron.on_job(discord_job))
|
||||||
|
|
||||||
|
assert response == "Checked the repo."
|
||||||
|
msg = seen["automation_msg"]
|
||||||
|
assert isinstance(msg, InboundMessage)
|
||||||
|
assert msg.channel == "discord"
|
||||||
|
assert msg.chat_id == "777"
|
||||||
|
assert msg.session_key_override == "discord:456:thread:777"
|
||||||
|
assert msg.metadata["context_chat_id"] == "456"
|
||||||
|
assert msg.metadata["parent_channel_id"] == "456"
|
||||||
|
assert msg.metadata["thread_id"] == "777"
|
||||||
|
|
||||||
|
|
||||||
def test_gateway_cron_job_suppresses_intermediate_progress(
|
def test_gateway_cron_job_suppresses_intermediate_progress(
|
||||||
monkeypatch, tmp_path: Path
|
monkeypatch, tmp_path: Path
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|||||||
@ -303,7 +303,9 @@ def test_remove_protected_dream_job_returns_clear_feedback(tmp_path) -> None:
|
|||||||
|
|
||||||
def test_add_cron_job_defaults_to_tool_timezone(tmp_path) -> None:
|
def test_add_cron_job_defaults_to_tool_timezone(tmp_path) -> None:
|
||||||
tool = _make_tool_with_tz(tmp_path, "Asia/Shanghai")
|
tool = _make_tool_with_tz(tmp_path, "Asia/Shanghai")
|
||||||
tool.set_context(RequestContext(channel="telegram", chat_id="chat-1"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1")
|
||||||
|
)
|
||||||
|
|
||||||
result = tool._add_job(None, "Morning standup", None, "0 8 * * *", None, None)
|
result = tool._add_job(None, "Morning standup", None, "0 8 * * *", None, None)
|
||||||
|
|
||||||
@ -314,7 +316,9 @@ def test_add_cron_job_defaults_to_tool_timezone(tmp_path) -> None:
|
|||||||
|
|
||||||
def test_add_at_job_uses_default_timezone_for_naive_datetime(tmp_path) -> None:
|
def test_add_at_job_uses_default_timezone_for_naive_datetime(tmp_path) -> None:
|
||||||
tool = _make_tool_with_tz(tmp_path, "Asia/Shanghai")
|
tool = _make_tool_with_tz(tmp_path, "Asia/Shanghai")
|
||||||
tool.set_context(RequestContext(channel="telegram", chat_id="chat-1"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1")
|
||||||
|
)
|
||||||
|
|
||||||
result = tool._add_job(None, "Morning reminder", None, None, None, "2026-03-25T08:00:00")
|
result = tool._add_job(None, "Morning reminder", None, None, None, "2026-03-25T08:00:00")
|
||||||
|
|
||||||
@ -324,26 +328,29 @@ def test_add_at_job_uses_default_timezone_for_naive_datetime(tmp_path) -> None:
|
|||||||
assert job.schedule.at_ms == expected
|
assert job.schedule.at_ms == expected
|
||||||
|
|
||||||
|
|
||||||
def test_add_job_delivers_by_default(tmp_path) -> None:
|
def test_add_job_binds_current_session_key(tmp_path) -> None:
|
||||||
tool = _make_tool(tmp_path)
|
tool = _make_tool(tmp_path)
|
||||||
tool.set_context(RequestContext(channel="telegram", chat_id="chat-1"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1")
|
||||||
|
)
|
||||||
|
|
||||||
result = tool._add_job(None, "Morning standup", 60, None, None, None)
|
result = tool._add_job(None, "Morning standup", 60, None, None, None)
|
||||||
|
|
||||||
assert result.startswith("Created job")
|
assert result.startswith("Created job")
|
||||||
job = tool._cron.list_jobs()[0]
|
job = tool._cron.list_jobs()[0]
|
||||||
assert job.payload.deliver is True
|
assert job.payload.session_key == "telegram:chat-1"
|
||||||
|
assert job.payload.channel is None
|
||||||
|
assert job.payload.to is None
|
||||||
|
|
||||||
|
|
||||||
def test_add_job_can_disable_delivery(tmp_path) -> None:
|
def test_add_job_requires_session_key(tmp_path) -> None:
|
||||||
tool = _make_tool(tmp_path)
|
tool = _make_tool(tmp_path)
|
||||||
tool.set_context(RequestContext(channel="telegram", chat_id="chat-1"))
|
tool.set_context(RequestContext(channel="telegram", chat_id="chat-1"))
|
||||||
|
|
||||||
result = tool._add_job(None, "Background refresh", 60, None, None, None, deliver=False)
|
result = tool._add_job(None, "Background refresh", 60, None, None, None, deliver=False)
|
||||||
|
|
||||||
assert result.startswith("Created job")
|
assert result == "Error: scheduled automations must be created from a chat session"
|
||||||
job = tool._cron.list_jobs()[0]
|
assert tool._cron.list_jobs() == []
|
||||||
assert job.payload.deliver is False
|
|
||||||
|
|
||||||
|
|
||||||
def test_cron_schema_advertises_action_specific_requirements(tmp_path) -> None:
|
def test_cron_schema_advertises_action_specific_requirements(tmp_path) -> None:
|
||||||
@ -375,7 +382,9 @@ def test_validate_params_requires_message_only_for_add(tmp_path) -> None:
|
|||||||
|
|
||||||
def test_add_job_empty_message_returns_actionable_error(tmp_path) -> None:
|
def test_add_job_empty_message_returns_actionable_error(tmp_path) -> None:
|
||||||
tool = _make_tool(tmp_path)
|
tool = _make_tool(tmp_path)
|
||||||
tool.set_context(RequestContext(channel="telegram", chat_id="chat-1"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="telegram", chat_id="chat-1", session_key="telegram:chat-1")
|
||||||
|
)
|
||||||
|
|
||||||
result = tool._add_job(None, "", 60, None, None, None)
|
result = tool._add_job(None, "", 60, None, None, None)
|
||||||
|
|
||||||
@ -383,8 +392,8 @@ def test_add_job_empty_message_returns_actionable_error(tmp_path) -> None:
|
|||||||
assert "Retry including message=" in result
|
assert "Retry including message=" in result
|
||||||
|
|
||||||
|
|
||||||
def test_add_job_captures_metadata_and_session_key(tmp_path) -> None:
|
def test_add_job_captures_only_session_key(tmp_path) -> None:
|
||||||
"""CronTool stores channel metadata and session_key when adding a job."""
|
"""CronTool stores the canonical session key without legacy delivery fields."""
|
||||||
tool = _make_tool(tmp_path)
|
tool = _make_tool(tmp_path)
|
||||||
meta = {"slack": {"thread_ts": "111.222", "channel_type": "channel"}}
|
meta = {"slack": {"thread_ts": "111.222", "channel_type": "channel"}}
|
||||||
tool.set_context(RequestContext(
|
tool.set_context(RequestContext(
|
||||||
@ -396,8 +405,10 @@ def test_add_job_captures_metadata_and_session_key(tmp_path) -> None:
|
|||||||
|
|
||||||
jobs = tool._cron.list_jobs()
|
jobs = tool._cron.list_jobs()
|
||||||
assert len(jobs) == 1
|
assert len(jobs) == 1
|
||||||
assert jobs[0].payload.channel_meta == meta
|
|
||||||
assert jobs[0].payload.session_key == "slack:C99:111.222"
|
assert jobs[0].payload.session_key == "slack:C99:111.222"
|
||||||
|
assert jobs[0].payload.channel is None
|
||||||
|
assert jobs[0].payload.to is None
|
||||||
|
assert jobs[0].payload.channel_meta == {}
|
||||||
|
|
||||||
|
|
||||||
def test_list_excludes_disabled_jobs(tmp_path) -> None:
|
def test_list_excludes_disabled_jobs(tmp_path) -> None:
|
||||||
|
|||||||
@ -41,7 +41,9 @@ class _SvcStub:
|
|||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def registry() -> ToolRegistry:
|
def registry() -> ToolRegistry:
|
||||||
tool = CronTool(_SvcStub(), default_timezone="UTC")
|
tool = CronTool(_SvcStub(), default_timezone="UTC")
|
||||||
tool.set_context(RequestContext(channel="channel", chat_id="chat-id"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="channel", chat_id="chat-id", session_key="channel:chat-id")
|
||||||
|
)
|
||||||
reg = ToolRegistry()
|
reg = ToolRegistry()
|
||||||
reg.register(tool)
|
reg.register(tool)
|
||||||
return reg
|
return reg
|
||||||
|
|||||||
53
tests/session/test_routing.py
Normal file
53
tests/session/test_routing.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
from nanobot.bus.events import InboundMessage
|
||||||
|
from nanobot.session.routing import routing_context_for_message
|
||||||
|
|
||||||
|
|
||||||
|
def test_routing_context_keeps_telegram_topic_without_stale_message_id() -> None:
|
||||||
|
context = routing_context_for_message(
|
||||||
|
InboundMessage(
|
||||||
|
channel="telegram",
|
||||||
|
sender_id="user-1",
|
||||||
|
chat_id="-100123",
|
||||||
|
content="set a reminder",
|
||||||
|
metadata={
|
||||||
|
"message_id": 100,
|
||||||
|
"message_thread_id": 42,
|
||||||
|
"_progress": True,
|
||||||
|
},
|
||||||
|
session_key_override="telegram:-100123:topic:42",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert context == {
|
||||||
|
"channel": "telegram",
|
||||||
|
"chat_id": "-100123",
|
||||||
|
"metadata": {"message_thread_id": 42},
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def test_routing_context_keeps_feishu_topic_anchor() -> None:
|
||||||
|
context = routing_context_for_message(
|
||||||
|
InboundMessage(
|
||||||
|
channel="feishu",
|
||||||
|
sender_id="ou_user",
|
||||||
|
chat_id="oc_chat",
|
||||||
|
content="set a reminder",
|
||||||
|
metadata={
|
||||||
|
"chat_type": "group",
|
||||||
|
"message_id": "om_msg",
|
||||||
|
"thread_id": "omt_thread",
|
||||||
|
"_progress": True,
|
||||||
|
},
|
||||||
|
session_key_override="feishu:oc_chat:om_root",
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
assert context == {
|
||||||
|
"channel": "feishu",
|
||||||
|
"chat_id": "oc_chat",
|
||||||
|
"metadata": {
|
||||||
|
"chat_type": "group",
|
||||||
|
"message_id": "om_msg",
|
||||||
|
"thread_id": "omt_thread",
|
||||||
|
},
|
||||||
|
}
|
||||||
@ -99,14 +99,18 @@ async def test_cron_tool_keeps_task_local_context(tmp_path) -> None:
|
|||||||
release = asyncio.Event()
|
release = asyncio.Event()
|
||||||
|
|
||||||
async def task_one() -> str:
|
async def task_one() -> str:
|
||||||
tool.set_context(RequestContext(channel="feishu", chat_id="chat-a"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="feishu", chat_id="chat-a", session_key="feishu:chat-a")
|
||||||
|
)
|
||||||
entered.set()
|
entered.set()
|
||||||
await release.wait()
|
await release.wait()
|
||||||
return await tool.execute(action="add", message="first", every_seconds=60)
|
return await tool.execute(action="add", message="first", every_seconds=60)
|
||||||
|
|
||||||
async def task_two() -> str:
|
async def task_two() -> str:
|
||||||
await entered.wait()
|
await entered.wait()
|
||||||
tool.set_context(RequestContext(channel="email", chat_id="chat-b"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="email", chat_id="chat-b", session_key="email:chat-b")
|
||||||
|
)
|
||||||
release.set()
|
release.set()
|
||||||
return await tool.execute(action="add", message="second", every_seconds=60)
|
return await tool.execute(action="add", message="second", every_seconds=60)
|
||||||
|
|
||||||
@ -116,8 +120,7 @@ async def test_cron_tool_keeps_task_local_context(tmp_path) -> None:
|
|||||||
assert result_two.startswith("Created job")
|
assert result_two.startswith("Created job")
|
||||||
|
|
||||||
jobs = tool._cron.list_jobs()
|
jobs = tool._cron.list_jobs()
|
||||||
assert {job.payload.channel for job in jobs} == {"feishu", "email"}
|
assert {job.payload.session_key for job in jobs} == {"feishu:chat-a", "email:chat-b"}
|
||||||
assert {job.payload.to for job in jobs} == {"chat-a", "chat-b"}
|
|
||||||
|
|
||||||
|
|
||||||
# --- Basic single-task regression tests ---
|
# --- Basic single-task regression tests ---
|
||||||
@ -228,15 +231,16 @@ async def test_spawn_tool_default_values_without_set_context() -> None:
|
|||||||
async def test_cron_tool_basic_set_context_and_execute(tmp_path) -> None:
|
async def test_cron_tool_basic_set_context_and_execute(tmp_path) -> None:
|
||||||
"""Single task: set_context then add job should use correct target."""
|
"""Single task: set_context then add job should use correct target."""
|
||||||
tool = CronTool(CronService(tmp_path / "jobs.json"))
|
tool = CronTool(CronService(tmp_path / "jobs.json"))
|
||||||
tool.set_context(RequestContext(channel="wechat", chat_id="user-789"))
|
tool.set_context(
|
||||||
|
RequestContext(channel="wechat", chat_id="user-789", session_key="wechat:user-789")
|
||||||
|
)
|
||||||
|
|
||||||
result = await tool.execute(action="add", message="standup", every_seconds=300)
|
result = await tool.execute(action="add", message="standup", every_seconds=300)
|
||||||
assert result.startswith("Created job")
|
assert result.startswith("Created job")
|
||||||
|
|
||||||
jobs = tool._cron.list_jobs()
|
jobs = tool._cron.list_jobs()
|
||||||
assert len(jobs) == 1
|
assert len(jobs) == 1
|
||||||
assert jobs[0].payload.channel == "wechat"
|
assert jobs[0].payload.session_key == "wechat:user-789"
|
||||||
assert jobs[0].payload.to == "user-789"
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
@ -245,4 +249,4 @@ async def test_cron_tool_no_context_returns_error(tmp_path) -> None:
|
|||||||
tool = CronTool(CronService(tmp_path / "jobs.json"))
|
tool = CronTool(CronService(tmp_path / "jobs.json"))
|
||||||
|
|
||||||
result = await tool.execute(action="add", message="test", every_seconds=60)
|
result = await tool.execute(action="add", message="test", every_seconds=60)
|
||||||
assert result == "Error: no session context (channel/chat_id)"
|
assert result == "Error: scheduled automations must be created from a chat session"
|
||||||
|
|||||||
@ -36,6 +36,7 @@ import { ClientProvider, useClient } from "@/providers/ClientProvider";
|
|||||||
import type {
|
import type {
|
||||||
ChatSummary,
|
ChatSummary,
|
||||||
RuntimeSurface,
|
RuntimeSurface,
|
||||||
|
SessionAutomationJob,
|
||||||
SettingsPayload,
|
SettingsPayload,
|
||||||
WorkspaceScopePayload,
|
WorkspaceScopePayload,
|
||||||
WorkspacesPayload,
|
WorkspacesPayload,
|
||||||
@ -546,6 +547,8 @@ function Shell({
|
|||||||
const [pendingDelete, setPendingDelete] = useState<{
|
const [pendingDelete, setPendingDelete] = useState<{
|
||||||
key: string;
|
key: string;
|
||||||
label: string;
|
label: string;
|
||||||
|
automations?: SessionAutomationJob[];
|
||||||
|
confirmAutomations?: boolean;
|
||||||
} | null>(null);
|
} | null>(null);
|
||||||
const [pendingRename, setPendingRename] = useState<{
|
const [pendingRename, setPendingRename] = useState<{
|
||||||
key: string;
|
key: string;
|
||||||
@ -1275,24 +1278,28 @@ function Shell({
|
|||||||
const fallbackKey = deletingActive
|
const fallbackKey = deletingActive
|
||||||
? (sessions[currentIndex + 1]?.key ?? sessions[currentIndex - 1]?.key ?? null)
|
? (sessions[currentIndex + 1]?.key ?? sessions[currentIndex - 1]?.key ?? null)
|
||||||
: activeKey;
|
: activeKey;
|
||||||
setPendingDelete(null);
|
|
||||||
if (deletingActive) {
|
|
||||||
navigate({
|
|
||||||
view: "chat",
|
|
||||||
activeKey: fallbackKey,
|
|
||||||
settingsSection: "overview",
|
|
||||||
}, { replace: true });
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
await deleteChat(key);
|
const result = await deleteChat(
|
||||||
} catch (e) {
|
key,
|
||||||
|
pendingDelete.confirmAutomations ? { deleteAutomations: true } : undefined,
|
||||||
|
);
|
||||||
|
if (result.blocked_by_automations) {
|
||||||
|
setPendingDelete({
|
||||||
|
...pendingDelete,
|
||||||
|
automations: result.automations ?? [],
|
||||||
|
confirmAutomations: true,
|
||||||
|
});
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
setPendingDelete(null);
|
||||||
if (deletingActive) {
|
if (deletingActive) {
|
||||||
navigate({
|
navigate({
|
||||||
view: "chat",
|
view: "chat",
|
||||||
activeKey: key,
|
activeKey: fallbackKey,
|
||||||
settingsSection: "overview",
|
settingsSection: "overview",
|
||||||
}, { replace: true });
|
}, { replace: true });
|
||||||
}
|
}
|
||||||
|
} catch (e) {
|
||||||
console.error("Failed to delete session", e);
|
console.error("Failed to delete session", e);
|
||||||
}
|
}
|
||||||
}, [pendingDelete, deleteChat, activeKey, navigate, sessions]);
|
}, [pendingDelete, deleteChat, activeKey, navigate, sessions]);
|
||||||
@ -1559,6 +1566,7 @@ function Shell({
|
|||||||
<DeleteConfirm
|
<DeleteConfirm
|
||||||
open={!!pendingDelete}
|
open={!!pendingDelete}
|
||||||
title={pendingDelete?.label ?? ""}
|
title={pendingDelete?.label ?? ""}
|
||||||
|
automations={pendingDelete?.confirmAutomations ? pendingDelete.automations : undefined}
|
||||||
onCancel={() => setPendingDelete(null)}
|
onCancel={() => setPendingDelete(null)}
|
||||||
onConfirm={onConfirmDelete}
|
onConfirm={onConfirmDelete}
|
||||||
/>
|
/>
|
||||||
|
|||||||
@ -10,10 +10,12 @@ import {
|
|||||||
} from "@/components/ui/alert-dialog";
|
} from "@/components/ui/alert-dialog";
|
||||||
import { Trash2 } from "lucide-react";
|
import { Trash2 } from "lucide-react";
|
||||||
import { useTranslation } from "react-i18next";
|
import { useTranslation } from "react-i18next";
|
||||||
|
import type { SessionAutomationJob } from "@/lib/types";
|
||||||
|
|
||||||
interface DeleteConfirmProps {
|
interface DeleteConfirmProps {
|
||||||
open: boolean;
|
open: boolean;
|
||||||
title: string;
|
title: string;
|
||||||
|
automations?: SessionAutomationJob[];
|
||||||
onCancel: () => void;
|
onCancel: () => void;
|
||||||
onConfirm: () => void;
|
onConfirm: () => void;
|
||||||
}
|
}
|
||||||
@ -21,14 +23,18 @@ interface DeleteConfirmProps {
|
|||||||
export function DeleteConfirm({
|
export function DeleteConfirm({
|
||||||
open,
|
open,
|
||||||
title,
|
title,
|
||||||
|
automations = [],
|
||||||
onCancel,
|
onCancel,
|
||||||
onConfirm,
|
onConfirm,
|
||||||
}: DeleteConfirmProps) {
|
}: DeleteConfirmProps) {
|
||||||
const { t } = useTranslation();
|
const { t } = useTranslation();
|
||||||
|
const hasAutomations = automations.length > 0;
|
||||||
|
const visibleAutomations = automations.slice(0, 4);
|
||||||
|
const hiddenCount = Math.max(0, automations.length - visibleAutomations.length);
|
||||||
return (
|
return (
|
||||||
<AlertDialog open={open} onOpenChange={(o) => (!o ? onCancel() : undefined)}>
|
<AlertDialog open={open} onOpenChange={(o) => (!o ? onCancel() : undefined)}>
|
||||||
<AlertDialogContent
|
<AlertDialogContent
|
||||||
className="w-[min(calc(100vw-2rem),22.75rem)] gap-0 rounded-[28px] border border-white/70 bg-card/95 p-5 text-center shadow-[0_24px_80px_rgba(15,23,42,0.20)] backdrop-blur-xl data-[state=open]:zoom-in-95 sm:rounded-[28px]"
|
className="w-[min(calc(100vw-2rem),24rem)] gap-0 rounded-[28px] border border-white/70 bg-card/95 p-5 text-center shadow-[0_24px_80px_rgba(15,23,42,0.20)] backdrop-blur-xl data-[state=open]:zoom-in-95 sm:rounded-[28px]"
|
||||||
>
|
>
|
||||||
<AlertDialogHeader className="items-center space-y-0 text-center">
|
<AlertDialogHeader className="items-center space-y-0 text-center">
|
||||||
<div className="mb-5 grid h-16 w-16 place-items-center rounded-full bg-destructive/10 text-destructive">
|
<div className="mb-5 grid h-16 w-16 place-items-center rounded-full bg-destructive/10 text-destructive">
|
||||||
@ -40,8 +46,31 @@ export function DeleteConfirm({
|
|||||||
{t("deleteConfirm.title", { title })}
|
{t("deleteConfirm.title", { title })}
|
||||||
</AlertDialogTitle>
|
</AlertDialogTitle>
|
||||||
<AlertDialogDescription className="mt-3 max-w-[17rem] text-center text-[14px] leading-6 text-muted-foreground">
|
<AlertDialogDescription className="mt-3 max-w-[17rem] text-center text-[14px] leading-6 text-muted-foreground">
|
||||||
{t("deleteConfirm.description")}
|
{hasAutomations
|
||||||
|
? t("deleteConfirm.automationsDescription", {
|
||||||
|
count: automations.length,
|
||||||
|
defaultValue:
|
||||||
|
"This chat has scheduled automations. Deleting it will also delete them.",
|
||||||
|
})
|
||||||
|
: t("deleteConfirm.description")}
|
||||||
</AlertDialogDescription>
|
</AlertDialogDescription>
|
||||||
|
{hasAutomations ? (
|
||||||
|
<div className="mt-4 max-h-32 w-full overflow-y-auto rounded-2xl bg-muted/55 px-3 py-2 text-left">
|
||||||
|
{visibleAutomations.map((job) => (
|
||||||
|
<div key={job.id} className="truncate text-[13px] leading-6 text-foreground">
|
||||||
|
{job.name || job.id}
|
||||||
|
</div>
|
||||||
|
))}
|
||||||
|
{hiddenCount > 0 ? (
|
||||||
|
<div className="text-[13px] leading-6 text-muted-foreground">
|
||||||
|
{t("deleteConfirm.moreAutomations", {
|
||||||
|
count: hiddenCount,
|
||||||
|
defaultValue: "+ {{count}} more",
|
||||||
|
})}
|
||||||
|
</div>
|
||||||
|
) : null}
|
||||||
|
</div>
|
||||||
|
) : null}
|
||||||
</AlertDialogHeader>
|
</AlertDialogHeader>
|
||||||
<AlertDialogFooter className="mt-7 grid grid-cols-2 gap-3 space-x-0">
|
<AlertDialogFooter className="mt-7 grid grid-cols-2 gap-3 space-x-0">
|
||||||
<AlertDialogCancel
|
<AlertDialogCancel
|
||||||
@ -54,7 +83,11 @@ export function DeleteConfirm({
|
|||||||
onClick={onConfirm}
|
onClick={onConfirm}
|
||||||
className="h-11 rounded-full bg-destructive px-5 text-[15px] font-semibold text-destructive-foreground shadow-[0_10px_25px_rgba(239,68,68,0.28)] hover:bg-destructive/90"
|
className="h-11 rounded-full bg-destructive px-5 text-[15px] font-semibold text-destructive-foreground shadow-[0_10px_25px_rgba(239,68,68,0.28)] hover:bg-destructive/90"
|
||||||
>
|
>
|
||||||
{t("deleteConfirm.confirm")}
|
{hasAutomations
|
||||||
|
? t("deleteConfirm.confirmWithAutomations", {
|
||||||
|
defaultValue: "Delete all",
|
||||||
|
})
|
||||||
|
: t("deleteConfirm.confirm")}
|
||||||
</AlertDialogAction>
|
</AlertDialogAction>
|
||||||
</AlertDialogFooter>
|
</AlertDialogFooter>
|
||||||
</AlertDialogContent>
|
</AlertDialogContent>
|
||||||
|
|||||||
@ -9,7 +9,12 @@ import {
|
|||||||
listSessions,
|
listSessions,
|
||||||
} from "@/lib/api";
|
} from "@/lib/api";
|
||||||
import { deriveTitle } from "@/lib/format";
|
import { deriveTitle } from "@/lib/format";
|
||||||
import type { ChatSummary, UIMessage, WorkspaceScopePayload } from "@/lib/types";
|
import type {
|
||||||
|
ChatSummary,
|
||||||
|
SessionDeleteResult,
|
||||||
|
UIMessage,
|
||||||
|
WorkspaceScopePayload,
|
||||||
|
} from "@/lib/types";
|
||||||
|
|
||||||
const EMPTY_MESSAGES: UIMessage[] = [];
|
const EMPTY_MESSAGES: UIMessage[] = [];
|
||||||
const INITIAL_HISTORY_PAGE_LIMIT = 160;
|
const INITIAL_HISTORY_PAGE_LIMIT = 160;
|
||||||
@ -31,7 +36,10 @@ export function useSessions(): {
|
|||||||
refresh: () => Promise<void>;
|
refresh: () => Promise<void>;
|
||||||
createChat: (workspaceScope?: WorkspaceScopePayload | null) => Promise<string>;
|
createChat: (workspaceScope?: WorkspaceScopePayload | null) => Promise<string>;
|
||||||
forkChat: (sourceChatId: string, beforeUserIndex: number, title?: string) => Promise<string>;
|
forkChat: (sourceChatId: string, beforeUserIndex: number, title?: string) => Promise<string>;
|
||||||
deleteChat: (key: string) => Promise<void>;
|
deleteChat: (
|
||||||
|
key: string,
|
||||||
|
options?: { deleteAutomations?: boolean },
|
||||||
|
) => Promise<SessionDeleteResult>;
|
||||||
} {
|
} {
|
||||||
const { client, token } = useClient();
|
const { client, token } = useClient();
|
||||||
const [sessions, setSessions] = useState<ChatSummary[]>([]);
|
const [sessions, setSessions] = useState<ChatSummary[]>([]);
|
||||||
@ -124,10 +132,12 @@ export function useSessions(): {
|
|||||||
}, [client]);
|
}, [client]);
|
||||||
|
|
||||||
const deleteChat = useCallback(
|
const deleteChat = useCallback(
|
||||||
async (key: string) => {
|
async (key: string, options?: { deleteAutomations?: boolean }) => {
|
||||||
await apiDeleteSession(tokenRef.current, key);
|
const result = await apiDeleteSession(tokenRef.current, key, options);
|
||||||
|
if (!result.deleted) return result;
|
||||||
optimisticKeysRef.current.delete(key);
|
optimisticKeysRef.current.delete(key);
|
||||||
setSessions((prev) => prev.filter((s) => s.key !== key));
|
setSessions((prev) => prev.filter((s) => s.key !== key));
|
||||||
|
return result;
|
||||||
},
|
},
|
||||||
[],
|
[],
|
||||||
);
|
);
|
||||||
|
|||||||
@ -9,6 +9,7 @@ import type {
|
|||||||
NetworkSafetySettingsUpdate,
|
NetworkSafetySettingsUpdate,
|
||||||
ProviderModelsPayload,
|
ProviderModelsPayload,
|
||||||
ProviderSettingsUpdate,
|
ProviderSettingsUpdate,
|
||||||
|
SessionDeleteResult,
|
||||||
SessionAutomationsPayload,
|
SessionAutomationsPayload,
|
||||||
SettingsPayload,
|
SettingsPayload,
|
||||||
SettingsUpdate,
|
SettingsUpdate,
|
||||||
@ -211,13 +212,18 @@ export async function fetchSkillDetail(
|
|||||||
export async function deleteSession(
|
export async function deleteSession(
|
||||||
token: string,
|
token: string,
|
||||||
key: string,
|
key: string,
|
||||||
|
optionsOrBase?: { deleteAutomations?: boolean } | string,
|
||||||
base: string = "",
|
base: string = "",
|
||||||
): Promise<boolean> {
|
): Promise<SessionDeleteResult> {
|
||||||
const body = await request<{ deleted: boolean }>(
|
const options = typeof optionsOrBase === "string" ? undefined : optionsOrBase;
|
||||||
`${base}/api/sessions/${encodeURIComponent(key)}/delete`,
|
const resolvedBase = typeof optionsOrBase === "string" ? optionsOrBase : base;
|
||||||
|
const query = new URLSearchParams();
|
||||||
|
if (options?.deleteAutomations) query.set("delete_automations", "true");
|
||||||
|
const suffix = query.toString() ? `?${query}` : "";
|
||||||
|
return request<SessionDeleteResult>(
|
||||||
|
`${resolvedBase}/api/sessions/${encodeURIComponent(key)}/delete${suffix}`,
|
||||||
token,
|
token,
|
||||||
);
|
);
|
||||||
return body.deleted;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
export async function fetchSettings(
|
export async function fetchSettings(
|
||||||
|
|||||||
@ -118,6 +118,12 @@ export interface SessionAutomationJob {
|
|||||||
|
|
||||||
export interface SessionAutomationsPayload { jobs: SessionAutomationJob[]; }
|
export interface SessionAutomationsPayload { jobs: SessionAutomationJob[]; }
|
||||||
|
|
||||||
|
export interface SessionDeleteResult {
|
||||||
|
deleted: boolean;
|
||||||
|
blocked_by_automations?: boolean;
|
||||||
|
automations?: SessionAutomationJob[];
|
||||||
|
}
|
||||||
|
|
||||||
export interface SkillSummary {
|
export interface SkillSummary {
|
||||||
name: string;
|
name: string;
|
||||||
description: string;
|
description: string;
|
||||||
|
|||||||
@ -131,6 +131,17 @@ describe("webui API helpers", () => {
|
|||||||
);
|
);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("passes the automation cascade flag when deleting a session", async () => {
|
||||||
|
await deleteSession("tok", "websocket:chat-1", { deleteAutomations: true });
|
||||||
|
|
||||||
|
expect(fetch).toHaveBeenCalledWith(
|
||||||
|
"/api/sessions/websocket%3Achat-1/delete?delete_automations=true",
|
||||||
|
expect.objectContaining({
|
||||||
|
headers: { Authorization: "Bearer tok" },
|
||||||
|
}),
|
||||||
|
);
|
||||||
|
});
|
||||||
|
|
||||||
it("serializes settings updates as a narrow query string", async () => {
|
it("serializes settings updates as a narrow query string", async () => {
|
||||||
await updateSettings("tok", {
|
await updateSettings("tok", {
|
||||||
modelPreset: "default",
|
modelPreset: "default",
|
||||||
|
|||||||
@ -149,6 +149,7 @@ vi.mock("@/hooks/useSessions", async (importOriginal) => {
|
|||||||
deleteChat: async (key: string) => {
|
deleteChat: async (key: string) => {
|
||||||
await deleteChatSpy(key);
|
await deleteChatSpy(key);
|
||||||
setSessions((prev: ChatSummary[]) => prev.filter((s) => s.key !== key));
|
setSessions((prev: ChatSummary[]) => prev.filter((s) => s.key !== key));
|
||||||
|
return { deleted: true };
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
},
|
},
|
||||||
|
|||||||
@ -103,7 +103,7 @@ describe("useSessions", () => {
|
|||||||
preview: "Beta",
|
preview: "Beta",
|
||||||
},
|
},
|
||||||
]);
|
]);
|
||||||
vi.mocked(api.deleteSession).mockResolvedValue(true);
|
vi.mocked(api.deleteSession).mockResolvedValue({ deleted: true });
|
||||||
|
|
||||||
const { result } = renderHook(() => useSessions(), {
|
const { result } = renderHook(() => useSessions(), {
|
||||||
wrapper: wrap(fakeClient()),
|
wrapper: wrap(fakeClient()),
|
||||||
@ -115,10 +115,42 @@ describe("useSessions", () => {
|
|||||||
await result.current.deleteChat("websocket:chat-a");
|
await result.current.deleteChat("websocket:chat-a");
|
||||||
});
|
});
|
||||||
|
|
||||||
expect(api.deleteSession).toHaveBeenCalledWith("tok", "websocket:chat-a");
|
expect(api.deleteSession).toHaveBeenCalledWith("tok", "websocket:chat-a", undefined);
|
||||||
expect(result.current.sessions.map((s) => s.key)).toEqual(["websocket:chat-b"]);
|
expect(result.current.sessions.map((s) => s.key)).toEqual(["websocket:chat-b"]);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it("keeps a session when delete is blocked by bound automations", async () => {
|
||||||
|
vi.mocked(api.listSessions).mockResolvedValue([
|
||||||
|
{
|
||||||
|
key: "websocket:chat-a",
|
||||||
|
channel: "websocket",
|
||||||
|
chatId: "chat-a",
|
||||||
|
createdAt: "2026-04-16T10:00:00Z",
|
||||||
|
updatedAt: "2026-04-16T10:00:00Z",
|
||||||
|
preview: "Alpha",
|
||||||
|
},
|
||||||
|
]);
|
||||||
|
vi.mocked(api.deleteSession).mockResolvedValue({
|
||||||
|
deleted: false,
|
||||||
|
blocked_by_automations: true,
|
||||||
|
automations: [],
|
||||||
|
});
|
||||||
|
|
||||||
|
const { result } = renderHook(() => useSessions(), {
|
||||||
|
wrapper: wrap(fakeClient()),
|
||||||
|
});
|
||||||
|
|
||||||
|
await waitFor(() => expect(result.current.sessions).toHaveLength(1));
|
||||||
|
|
||||||
|
let deleteResult: Awaited<ReturnType<typeof result.current.deleteChat>> | undefined;
|
||||||
|
await act(async () => {
|
||||||
|
deleteResult = await result.current.deleteChat("websocket:chat-a");
|
||||||
|
});
|
||||||
|
|
||||||
|
expect(deleteResult?.blocked_by_automations).toBe(true);
|
||||||
|
expect(result.current.sessions.map((s) => s.key)).toEqual(["websocket:chat-a"]);
|
||||||
|
});
|
||||||
|
|
||||||
it("refreshes sessions when the websocket reports a session update", async () => {
|
it("refreshes sessions when the websocket reports a session update", async () => {
|
||||||
vi.mocked(api.listSessions)
|
vi.mocked(api.listSessions)
|
||||||
.mockResolvedValueOnce([
|
.mockResolvedValueOnce([
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user