From fb6dd111e1c11afaaa2a7d7ea5ee4a65ce4787d1 Mon Sep 17 00:00:00 2001 From: chengyongru <61816729+chengyongru@users.noreply.github.com> Date: Fri, 10 Apr 2026 17:43:42 +0800 Subject: [PATCH 1/7] =?UTF-8?q?feat(agent):=20auto=20compact=20=E2=80=94?= =?UTF-8?q?=20proactive=20session=20compression=20to=20reduce=20token=20co?= =?UTF-8?q?st=20and=20latency=20(#2982)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the session context into a summary. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary and fresh input. --- README.md | 26 + nanobot/agent/auto_compact.py | 82 +++ nanobot/agent/context.py | 9 +- nanobot/agent/loop.py | 38 +- nanobot/agent/memory.py | 4 + nanobot/cli/commands.py | 3 + nanobot/config/schema.py | 1 + nanobot/nanobot.py | 1 + nanobot/session/manager.py | 3 + tests/agent/test_auto_compact.py | 931 +++++++++++++++++++++++++++++++ 10 files changed, 1091 insertions(+), 7 deletions(-) create mode 100644 nanobot/agent/auto_compact.py create mode 100644 tests/agent/test_auto_compact.py diff --git a/README.md b/README.md index 6098c55ca..a9bf4b5e3 100644 --- a/README.md +++ b/README.md @@ -1503,6 +1503,32 @@ MCP tools are automatically discovered and registered on startup. The LLM can us **Docker security**: The official Docker image runs as a non-root user (`nanobot`, UID 1000) with bubblewrap pre-installed. When using `docker-compose.yml`, the container drops all Linux capabilities except `SYS_ADMIN` (required for bwrap's namespace isolation). +### Auto Compact + +When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the session context into a summary. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary and fresh input. + +```json +{ + "agents": { + "defaults": { + "sessionTtlMinutes": 15 + } + } +} +``` + +| Option | Default | Description | +|--------|---------|-------------| +| `agents.defaults.sessionTtlMinutes` | `0` (disabled) | Minutes of idle time before auto-compaction. Set to `0` to disable. Recommended: `15` — matches typical LLM KV cache expiration, so compacted sessions won't waste cache on cold entries. | + +How it works: +1. **Idle detection**: On each idle tick (~1 s), checks all sessions for expiration. +2. **Background compaction**: Expired sessions are summarized via LLM, then cleared. +3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted). + +> [!TIP] +> The summary survives bot restarts — it's stored in session metadata and recovered on the next message. + ### Timezone Time is context. Context should be precise. diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/auto_compact.py new file mode 100644 index 000000000..171f5f55a --- /dev/null +++ b/nanobot/agent/auto_compact.py @@ -0,0 +1,82 @@ +"""Auto compact: proactive compression of idle sessions to reduce token cost and latency.""" + +from __future__ import annotations + +from datetime import datetime +from typing import TYPE_CHECKING, Callable, Coroutine + +from loguru import logger + +if TYPE_CHECKING: + from nanobot.agent.memory import Consolidator + from nanobot.session.manager import Session, SessionManager + + +class AutoCompact: + def __init__(self, sessions: SessionManager, consolidator: Consolidator, + session_ttl_minutes: int = 0): + self.sessions = sessions + self.consolidator = consolidator + self._ttl = session_ttl_minutes + self._archiving: set[str] = set() + self._summaries: dict[str, tuple[str, datetime]] = {} + + def _is_expired(self, ts: datetime | str | None) -> bool: + if self._ttl <= 0 or not ts: + return False + if isinstance(ts, str): + ts = datetime.fromisoformat(ts) + return (datetime.now() - ts).total_seconds() >= self._ttl * 60 + + @staticmethod + def _format_summary(text: str, last_active: datetime) -> str: + idle_min = int((datetime.now() - last_active).total_seconds() / 60) + return f"Inactive for {idle_min} minutes.\nPrevious conversation summary: {text}" + + def check_expired(self, schedule_background: Callable[[Coroutine], None]) -> None: + for info in self.sessions.list_sessions(): + key = info.get("key", "") + if key and key not in self._archiving and self._is_expired(info.get("updated_at")): + self._archiving.add(key) + logger.debug("Auto-compact: scheduling archival for {} (idle > {} min)", key, self._ttl) + schedule_background(self._archive(key)) + + async def _archive(self, key: str) -> None: + try: + self.sessions.invalidate(key) + session = self.sessions.get_or_create(key) + msgs = session.messages[session.last_consolidated:] + if not msgs: + logger.debug("Auto-compact: skipping {}, no un-consolidated messages", key) + session.updated_at = datetime.now() + self.sessions.save(session) + return + n = len(msgs) + last_active = session.updated_at + await self.consolidator.archive(msgs) + entry = self.consolidator.get_last_history_entry() + summary = (entry or {}).get("content", "") + if summary and summary != "(nothing)": + self._summaries[key] = (summary, last_active) + session.metadata["_last_summary"] = {"text": summary, "last_active": last_active.isoformat()} + session.clear() + self.sessions.save(session) + logger.info("Auto-compact: archived {} ({} messages, summary={})", key, n, bool(summary)) + except Exception: + logger.exception("Auto-compact: failed for {}", key) + finally: + self._archiving.discard(key) + + def prepare_session(self, session: Session, key: str) -> tuple[Session, str | None]: + if key in self._archiving or self._is_expired(session.updated_at): + logger.info("Auto-compact: reloading session {} (archiving={})", key, key in self._archiving) + session = self.sessions.get_or_create(key) + entry = self._summaries.pop(key, None) + if entry: + session.metadata.pop("_last_summary", None) + return session, self._format_summary(entry[0], entry[1]) + if not session.messages and "_last_summary" in session.metadata: + meta = session.metadata.pop("_last_summary") + self.sessions.save(session) + return session, self._format_summary(meta["text"], datetime.fromisoformat(meta["last_active"])) + return session, None diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index 3ac19e7f3..e3460ddfd 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -20,6 +20,7 @@ class ContextBuilder: BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"] _RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]" _MAX_RECENT_HISTORY = 50 + _RUNTIME_CONTEXT_END = "[/Runtime Context]" def __init__(self, workspace: Path, timezone: str | None = None): self.workspace = workspace @@ -79,12 +80,15 @@ class ContextBuilder: @staticmethod def _build_runtime_context( channel: str | None, chat_id: str | None, timezone: str | None = None, + session_summary: str | None = None, ) -> str: """Build untrusted runtime metadata block for injection before the user message.""" lines = [f"Current Time: {current_time_str(timezone)}"] if channel and chat_id: lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"] - return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines) + if session_summary: + lines += ["", "[Resumed Session]", session_summary] + return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines) + "\n" + ContextBuilder._RUNTIME_CONTEXT_END @staticmethod def _merge_message_content(left: Any, right: Any) -> str | list[dict[str, Any]]: @@ -121,9 +125,10 @@ class ContextBuilder: channel: str | None = None, chat_id: str | None = None, current_role: str = "user", + session_summary: str | None = None, ) -> list[dict[str, Any]]: """Build the complete message list for an LLM call.""" - runtime_ctx = self._build_runtime_context(channel, chat_id, self.timezone) + runtime_ctx = self._build_runtime_context(channel, chat_id, self.timezone, session_summary=session_summary) user_content = self._build_user_content(current_message, media) # Merge runtime context and user content into a single user message diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index bc83cc77c..65a5a1abc 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger +from nanobot.agent.auto_compact import AutoCompact from nanobot.agent.context import ContextBuilder from nanobot.agent.hook import AgentHook, AgentHookContext, CompositeHook from nanobot.agent.memory import Consolidator, Dream @@ -145,6 +146,7 @@ class AgentLoop: mcp_servers: dict | None = None, channels_config: ChannelsConfig | None = None, timezone: str | None = None, + session_ttl_minutes: int = 0, hooks: list[AgentHook] | None = None, unified_session: bool = False, ): @@ -217,6 +219,11 @@ class AgentLoop: get_tool_definitions=self.tools.get_definitions, max_completion_tokens=provider.generation.max_tokens, ) + self.auto_compact = AutoCompact( + sessions=self.sessions, + consolidator=self.consolidator, + session_ttl_minutes=session_ttl_minutes, + ) self.dream = Dream( store=self.context.memory, provider=provider, @@ -371,6 +378,7 @@ class AgentLoop: try: msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0) except asyncio.TimeoutError: + self.auto_compact.check_expired(self._schedule_background) continue except asyncio.CancelledError: # Preserve real task cancellation so shutdown can complete cleanly. @@ -497,13 +505,18 @@ class AgentLoop: session = self.sessions.get_or_create(key) if self._restore_runtime_checkpoint(session): self.sessions.save(session) + + session, pending = self.auto_compact.prepare_session(session, key) + await self.consolidator.maybe_consolidate_by_tokens(session) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) history = session.get_history(max_messages=0) current_role = "assistant" if msg.sender_id == "subagent" else "user" + messages = self.context.build_messages( history=history, current_message=msg.content, channel=channel, chat_id=chat_id, + session_summary=pending, current_role=current_role, ) final_content, _, all_msgs, _ = await self._run_agent_loop( @@ -525,6 +538,8 @@ class AgentLoop: if self._restore_runtime_checkpoint(session): self.sessions.save(session) + session, pending = self.auto_compact.prepare_session(session, key) + # Slash commands raw = msg.content.strip() ctx = CommandContext(msg=msg, session=session, key=key, raw=raw, loop=self) @@ -539,9 +554,11 @@ class AgentLoop: message_tool.start_turn() history = session.get_history(max_messages=0) + initial_messages = self.context.build_messages( history=history, current_message=msg.content, + session_summary=pending, media=msg.media if msg.media else None, channel=msg.channel, chat_id=msg.chat_id, ) @@ -645,12 +662,23 @@ class AgentLoop: entry["content"] = filtered elif role == "user": if isinstance(content, str) and content.startswith(ContextBuilder._RUNTIME_CONTEXT_TAG): - # Strip the runtime-context prefix, keep only the user text. - parts = content.split("\n\n", 1) - if len(parts) > 1 and parts[1].strip(): - entry["content"] = parts[1] + # Strip the entire runtime-context block (including any session summary). + # The block is bounded by _RUNTIME_CONTEXT_TAG and _RUNTIME_CONTEXT_END. + end_marker = ContextBuilder._RUNTIME_CONTEXT_END + end_pos = content.find(end_marker) + if end_pos >= 0: + after = content[end_pos + len(end_marker):].lstrip("\n") + if after: + entry["content"] = after + else: + continue else: - continue + # Fallback: no end marker found, strip the tag prefix + after_tag = content[len(ContextBuilder._RUNTIME_CONTEXT_TAG):].lstrip("\n") + if after_tag.strip(): + entry["content"] = after_tag + else: + continue if isinstance(content, list): filtered = self._sanitize_persisted_blocks(content, drop_runtime=True) if not filtered: diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 943d91855..26c5cd45f 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -374,6 +374,10 @@ class Consolidator: weakref.WeakValueDictionary() ) + def get_last_history_entry(self) -> dict[str, Any] | None: + """Return the most recent entry from history.jsonl.""" + return self.store._read_last_entry() + def get_lock(self, session_key: str) -> asyncio.Lock: """Return the shared consolidation lock for one session.""" return self._locks.setdefault(session_key, asyncio.Lock()) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index 5ce8b7937..9d818a9db 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -591,6 +591,7 @@ def serve( channels_config=runtime_config.channels, timezone=runtime_config.agents.defaults.timezone, unified_session=runtime_config.agents.defaults.unified_session, + session_ttl_minutes=runtime_config.agents.defaults.session_ttl_minutes, ) model_name = runtime_config.agents.defaults.model @@ -683,6 +684,7 @@ def gateway( channels_config=config.channels, timezone=config.agents.defaults.timezone, unified_session=config.agents.defaults.unified_session, + session_ttl_minutes=config.agents.defaults.session_ttl_minutes, ) # Set cron callback (needs agent) @@ -915,6 +917,7 @@ def agent( channels_config=config.channels, timezone=config.agents.defaults.timezone, unified_session=config.agents.defaults.unified_session, + session_ttl_minutes=config.agents.defaults.session_ttl_minutes, ) restart_notice = consume_restart_notice_from_env() if restart_notice and should_show_cli_restart_notice(restart_notice, session_id): diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 2d31c8bf9..8ab68d7b5 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -77,6 +77,7 @@ class AgentDefaults(Base): reasoning_effort: str | None = None # low / medium / high / adaptive - enables LLM thinking mode timezone: str = "UTC" # IANA timezone, e.g. "Asia/Shanghai", "America/New_York" unified_session: bool = False # Share one session across all channels (single-user multi-device) + session_ttl_minutes: int = Field(default=0, ge=0) # Auto /new after idle (0 = disabled) dream: DreamConfig = Field(default_factory=DreamConfig) diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index 9166acb27..df0e49842 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -82,6 +82,7 @@ class Nanobot: mcp_servers=config.tools.mcp_servers, timezone=defaults.timezone, unified_session=defaults.unified_session, + session_ttl_minutes=defaults.session_ttl_minutes, ) return cls(loop) diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 27df31405..2ed0624a2 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -155,6 +155,7 @@ class SessionManager: messages = [] metadata = {} created_at = None + updated_at = None last_consolidated = 0 with open(path, encoding="utf-8") as f: @@ -168,6 +169,7 @@ class SessionManager: if data.get("_type") == "metadata": metadata = data.get("metadata", {}) created_at = datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None + updated_at = datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None last_consolidated = data.get("last_consolidated", 0) else: messages.append(data) @@ -176,6 +178,7 @@ class SessionManager: key=key, messages=messages, created_at=created_at or datetime.now(), + updated_at=updated_at or datetime.now(), metadata=metadata, last_consolidated=last_consolidated ) diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py new file mode 100644 index 000000000..8b26254e9 --- /dev/null +++ b/tests/agent/test_auto_compact.py @@ -0,0 +1,931 @@ +"""Tests for auto compact (idle TTL) feature.""" + +import asyncio +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock +from pathlib import Path + +import pytest + +from nanobot.agent.loop import AgentLoop +from nanobot.bus.events import InboundMessage +from nanobot.bus.queue import MessageBus +from nanobot.config.schema import AgentDefaults +from nanobot.command import CommandContext +from nanobot.providers.base import LLMResponse + + +def _make_loop(tmp_path: Path, session_ttl_minutes: int = 15) -> AgentLoop: + """Create a minimal AgentLoop for testing.""" + bus = MessageBus() + provider = MagicMock() + provider.get_default_model.return_value = "test-model" + provider.estimate_prompt_tokens.return_value = (10_000, "test") + provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[])) + provider.generation.max_tokens = 4096 + loop = AgentLoop( + bus=bus, + provider=provider, + workspace=tmp_path, + model="test-model", + context_window_tokens=128_000, + session_ttl_minutes=session_ttl_minutes, + ) + loop.tools.get_definitions = MagicMock(return_value=[]) + return loop + + +class TestSessionTTLConfig: + """Test session TTL configuration.""" + + def test_default_ttl_is_zero(self): + """Default TTL should be 0 (disabled).""" + defaults = AgentDefaults() + assert defaults.session_ttl_minutes == 0 + + def test_custom_ttl(self): + """Custom TTL should be stored correctly.""" + defaults = AgentDefaults(session_ttl_minutes=30) + assert defaults.session_ttl_minutes == 30 + + +class TestAgentLoopTTLParam: + """Test that AutoCompact receives and stores session_ttl_minutes.""" + + def test_loop_stores_ttl(self, tmp_path): + """AutoCompact should store the TTL value.""" + loop = _make_loop(tmp_path, session_ttl_minutes=25) + assert loop.auto_compact._ttl == 25 + + def test_loop_default_ttl_zero(self, tmp_path): + """AutoCompact default TTL should be 0 (disabled).""" + loop = _make_loop(tmp_path, session_ttl_minutes=0) + assert loop.auto_compact._ttl == 0 + + +class TestAutoCompact: + """Test the _archive method.""" + + @pytest.mark.asyncio + async def test_is_expired_boundary(self, tmp_path): + """Exactly at TTL boundary should be expired (>= not >).""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + ts = datetime.now() - timedelta(minutes=15) + assert loop.auto_compact._is_expired(ts) is True + ts2 = datetime.now() - timedelta(minutes=14, seconds=59) + assert loop.auto_compact._is_expired(ts2) is False + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_is_expired_string_timestamp(self, tmp_path): + """_is_expired should parse ISO string timestamps.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + ts = (datetime.now() - timedelta(minutes=20)).isoformat() + assert loop.auto_compact._is_expired(ts) is True + assert loop.auto_compact._is_expired(None) is False + assert loop.auto_compact._is_expired("") is False + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_check_expired_only_archives_expired_sessions(self, tmp_path): + """With multiple sessions, only the expired one should be archived.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + # Expired session + s1 = loop.sessions.get_or_create("cli:expired") + s1.add_message("user", "old") + s1.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(s1) + # Active session + s2 = loop.sessions.get_or_create("cli:active") + s2.add_message("user", "recent") + loop.sessions.save(s2) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.auto_compact.check_expired(loop._schedule_background) + await asyncio.sleep(0.1) + + active_after = loop.sessions.get_or_create("cli:active") + assert len(active_after.messages) == 1 + assert active_after.messages[0]["content"] == "recent" + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_archives_and_clears(self, tmp_path): + """_archive should archive un-consolidated messages and clear session.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + for i in range(4): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + loop.sessions.save(session) + + archived_messages = [] + + async def _fake_archive(messages): + archived_messages.extend(messages) + return True + + loop.consolidator.archive = _fake_archive + + await loop.auto_compact._archive("cli:test") + + assert len(archived_messages) == 8 + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 0 + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_stores_summary(self, tmp_path): + """_archive should store the summary in _summaries.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "hello") + session.add_message("assistant", "hi there") + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.", + } + + await loop.auto_compact._archive("cli:test") + + entry = loop.auto_compact._summaries.get("cli:test") + assert entry is not None + assert entry[0] == "User said hello." + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 0 + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_empty_session(self, tmp_path): + """_archive on empty session should not archive.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + + archive_called = False + + async def _fake_archive(messages): + nonlocal archive_called + archive_called = True + return True + + loop.consolidator.archive = _fake_archive + + await loop.auto_compact._archive("cli:test") + + assert not archive_called + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 0 + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_respects_last_consolidated(self, tmp_path): + """_archive should only archive un-consolidated messages.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + for i in range(10): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + session.last_consolidated = 18 + loop.sessions.save(session) + + archived_count = 0 + + async def _fake_archive(messages): + nonlocal archived_count + archived_count = len(messages) + return True + + loop.consolidator.archive = _fake_archive + + await loop.auto_compact._archive("cli:test") + + assert archived_count == 2 + await loop.close_mcp() + + +class TestAutoCompactIdleDetection: + """Test idle detection triggers auto-new in _process_message.""" + + @pytest.mark.asyncio + async def test_no_auto_compact_when_ttl_disabled(self, tmp_path): + """No auto-new should happen when TTL is 0 (disabled).""" + loop = _make_loop(tmp_path, session_ttl_minutes=0) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.updated_at = datetime.now() - timedelta(minutes=30) + loop.sessions.save(session) + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new msg") + await loop._process_message(msg) + + session_after = loop.sessions.get_or_create("cli:test") + assert any(m["content"] == "old message" for m in session_after.messages) + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_triggers_on_idle(self, tmp_path): + """Proactive auto-new archives expired session; _process_message reloads it.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archived_messages = [] + + async def _fake_archive(messages): + archived_messages.extend(messages) + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + # Simulate proactive archive completing before message arrives + await loop.auto_compact._archive("cli:test") + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new msg") + await loop._process_message(msg) + + session_after = loop.sessions.get_or_create("cli:test") + assert not any(m["content"] == "old message" for m in session_after.messages) + assert any(m["content"] == "new msg" for m in session_after.messages) + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_no_auto_compact_when_active(self, tmp_path): + """No auto-new should happen when session is recently active.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "recent message") + loop.sessions.save(session) + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new msg") + await loop._process_message(msg) + + session_after = loop.sessions.get_or_create("cli:test") + assert any(m["content"] == "recent message" for m in session_after.messages) + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_does_not_affect_priority_commands(self, tmp_path): + """Priority commands (/stop, /restart) bypass _process_message entirely via run().""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + # Priority commands are dispatched in run() before _process_message is called. + # Simulate that path directly via dispatch_priority. + raw = "/stop" + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content=raw) + ctx = CommandContext(msg=msg, session=session, key="cli:test", raw=raw, loop=loop) + result = await loop.commands.dispatch_priority(ctx) + assert result is not None + assert "stopped" in result.content.lower() or "no active task" in result.content.lower() + + # Session should be untouched since priority commands skip _process_message + session_after = loop.sessions.get_or_create("cli:test") + assert any(m["content"] == "old message" for m in session_after.messages) + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_with_slash_new(self, tmp_path): + """Auto-new fires before /new dispatches; session is cleared twice but idempotent.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + for i in range(4): + session.add_message("user", f"msg{i}") + session.add_message("assistant", f"resp{i}") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new") + response = await loop._process_message(msg) + + assert response is not None + assert "new session started" in response.content.lower() + + session_after = loop.sessions.get_or_create("cli:test") + # Session is empty (auto-new archived and cleared, /new cleared again) + assert len(session_after.messages) == 0 + await loop.close_mcp() + + +class TestAutoCompactSystemMessages: + """Test that auto-new also works for system messages.""" + + @pytest.mark.asyncio + async def test_auto_compact_triggers_for_system_messages(self, tmp_path): + """Proactive auto-new archives expired session; system messages reload it.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message from subagent context") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + # Simulate proactive archive completing before system message arrives + await loop.auto_compact._archive("cli:test") + + msg = InboundMessage( + channel="system", sender_id="subagent", chat_id="cli:test", + content="subagent result", + ) + await loop._process_message(msg) + + session_after = loop.sessions.get_or_create("cli:test") + assert not any( + m["content"] == "old message from subagent context" + for m in session_after.messages + ) + await loop.close_mcp() + + +class TestAutoCompactEdgeCases: + """Edge cases for auto session new.""" + + @pytest.mark.asyncio + async def test_auto_compact_with_nothing_summary(self, tmp_path): + """Auto-new should not inject when archive produces '(nothing)'.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "thanks") + session.add_message("assistant", "you're welcome") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + loop.provider.chat_with_retry = AsyncMock( + return_value=LLMResponse(content="(nothing)", tool_calls=[]) + ) + + await loop.auto_compact._archive("cli:test") + + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 0 + # "(nothing)" summary should not be stored + assert "cli:test" not in loop.auto_compact._summaries + + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_archive_failure_still_clears(self, tmp_path): + """Auto-new should clear session even if LLM archive fails (raw_archive fallback).""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "important data") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + loop.provider.chat_with_retry = AsyncMock(side_effect=Exception("API down")) + + # Should not raise + await loop.auto_compact._archive("cli:test") + + session_after = loop.sessions.get_or_create("cli:test") + # Session should be cleared (archive falls back to raw dump) + assert len(session_after.messages) == 0 + + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_auto_compact_preserves_runtime_checkpoint_before_check(self, tmp_path): + """Runtime checkpoint is restored; proactive archive handles the expired session.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.metadata[AgentLoop._RUNTIME_CHECKPOINT_KEY] = { + "assistant_message": {"role": "assistant", "content": "interrupted response"}, + "completed_tool_results": [], + "pending_tool_calls": [], + } + session.add_message("user", "previous message") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archived_messages = [] + + async def _fake_archive(messages): + archived_messages.extend(messages) + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + # Simulate proactive archive completing before message arrives + await loop.auto_compact._archive("cli:test") + + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="continue") + await loop._process_message(msg) + + # The checkpoint-restored message should have been archived by proactive path + assert len(archived_messages) >= 1 + + await loop.close_mcp() + + +class TestAutoCompactIntegration: + """End-to-end test of auto session new feature.""" + + @pytest.mark.asyncio + async def test_full_lifecycle(self, tmp_path): + """ + Full lifecycle: messages -> idle -> auto-new -> archive -> clear -> summary injected as runtime context. + """ + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + + # Phase 1: User has a conversation + session.add_message("user", "I'm learning English, teach me past tense") + session.add_message("assistant", "Past tense is used for actions completed in the past...") + session.add_message("user", "Give me an example") + session.add_message("assistant", '"I walked to the store yesterday."') + loop.sessions.save(session) + + # Phase 2: Time passes (simulate idle) + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + # Phase 3: User returns with a new message + loop.provider.chat_with_retry = AsyncMock( + return_value=LLMResponse( + content="User is learning English past tense. Example: 'I walked to the store yesterday.'", + tool_calls=[], + ) + ) + + msg = InboundMessage( + channel="cli", sender_id="user", chat_id="test", + content="Let's continue, teach me present perfect", + ) + response = await loop._process_message(msg) + + # Phase 4: Verify + session_after = loop.sessions.get_or_create("cli:test") + + # Old messages should be gone + assert not any( + "past tense is used" in str(m.get("content", "")) for m in session_after.messages + ) + + # Summary should NOT be persisted in session (ephemeral, one-shot) + assert not any( + "[Resumed Session]" in str(m.get("content", "")) for m in session_after.messages + ) + # Runtime context end marker should NOT be persisted + assert not any( + "[/Runtime Context]" in str(m.get("content", "")) for m in session_after.messages + ) + + # Pending summary should be consumed (one-shot) + assert "cli:test" not in loop.auto_compact._summaries + + # The new message should be processed (response exists) + assert response is not None + + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_multi_paragraph_user_message_preserved(self, tmp_path): + """Multi-paragraph user messages must be fully preserved after auto-new.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + # Simulate proactive archive completing before message arrives + await loop.auto_compact._archive("cli:test") + + msg = InboundMessage( + channel="cli", sender_id="user", chat_id="test", + content="Paragraph one\n\nParagraph two\n\nParagraph three", + ) + await loop._process_message(msg) + + session_after = loop.sessions.get_or_create("cli:test") + user_msgs = [m for m in session_after.messages if m.get("role") == "user"] + assert len(user_msgs) >= 1 + # All three paragraphs must be preserved + persisted = user_msgs[-1]["content"] + assert "Paragraph one" in persisted + assert "Paragraph two" in persisted + assert "Paragraph three" in persisted + # No runtime context markers in persisted message + assert "[Runtime Context" not in persisted + assert "[/Runtime Context]" not in persisted + await loop.close_mcp() + + +class TestProactiveAutoCompact: + """Test proactive auto-new on idle ticks (TimeoutError path in run loop).""" + + @staticmethod + async def _run_check_expired(loop): + """Helper: run check_expired via callback and wait for background tasks.""" + loop.auto_compact.check_expired(loop._schedule_background) + await asyncio.sleep(0.1) + + @pytest.mark.asyncio + async def test_no_check_when_ttl_disabled(self, tmp_path): + """check_expired should be a no-op when TTL is 0.""" + loop = _make_loop(tmp_path, session_ttl_minutes=0) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.updated_at = datetime.now() - timedelta(minutes=30) + loop.sessions.save(session) + + await self._run_check_expired(loop) + + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 1 + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_proactive_archive_on_idle_tick(self, tmp_path): + """Expired session should be archived during idle tick.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.add_message("assistant", "old response") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archived_messages = [] + + async def _fake_archive(messages): + archived_messages.extend(messages) + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User chatted about old things.", + } + + await self._run_check_expired(loop) + + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 0 + assert len(archived_messages) == 2 + entry = loop.auto_compact._summaries.get("cli:test") + assert entry is not None + assert entry[0] == "User chatted about old things." + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_no_proactive_archive_when_active(self, tmp_path): + """Recently active session should NOT be archived on idle tick.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "recent message") + loop.sessions.save(session) + + await self._run_check_expired(loop) + + session_after = loop.sessions.get_or_create("cli:test") + assert len(session_after.messages) == 1 + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_no_duplicate_archive(self, tmp_path): + """Should not archive the same session twice if already in progress.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archive_count = 0 + started = asyncio.Event() + block_forever = asyncio.Event() + + async def _slow_archive(messages): + nonlocal archive_count + archive_count += 1 + started.set() + await block_forever.wait() + return True + + loop.consolidator.archive = _slow_archive + + # First call starts archiving via callback + loop.auto_compact.check_expired(loop._schedule_background) + await started.wait() + assert archive_count == 1 + + # Second call should skip (key is in _archiving) + loop.auto_compact.check_expired(loop._schedule_background) + await asyncio.sleep(0.05) + assert archive_count == 1 + + # Clean up + block_forever.set() + await asyncio.sleep(0.1) + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_proactive_archive_error_does_not_block(self, tmp_path): + """Proactive archive failure should be caught and not block future ticks.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + async def _failing_archive(messages): + raise RuntimeError("LLM down") + + loop.consolidator.archive = _failing_archive + + # Should not raise + await self._run_check_expired(loop) + + # Key should be removed from _archiving (finally block) + assert "cli:test" not in loop.auto_compact._archiving + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_proactive_archive_skips_empty_sessions(self, tmp_path): + """Proactive archive should not call LLM for sessions with no un-consolidated messages.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archive_called = False + + async def _fake_archive(messages): + nonlocal archive_called + archive_called = True + return True + + loop.consolidator.archive = _fake_archive + + await self._run_check_expired(loop) + + assert not archive_called + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_no_reschedule_after_successful_archive(self, tmp_path): + """Already-archived session should NOT be re-scheduled on subsequent ticks.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "old message") + session.add_message("assistant", "old response") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archive_count = 0 + + async def _fake_archive(messages): + nonlocal archive_count + archive_count += 1 + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + # First tick: archives the session + await self._run_check_expired(loop) + assert archive_count == 1 + + # Second tick: should NOT re-schedule (updated_at is fresh after clear) + await self._run_check_expired(loop) + assert archive_count == 1 # Still 1, not re-scheduled + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_empty_skip_refreshes_updated_at_prevents_reschedule(self, tmp_path): + """Empty session skip refreshes updated_at, preventing immediate re-scheduling.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archive_count = 0 + + async def _fake_archive(messages): + nonlocal archive_count + archive_count += 1 + return True + + loop.consolidator.archive = _fake_archive + + # First tick: skips (no messages), refreshes updated_at + await self._run_check_expired(loop) + assert archive_count == 0 + + # Second tick: should NOT re-schedule because updated_at is fresh + await self._run_check_expired(loop) + assert archive_count == 0 + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_session_can_be_compacted_again_after_new_messages(self, tmp_path): + """After successful compact + user sends new messages + idle again, should compact again.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "first conversation") + session.add_message("assistant", "first response") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + archive_count = 0 + + async def _fake_archive(messages): + nonlocal archive_count + archive_count += 1 + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + # First compact cycle + await loop.auto_compact._archive("cli:test") + assert archive_count == 1 + + # User returns, sends new messages + msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="second topic") + await loop._process_message(msg) + + # Simulate idle again + loop.sessions.invalidate("cli:test") + session2 = loop.sessions.get_or_create("cli:test") + session2.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session2) + + # Second compact cycle should succeed + await loop.auto_compact._archive("cli:test") + assert archive_count == 2 + await loop.close_mcp() + + +class TestSummaryPersistence: + """Test that summary survives restart via session metadata.""" + + @pytest.mark.asyncio + async def test_summary_persisted_in_session_metadata(self, tmp_path): + """After archive, _last_summary should be in session metadata.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "hello") + session.add_message("assistant", "hi there") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.", + } + + await loop.auto_compact._archive("cli:test") + + # Summary should be persisted in session metadata + session_after = loop.sessions.get_or_create("cli:test") + meta = session_after.metadata.get("_last_summary") + assert meta is not None + assert meta["text"] == "User said hello." + assert "last_active" in meta + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_summary_recovered_after_restart(self, tmp_path): + """Summary should be recovered from metadata when _summaries is empty (simulates restart).""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "hello") + session.add_message("assistant", "hi there") + last_active = datetime.now() - timedelta(minutes=20) + session.updated_at = last_active + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.", + } + + # Archive + await loop.auto_compact._archive("cli:test") + + # Simulate restart: clear in-memory state + loop.auto_compact._summaries.clear() + loop.sessions.invalidate("cli:test") + + # prepare_session should recover summary from metadata + reloaded = loop.sessions.get_or_create("cli:test") + _, summary = loop.auto_compact.prepare_session(reloaded, "cli:test") + + assert summary is not None + assert "User said hello." in summary + assert "Inactive for" in summary + # Metadata should be cleaned up after consumption + assert "_last_summary" not in reloaded.metadata + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_metadata_cleanup_no_leak(self, tmp_path): + """_last_summary should be removed from metadata after being consumed.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "hello") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + await loop.auto_compact._archive("cli:test") + + # Clear in-memory to force metadata path + loop.auto_compact._summaries.clear() + loop.sessions.invalidate("cli:test") + reloaded = loop.sessions.get_or_create("cli:test") + + # First call: consumes from metadata + _, summary = loop.auto_compact.prepare_session(reloaded, "cli:test") + assert summary is not None + + # Second call: no summary (already consumed) + _, summary2 = loop.auto_compact.prepare_session(reloaded, "cli:test") + assert summary2 is None + assert "_last_summary" not in reloaded.metadata + await loop.close_mcp() + + @pytest.mark.asyncio + async def test_metadata_cleanup_on_inmemory_path(self, tmp_path): + """In-memory _summaries path should also clean up _last_summary from metadata.""" + loop = _make_loop(tmp_path, session_ttl_minutes=15) + session = loop.sessions.get_or_create("cli:test") + session.add_message("user", "hello") + session.updated_at = datetime.now() - timedelta(minutes=20) + loop.sessions.save(session) + + async def _fake_archive(messages): + return True + + loop.consolidator.archive = _fake_archive + loop.consolidator.get_last_history_entry = lambda: { + "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", + } + + await loop.auto_compact._archive("cli:test") + + # Both _summaries and metadata have the summary + assert "cli:test" in loop.auto_compact._summaries + loop.sessions.invalidate("cli:test") + reloaded = loop.sessions.get_or_create("cli:test") + assert "_last_summary" in reloaded.metadata + + # In-memory path is taken (no restart) + _, summary = loop.auto_compact.prepare_session(reloaded, "cli:test") + assert summary is not None + # Metadata should also be cleaned up + assert "_last_summary" not in reloaded.metadata + await loop.close_mcp() From 69d60e2b063b1b5b649a97320b36d8b7612b7f8a Mon Sep 17 00:00:00 2001 From: chengyongru Date: Fri, 10 Apr 2026 18:03:36 +0800 Subject: [PATCH 2/7] fix(agent): handle UnicodeDecodeError in _read_last_entry history.jsonl may contain non-UTF-8 bytes (e.g. from email channel binary content), causing auto compact to fail when reading the last entry for summary generation. Catch UnicodeDecodeError alongside FileNotFoundError and JSONDecodeError. --- nanobot/agent/memory.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 26c5cd45f..e9662ff2c 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -290,7 +290,7 @@ class MemoryStore: if not lines: return None return json.loads(lines[-1]) - except (FileNotFoundError, json.JSONDecodeError): + except (FileNotFoundError, json.JSONDecodeError, UnicodeDecodeError): return None def _write_entries(self, entries: list[dict[str, Any]]) -> None: From d03458f0346ffbc59996e265dece8520e79e974b Mon Sep 17 00:00:00 2001 From: chengyongru Date: Fri, 10 Apr 2026 18:14:14 +0800 Subject: [PATCH 3/7] fix(agent): eliminate race condition in auto compact summary retrieval MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Make Consolidator.archive() return the summary string directly instead of writing to history.jsonl then reading back via get_last_history_entry(). This eliminates a race condition where concurrent _archive calls for different sessions could read each other's summaries from the shared history file (cross-user context leak in multi-user deployments). Also removes Consolidator.get_last_history_entry() — no longer needed. --- nanobot/agent/auto_compact.py | 6 +-- nanobot/agent/memory.py | 14 +++--- tests/agent/test_auto_compact.py | 76 +++++++++----------------------- tests/agent/test_consolidator.py | 6 +-- 4 files changed, 31 insertions(+), 71 deletions(-) diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/auto_compact.py index 171f5f55a..f30feac17 100644 --- a/nanobot/agent/auto_compact.py +++ b/nanobot/agent/auto_compact.py @@ -53,9 +53,7 @@ class AutoCompact: return n = len(msgs) last_active = session.updated_at - await self.consolidator.archive(msgs) - entry = self.consolidator.get_last_history_entry() - summary = (entry or {}).get("content", "") + summary = await self.consolidator.archive(msgs) or "" if summary and summary != "(nothing)": self._summaries[key] = (summary, last_active) session.metadata["_last_summary"] = {"text": summary, "last_active": last_active.isoformat()} @@ -71,6 +69,8 @@ class AutoCompact: if key in self._archiving or self._is_expired(session.updated_at): logger.info("Auto-compact: reloading session {} (archiving={})", key, key in self._archiving) session = self.sessions.get_or_create(key) + # Hot path: summary from in-memory dict (process hasn't restarted). + # Also clean metadata copy so stale _last_summary never leaks to disk. entry = self._summaries.pop(key, None) if entry: session.metadata.pop("_last_summary", None) diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index e9662ff2c..04d988ee5 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -374,10 +374,6 @@ class Consolidator: weakref.WeakValueDictionary() ) - def get_last_history_entry(self) -> dict[str, Any] | None: - """Return the most recent entry from history.jsonl.""" - return self.store._read_last_entry() - def get_lock(self, session_key: str) -> asyncio.Lock: """Return the shared consolidation lock for one session.""" return self._locks.setdefault(session_key, asyncio.Lock()) @@ -437,13 +433,13 @@ class Consolidator: self._get_tool_definitions(), ) - async def archive(self, messages: list[dict]) -> bool: + async def archive(self, messages: list[dict]) -> str | None: """Summarize messages via LLM and append to history.jsonl. - Returns True on success (or degraded success), False if nothing to do. + Returns the summary text on success, None if nothing to archive. """ if not messages: - return False + return None try: formatted = MemoryStore._format_messages(messages) response = await self.provider.chat_with_retry( @@ -463,11 +459,11 @@ class Consolidator: ) summary = response.content or "[no summary]" self.store.append_history(summary) - return True + return summary except Exception: logger.warning("Consolidation LLM call failed, raw-dumping to history") self.store.raw_archive(messages) - return True + return None async def maybe_consolidate_by_tokens(self, session: Session) -> None: """Loop: archive old messages until prompt fits within safe budget. diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py index 8b26254e9..39792e290 100644 --- a/tests/agent/test_auto_compact.py +++ b/tests/agent/test_auto_compact.py @@ -101,7 +101,7 @@ class TestAutoCompact: loop.sessions.save(s2) async def _fake_archive(messages): - return True + return "Summary." loop.consolidator.archive = _fake_archive loop.auto_compact.check_expired(loop._schedule_background) @@ -126,7 +126,7 @@ class TestAutoCompact: async def _fake_archive(messages): archived_messages.extend(messages) - return True + return "Summary." loop.consolidator.archive = _fake_archive @@ -147,12 +147,9 @@ class TestAutoCompact: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "User said hello." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.", - } await loop.auto_compact._archive("cli:test") @@ -174,7 +171,7 @@ class TestAutoCompact: async def _fake_archive(messages): nonlocal archive_called archive_called = True - return True + return "Summary." loop.consolidator.archive = _fake_archive @@ -201,7 +198,7 @@ class TestAutoCompact: async def _fake_archive(messages): nonlocal archived_count archived_count = len(messages) - return True + return "Summary." loop.consolidator.archive = _fake_archive @@ -243,12 +240,9 @@ class TestAutoCompactIdleDetection: async def _fake_archive(messages): archived_messages.extend(messages) - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } # Simulate proactive archive completing before message arrives await loop.auto_compact._archive("cli:test") @@ -311,7 +305,7 @@ class TestAutoCompactIdleDetection: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "Summary." loop.consolidator.archive = _fake_archive @@ -340,12 +334,9 @@ class TestAutoCompactSystemMessages: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } # Simulate proactive archive completing before system message arrives await loop.auto_compact._archive("cli:test") @@ -428,12 +419,9 @@ class TestAutoCompactEdgeCases: async def _fake_archive(messages): archived_messages.extend(messages) - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } # Simulate proactive archive completing before message arrives await loop.auto_compact._archive("cli:test") @@ -518,12 +506,9 @@ class TestAutoCompactIntegration: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } # Simulate proactive archive completing before message arrives await loop.auto_compact._archive("cli:test") @@ -586,12 +571,9 @@ class TestProactiveAutoCompact: async def _fake_archive(messages): archived_messages.extend(messages) - return True + return "User chatted about old things." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User chatted about old things.", - } await self._run_check_expired(loop) @@ -635,7 +617,7 @@ class TestProactiveAutoCompact: archive_count += 1 started.set() await block_forever.wait() - return True + return "Summary." loop.consolidator.archive = _slow_archive @@ -688,7 +670,7 @@ class TestProactiveAutoCompact: async def _fake_archive(messages): nonlocal archive_called archive_called = True - return True + return "Summary." loop.consolidator.archive = _fake_archive @@ -712,12 +694,9 @@ class TestProactiveAutoCompact: async def _fake_archive(messages): nonlocal archive_count archive_count += 1 - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } # First tick: archives the session await self._run_check_expired(loop) @@ -741,7 +720,7 @@ class TestProactiveAutoCompact: async def _fake_archive(messages): nonlocal archive_count archive_count += 1 - return True + return "Summary." loop.consolidator.archive = _fake_archive @@ -769,12 +748,9 @@ class TestProactiveAutoCompact: async def _fake_archive(messages): nonlocal archive_count archive_count += 1 - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } # First compact cycle await loop.auto_compact._archive("cli:test") @@ -810,12 +786,9 @@ class TestSummaryPersistence: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "User said hello." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.", - } await loop.auto_compact._archive("cli:test") @@ -839,12 +812,9 @@ class TestSummaryPersistence: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "User said hello." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.", - } # Archive await loop.auto_compact._archive("cli:test") @@ -874,12 +844,9 @@ class TestSummaryPersistence: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } await loop.auto_compact._archive("cli:test") @@ -908,12 +875,9 @@ class TestSummaryPersistence: loop.sessions.save(session) async def _fake_archive(messages): - return True + return "Summary." loop.consolidator.archive = _fake_archive - loop.consolidator.get_last_history_entry = lambda: { - "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.", - } await loop.auto_compact._archive("cli:test") diff --git a/tests/agent/test_consolidator.py b/tests/agent/test_consolidator.py index b7989d9dd..28587e1b4 100644 --- a/tests/agent/test_consolidator.py +++ b/tests/agent/test_consolidator.py @@ -46,7 +46,7 @@ class TestConsolidatorSummarize: {"role": "assistant", "content": "Done, fixed the race condition."}, ] result = await consolidator.archive(messages) - assert result is True + assert result == "User fixed a bug in the auth module." entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 @@ -55,14 +55,14 @@ class TestConsolidatorSummarize: mock_provider.chat_with_retry.side_effect = Exception("API error") messages = [{"role": "user", "content": "hello"}] result = await consolidator.archive(messages) - assert result is True # always succeeds + assert result is None # no summary on raw dump fallback entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 assert "[RAW]" in entries[0]["content"] async def test_summarize_skips_empty_messages(self, consolidator): result = await consolidator.archive([]) - assert result is False + assert result is None class TestConsolidatorTokenBudget: From 1cb28b39a30cea4be4ca57e7a1997bb06e9f71b3 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sat, 11 Apr 2026 07:25:50 +0000 Subject: [PATCH 4/7] feat(agent): retain recent context during auto compact Keep a legal recent suffix in idle auto-compacted sessions so resumed chats preserve their freshest live context while older messages are summarized. Recover persisted summaries even when retained messages remain, and document the new behavior. --- README.md | 6 +- nanobot/agent/auto_compact.py | 51 +++++++++++--- tests/agent/test_auto_compact.py | 116 ++++++++++++++++--------------- 3 files changed, 104 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index a9bf4b5e3..88ff35f29 100644 --- a/README.md +++ b/README.md @@ -1505,7 +1505,7 @@ MCP tools are automatically discovered and registered on startup. The LLM can us ### Auto Compact -When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the session context into a summary. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary and fresh input. +When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input. ```json { @@ -1523,8 +1523,8 @@ When a user is idle for longer than a configured TTL, nanobot **proactively** co How it works: 1. **Idle detection**: On each idle tick (~1 s), checks all sessions for expiration. -2. **Background compaction**: Expired sessions are summarized via LLM, then cleared. -3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted). +2. **Background compaction**: Expired sessions summarize the older live prefix via LLM and keep the most recent legal suffix (currently 8 messages). +3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted) alongside the retained recent suffix. > [!TIP] > The summary survives bot restarts — it's stored in session metadata and recovered on the next message. diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/auto_compact.py index f30feac17..47c7b5a36 100644 --- a/nanobot/agent/auto_compact.py +++ b/nanobot/agent/auto_compact.py @@ -3,16 +3,18 @@ from __future__ import annotations from datetime import datetime -from typing import TYPE_CHECKING, Callable, Coroutine +from typing import TYPE_CHECKING, Any, Callable, Coroutine from loguru import logger +from nanobot.session.manager import Session, SessionManager if TYPE_CHECKING: from nanobot.agent.memory import Consolidator - from nanobot.session.manager import Session, SessionManager class AutoCompact: + _RECENT_SUFFIX_MESSAGES = 8 + def __init__(self, sessions: SessionManager, consolidator: Consolidator, session_ttl_minutes: int = 0): self.sessions = sessions @@ -33,6 +35,27 @@ class AutoCompact: idle_min = int((datetime.now() - last_active).total_seconds() / 60) return f"Inactive for {idle_min} minutes.\nPrevious conversation summary: {text}" + def _split_unconsolidated( + self, session: Session, + ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]: + """Split live session tail into archiveable prefix and retained recent suffix.""" + tail = list(session.messages[session.last_consolidated:]) + if not tail: + return [], [] + + probe = Session( + key=session.key, + messages=tail.copy(), + created_at=session.created_at, + updated_at=session.updated_at, + metadata={}, + last_consolidated=0, + ) + probe.retain_recent_legal_suffix(self._RECENT_SUFFIX_MESSAGES) + kept = probe.messages + cut = len(tail) - len(kept) + return tail[:cut], kept + def check_expired(self, schedule_background: Callable[[Coroutine], None]) -> None: for info in self.sessions.list_sessions(): key = info.get("key", "") @@ -45,21 +68,31 @@ class AutoCompact: try: self.sessions.invalidate(key) session = self.sessions.get_or_create(key) - msgs = session.messages[session.last_consolidated:] - if not msgs: + archive_msgs, kept_msgs = self._split_unconsolidated(session) + if not archive_msgs and not kept_msgs: logger.debug("Auto-compact: skipping {}, no un-consolidated messages", key) session.updated_at = datetime.now() self.sessions.save(session) return - n = len(msgs) + last_active = session.updated_at - summary = await self.consolidator.archive(msgs) or "" + summary = "" + if archive_msgs: + summary = await self.consolidator.archive(archive_msgs) or "" if summary and summary != "(nothing)": self._summaries[key] = (summary, last_active) session.metadata["_last_summary"] = {"text": summary, "last_active": last_active.isoformat()} - session.clear() + session.messages = kept_msgs + session.last_consolidated = 0 + session.updated_at = datetime.now() self.sessions.save(session) - logger.info("Auto-compact: archived {} ({} messages, summary={})", key, n, bool(summary)) + logger.info( + "Auto-compact: archived {} (archived={}, kept={}, summary={})", + key, + len(archive_msgs), + len(kept_msgs), + bool(summary), + ) except Exception: logger.exception("Auto-compact: failed for {}", key) finally: @@ -75,7 +108,7 @@ class AutoCompact: if entry: session.metadata.pop("_last_summary", None) return session, self._format_summary(entry[0], entry[1]) - if not session.messages and "_last_summary" in session.metadata: + if "_last_summary" in session.metadata: meta = session.metadata.pop("_last_summary") self.sessions.save(session) return session, self._format_summary(meta["text"], datetime.fromisoformat(meta["last_active"])) diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py index 39792e290..8f1be03a2 100644 --- a/tests/agent/test_auto_compact.py +++ b/tests/agent/test_auto_compact.py @@ -35,6 +35,13 @@ def _make_loop(tmp_path: Path, session_ttl_minutes: int = 15) -> AgentLoop: return loop +def _add_turns(session, turns: int, *, prefix: str = "msg") -> None: + """Append simple user/assistant turns to a session.""" + for i in range(turns): + session.add_message("user", f"{prefix} user {i}") + session.add_message("assistant", f"{prefix} assistant {i}") + + class TestSessionTTLConfig: """Test session TTL configuration.""" @@ -113,13 +120,11 @@ class TestAutoCompact: await loop.close_mcp() @pytest.mark.asyncio - async def test_auto_compact_archives_and_clears(self, tmp_path): - """_archive should archive un-consolidated messages and clear session.""" + async def test_auto_compact_archives_prefix_and_keeps_recent_suffix(self, tmp_path): + """_archive should summarize the old prefix and keep a recent legal suffix.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - for i in range(4): - session.add_message("user", f"msg{i}") - session.add_message("assistant", f"resp{i}") + _add_turns(session, 6) loop.sessions.save(session) archived_messages = [] @@ -132,9 +137,11 @@ class TestAutoCompact: await loop.auto_compact._archive("cli:test") - assert len(archived_messages) == 8 + assert len(archived_messages) == 4 session_after = loop.sessions.get_or_create("cli:test") - assert len(session_after.messages) == 0 + assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES + assert session_after.messages[0]["content"] == "msg user 2" + assert session_after.messages[-1]["content"] == "msg assistant 5" await loop.close_mcp() @pytest.mark.asyncio @@ -142,8 +149,7 @@ class TestAutoCompact: """_archive should store the summary in _summaries.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "hello") - session.add_message("assistant", "hi there") + _add_turns(session, 6, prefix="hello") loop.sessions.save(session) async def _fake_archive(messages): @@ -157,7 +163,7 @@ class TestAutoCompact: assert entry is not None assert entry[0] == "User said hello." session_after = loop.sessions.get_or_create("cli:test") - assert len(session_after.messages) == 0 + assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES await loop.close_mcp() @pytest.mark.asyncio @@ -187,9 +193,7 @@ class TestAutoCompact: """_archive should only archive un-consolidated messages.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - for i in range(10): - session.add_message("user", f"msg{i}") - session.add_message("assistant", f"resp{i}") + _add_turns(session, 14) session.last_consolidated = 18 loop.sessions.save(session) @@ -232,7 +236,7 @@ class TestAutoCompactIdleDetection: """Proactive auto-new archives expired session; _process_message reloads it.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "old message") + _add_turns(session, 6, prefix="old") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -251,7 +255,8 @@ class TestAutoCompactIdleDetection: await loop._process_message(msg) session_after = loop.sessions.get_or_create("cli:test") - assert not any(m["content"] == "old message" for m in session_after.messages) + assert len(archived_messages) == 4 + assert not any(m["content"] == "old user 0" for m in session_after.messages) assert any(m["content"] == "new msg" for m in session_after.messages) await loop.close_mcp() @@ -329,7 +334,7 @@ class TestAutoCompactSystemMessages: """Proactive auto-new archives expired session; system messages reload it.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "old message from subagent context") + _add_turns(session, 6, prefix="old") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -349,7 +354,7 @@ class TestAutoCompactSystemMessages: session_after = loop.sessions.get_or_create("cli:test") assert not any( - m["content"] == "old message from subagent context" + m["content"] == "old user 0" for m in session_after.messages ) await loop.close_mcp() @@ -363,8 +368,7 @@ class TestAutoCompactEdgeCases: """Auto-new should not inject when archive produces '(nothing)'.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "thanks") - session.add_message("assistant", "you're welcome") + _add_turns(session, 6, prefix="thanks") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -375,18 +379,18 @@ class TestAutoCompactEdgeCases: await loop.auto_compact._archive("cli:test") session_after = loop.sessions.get_or_create("cli:test") - assert len(session_after.messages) == 0 + assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES # "(nothing)" summary should not be stored assert "cli:test" not in loop.auto_compact._summaries await loop.close_mcp() @pytest.mark.asyncio - async def test_auto_compact_archive_failure_still_clears(self, tmp_path): - """Auto-new should clear session even if LLM archive fails (raw_archive fallback).""" + async def test_auto_compact_archive_failure_still_keeps_recent_suffix(self, tmp_path): + """Auto-new should keep the recent suffix even if LLM archive falls back to raw dump.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "important data") + _add_turns(session, 6, prefix="important") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -396,14 +400,13 @@ class TestAutoCompactEdgeCases: await loop.auto_compact._archive("cli:test") session_after = loop.sessions.get_or_create("cli:test") - # Session should be cleared (archive falls back to raw dump) - assert len(session_after.messages) == 0 + assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES await loop.close_mcp() @pytest.mark.asyncio async def test_auto_compact_preserves_runtime_checkpoint_before_check(self, tmp_path): - """Runtime checkpoint is restored; proactive archive handles the expired session.""" + """Short expired sessions keep recent messages; checkpoint restore still works on resume.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") session.metadata[AgentLoop._RUNTIME_CHECKPOINT_KEY] = { @@ -429,8 +432,10 @@ class TestAutoCompactEdgeCases: msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="continue") await loop._process_message(msg) - # The checkpoint-restored message should have been archived by proactive path - assert len(archived_messages) >= 1 + session_after = loop.sessions.get_or_create("cli:test") + assert archived_messages == [] + assert any(m["content"] == "previous message" for m in session_after.messages) + assert any(m["content"] == "interrupted response" for m in session_after.messages) await loop.close_mcp() @@ -446,11 +451,17 @@ class TestAutoCompactIntegration: loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - # Phase 1: User has a conversation + # Phase 1: User has a conversation longer than the retained recent suffix session.add_message("user", "I'm learning English, teach me past tense") session.add_message("assistant", "Past tense is used for actions completed in the past...") session.add_message("user", "Give me an example") session.add_message("assistant", '"I walked to the store yesterday."') + session.add_message("user", "Give me another example") + session.add_message("assistant", '"She visited Paris last year."') + session.add_message("user", "Quiz me") + session.add_message("assistant", "What is the past tense of go?") + session.add_message("user", "I think it is went") + session.add_message("assistant", "Correct.") loop.sessions.save(session) # Phase 2: Time passes (simulate idle) @@ -474,7 +485,7 @@ class TestAutoCompactIntegration: # Phase 4: Verify session_after = loop.sessions.get_or_create("cli:test") - # Old messages should be gone + # The oldest messages should be trimmed from live session history assert not any( "past tense is used" in str(m.get("content", "")) for m in session_after.messages ) @@ -497,8 +508,8 @@ class TestAutoCompactIntegration: await loop.close_mcp() @pytest.mark.asyncio - async def test_multi_paragraph_user_message_preserved(self, tmp_path): - """Multi-paragraph user messages must be fully preserved after auto-new.""" + async def test_runtime_context_markers_not_persisted_for_multi_paragraph_turn(self, tmp_path): + """Auto-compact resume context must not leak runtime markers into persisted session history.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") session.add_message("user", "old message") @@ -520,16 +531,11 @@ class TestAutoCompactIntegration: await loop._process_message(msg) session_after = loop.sessions.get_or_create("cli:test") - user_msgs = [m for m in session_after.messages if m.get("role") == "user"] - assert len(user_msgs) >= 1 - # All three paragraphs must be preserved - persisted = user_msgs[-1]["content"] - assert "Paragraph one" in persisted - assert "Paragraph two" in persisted - assert "Paragraph three" in persisted - # No runtime context markers in persisted message - assert "[Runtime Context" not in persisted - assert "[/Runtime Context]" not in persisted + assert any(m.get("content") == "old message" for m in session_after.messages) + for persisted in session_after.messages: + content = str(persisted.get("content", "")) + assert "[Runtime Context" not in content + assert "[/Runtime Context]" not in content await loop.close_mcp() @@ -562,8 +568,7 @@ class TestProactiveAutoCompact: """Expired session should be archived during idle tick.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "old message") - session.add_message("assistant", "old response") + _add_turns(session, 5, prefix="old") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -578,7 +583,7 @@ class TestProactiveAutoCompact: await self._run_check_expired(loop) session_after = loop.sessions.get_or_create("cli:test") - assert len(session_after.messages) == 0 + assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES assert len(archived_messages) == 2 entry = loop.auto_compact._summaries.get("cli:test") assert entry is not None @@ -604,7 +609,7 @@ class TestProactiveAutoCompact: """Should not archive the same session twice if already in progress.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "old message") + _add_turns(session, 6, prefix="old") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -641,7 +646,7 @@ class TestProactiveAutoCompact: """Proactive archive failure should be caught and not block future ticks.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "old message") + _add_turns(session, 6, prefix="old") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -684,8 +689,7 @@ class TestProactiveAutoCompact: """Already-archived session should NOT be re-scheduled on subsequent ticks.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "old message") - session.add_message("assistant", "old response") + _add_turns(session, 5, prefix="old") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -738,8 +742,7 @@ class TestProactiveAutoCompact: """After successful compact + user sends new messages + idle again, should compact again.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "first conversation") - session.add_message("assistant", "first response") + _add_turns(session, 5, prefix="first") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -780,8 +783,7 @@ class TestSummaryPersistence: """After archive, _last_summary should be in session metadata.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "hello") - session.add_message("assistant", "hi there") + _add_turns(session, 6, prefix="hello") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -805,8 +807,7 @@ class TestSummaryPersistence: """Summary should be recovered from metadata when _summaries is empty (simulates restart).""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "hello") - session.add_message("assistant", "hi there") + _add_turns(session, 6, prefix="hello") last_active = datetime.now() - timedelta(minutes=20) session.updated_at = last_active loop.sessions.save(session) @@ -825,6 +826,7 @@ class TestSummaryPersistence: # prepare_session should recover summary from metadata reloaded = loop.sessions.get_or_create("cli:test") + assert len(reloaded.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES _, summary = loop.auto_compact.prepare_session(reloaded, "cli:test") assert summary is not None @@ -839,7 +841,7 @@ class TestSummaryPersistence: """_last_summary should be removed from metadata after being consumed.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "hello") + _add_turns(session, 6, prefix="hello") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) @@ -870,7 +872,7 @@ class TestSummaryPersistence: """In-memory _summaries path should also clean up _last_summary from metadata.""" loop = _make_loop(tmp_path, session_ttl_minutes=15) session = loop.sessions.get_or_create("cli:test") - session.add_message("user", "hello") + _add_turns(session, 6, prefix="hello") session.updated_at = datetime.now() - timedelta(minutes=20) loop.sessions.save(session) From 84e840659aabc5682d3699c9466a050d82f644df Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sat, 11 Apr 2026 07:32:56 +0000 Subject: [PATCH 5/7] refactor(config): rename auto compact config key Prefer the more user-friendly idleCompactAfterMinutes name for auto compact while keeping sessionTtlMinutes as a backward-compatible alias. Update tests and README to document the retained recent-context behavior and the new preferred key. --- README.md | 13 ++++++++----- nanobot/config/schema.py | 7 ++++++- tests/agent/test_auto_compact.py | 17 +++++++++++++++++ 3 files changed, 31 insertions(+), 6 deletions(-) diff --git a/README.md b/README.md index 88ff35f29..856986754 100644 --- a/README.md +++ b/README.md @@ -1505,13 +1505,13 @@ MCP tools are automatically discovered and registered on startup. The LLM can us ### Auto Compact -When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input. +When a user is idle for longer than a configured threshold, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input. ```json { "agents": { "defaults": { - "sessionTtlMinutes": 15 + "idleCompactAfterMinutes": 15 } } } @@ -1519,15 +1519,18 @@ When a user is idle for longer than a configured TTL, nanobot **proactively** co | Option | Default | Description | |--------|---------|-------------| -| `agents.defaults.sessionTtlMinutes` | `0` (disabled) | Minutes of idle time before auto-compaction. Set to `0` to disable. Recommended: `15` — matches typical LLM KV cache expiration, so compacted sessions won't waste cache on cold entries. | +| `agents.defaults.idleCompactAfterMinutes` | `0` (disabled) | Minutes of idle time before auto-compaction starts. Set to `0` to disable. Recommended: `15` — close to a typical LLM KV cache expiry window, so stale sessions get compacted before the user returns. | + +`sessionTtlMinutes` remains accepted as a legacy alias for backward compatibility, but `idleCompactAfterMinutes` is the preferred config key going forward. How it works: 1. **Idle detection**: On each idle tick (~1 s), checks all sessions for expiration. -2. **Background compaction**: Expired sessions summarize the older live prefix via LLM and keep the most recent legal suffix (currently 8 messages). +2. **Background compaction**: Idle sessions summarize the older live prefix via LLM and keep the most recent legal suffix (currently 8 messages). 3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted) alongside the retained recent suffix. +4. **Restart-safe resume**: The summary is also mirrored into session metadata so it can still be recovered after a process restart. > [!TIP] -> The summary survives bot restarts — it's stored in session metadata and recovered on the next message. +> Think of auto compact as "summarize older context, keep the freshest live turns." It is not a hard session reset. ### Timezone diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index 8ab68d7b5..67cce4470 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -77,7 +77,12 @@ class AgentDefaults(Base): reasoning_effort: str | None = None # low / medium / high / adaptive - enables LLM thinking mode timezone: str = "UTC" # IANA timezone, e.g. "Asia/Shanghai", "America/New_York" unified_session: bool = False # Share one session across all channels (single-user multi-device) - session_ttl_minutes: int = Field(default=0, ge=0) # Auto /new after idle (0 = disabled) + session_ttl_minutes: int = Field( + default=0, + ge=0, + validation_alias=AliasChoices("idleCompactAfterMinutes", "sessionTtlMinutes"), + serialization_alias="idleCompactAfterMinutes", + ) # Auto-compact idle threshold in minutes (0 = disabled) dream: DreamConfig = Field(default_factory=DreamConfig) diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py index 8f1be03a2..b3462820b 100644 --- a/tests/agent/test_auto_compact.py +++ b/tests/agent/test_auto_compact.py @@ -55,6 +55,23 @@ class TestSessionTTLConfig: defaults = AgentDefaults(session_ttl_minutes=30) assert defaults.session_ttl_minutes == 30 + def test_user_friendly_alias_is_supported(self): + """Config should accept idleCompactAfterMinutes as the preferred JSON key.""" + defaults = AgentDefaults.model_validate({"idleCompactAfterMinutes": 30}) + assert defaults.session_ttl_minutes == 30 + + def test_legacy_alias_is_still_supported(self): + """Config should still accept the old sessionTtlMinutes key for compatibility.""" + defaults = AgentDefaults.model_validate({"sessionTtlMinutes": 30}) + assert defaults.session_ttl_minutes == 30 + + def test_serializes_with_user_friendly_alias(self): + """Config dumps should use idleCompactAfterMinutes for JSON output.""" + defaults = AgentDefaults(session_ttl_minutes=30) + data = defaults.model_dump(mode="json", by_alias=True) + assert data["idleCompactAfterMinutes"] == 30 + assert "sessionTtlMinutes" not in data + class TestAgentLoopTTLParam: """Test that AutoCompact receives and stores session_ttl_minutes.""" From 5932482d01bb442e99143cabea2c0a0c272c5a1b Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Sat, 11 Apr 2026 07:49:31 +0000 Subject: [PATCH 6/7] refactor(agent): rename auto compact module Rename the auto compact module to autocompact.py for a cleaner path while keeping the AutoCompact type and behavior unchanged. Update the agent loop import to match. --- nanobot/agent/{auto_compact.py => autocompact.py} | 0 nanobot/agent/loop.py | 2 +- 2 files changed, 1 insertion(+), 1 deletion(-) rename nanobot/agent/{auto_compact.py => autocompact.py} (100%) diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/autocompact.py similarity index 100% rename from nanobot/agent/auto_compact.py rename to nanobot/agent/autocompact.py diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 65a5a1abc..05a27349f 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -13,7 +13,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable from loguru import logger -from nanobot.agent.auto_compact import AutoCompact +from nanobot.agent.autocompact import AutoCompact from nanobot.agent.context import ContextBuilder from nanobot.agent.hook import AgentHook, AgentHookContext, CompositeHook from nanobot.agent.memory import Consolidator, Dream From e0ba56808967f6bac652fa8e5ed13ac32874f2e4 Mon Sep 17 00:00:00 2001 From: weitongtong Date: Sat, 11 Apr 2026 14:34:45 +0800 Subject: [PATCH 7/7] =?UTF-8?q?fix(cron):=20=E4=BF=AE=E5=A4=8D=E5=9B=BA?= =?UTF-8?q?=E5=AE=9A=E9=97=B4=E9=9A=94=E4=BB=BB=E5=8A=A1=E5=9B=A0=20store?= =?UTF-8?q?=20=E5=B9=B6=E5=8F=91=E6=9B=BF=E6=8D=A2=E5=AF=BC=E8=87=B4?= =?UTF-8?q?=E7=9A=84=E9=87=8D=E5=A4=8D=E6=89=A7=E8=A1=8C?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit _on_timer 中 await _execute_job 让出控制权期间,前端轮询触发的 list_jobs 调用 _load_store 从磁盘重新加载覆盖 self._store, 已执行任务的状态被旧值回退,导致再次触发。 引入 _timer_active 标志位,在任务执行期间阻止并发 _load_store 替换 store。同时修复 store 为空时未重新 arm timer 的问题。 Made-with: Cursor --- nanobot/cron/service.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py index 267613012..165ce54d7 100644 --- a/nanobot/cron/service.py +++ b/nanobot/cron/service.py @@ -80,6 +80,7 @@ class CronService: self._store: CronStore | None = None self._timer_task: asyncio.Task | None = None self._running = False + self._timer_active = False self.max_sleep_ms = max_sleep_ms def _load_jobs(self) -> tuple[list[CronJob], int]: @@ -171,7 +172,11 @@ class CronService: def _load_store(self) -> CronStore: """Load jobs from disk. Reloads automatically if file was modified externally. - Reload every time because it needs to merge operations on the jobs object from other instances. + - During _on_timer execution, return the existing store to prevent concurrent + _load_store calls (e.g. from list_jobs polling) from replacing it mid-execution. """ + if self._timer_active and self._store: + return self._store jobs, version = self._load_jobs() self._store = CronStore(version=version, jobs=jobs) self._merge_action() @@ -290,18 +295,23 @@ class CronService: """Handle timer tick - run due jobs.""" self._load_store() if not self._store: + self._arm_timer() return - now = _now_ms() - due_jobs = [ - j for j in self._store.jobs - if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms - ] + self._timer_active = True + try: + now = _now_ms() + due_jobs = [ + j for j in self._store.jobs + if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms + ] - for job in due_jobs: - await self._execute_job(job) + for job in due_jobs: + await self._execute_job(job) - self._save_store() + self._save_store() + finally: + self._timer_active = False self._arm_timer() async def _execute_job(self, job: CronJob) -> None: