refactor: bind cron jobs to origin sessions

This commit is contained in:
chengyongru 2026-06-12 14:00:53 +08:00
parent 271b3651d7
commit 80524e9e88
14 changed files with 47 additions and 267 deletions

View File

@ -213,20 +213,23 @@ Rules:
session being deleted.
- Do not block on system jobs.
- Do not block on legacy unbound jobs.
- In unified-session mode, WebUI chats display cron jobs owned by
`unified:default`, but deleting an individual `websocket:*` thread should not
block on or delete those unified cron jobs.
- In unified-session mode, WebUI-created cron jobs still belong to the concrete
`websocket:*` chat that created them, so deleting that chat should block on or
delete those jobs.
- If the user manually deletes files outside the WebUI/API, do not try to
compensate.
## Unified Session Mode
When `unified_session` is enabled, WebUI-created cron jobs should bind to the
same unified session as normal WebUI chat turns: `unified:default`.
When `unified_session` is enabled, WebUI-created cron jobs should still bind to
the concrete WebUI chat that created them, for example `websocket:<chat_id>`.
The cron trigger is delivered through that original chat. `AgentLoop` then
applies `unified_session` normally, so the turn's memory/session context may be
`unified:default` even though the cron job's ownership key is concrete.
- All WebUI chats should display cron jobs owned by `unified:default`.
- Individual WebUI thread deletion should remain scoped to the concrete
`websocket:*` thread being deleted.
- Each WebUI chat should display cron jobs owned by that concrete chat.
- Individual WebUI thread deletion should block on cron jobs owned by that
concrete `websocket:*` thread.
- Toggling `unified_session` does not migrate existing cron jobs. Existing jobs
keep their stored `payload.session_key` and continue to execute against that
owner until explicitly removed or recreated.

View File

@ -59,7 +59,6 @@ from nanobot.session.goal_state import (
)
from nanobot.session.keys import UNIFIED_SESSION_KEY, session_key_for_channel
from nanobot.session.manager import Session, SessionManager
from nanobot.session.routing import persist_routing_context
from nanobot.utils.document import extract_documents, reference_non_image_attachments
from nanobot.utils.helpers import image_placeholder_text
from nanobot.utils.helpers import truncate_text as truncate_text_fn
@ -1389,8 +1388,6 @@ class AgentLoop:
ctx.session = self.sessions.get_or_create(ctx.session_key)
await self._runtime_events().session_turn_started(msg, ctx.session_key)
self.workspace_scopes.persist_message_scope(ctx.session, msg)
if persist_routing_context(ctx.session, msg):
self.sessions.save(ctx.session)
if self._restore_runtime_checkpoint(ctx.session):
self.sessions.save(ctx.session)

View File

@ -75,7 +75,7 @@ class CronTool(Tool, ContextAware):
self._channel.set(ctx.channel)
self._chat_id.set(ctx.chat_id)
self._metadata.set(ctx.metadata)
self._session_key.set(ctx.session_key or "")
self._session_key.set(f"{ctx.channel}:{ctx.chat_id}" if ctx.channel and ctx.chat_id else "")
def set_cron_context(self, active: bool):
"""Mark whether the tool is executing inside a cron job callback."""

View File

@ -988,9 +988,7 @@ def _run_gateway(
from nanobot.cron.types import CronJob
from nanobot.providers.factory import build_provider_snapshot, load_provider_snapshot
from nanobot.providers.image_generation import image_gen_provider_configs
from nanobot.security.workspace_access import WORKSPACE_SCOPE_METADATA_KEY
from nanobot.session.manager import SessionManager
from nanobot.session.routing import read_routing_context
from nanobot.session.webui_turns import WebuiTurnCoordinator
from nanobot.utils.prompt_templates import render_template
from nanobot.webui.token_usage import TokenUsageHook
@ -1046,11 +1044,6 @@ def _run_gateway(
unified_session=config.agents.defaults.unified_session,
)
def _session_metadata(session_key: str) -> dict[str, Any]:
data = session_manager.read_session_file(session_key)
metadata = data.get("metadata", {}) if isinstance(data, dict) else {}
return dict(metadata) if isinstance(metadata, dict) else {}
def _bound_session_delivery_context(
session_key: str,
*,
@ -1063,18 +1056,10 @@ def _run_gateway(
if not channel or not rest:
raise ValueError(f"bound cron session_key is invalid: {session_key!r}")
session_metadata = _session_metadata(session_key)
routed = read_routing_context(session_metadata)
if routed is not None:
channel, rest, metadata = routed
else:
metadata: dict[str, Any] = {}
metadata: dict[str, Any] = {}
if channel == "websocket":
metadata["webui"] = True
scope = session_metadata.get(WORKSPACE_SCOPE_METADATA_KEY)
if isinstance(scope, dict):
metadata[WORKSPACE_SCOPE_METADATA_KEY] = dict(scope)
metadata.update(
_proactive_delivery_metadata(
"websocket",
@ -1156,7 +1141,6 @@ def _run_gateway(
chat_id=chat_id,
content=prompt,
metadata=metadata,
session_key_override=session_key,
)
)
except (Exception, asyncio.CancelledError) as exc:

View File

@ -14,7 +14,6 @@ from typing import Any
from loguru import logger
from nanobot.config.paths import get_legacy_sessions_dir
from nanobot.session.metadata import SESSION_ROUTING_METADATA_KEY
from nanobot.utils.helpers import (
ensure_dir,
estimate_message_tokens,
@ -37,7 +36,6 @@ _FORK_VOLATILE_METADATA_KEYS = {
"pending_user_turn",
"runtime_checkpoint",
"thread_goal",
SESSION_ROUTING_METADATA_KEY,
"title",
"title_user_edited",
}

View File

@ -1,3 +0,0 @@
"""Shared session metadata keys."""
SESSION_ROUTING_METADATA_KEY = "_routing_context"

View File

@ -1,104 +0,0 @@
"""Persisted session routing context for proactive turns."""
from __future__ import annotations
from typing import Any, Mapping
from nanobot.bus.events import InboundMessage
from nanobot.cron.session_turns import is_cron_turn
from nanobot.session.manager import Session
from nanobot.session.metadata import SESSION_ROUTING_METADATA_KEY
_ROUTING_METADATA_KEYS = {
"chat_type",
"context_chat_id",
"conversation_type",
"event_id",
"message_thread_id",
"msg_type",
"parent_channel_id",
"parent_id",
"platform",
"root_id",
"thread_id",
"thread_reply_to_event_id",
"thread_root_event_id",
}
_CHANNEL_ROUTING_METADATA_KEYS = {
# Feishu needs a message anchor to reply into an existing topic. Other
# channels should avoid stale reply anchors for scheduled cron turns.
"feishu": {"message_id"},
}
_SLACK_ROUTING_KEYS = {"channel_type", "thread_ts"}
def _scalar(value: Any) -> str | int | float | bool | None:
if value is None or isinstance(value, (str, int, float, bool)):
return value
return None
def _routing_metadata(channel: str, metadata: Mapping[str, Any] | None) -> dict[str, Any]:
if not isinstance(metadata, Mapping):
return {}
out: dict[str, Any] = {}
keys = _ROUTING_METADATA_KEYS | _CHANNEL_ROUTING_METADATA_KEYS.get(channel, set())
for key in keys:
if key not in metadata:
continue
value = _scalar(metadata.get(key))
if value is not None:
out[key] = value
slack = metadata.get("slack")
if isinstance(slack, Mapping):
slack_out = {
key: value
for key in _SLACK_ROUTING_KEYS
if (value := _scalar(slack.get(key))) is not None
}
if slack_out:
out["slack"] = slack_out
return out
def routing_context_for_message(msg: InboundMessage) -> dict[str, Any]:
"""Return the stable routing context needed to deliver future session turns."""
return {
"channel": msg.channel,
"chat_id": msg.chat_id,
"metadata": _routing_metadata(msg.channel, msg.metadata),
}
def persist_routing_context(session: Session, msg: InboundMessage) -> bool:
"""Persist the latest non-cron delivery context for a session."""
if is_cron_turn(msg.metadata):
return False
context = routing_context_for_message(msg)
if session.metadata.get(SESSION_ROUTING_METADATA_KEY) == context:
return False
session.metadata[SESSION_ROUTING_METADATA_KEY] = context
return True
def read_routing_context(metadata: Mapping[str, Any] | None) -> tuple[str, str, dict[str, Any]] | None:
"""Decode a persisted routing context from session metadata."""
if not isinstance(metadata, Mapping):
return None
raw = metadata.get(SESSION_ROUTING_METADATA_KEY)
if not isinstance(raw, Mapping):
return None
channel = raw.get("channel")
chat_id = raw.get("chat_id")
if not isinstance(channel, str) or not channel:
return None
if not isinstance(chat_id, str) or not chat_id:
return None
route_meta = raw.get("metadata")
metadata_out = dict(route_meta) if isinstance(route_meta, Mapping) else {}
return channel, chat_id, metadata_out

View File

@ -21,7 +21,6 @@ from websockets.http11 import Request as WsRequest
from websockets.http11 import Response
from nanobot.command.builtin import builtin_command_palette
from nanobot.session.keys import UNIFIED_SESSION_KEY
from nanobot.utils.subagent_channel_display import scrub_subagent_messages_for_channel
from nanobot.webui.file_preview import WebUIFilePreviewError, file_preview_payload
from nanobot.webui.gateway_tokens import GatewayTokenStore, token_response_payload
@ -473,8 +472,6 @@ class GatewayHTTPHandler:
def _automation_display_key(self, session_key: str) -> str:
"""Return the cron ownership key shown for this WebUI thread."""
if self._unified_session:
return UNIFIED_SESSION_KEY
return session_key
# -- Media routes -------------------------------------------------------

View File

@ -12,7 +12,6 @@ from nanobot.cron.session_turns import CRON_HISTORY_META, CRON_TRIGGER_META
from nanobot.providers.base import LLMResponse
from nanobot.session.goal_state import GOAL_STATE_KEY
from nanobot.session.manager import Session, SessionManager
from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY
from nanobot.session.turn_continuation import (
INTERNAL_CONTINUATION_META,
INTERNAL_CONTINUATION_RUN_STARTED_AT_META,
@ -864,12 +863,6 @@ async def test_process_message_uses_context_chat_id_for_runtime_prompt(tmp_path:
assert result.chat_id == "thread-777"
assert loop.context.build_messages.call_args.kwargs["chat_id"] == "parent-456"
assert loop._run_agent_loop.call_args.kwargs["chat_id"] == "thread-777"
session = loop.sessions.get_or_create("discord:parent-456:thread:thread-777")
assert session.metadata[SESSION_ROUTING_METADATA_KEY] == {
"channel": "discord",
"chat_id": "thread-777",
"metadata": {"context_chat_id": "parent-456"},
}
@pytest.mark.asyncio

View File

@ -1,5 +1,4 @@
from nanobot.session.manager import Session, SessionManager
from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY
def _assert_no_orphans(history: list[dict]) -> None:
@ -433,11 +432,6 @@ def test_fork_session_before_user_index_copies_only_prefix(tmp_path):
source.metadata["webui"] = True
source.metadata["title"] = "Old title"
source.metadata["goal_state"] = {"status": "active", "objective": "do not inherit"}
source.metadata[SESSION_ROUTING_METADATA_KEY] = {
"channel": "websocket",
"chat_id": "source",
"metadata": {},
}
source.add_message("user", "round1")
source.add_message("assistant", "answer1")
source.add_message("user", "round2 fork me")
@ -456,7 +450,6 @@ def test_fork_session_before_user_index_copies_only_prefix(tmp_path):
assert forked.metadata["webui"] is True
assert "title" not in forked.metadata
assert "goal_state" not in forked.metadata
assert SESSION_ROUTING_METADATA_KEY not in forked.metadata
saved = manager.read_session_file("websocket:fork")
assert [m["content"] for m in saved["messages"]] == ["round1", "answer1"]

View File

@ -243,7 +243,7 @@ async def test_session_automations_route_filters_by_webui_session(
@pytest.mark.asyncio
async def test_session_automations_route_uses_unified_owner_when_enabled(
async def test_session_automations_route_uses_origin_owner_when_unified_enabled(
bus: MagicMock, tmp_path: Path
) -> None:
cron = CronService(tmp_path / "cron" / "jobs.json")
@ -255,9 +255,9 @@ async def test_session_automations_route_uses_unified_owner_when_enabled(
session_key=UNIFIED_SESSION_KEY,
)
cron.add_job(
name="Visible thread only",
name="Visible chat job",
schedule=hourly,
message="Do not show in unified mode",
message="Show for this chat",
session_key="websocket:abc",
)
channel = _ch(
@ -274,14 +274,19 @@ async def test_session_automations_route_uses_unified_owner_when_enabled(
token = boot.json()["token"]
auth = {"Authorization": f"Bearer {token}"}
for key in ("websocket%3Aabc", "websocket%3Aother"):
resp = await _http_get(
f"http://127.0.0.1:29917/api/sessions/{key}/automations",
headers=auth,
)
assert resp.status_code == 200
body = resp.json()
assert [job["name"] for job in body["jobs"]] == ["Unified check"]
resp = await _http_get(
"http://127.0.0.1:29917/api/sessions/websocket%3Aabc/automations",
headers=auth,
)
assert resp.status_code == 200
assert [job["name"] for job in resp.json()["jobs"]] == ["Visible chat job"]
resp = await _http_get(
"http://127.0.0.1:29917/api/sessions/websocket%3Aother/automations",
headers=auth,
)
assert resp.status_code == 200
assert resp.json()["jobs"] == []
finally:
await channel.stop()
await server_task
@ -802,17 +807,17 @@ async def test_session_delete_can_cascade_bound_automations(
@pytest.mark.asyncio
async def test_session_delete_does_not_cascade_unified_automations(
async def test_session_delete_blocks_origin_automation_when_unified_enabled(
bus: MagicMock, tmp_path: Path, monkeypatch: pytest.MonkeyPatch
) -> None:
monkeypatch.setattr("nanobot.config.paths.get_data_dir", lambda: tmp_path)
sm = _seed_session(tmp_path, key="websocket:doomed")
cron = CronService(tmp_path / "cron" / "jobs.json")
cron.add_job(
name="Shared daily check",
name="Chat daily check",
schedule=CronSchedule(kind="every", every_ms=86_400_000),
message="Check the shared session",
session_key=UNIFIED_SESSION_KEY,
message="Check this chat",
session_key="websocket:doomed",
)
channel = _ch(
bus,
@ -835,10 +840,13 @@ async def test_session_delete_does_not_cascade_unified_automations(
)
assert resp.status_code == 200
assert resp.json()["deleted"] is True
assert not path.exists()
assert [job.name for job in cron.list_bound_cron_jobs_for_session(UNIFIED_SESSION_KEY)] == [
"Shared daily check"
body = resp.json()
assert body["deleted"] is False
assert body["blocked_by_automations"] is True
assert [job["name"] for job in body["automations"]] == ["Chat daily check"]
assert path.exists()
assert [job.name for job in cron.list_bound_cron_jobs_for_session("websocket:doomed")] == [
"Chat daily check"
]
finally:
await channel.stop()

View File

@ -16,7 +16,6 @@ from nanobot.cron.types import CronJob, CronPayload
from nanobot.providers.factory import ProviderSnapshot, make_provider
from nanobot.providers.openai_codex_provider import _strip_model_prefix
from nanobot.providers.registry import find_by_name
from nanobot.session.routing import SESSION_ROUTING_METADATA_KEY
from nanobot.webui.metadata import WEBUI_MESSAGE_SOURCE_METADATA_KEY, WEBUI_TURN_METADATA_KEY
runner = CliRunner()
@ -1548,38 +1547,10 @@ def test_gateway_bound_cron_runs_as_session_turn(
)
monkeypatch.setattr("nanobot.bus.queue.MessageBus", lambda: bus)
route_metadata = {
"websocket:chat-1": {
"workspace_scope": {
"project_path": str(tmp_path),
"access_mode": "restricted",
},
SESSION_ROUTING_METADATA_KEY: {
"channel": "websocket",
"chat_id": "chat-1",
"metadata": {},
},
},
"discord:456:thread:777": {
SESSION_ROUTING_METADATA_KEY: {
"channel": "discord",
"chat_id": "777",
"metadata": {
"context_chat_id": "456",
"parent_channel_id": "456",
"thread_id": "777",
},
},
},
}
class _FakeSessionManager:
def __init__(self, _workspace: Path) -> None:
pass
def read_session_file(self, key: str) -> dict[str, object] | None:
return {"metadata": route_metadata.get(key, {})}
monkeypatch.setattr("nanobot.session.manager.SessionManager", _FakeSessionManager)
class _FakeCron:
@ -1651,10 +1622,9 @@ def test_gateway_bound_cron_runs_as_session_turn(
assert msg.channel == "websocket"
assert msg.chat_id == "chat-1"
assert msg.sender_id == "cron"
assert msg.session_key_override == "websocket:chat-1"
assert msg.session_key_override is None
assert "Cron job: Check repository health." in msg.content
assert msg.metadata["webui"] is True
assert msg.metadata["workspace_scope"]["project_path"] == str(tmp_path)
assert msg.metadata[WEBUI_MESSAGE_SOURCE_METADATA_KEY] == {
"kind": "cron",
"label": "Repo check",
@ -1675,7 +1645,7 @@ def test_gateway_bound_cron_runs_as_session_turn(
name="Thread check",
payload=CronPayload(
message="Check the Discord thread.",
session_key="discord:456:thread:777",
session_key="discord:777",
),
)
@ -1686,10 +1656,7 @@ def test_gateway_bound_cron_runs_as_session_turn(
assert isinstance(msg, InboundMessage)
assert msg.channel == "discord"
assert msg.chat_id == "777"
assert msg.session_key_override == "discord:456:thread:777"
assert msg.metadata["context_chat_id"] == "456"
assert msg.metadata["parent_channel_id"] == "456"
assert msg.metadata["thread_id"] == "777"
assert msg.session_key_override is None
def test_gateway_cron_job_suppresses_intermediate_progress(

View File

@ -1,53 +0,0 @@
from nanobot.bus.events import InboundMessage
from nanobot.session.routing import routing_context_for_message
def test_routing_context_keeps_telegram_topic_without_stale_message_id() -> None:
context = routing_context_for_message(
InboundMessage(
channel="telegram",
sender_id="user-1",
chat_id="-100123",
content="set a reminder",
metadata={
"message_id": 100,
"message_thread_id": 42,
"_progress": True,
},
session_key_override="telegram:-100123:topic:42",
)
)
assert context == {
"channel": "telegram",
"chat_id": "-100123",
"metadata": {"message_thread_id": 42},
}
def test_routing_context_keeps_feishu_topic_anchor() -> None:
context = routing_context_for_message(
InboundMessage(
channel="feishu",
sender_id="ou_user",
chat_id="oc_chat",
content="set a reminder",
metadata={
"chat_type": "group",
"message_id": "om_msg",
"thread_id": "omt_thread",
"_progress": True,
},
session_key_override="feishu:oc_chat:om_root",
)
)
assert context == {
"channel": "feishu",
"chat_id": "oc_chat",
"metadata": {
"chat_type": "group",
"message_id": "om_msg",
"thread_id": "omt_thread",
},
}

View File

@ -246,8 +246,8 @@ async def test_cron_tool_basic_set_context_and_execute(tmp_path) -> None:
@pytest.mark.asyncio
async def test_webui_cron_tool_uses_unified_session_when_enabled(tmp_path) -> None:
"""WebUI-created automations should follow unified session ownership."""
async def test_webui_cron_tool_uses_origin_session_when_unified_enabled(tmp_path) -> None:
"""WebUI-created cron jobs stay attached to the creating chat."""
tool = CronTool(CronService(tmp_path / "jobs.json"))
class _Tools:
@ -271,7 +271,7 @@ async def test_webui_cron_tool_uses_unified_session_when_enabled(tmp_path) -> No
jobs = tool._cron.list_jobs()
assert len(jobs) == 1
assert jobs[0].payload.session_key == UNIFIED_SESSION_KEY
assert jobs[0].payload.session_key == "websocket:chat-123"
@pytest.mark.asyncio
@ -280,4 +280,4 @@ async def test_cron_tool_no_context_returns_error(tmp_path) -> None:
tool = CronTool(CronService(tmp_path / "jobs.json"))
result = await tool.execute(action="add", message="test", every_seconds=60)
assert result == "Error: scheduled automations must be created from a chat session"
assert result == "Error: scheduled cron jobs must be created from a chat session"