From e392c27f7e6981313198c078dd6b3ef01aeb4b1f Mon Sep 17 00:00:00 2001
From: 04cb <0x04cb@gmail.com>
Date: Sat, 11 Apr 2026 00:47:23 +0800
Subject: [PATCH 01/13] fix(utils): anchor unclosed think-tag regex to string
start (#3004)
---
nanobot/utils/helpers.py | 4 ++--
tests/utils/test_strip_think.py | 29 +++++++++++++++++++++++++++++
2 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py
index 3b4f9f25a..1bfd9f18b 100644
--- a/nanobot/utils/helpers.py
+++ b/nanobot/utils/helpers.py
@@ -17,10 +17,10 @@ from loguru import logger
def strip_think(text: str) -> str:
"""Remove thinking blocks and any unclosed trailing tag."""
text = re.sub(r"[\s\S]*?", "", text)
- text = re.sub(r"[\s\S]*$", "", text)
+ text = re.sub(r"^\s*[\s\S]*$", "", text)
# Gemma 4 and similar models use ... blocks
text = re.sub(r"[\s\S]*?", "", text)
- text = re.sub(r"[\s\S]*$", "", text)
+ text = re.sub(r"^\s*[\s\S]*$", "", text)
return text.strip()
diff --git a/tests/utils/test_strip_think.py b/tests/utils/test_strip_think.py
index 6710dfc93..5828c6d1f 100644
--- a/tests/utils/test_strip_think.py
+++ b/tests/utils/test_strip_think.py
@@ -34,3 +34,32 @@ class TestStripThinkTag:
def test_empty_string(self):
assert strip_think("") == ""
+
+
+class TestStripThinkFalsePositive:
+ """Ensure mid-content / tags are NOT stripped (#3004)."""
+
+ def test_backtick_think_tag_preserved(self):
+ text = "*Think Stripping:* A new utility to strip `` tags from output."
+ assert strip_think(text) == text
+
+ def test_prose_think_tag_preserved(self):
+ text = "The model emits at the start of its response."
+ assert strip_think(text) == text
+
+ def test_code_block_think_tag_preserved(self):
+ text = "Example:\n```\ntext = re.sub(r\"[\\s\\S]*\", \"\", text)\n```\nDone."
+ assert strip_think(text) == text
+
+ def test_backtick_thought_tag_preserved(self):
+ text = "Gemma 4 uses `` blocks for reasoning."
+ assert strip_think(text) == text
+
+ def test_prefix_unclosed_think_still_stripped(self):
+ assert strip_think("reasoning without closing") == ""
+
+ def test_prefix_unclosed_think_with_whitespace(self):
+ assert strip_think(" reasoning...") == ""
+
+ def test_prefix_unclosed_thought_still_stripped(self):
+ assert strip_think("reasoning without closing") == ""
From b52bfddf16636f85d5805fce393721d49eb5223c Mon Sep 17 00:00:00 2001
From: Daniel Phang
Date: Sat, 11 Apr 2026 00:34:48 -0700
Subject: [PATCH 02/13] fix(cron): guard _load_store against reentrant reload
during job execution
When on_job callbacks call list_jobs() (which triggers _load_store),
the in-memory state is reloaded from disk, discarding the next_run_at_ms
updates that _on_timer is actively computing. This causes jobs to
re-trigger indefinitely on the next tick.
Add an _executing flag around the job execution loop. While set,
_load_store returns the cached store instead of reloading from disk.
Includes regression test.
Co-Authored-By: Claude Opus 4.6 (1M context)
---
nanobot/cron/service.py | 13 ++++++++--
tests/cron/test_cron_service.py | 46 +++++++++++++++++++++++++++++++++
2 files changed, 57 insertions(+), 2 deletions(-)
diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py
index 267613012..d3f74fbf8 100644
--- a/nanobot/cron/service.py
+++ b/nanobot/cron/service.py
@@ -80,6 +80,7 @@ class CronService:
self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None
self._running = False
+ self._executing = False
self.max_sleep_ms = max_sleep_ms
def _load_jobs(self) -> tuple[list[CronJob], int]:
@@ -171,7 +172,11 @@ class CronService:
def _load_store(self) -> CronStore:
"""Load jobs from disk. Reloads automatically if file was modified externally.
- Reload every time because it needs to merge operations on the jobs object from other instances.
+ - Skip reload when _executing to prevent on_job callbacks (e.g. list_jobs)
+ from replacing in-memory state that _on_timer is actively modifying.
"""
+ if self._executing and self._store is not None:
+ return self._store
jobs, version = self._load_jobs()
self._store = CronStore(version=version, jobs=jobs)
self._merge_action()
@@ -298,8 +303,12 @@ class CronService:
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
- for job in due_jobs:
- await self._execute_job(job)
+ self._executing = True
+ try:
+ for job in due_jobs:
+ await self._execute_job(job)
+ finally:
+ self._executing = False
self._save_store()
self._arm_timer()
diff --git a/tests/cron/test_cron_service.py b/tests/cron/test_cron_service.py
index f1956d8d2..4aa7fc06d 100644
--- a/tests/cron/test_cron_service.py
+++ b/tests/cron/test_cron_service.py
@@ -479,3 +479,49 @@ def test_update_job_sentinel_channel_and_to(tmp_path) -> None:
assert isinstance(result, CronJob)
assert result.payload.channel is None
assert result.payload.to is None
+
+
+@pytest.mark.asyncio
+async def test_list_jobs_during_on_job_does_not_cause_stale_reload(tmp_path) -> None:
+ """Regression: if the bot calls list_jobs (which reloads from disk) during
+ on_job execution, the in-memory next_run_at_ms update must not be lost.
+ Previously this caused an infinite re-trigger loop."""
+ store_path = tmp_path / "cron" / "jobs.json"
+ execution_count = 0
+
+ async def on_job_that_lists(job):
+ nonlocal execution_count
+ execution_count += 1
+ # Simulate the bot calling cron(action=list) mid-execution
+ service.list_jobs()
+
+ service = CronService(store_path, on_job=on_job_that_lists, max_sleep_ms=100)
+ await service.start()
+
+ # Add two jobs scheduled in the past so they're immediately due
+ now_ms = int(time.time() * 1000)
+ for name in ("job-a", "job-b"):
+ service.add_job(
+ name=name,
+ schedule=CronSchedule(kind="every", every_ms=3_600_000),
+ message="test",
+ )
+ # Force next_run to the past so _on_timer picks them up
+ for job in service._store.jobs:
+ job.state.next_run_at_ms = now_ms - 1000
+ service._save_store()
+ service._arm_timer()
+
+ # Let the timer fire once
+ await asyncio.sleep(0.3)
+ service.stop()
+
+ # Each job should have run exactly once, not looped
+ assert execution_count == 2
+
+ # Verify next_run_at_ms was persisted correctly (in the future)
+ raw = json.loads(store_path.read_text())
+ for j in raw["jobs"]:
+ next_run = j["state"]["nextRunAtMs"]
+ assert next_run is not None
+ assert next_run > now_ms, f"Job '{j['name']}' next_run should be in the future"
From fb6dd111e1c11afaaa2a7d7ea5ee4a65ce4787d1 Mon Sep 17 00:00:00 2001
From: chengyongru <61816729+chengyongru@users.noreply.github.com>
Date: Fri, 10 Apr 2026 17:43:42 +0800
Subject: [PATCH 03/13] =?UTF-8?q?feat(agent):=20auto=20compact=20=E2=80=94?=
=?UTF-8?q?=20proactive=20session=20compression=20to=20reduce=20token=20co?=
=?UTF-8?q?st=20and=20latency=20(#2982)?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the session context into a summary. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary and fresh input.
---
README.md | 26 +
nanobot/agent/auto_compact.py | 82 +++
nanobot/agent/context.py | 9 +-
nanobot/agent/loop.py | 38 +-
nanobot/agent/memory.py | 4 +
nanobot/cli/commands.py | 3 +
nanobot/config/schema.py | 1 +
nanobot/nanobot.py | 1 +
nanobot/session/manager.py | 3 +
tests/agent/test_auto_compact.py | 931 +++++++++++++++++++++++++++++++
10 files changed, 1091 insertions(+), 7 deletions(-)
create mode 100644 nanobot/agent/auto_compact.py
create mode 100644 tests/agent/test_auto_compact.py
diff --git a/README.md b/README.md
index 6098c55ca..a9bf4b5e3 100644
--- a/README.md
+++ b/README.md
@@ -1503,6 +1503,32 @@ MCP tools are automatically discovered and registered on startup. The LLM can us
**Docker security**: The official Docker image runs as a non-root user (`nanobot`, UID 1000) with bubblewrap pre-installed. When using `docker-compose.yml`, the container drops all Linux capabilities except `SYS_ADMIN` (required for bwrap's namespace isolation).
+### Auto Compact
+
+When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the session context into a summary. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary and fresh input.
+
+```json
+{
+ "agents": {
+ "defaults": {
+ "sessionTtlMinutes": 15
+ }
+ }
+}
+```
+
+| Option | Default | Description |
+|--------|---------|-------------|
+| `agents.defaults.sessionTtlMinutes` | `0` (disabled) | Minutes of idle time before auto-compaction. Set to `0` to disable. Recommended: `15` — matches typical LLM KV cache expiration, so compacted sessions won't waste cache on cold entries. |
+
+How it works:
+1. **Idle detection**: On each idle tick (~1 s), checks all sessions for expiration.
+2. **Background compaction**: Expired sessions are summarized via LLM, then cleared.
+3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted).
+
+> [!TIP]
+> The summary survives bot restarts — it's stored in session metadata and recovered on the next message.
+
### Timezone
Time is context. Context should be precise.
diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/auto_compact.py
new file mode 100644
index 000000000..171f5f55a
--- /dev/null
+++ b/nanobot/agent/auto_compact.py
@@ -0,0 +1,82 @@
+"""Auto compact: proactive compression of idle sessions to reduce token cost and latency."""
+
+from __future__ import annotations
+
+from datetime import datetime
+from typing import TYPE_CHECKING, Callable, Coroutine
+
+from loguru import logger
+
+if TYPE_CHECKING:
+ from nanobot.agent.memory import Consolidator
+ from nanobot.session.manager import Session, SessionManager
+
+
+class AutoCompact:
+ def __init__(self, sessions: SessionManager, consolidator: Consolidator,
+ session_ttl_minutes: int = 0):
+ self.sessions = sessions
+ self.consolidator = consolidator
+ self._ttl = session_ttl_minutes
+ self._archiving: set[str] = set()
+ self._summaries: dict[str, tuple[str, datetime]] = {}
+
+ def _is_expired(self, ts: datetime | str | None) -> bool:
+ if self._ttl <= 0 or not ts:
+ return False
+ if isinstance(ts, str):
+ ts = datetime.fromisoformat(ts)
+ return (datetime.now() - ts).total_seconds() >= self._ttl * 60
+
+ @staticmethod
+ def _format_summary(text: str, last_active: datetime) -> str:
+ idle_min = int((datetime.now() - last_active).total_seconds() / 60)
+ return f"Inactive for {idle_min} minutes.\nPrevious conversation summary: {text}"
+
+ def check_expired(self, schedule_background: Callable[[Coroutine], None]) -> None:
+ for info in self.sessions.list_sessions():
+ key = info.get("key", "")
+ if key and key not in self._archiving and self._is_expired(info.get("updated_at")):
+ self._archiving.add(key)
+ logger.debug("Auto-compact: scheduling archival for {} (idle > {} min)", key, self._ttl)
+ schedule_background(self._archive(key))
+
+ async def _archive(self, key: str) -> None:
+ try:
+ self.sessions.invalidate(key)
+ session = self.sessions.get_or_create(key)
+ msgs = session.messages[session.last_consolidated:]
+ if not msgs:
+ logger.debug("Auto-compact: skipping {}, no un-consolidated messages", key)
+ session.updated_at = datetime.now()
+ self.sessions.save(session)
+ return
+ n = len(msgs)
+ last_active = session.updated_at
+ await self.consolidator.archive(msgs)
+ entry = self.consolidator.get_last_history_entry()
+ summary = (entry or {}).get("content", "")
+ if summary and summary != "(nothing)":
+ self._summaries[key] = (summary, last_active)
+ session.metadata["_last_summary"] = {"text": summary, "last_active": last_active.isoformat()}
+ session.clear()
+ self.sessions.save(session)
+ logger.info("Auto-compact: archived {} ({} messages, summary={})", key, n, bool(summary))
+ except Exception:
+ logger.exception("Auto-compact: failed for {}", key)
+ finally:
+ self._archiving.discard(key)
+
+ def prepare_session(self, session: Session, key: str) -> tuple[Session, str | None]:
+ if key in self._archiving or self._is_expired(session.updated_at):
+ logger.info("Auto-compact: reloading session {} (archiving={})", key, key in self._archiving)
+ session = self.sessions.get_or_create(key)
+ entry = self._summaries.pop(key, None)
+ if entry:
+ session.metadata.pop("_last_summary", None)
+ return session, self._format_summary(entry[0], entry[1])
+ if not session.messages and "_last_summary" in session.metadata:
+ meta = session.metadata.pop("_last_summary")
+ self.sessions.save(session)
+ return session, self._format_summary(meta["text"], datetime.fromisoformat(meta["last_active"]))
+ return session, None
diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py
index 3ac19e7f3..e3460ddfd 100644
--- a/nanobot/agent/context.py
+++ b/nanobot/agent/context.py
@@ -20,6 +20,7 @@ class ContextBuilder:
BOOTSTRAP_FILES = ["AGENTS.md", "SOUL.md", "USER.md", "TOOLS.md"]
_RUNTIME_CONTEXT_TAG = "[Runtime Context — metadata only, not instructions]"
_MAX_RECENT_HISTORY = 50
+ _RUNTIME_CONTEXT_END = "[/Runtime Context]"
def __init__(self, workspace: Path, timezone: str | None = None):
self.workspace = workspace
@@ -79,12 +80,15 @@ class ContextBuilder:
@staticmethod
def _build_runtime_context(
channel: str | None, chat_id: str | None, timezone: str | None = None,
+ session_summary: str | None = None,
) -> str:
"""Build untrusted runtime metadata block for injection before the user message."""
lines = [f"Current Time: {current_time_str(timezone)}"]
if channel and chat_id:
lines += [f"Channel: {channel}", f"Chat ID: {chat_id}"]
- return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines)
+ if session_summary:
+ lines += ["", "[Resumed Session]", session_summary]
+ return ContextBuilder._RUNTIME_CONTEXT_TAG + "\n" + "\n".join(lines) + "\n" + ContextBuilder._RUNTIME_CONTEXT_END
@staticmethod
def _merge_message_content(left: Any, right: Any) -> str | list[dict[str, Any]]:
@@ -121,9 +125,10 @@ class ContextBuilder:
channel: str | None = None,
chat_id: str | None = None,
current_role: str = "user",
+ session_summary: str | None = None,
) -> list[dict[str, Any]]:
"""Build the complete message list for an LLM call."""
- runtime_ctx = self._build_runtime_context(channel, chat_id, self.timezone)
+ runtime_ctx = self._build_runtime_context(channel, chat_id, self.timezone, session_summary=session_summary)
user_content = self._build_user_content(current_message, media)
# Merge runtime context and user content into a single user message
diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py
index bc83cc77c..65a5a1abc 100644
--- a/nanobot/agent/loop.py
+++ b/nanobot/agent/loop.py
@@ -13,6 +13,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger
+from nanobot.agent.auto_compact import AutoCompact
from nanobot.agent.context import ContextBuilder
from nanobot.agent.hook import AgentHook, AgentHookContext, CompositeHook
from nanobot.agent.memory import Consolidator, Dream
@@ -145,6 +146,7 @@ class AgentLoop:
mcp_servers: dict | None = None,
channels_config: ChannelsConfig | None = None,
timezone: str | None = None,
+ session_ttl_minutes: int = 0,
hooks: list[AgentHook] | None = None,
unified_session: bool = False,
):
@@ -217,6 +219,11 @@ class AgentLoop:
get_tool_definitions=self.tools.get_definitions,
max_completion_tokens=provider.generation.max_tokens,
)
+ self.auto_compact = AutoCompact(
+ sessions=self.sessions,
+ consolidator=self.consolidator,
+ session_ttl_minutes=session_ttl_minutes,
+ )
self.dream = Dream(
store=self.context.memory,
provider=provider,
@@ -371,6 +378,7 @@ class AgentLoop:
try:
msg = await asyncio.wait_for(self.bus.consume_inbound(), timeout=1.0)
except asyncio.TimeoutError:
+ self.auto_compact.check_expired(self._schedule_background)
continue
except asyncio.CancelledError:
# Preserve real task cancellation so shutdown can complete cleanly.
@@ -497,13 +505,18 @@ class AgentLoop:
session = self.sessions.get_or_create(key)
if self._restore_runtime_checkpoint(session):
self.sessions.save(session)
+
+ session, pending = self.auto_compact.prepare_session(session, key)
+
await self.consolidator.maybe_consolidate_by_tokens(session)
self._set_tool_context(channel, chat_id, msg.metadata.get("message_id"))
history = session.get_history(max_messages=0)
current_role = "assistant" if msg.sender_id == "subagent" else "user"
+
messages = self.context.build_messages(
history=history,
current_message=msg.content, channel=channel, chat_id=chat_id,
+ session_summary=pending,
current_role=current_role,
)
final_content, _, all_msgs, _ = await self._run_agent_loop(
@@ -525,6 +538,8 @@ class AgentLoop:
if self._restore_runtime_checkpoint(session):
self.sessions.save(session)
+ session, pending = self.auto_compact.prepare_session(session, key)
+
# Slash commands
raw = msg.content.strip()
ctx = CommandContext(msg=msg, session=session, key=key, raw=raw, loop=self)
@@ -539,9 +554,11 @@ class AgentLoop:
message_tool.start_turn()
history = session.get_history(max_messages=0)
+
initial_messages = self.context.build_messages(
history=history,
current_message=msg.content,
+ session_summary=pending,
media=msg.media if msg.media else None,
channel=msg.channel, chat_id=msg.chat_id,
)
@@ -645,12 +662,23 @@ class AgentLoop:
entry["content"] = filtered
elif role == "user":
if isinstance(content, str) and content.startswith(ContextBuilder._RUNTIME_CONTEXT_TAG):
- # Strip the runtime-context prefix, keep only the user text.
- parts = content.split("\n\n", 1)
- if len(parts) > 1 and parts[1].strip():
- entry["content"] = parts[1]
+ # Strip the entire runtime-context block (including any session summary).
+ # The block is bounded by _RUNTIME_CONTEXT_TAG and _RUNTIME_CONTEXT_END.
+ end_marker = ContextBuilder._RUNTIME_CONTEXT_END
+ end_pos = content.find(end_marker)
+ if end_pos >= 0:
+ after = content[end_pos + len(end_marker):].lstrip("\n")
+ if after:
+ entry["content"] = after
+ else:
+ continue
else:
- continue
+ # Fallback: no end marker found, strip the tag prefix
+ after_tag = content[len(ContextBuilder._RUNTIME_CONTEXT_TAG):].lstrip("\n")
+ if after_tag.strip():
+ entry["content"] = after_tag
+ else:
+ continue
if isinstance(content, list):
filtered = self._sanitize_persisted_blocks(content, drop_runtime=True)
if not filtered:
diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py
index 943d91855..26c5cd45f 100644
--- a/nanobot/agent/memory.py
+++ b/nanobot/agent/memory.py
@@ -374,6 +374,10 @@ class Consolidator:
weakref.WeakValueDictionary()
)
+ def get_last_history_entry(self) -> dict[str, Any] | None:
+ """Return the most recent entry from history.jsonl."""
+ return self.store._read_last_entry()
+
def get_lock(self, session_key: str) -> asyncio.Lock:
"""Return the shared consolidation lock for one session."""
return self._locks.setdefault(session_key, asyncio.Lock())
diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py
index 5ce8b7937..9d818a9db 100644
--- a/nanobot/cli/commands.py
+++ b/nanobot/cli/commands.py
@@ -591,6 +591,7 @@ def serve(
channels_config=runtime_config.channels,
timezone=runtime_config.agents.defaults.timezone,
unified_session=runtime_config.agents.defaults.unified_session,
+ session_ttl_minutes=runtime_config.agents.defaults.session_ttl_minutes,
)
model_name = runtime_config.agents.defaults.model
@@ -683,6 +684,7 @@ def gateway(
channels_config=config.channels,
timezone=config.agents.defaults.timezone,
unified_session=config.agents.defaults.unified_session,
+ session_ttl_minutes=config.agents.defaults.session_ttl_minutes,
)
# Set cron callback (needs agent)
@@ -915,6 +917,7 @@ def agent(
channels_config=config.channels,
timezone=config.agents.defaults.timezone,
unified_session=config.agents.defaults.unified_session,
+ session_ttl_minutes=config.agents.defaults.session_ttl_minutes,
)
restart_notice = consume_restart_notice_from_env()
if restart_notice and should_show_cli_restart_notice(restart_notice, session_id):
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index 2d31c8bf9..8ab68d7b5 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -77,6 +77,7 @@ class AgentDefaults(Base):
reasoning_effort: str | None = None # low / medium / high / adaptive - enables LLM thinking mode
timezone: str = "UTC" # IANA timezone, e.g. "Asia/Shanghai", "America/New_York"
unified_session: bool = False # Share one session across all channels (single-user multi-device)
+ session_ttl_minutes: int = Field(default=0, ge=0) # Auto /new after idle (0 = disabled)
dream: DreamConfig = Field(default_factory=DreamConfig)
diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py
index 9166acb27..df0e49842 100644
--- a/nanobot/nanobot.py
+++ b/nanobot/nanobot.py
@@ -82,6 +82,7 @@ class Nanobot:
mcp_servers=config.tools.mcp_servers,
timezone=defaults.timezone,
unified_session=defaults.unified_session,
+ session_ttl_minutes=defaults.session_ttl_minutes,
)
return cls(loop)
diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py
index 27df31405..2ed0624a2 100644
--- a/nanobot/session/manager.py
+++ b/nanobot/session/manager.py
@@ -155,6 +155,7 @@ class SessionManager:
messages = []
metadata = {}
created_at = None
+ updated_at = None
last_consolidated = 0
with open(path, encoding="utf-8") as f:
@@ -168,6 +169,7 @@ class SessionManager:
if data.get("_type") == "metadata":
metadata = data.get("metadata", {})
created_at = datetime.fromisoformat(data["created_at"]) if data.get("created_at") else None
+ updated_at = datetime.fromisoformat(data["updated_at"]) if data.get("updated_at") else None
last_consolidated = data.get("last_consolidated", 0)
else:
messages.append(data)
@@ -176,6 +178,7 @@ class SessionManager:
key=key,
messages=messages,
created_at=created_at or datetime.now(),
+ updated_at=updated_at or datetime.now(),
metadata=metadata,
last_consolidated=last_consolidated
)
diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py
new file mode 100644
index 000000000..8b26254e9
--- /dev/null
+++ b/tests/agent/test_auto_compact.py
@@ -0,0 +1,931 @@
+"""Tests for auto compact (idle TTL) feature."""
+
+import asyncio
+from datetime import datetime, timedelta
+from unittest.mock import AsyncMock, MagicMock
+from pathlib import Path
+
+import pytest
+
+from nanobot.agent.loop import AgentLoop
+from nanobot.bus.events import InboundMessage
+from nanobot.bus.queue import MessageBus
+from nanobot.config.schema import AgentDefaults
+from nanobot.command import CommandContext
+from nanobot.providers.base import LLMResponse
+
+
+def _make_loop(tmp_path: Path, session_ttl_minutes: int = 15) -> AgentLoop:
+ """Create a minimal AgentLoop for testing."""
+ bus = MessageBus()
+ provider = MagicMock()
+ provider.get_default_model.return_value = "test-model"
+ provider.estimate_prompt_tokens.return_value = (10_000, "test")
+ provider.chat_with_retry = AsyncMock(return_value=LLMResponse(content="ok", tool_calls=[]))
+ provider.generation.max_tokens = 4096
+ loop = AgentLoop(
+ bus=bus,
+ provider=provider,
+ workspace=tmp_path,
+ model="test-model",
+ context_window_tokens=128_000,
+ session_ttl_minutes=session_ttl_minutes,
+ )
+ loop.tools.get_definitions = MagicMock(return_value=[])
+ return loop
+
+
+class TestSessionTTLConfig:
+ """Test session TTL configuration."""
+
+ def test_default_ttl_is_zero(self):
+ """Default TTL should be 0 (disabled)."""
+ defaults = AgentDefaults()
+ assert defaults.session_ttl_minutes == 0
+
+ def test_custom_ttl(self):
+ """Custom TTL should be stored correctly."""
+ defaults = AgentDefaults(session_ttl_minutes=30)
+ assert defaults.session_ttl_minutes == 30
+
+
+class TestAgentLoopTTLParam:
+ """Test that AutoCompact receives and stores session_ttl_minutes."""
+
+ def test_loop_stores_ttl(self, tmp_path):
+ """AutoCompact should store the TTL value."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=25)
+ assert loop.auto_compact._ttl == 25
+
+ def test_loop_default_ttl_zero(self, tmp_path):
+ """AutoCompact default TTL should be 0 (disabled)."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=0)
+ assert loop.auto_compact._ttl == 0
+
+
+class TestAutoCompact:
+ """Test the _archive method."""
+
+ @pytest.mark.asyncio
+ async def test_is_expired_boundary(self, tmp_path):
+ """Exactly at TTL boundary should be expired (>= not >)."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ ts = datetime.now() - timedelta(minutes=15)
+ assert loop.auto_compact._is_expired(ts) is True
+ ts2 = datetime.now() - timedelta(minutes=14, seconds=59)
+ assert loop.auto_compact._is_expired(ts2) is False
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_is_expired_string_timestamp(self, tmp_path):
+ """_is_expired should parse ISO string timestamps."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ ts = (datetime.now() - timedelta(minutes=20)).isoformat()
+ assert loop.auto_compact._is_expired(ts) is True
+ assert loop.auto_compact._is_expired(None) is False
+ assert loop.auto_compact._is_expired("") is False
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_check_expired_only_archives_expired_sessions(self, tmp_path):
+ """With multiple sessions, only the expired one should be archived."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ # Expired session
+ s1 = loop.sessions.get_or_create("cli:expired")
+ s1.add_message("user", "old")
+ s1.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(s1)
+ # Active session
+ s2 = loop.sessions.get_or_create("cli:active")
+ s2.add_message("user", "recent")
+ loop.sessions.save(s2)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.auto_compact.check_expired(loop._schedule_background)
+ await asyncio.sleep(0.1)
+
+ active_after = loop.sessions.get_or_create("cli:active")
+ assert len(active_after.messages) == 1
+ assert active_after.messages[0]["content"] == "recent"
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_archives_and_clears(self, tmp_path):
+ """_archive should archive un-consolidated messages and clear session."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ for i in range(4):
+ session.add_message("user", f"msg{i}")
+ session.add_message("assistant", f"resp{i}")
+ loop.sessions.save(session)
+
+ archived_messages = []
+
+ async def _fake_archive(messages):
+ archived_messages.extend(messages)
+ return True
+
+ loop.consolidator.archive = _fake_archive
+
+ await loop.auto_compact._archive("cli:test")
+
+ assert len(archived_messages) == 8
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert len(session_after.messages) == 0
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_stores_summary(self, tmp_path):
+ """_archive should store the summary in _summaries."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "hello")
+ session.add_message("assistant", "hi there")
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
+ }
+
+ await loop.auto_compact._archive("cli:test")
+
+ entry = loop.auto_compact._summaries.get("cli:test")
+ assert entry is not None
+ assert entry[0] == "User said hello."
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert len(session_after.messages) == 0
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_empty_session(self, tmp_path):
+ """_archive on empty session should not archive."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+
+ archive_called = False
+
+ async def _fake_archive(messages):
+ nonlocal archive_called
+ archive_called = True
+ return True
+
+ loop.consolidator.archive = _fake_archive
+
+ await loop.auto_compact._archive("cli:test")
+
+ assert not archive_called
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert len(session_after.messages) == 0
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_respects_last_consolidated(self, tmp_path):
+ """_archive should only archive un-consolidated messages."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ for i in range(10):
+ session.add_message("user", f"msg{i}")
+ session.add_message("assistant", f"resp{i}")
+ session.last_consolidated = 18
+ loop.sessions.save(session)
+
+ archived_count = 0
+
+ async def _fake_archive(messages):
+ nonlocal archived_count
+ archived_count = len(messages)
+ return True
+
+ loop.consolidator.archive = _fake_archive
+
+ await loop.auto_compact._archive("cli:test")
+
+ assert archived_count == 2
+ await loop.close_mcp()
+
+
+class TestAutoCompactIdleDetection:
+ """Test idle detection triggers auto-new in _process_message."""
+
+ @pytest.mark.asyncio
+ async def test_no_auto_compact_when_ttl_disabled(self, tmp_path):
+ """No auto-new should happen when TTL is 0 (disabled)."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=0)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.updated_at = datetime.now() - timedelta(minutes=30)
+ loop.sessions.save(session)
+
+ msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new msg")
+ await loop._process_message(msg)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert any(m["content"] == "old message" for m in session_after.messages)
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_triggers_on_idle(self, tmp_path):
+ """Proactive auto-new archives expired session; _process_message reloads it."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archived_messages = []
+
+ async def _fake_archive(messages):
+ archived_messages.extend(messages)
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ # Simulate proactive archive completing before message arrives
+ await loop.auto_compact._archive("cli:test")
+
+ msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new msg")
+ await loop._process_message(msg)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert not any(m["content"] == "old message" for m in session_after.messages)
+ assert any(m["content"] == "new msg" for m in session_after.messages)
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_no_auto_compact_when_active(self, tmp_path):
+ """No auto-new should happen when session is recently active."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "recent message")
+ loop.sessions.save(session)
+
+ msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new msg")
+ await loop._process_message(msg)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert any(m["content"] == "recent message" for m in session_after.messages)
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_does_not_affect_priority_commands(self, tmp_path):
+ """Priority commands (/stop, /restart) bypass _process_message entirely via run()."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ # Priority commands are dispatched in run() before _process_message is called.
+ # Simulate that path directly via dispatch_priority.
+ raw = "/stop"
+ msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content=raw)
+ ctx = CommandContext(msg=msg, session=session, key="cli:test", raw=raw, loop=loop)
+ result = await loop.commands.dispatch_priority(ctx)
+ assert result is not None
+ assert "stopped" in result.content.lower() or "no active task" in result.content.lower()
+
+ # Session should be untouched since priority commands skip _process_message
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert any(m["content"] == "old message" for m in session_after.messages)
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_with_slash_new(self, tmp_path):
+ """Auto-new fires before /new dispatches; session is cleared twice but idempotent."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ for i in range(4):
+ session.add_message("user", f"msg{i}")
+ session.add_message("assistant", f"resp{i}")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+
+ msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="/new")
+ response = await loop._process_message(msg)
+
+ assert response is not None
+ assert "new session started" in response.content.lower()
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ # Session is empty (auto-new archived and cleared, /new cleared again)
+ assert len(session_after.messages) == 0
+ await loop.close_mcp()
+
+
+class TestAutoCompactSystemMessages:
+ """Test that auto-new also works for system messages."""
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_triggers_for_system_messages(self, tmp_path):
+ """Proactive auto-new archives expired session; system messages reload it."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message from subagent context")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ # Simulate proactive archive completing before system message arrives
+ await loop.auto_compact._archive("cli:test")
+
+ msg = InboundMessage(
+ channel="system", sender_id="subagent", chat_id="cli:test",
+ content="subagent result",
+ )
+ await loop._process_message(msg)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert not any(
+ m["content"] == "old message from subagent context"
+ for m in session_after.messages
+ )
+ await loop.close_mcp()
+
+
+class TestAutoCompactEdgeCases:
+ """Edge cases for auto session new."""
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_with_nothing_summary(self, tmp_path):
+ """Auto-new should not inject when archive produces '(nothing)'."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "thanks")
+ session.add_message("assistant", "you're welcome")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ loop.provider.chat_with_retry = AsyncMock(
+ return_value=LLMResponse(content="(nothing)", tool_calls=[])
+ )
+
+ await loop.auto_compact._archive("cli:test")
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert len(session_after.messages) == 0
+ # "(nothing)" summary should not be stored
+ assert "cli:test" not in loop.auto_compact._summaries
+
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_archive_failure_still_clears(self, tmp_path):
+ """Auto-new should clear session even if LLM archive fails (raw_archive fallback)."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "important data")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ loop.provider.chat_with_retry = AsyncMock(side_effect=Exception("API down"))
+
+ # Should not raise
+ await loop.auto_compact._archive("cli:test")
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ # Session should be cleared (archive falls back to raw dump)
+ assert len(session_after.messages) == 0
+
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_auto_compact_preserves_runtime_checkpoint_before_check(self, tmp_path):
+ """Runtime checkpoint is restored; proactive archive handles the expired session."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.metadata[AgentLoop._RUNTIME_CHECKPOINT_KEY] = {
+ "assistant_message": {"role": "assistant", "content": "interrupted response"},
+ "completed_tool_results": [],
+ "pending_tool_calls": [],
+ }
+ session.add_message("user", "previous message")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archived_messages = []
+
+ async def _fake_archive(messages):
+ archived_messages.extend(messages)
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ # Simulate proactive archive completing before message arrives
+ await loop.auto_compact._archive("cli:test")
+
+ msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="continue")
+ await loop._process_message(msg)
+
+ # The checkpoint-restored message should have been archived by proactive path
+ assert len(archived_messages) >= 1
+
+ await loop.close_mcp()
+
+
+class TestAutoCompactIntegration:
+ """End-to-end test of auto session new feature."""
+
+ @pytest.mark.asyncio
+ async def test_full_lifecycle(self, tmp_path):
+ """
+ Full lifecycle: messages -> idle -> auto-new -> archive -> clear -> summary injected as runtime context.
+ """
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+
+ # Phase 1: User has a conversation
+ session.add_message("user", "I'm learning English, teach me past tense")
+ session.add_message("assistant", "Past tense is used for actions completed in the past...")
+ session.add_message("user", "Give me an example")
+ session.add_message("assistant", '"I walked to the store yesterday."')
+ loop.sessions.save(session)
+
+ # Phase 2: Time passes (simulate idle)
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ # Phase 3: User returns with a new message
+ loop.provider.chat_with_retry = AsyncMock(
+ return_value=LLMResponse(
+ content="User is learning English past tense. Example: 'I walked to the store yesterday.'",
+ tool_calls=[],
+ )
+ )
+
+ msg = InboundMessage(
+ channel="cli", sender_id="user", chat_id="test",
+ content="Let's continue, teach me present perfect",
+ )
+ response = await loop._process_message(msg)
+
+ # Phase 4: Verify
+ session_after = loop.sessions.get_or_create("cli:test")
+
+ # Old messages should be gone
+ assert not any(
+ "past tense is used" in str(m.get("content", "")) for m in session_after.messages
+ )
+
+ # Summary should NOT be persisted in session (ephemeral, one-shot)
+ assert not any(
+ "[Resumed Session]" in str(m.get("content", "")) for m in session_after.messages
+ )
+ # Runtime context end marker should NOT be persisted
+ assert not any(
+ "[/Runtime Context]" in str(m.get("content", "")) for m in session_after.messages
+ )
+
+ # Pending summary should be consumed (one-shot)
+ assert "cli:test" not in loop.auto_compact._summaries
+
+ # The new message should be processed (response exists)
+ assert response is not None
+
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_multi_paragraph_user_message_preserved(self, tmp_path):
+ """Multi-paragraph user messages must be fully preserved after auto-new."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ # Simulate proactive archive completing before message arrives
+ await loop.auto_compact._archive("cli:test")
+
+ msg = InboundMessage(
+ channel="cli", sender_id="user", chat_id="test",
+ content="Paragraph one\n\nParagraph two\n\nParagraph three",
+ )
+ await loop._process_message(msg)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ user_msgs = [m for m in session_after.messages if m.get("role") == "user"]
+ assert len(user_msgs) >= 1
+ # All three paragraphs must be preserved
+ persisted = user_msgs[-1]["content"]
+ assert "Paragraph one" in persisted
+ assert "Paragraph two" in persisted
+ assert "Paragraph three" in persisted
+ # No runtime context markers in persisted message
+ assert "[Runtime Context" not in persisted
+ assert "[/Runtime Context]" not in persisted
+ await loop.close_mcp()
+
+
+class TestProactiveAutoCompact:
+ """Test proactive auto-new on idle ticks (TimeoutError path in run loop)."""
+
+ @staticmethod
+ async def _run_check_expired(loop):
+ """Helper: run check_expired via callback and wait for background tasks."""
+ loop.auto_compact.check_expired(loop._schedule_background)
+ await asyncio.sleep(0.1)
+
+ @pytest.mark.asyncio
+ async def test_no_check_when_ttl_disabled(self, tmp_path):
+ """check_expired should be a no-op when TTL is 0."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=0)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.updated_at = datetime.now() - timedelta(minutes=30)
+ loop.sessions.save(session)
+
+ await self._run_check_expired(loop)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert len(session_after.messages) == 1
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_proactive_archive_on_idle_tick(self, tmp_path):
+ """Expired session should be archived during idle tick."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.add_message("assistant", "old response")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archived_messages = []
+
+ async def _fake_archive(messages):
+ archived_messages.extend(messages)
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User chatted about old things.",
+ }
+
+ await self._run_check_expired(loop)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert len(session_after.messages) == 0
+ assert len(archived_messages) == 2
+ entry = loop.auto_compact._summaries.get("cli:test")
+ assert entry is not None
+ assert entry[0] == "User chatted about old things."
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_no_proactive_archive_when_active(self, tmp_path):
+ """Recently active session should NOT be archived on idle tick."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "recent message")
+ loop.sessions.save(session)
+
+ await self._run_check_expired(loop)
+
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert len(session_after.messages) == 1
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_no_duplicate_archive(self, tmp_path):
+ """Should not archive the same session twice if already in progress."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archive_count = 0
+ started = asyncio.Event()
+ block_forever = asyncio.Event()
+
+ async def _slow_archive(messages):
+ nonlocal archive_count
+ archive_count += 1
+ started.set()
+ await block_forever.wait()
+ return True
+
+ loop.consolidator.archive = _slow_archive
+
+ # First call starts archiving via callback
+ loop.auto_compact.check_expired(loop._schedule_background)
+ await started.wait()
+ assert archive_count == 1
+
+ # Second call should skip (key is in _archiving)
+ loop.auto_compact.check_expired(loop._schedule_background)
+ await asyncio.sleep(0.05)
+ assert archive_count == 1
+
+ # Clean up
+ block_forever.set()
+ await asyncio.sleep(0.1)
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_proactive_archive_error_does_not_block(self, tmp_path):
+ """Proactive archive failure should be caught and not block future ticks."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ async def _failing_archive(messages):
+ raise RuntimeError("LLM down")
+
+ loop.consolidator.archive = _failing_archive
+
+ # Should not raise
+ await self._run_check_expired(loop)
+
+ # Key should be removed from _archiving (finally block)
+ assert "cli:test" not in loop.auto_compact._archiving
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_proactive_archive_skips_empty_sessions(self, tmp_path):
+ """Proactive archive should not call LLM for sessions with no un-consolidated messages."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archive_called = False
+
+ async def _fake_archive(messages):
+ nonlocal archive_called
+ archive_called = True
+ return True
+
+ loop.consolidator.archive = _fake_archive
+
+ await self._run_check_expired(loop)
+
+ assert not archive_called
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_no_reschedule_after_successful_archive(self, tmp_path):
+ """Already-archived session should NOT be re-scheduled on subsequent ticks."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "old message")
+ session.add_message("assistant", "old response")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archive_count = 0
+
+ async def _fake_archive(messages):
+ nonlocal archive_count
+ archive_count += 1
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ # First tick: archives the session
+ await self._run_check_expired(loop)
+ assert archive_count == 1
+
+ # Second tick: should NOT re-schedule (updated_at is fresh after clear)
+ await self._run_check_expired(loop)
+ assert archive_count == 1 # Still 1, not re-scheduled
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_empty_skip_refreshes_updated_at_prevents_reschedule(self, tmp_path):
+ """Empty session skip refreshes updated_at, preventing immediate re-scheduling."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archive_count = 0
+
+ async def _fake_archive(messages):
+ nonlocal archive_count
+ archive_count += 1
+ return True
+
+ loop.consolidator.archive = _fake_archive
+
+ # First tick: skips (no messages), refreshes updated_at
+ await self._run_check_expired(loop)
+ assert archive_count == 0
+
+ # Second tick: should NOT re-schedule because updated_at is fresh
+ await self._run_check_expired(loop)
+ assert archive_count == 0
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_session_can_be_compacted_again_after_new_messages(self, tmp_path):
+ """After successful compact + user sends new messages + idle again, should compact again."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "first conversation")
+ session.add_message("assistant", "first response")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ archive_count = 0
+
+ async def _fake_archive(messages):
+ nonlocal archive_count
+ archive_count += 1
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ # First compact cycle
+ await loop.auto_compact._archive("cli:test")
+ assert archive_count == 1
+
+ # User returns, sends new messages
+ msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="second topic")
+ await loop._process_message(msg)
+
+ # Simulate idle again
+ loop.sessions.invalidate("cli:test")
+ session2 = loop.sessions.get_or_create("cli:test")
+ session2.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session2)
+
+ # Second compact cycle should succeed
+ await loop.auto_compact._archive("cli:test")
+ assert archive_count == 2
+ await loop.close_mcp()
+
+
+class TestSummaryPersistence:
+ """Test that summary survives restart via session metadata."""
+
+ @pytest.mark.asyncio
+ async def test_summary_persisted_in_session_metadata(self, tmp_path):
+ """After archive, _last_summary should be in session metadata."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "hello")
+ session.add_message("assistant", "hi there")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
+ }
+
+ await loop.auto_compact._archive("cli:test")
+
+ # Summary should be persisted in session metadata
+ session_after = loop.sessions.get_or_create("cli:test")
+ meta = session_after.metadata.get("_last_summary")
+ assert meta is not None
+ assert meta["text"] == "User said hello."
+ assert "last_active" in meta
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_summary_recovered_after_restart(self, tmp_path):
+ """Summary should be recovered from metadata when _summaries is empty (simulates restart)."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "hello")
+ session.add_message("assistant", "hi there")
+ last_active = datetime.now() - timedelta(minutes=20)
+ session.updated_at = last_active
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
+ }
+
+ # Archive
+ await loop.auto_compact._archive("cli:test")
+
+ # Simulate restart: clear in-memory state
+ loop.auto_compact._summaries.clear()
+ loop.sessions.invalidate("cli:test")
+
+ # prepare_session should recover summary from metadata
+ reloaded = loop.sessions.get_or_create("cli:test")
+ _, summary = loop.auto_compact.prepare_session(reloaded, "cli:test")
+
+ assert summary is not None
+ assert "User said hello." in summary
+ assert "Inactive for" in summary
+ # Metadata should be cleaned up after consumption
+ assert "_last_summary" not in reloaded.metadata
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_metadata_cleanup_no_leak(self, tmp_path):
+ """_last_summary should be removed from metadata after being consumed."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "hello")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ await loop.auto_compact._archive("cli:test")
+
+ # Clear in-memory to force metadata path
+ loop.auto_compact._summaries.clear()
+ loop.sessions.invalidate("cli:test")
+ reloaded = loop.sessions.get_or_create("cli:test")
+
+ # First call: consumes from metadata
+ _, summary = loop.auto_compact.prepare_session(reloaded, "cli:test")
+ assert summary is not None
+
+ # Second call: no summary (already consumed)
+ _, summary2 = loop.auto_compact.prepare_session(reloaded, "cli:test")
+ assert summary2 is None
+ assert "_last_summary" not in reloaded.metadata
+ await loop.close_mcp()
+
+ @pytest.mark.asyncio
+ async def test_metadata_cleanup_on_inmemory_path(self, tmp_path):
+ """In-memory _summaries path should also clean up _last_summary from metadata."""
+ loop = _make_loop(tmp_path, session_ttl_minutes=15)
+ session = loop.sessions.get_or_create("cli:test")
+ session.add_message("user", "hello")
+ session.updated_at = datetime.now() - timedelta(minutes=20)
+ loop.sessions.save(session)
+
+ async def _fake_archive(messages):
+ return True
+
+ loop.consolidator.archive = _fake_archive
+ loop.consolidator.get_last_history_entry = lambda: {
+ "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
+ }
+
+ await loop.auto_compact._archive("cli:test")
+
+ # Both _summaries and metadata have the summary
+ assert "cli:test" in loop.auto_compact._summaries
+ loop.sessions.invalidate("cli:test")
+ reloaded = loop.sessions.get_or_create("cli:test")
+ assert "_last_summary" in reloaded.metadata
+
+ # In-memory path is taken (no restart)
+ _, summary = loop.auto_compact.prepare_session(reloaded, "cli:test")
+ assert summary is not None
+ # Metadata should also be cleaned up
+ assert "_last_summary" not in reloaded.metadata
+ await loop.close_mcp()
From 69d60e2b063b1b5b649a97320b36d8b7612b7f8a Mon Sep 17 00:00:00 2001
From: chengyongru
Date: Fri, 10 Apr 2026 18:03:36 +0800
Subject: [PATCH 04/13] fix(agent): handle UnicodeDecodeError in
_read_last_entry
history.jsonl may contain non-UTF-8 bytes (e.g. from email channel
binary content), causing auto compact to fail when reading the last
entry for summary generation. Catch UnicodeDecodeError alongside
FileNotFoundError and JSONDecodeError.
---
nanobot/agent/memory.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py
index 26c5cd45f..e9662ff2c 100644
--- a/nanobot/agent/memory.py
+++ b/nanobot/agent/memory.py
@@ -290,7 +290,7 @@ class MemoryStore:
if not lines:
return None
return json.loads(lines[-1])
- except (FileNotFoundError, json.JSONDecodeError):
+ except (FileNotFoundError, json.JSONDecodeError, UnicodeDecodeError):
return None
def _write_entries(self, entries: list[dict[str, Any]]) -> None:
From d03458f0346ffbc59996e265dece8520e79e974b Mon Sep 17 00:00:00 2001
From: chengyongru
Date: Fri, 10 Apr 2026 18:14:14 +0800
Subject: [PATCH 05/13] fix(agent): eliminate race condition in auto compact
summary retrieval
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
Make Consolidator.archive() return the summary string directly instead
of writing to history.jsonl then reading back via get_last_history_entry().
This eliminates a race condition where concurrent _archive calls for
different sessions could read each other's summaries from the shared
history file (cross-user context leak in multi-user deployments).
Also removes Consolidator.get_last_history_entry() — no longer needed.
---
nanobot/agent/auto_compact.py | 6 +--
nanobot/agent/memory.py | 14 +++---
tests/agent/test_auto_compact.py | 76 +++++++++-----------------------
tests/agent/test_consolidator.py | 6 +--
4 files changed, 31 insertions(+), 71 deletions(-)
diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/auto_compact.py
index 171f5f55a..f30feac17 100644
--- a/nanobot/agent/auto_compact.py
+++ b/nanobot/agent/auto_compact.py
@@ -53,9 +53,7 @@ class AutoCompact:
return
n = len(msgs)
last_active = session.updated_at
- await self.consolidator.archive(msgs)
- entry = self.consolidator.get_last_history_entry()
- summary = (entry or {}).get("content", "")
+ summary = await self.consolidator.archive(msgs) or ""
if summary and summary != "(nothing)":
self._summaries[key] = (summary, last_active)
session.metadata["_last_summary"] = {"text": summary, "last_active": last_active.isoformat()}
@@ -71,6 +69,8 @@ class AutoCompact:
if key in self._archiving or self._is_expired(session.updated_at):
logger.info("Auto-compact: reloading session {} (archiving={})", key, key in self._archiving)
session = self.sessions.get_or_create(key)
+ # Hot path: summary from in-memory dict (process hasn't restarted).
+ # Also clean metadata copy so stale _last_summary never leaks to disk.
entry = self._summaries.pop(key, None)
if entry:
session.metadata.pop("_last_summary", None)
diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py
index e9662ff2c..04d988ee5 100644
--- a/nanobot/agent/memory.py
+++ b/nanobot/agent/memory.py
@@ -374,10 +374,6 @@ class Consolidator:
weakref.WeakValueDictionary()
)
- def get_last_history_entry(self) -> dict[str, Any] | None:
- """Return the most recent entry from history.jsonl."""
- return self.store._read_last_entry()
-
def get_lock(self, session_key: str) -> asyncio.Lock:
"""Return the shared consolidation lock for one session."""
return self._locks.setdefault(session_key, asyncio.Lock())
@@ -437,13 +433,13 @@ class Consolidator:
self._get_tool_definitions(),
)
- async def archive(self, messages: list[dict]) -> bool:
+ async def archive(self, messages: list[dict]) -> str | None:
"""Summarize messages via LLM and append to history.jsonl.
- Returns True on success (or degraded success), False if nothing to do.
+ Returns the summary text on success, None if nothing to archive.
"""
if not messages:
- return False
+ return None
try:
formatted = MemoryStore._format_messages(messages)
response = await self.provider.chat_with_retry(
@@ -463,11 +459,11 @@ class Consolidator:
)
summary = response.content or "[no summary]"
self.store.append_history(summary)
- return True
+ return summary
except Exception:
logger.warning("Consolidation LLM call failed, raw-dumping to history")
self.store.raw_archive(messages)
- return True
+ return None
async def maybe_consolidate_by_tokens(self, session: Session) -> None:
"""Loop: archive old messages until prompt fits within safe budget.
diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py
index 8b26254e9..39792e290 100644
--- a/tests/agent/test_auto_compact.py
+++ b/tests/agent/test_auto_compact.py
@@ -101,7 +101,7 @@ class TestAutoCompact:
loop.sessions.save(s2)
async def _fake_archive(messages):
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
loop.auto_compact.check_expired(loop._schedule_background)
@@ -126,7 +126,7 @@ class TestAutoCompact:
async def _fake_archive(messages):
archived_messages.extend(messages)
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
@@ -147,12 +147,9 @@ class TestAutoCompact:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "User said hello."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
- }
await loop.auto_compact._archive("cli:test")
@@ -174,7 +171,7 @@ class TestAutoCompact:
async def _fake_archive(messages):
nonlocal archive_called
archive_called = True
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
@@ -201,7 +198,7 @@ class TestAutoCompact:
async def _fake_archive(messages):
nonlocal archived_count
archived_count = len(messages)
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
@@ -243,12 +240,9 @@ class TestAutoCompactIdleDetection:
async def _fake_archive(messages):
archived_messages.extend(messages)
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
# Simulate proactive archive completing before message arrives
await loop.auto_compact._archive("cli:test")
@@ -311,7 +305,7 @@ class TestAutoCompactIdleDetection:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
@@ -340,12 +334,9 @@ class TestAutoCompactSystemMessages:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
# Simulate proactive archive completing before system message arrives
await loop.auto_compact._archive("cli:test")
@@ -428,12 +419,9 @@ class TestAutoCompactEdgeCases:
async def _fake_archive(messages):
archived_messages.extend(messages)
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
# Simulate proactive archive completing before message arrives
await loop.auto_compact._archive("cli:test")
@@ -518,12 +506,9 @@ class TestAutoCompactIntegration:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
# Simulate proactive archive completing before message arrives
await loop.auto_compact._archive("cli:test")
@@ -586,12 +571,9 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
archived_messages.extend(messages)
- return True
+ return "User chatted about old things."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User chatted about old things.",
- }
await self._run_check_expired(loop)
@@ -635,7 +617,7 @@ class TestProactiveAutoCompact:
archive_count += 1
started.set()
await block_forever.wait()
- return True
+ return "Summary."
loop.consolidator.archive = _slow_archive
@@ -688,7 +670,7 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_called
archive_called = True
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
@@ -712,12 +694,9 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_count
archive_count += 1
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
# First tick: archives the session
await self._run_check_expired(loop)
@@ -741,7 +720,7 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_count
archive_count += 1
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
@@ -769,12 +748,9 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_count
archive_count += 1
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
# First compact cycle
await loop.auto_compact._archive("cli:test")
@@ -810,12 +786,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "User said hello."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
- }
await loop.auto_compact._archive("cli:test")
@@ -839,12 +812,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "User said hello."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
- }
# Archive
await loop.auto_compact._archive("cli:test")
@@ -874,12 +844,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
await loop.auto_compact._archive("cli:test")
@@ -908,12 +875,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
- return True
+ return "Summary."
loop.consolidator.archive = _fake_archive
- loop.consolidator.get_last_history_entry = lambda: {
- "cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
- }
await loop.auto_compact._archive("cli:test")
diff --git a/tests/agent/test_consolidator.py b/tests/agent/test_consolidator.py
index b7989d9dd..28587e1b4 100644
--- a/tests/agent/test_consolidator.py
+++ b/tests/agent/test_consolidator.py
@@ -46,7 +46,7 @@ class TestConsolidatorSummarize:
{"role": "assistant", "content": "Done, fixed the race condition."},
]
result = await consolidator.archive(messages)
- assert result is True
+ assert result == "User fixed a bug in the auth module."
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 1
@@ -55,14 +55,14 @@ class TestConsolidatorSummarize:
mock_provider.chat_with_retry.side_effect = Exception("API error")
messages = [{"role": "user", "content": "hello"}]
result = await consolidator.archive(messages)
- assert result is True # always succeeds
+ assert result is None # no summary on raw dump fallback
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 1
assert "[RAW]" in entries[0]["content"]
async def test_summarize_skips_empty_messages(self, consolidator):
result = await consolidator.archive([])
- assert result is False
+ assert result is None
class TestConsolidatorTokenBudget:
From 1cb28b39a30cea4be4ca57e7a1997bb06e9f71b3 Mon Sep 17 00:00:00 2001
From: Xubin Ren
Date: Sat, 11 Apr 2026 07:25:50 +0000
Subject: [PATCH 06/13] feat(agent): retain recent context during auto compact
Keep a legal recent suffix in idle auto-compacted sessions so resumed chats preserve their freshest live context while older messages are summarized. Recover persisted summaries even when retained messages remain, and document the new behavior.
---
README.md | 6 +-
nanobot/agent/auto_compact.py | 51 +++++++++++---
tests/agent/test_auto_compact.py | 116 ++++++++++++++++---------------
3 files changed, 104 insertions(+), 69 deletions(-)
diff --git a/README.md b/README.md
index a9bf4b5e3..88ff35f29 100644
--- a/README.md
+++ b/README.md
@@ -1505,7 +1505,7 @@ MCP tools are automatically discovered and registered on startup. The LLM can us
### Auto Compact
-When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the session context into a summary. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary and fresh input.
+When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input.
```json
{
@@ -1523,8 +1523,8 @@ When a user is idle for longer than a configured TTL, nanobot **proactively** co
How it works:
1. **Idle detection**: On each idle tick (~1 s), checks all sessions for expiration.
-2. **Background compaction**: Expired sessions are summarized via LLM, then cleared.
-3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted).
+2. **Background compaction**: Expired sessions summarize the older live prefix via LLM and keep the most recent legal suffix (currently 8 messages).
+3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted) alongside the retained recent suffix.
> [!TIP]
> The summary survives bot restarts — it's stored in session metadata and recovered on the next message.
diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/auto_compact.py
index f30feac17..47c7b5a36 100644
--- a/nanobot/agent/auto_compact.py
+++ b/nanobot/agent/auto_compact.py
@@ -3,16 +3,18 @@
from __future__ import annotations
from datetime import datetime
-from typing import TYPE_CHECKING, Callable, Coroutine
+from typing import TYPE_CHECKING, Any, Callable, Coroutine
from loguru import logger
+from nanobot.session.manager import Session, SessionManager
if TYPE_CHECKING:
from nanobot.agent.memory import Consolidator
- from nanobot.session.manager import Session, SessionManager
class AutoCompact:
+ _RECENT_SUFFIX_MESSAGES = 8
+
def __init__(self, sessions: SessionManager, consolidator: Consolidator,
session_ttl_minutes: int = 0):
self.sessions = sessions
@@ -33,6 +35,27 @@ class AutoCompact:
idle_min = int((datetime.now() - last_active).total_seconds() / 60)
return f"Inactive for {idle_min} minutes.\nPrevious conversation summary: {text}"
+ def _split_unconsolidated(
+ self, session: Session,
+ ) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
+ """Split live session tail into archiveable prefix and retained recent suffix."""
+ tail = list(session.messages[session.last_consolidated:])
+ if not tail:
+ return [], []
+
+ probe = Session(
+ key=session.key,
+ messages=tail.copy(),
+ created_at=session.created_at,
+ updated_at=session.updated_at,
+ metadata={},
+ last_consolidated=0,
+ )
+ probe.retain_recent_legal_suffix(self._RECENT_SUFFIX_MESSAGES)
+ kept = probe.messages
+ cut = len(tail) - len(kept)
+ return tail[:cut], kept
+
def check_expired(self, schedule_background: Callable[[Coroutine], None]) -> None:
for info in self.sessions.list_sessions():
key = info.get("key", "")
@@ -45,21 +68,31 @@ class AutoCompact:
try:
self.sessions.invalidate(key)
session = self.sessions.get_or_create(key)
- msgs = session.messages[session.last_consolidated:]
- if not msgs:
+ archive_msgs, kept_msgs = self._split_unconsolidated(session)
+ if not archive_msgs and not kept_msgs:
logger.debug("Auto-compact: skipping {}, no un-consolidated messages", key)
session.updated_at = datetime.now()
self.sessions.save(session)
return
- n = len(msgs)
+
last_active = session.updated_at
- summary = await self.consolidator.archive(msgs) or ""
+ summary = ""
+ if archive_msgs:
+ summary = await self.consolidator.archive(archive_msgs) or ""
if summary and summary != "(nothing)":
self._summaries[key] = (summary, last_active)
session.metadata["_last_summary"] = {"text": summary, "last_active": last_active.isoformat()}
- session.clear()
+ session.messages = kept_msgs
+ session.last_consolidated = 0
+ session.updated_at = datetime.now()
self.sessions.save(session)
- logger.info("Auto-compact: archived {} ({} messages, summary={})", key, n, bool(summary))
+ logger.info(
+ "Auto-compact: archived {} (archived={}, kept={}, summary={})",
+ key,
+ len(archive_msgs),
+ len(kept_msgs),
+ bool(summary),
+ )
except Exception:
logger.exception("Auto-compact: failed for {}", key)
finally:
@@ -75,7 +108,7 @@ class AutoCompact:
if entry:
session.metadata.pop("_last_summary", None)
return session, self._format_summary(entry[0], entry[1])
- if not session.messages and "_last_summary" in session.metadata:
+ if "_last_summary" in session.metadata:
meta = session.metadata.pop("_last_summary")
self.sessions.save(session)
return session, self._format_summary(meta["text"], datetime.fromisoformat(meta["last_active"]))
diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py
index 39792e290..8f1be03a2 100644
--- a/tests/agent/test_auto_compact.py
+++ b/tests/agent/test_auto_compact.py
@@ -35,6 +35,13 @@ def _make_loop(tmp_path: Path, session_ttl_minutes: int = 15) -> AgentLoop:
return loop
+def _add_turns(session, turns: int, *, prefix: str = "msg") -> None:
+ """Append simple user/assistant turns to a session."""
+ for i in range(turns):
+ session.add_message("user", f"{prefix} user {i}")
+ session.add_message("assistant", f"{prefix} assistant {i}")
+
+
class TestSessionTTLConfig:
"""Test session TTL configuration."""
@@ -113,13 +120,11 @@ class TestAutoCompact:
await loop.close_mcp()
@pytest.mark.asyncio
- async def test_auto_compact_archives_and_clears(self, tmp_path):
- """_archive should archive un-consolidated messages and clear session."""
+ async def test_auto_compact_archives_prefix_and_keeps_recent_suffix(self, tmp_path):
+ """_archive should summarize the old prefix and keep a recent legal suffix."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- for i in range(4):
- session.add_message("user", f"msg{i}")
- session.add_message("assistant", f"resp{i}")
+ _add_turns(session, 6)
loop.sessions.save(session)
archived_messages = []
@@ -132,9 +137,11 @@ class TestAutoCompact:
await loop.auto_compact._archive("cli:test")
- assert len(archived_messages) == 8
+ assert len(archived_messages) == 4
session_after = loop.sessions.get_or_create("cli:test")
- assert len(session_after.messages) == 0
+ assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES
+ assert session_after.messages[0]["content"] == "msg user 2"
+ assert session_after.messages[-1]["content"] == "msg assistant 5"
await loop.close_mcp()
@pytest.mark.asyncio
@@ -142,8 +149,7 @@ class TestAutoCompact:
"""_archive should store the summary in _summaries."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "hello")
- session.add_message("assistant", "hi there")
+ _add_turns(session, 6, prefix="hello")
loop.sessions.save(session)
async def _fake_archive(messages):
@@ -157,7 +163,7 @@ class TestAutoCompact:
assert entry is not None
assert entry[0] == "User said hello."
session_after = loop.sessions.get_or_create("cli:test")
- assert len(session_after.messages) == 0
+ assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES
await loop.close_mcp()
@pytest.mark.asyncio
@@ -187,9 +193,7 @@ class TestAutoCompact:
"""_archive should only archive un-consolidated messages."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- for i in range(10):
- session.add_message("user", f"msg{i}")
- session.add_message("assistant", f"resp{i}")
+ _add_turns(session, 14)
session.last_consolidated = 18
loop.sessions.save(session)
@@ -232,7 +236,7 @@ class TestAutoCompactIdleDetection:
"""Proactive auto-new archives expired session; _process_message reloads it."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "old message")
+ _add_turns(session, 6, prefix="old")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -251,7 +255,8 @@ class TestAutoCompactIdleDetection:
await loop._process_message(msg)
session_after = loop.sessions.get_or_create("cli:test")
- assert not any(m["content"] == "old message" for m in session_after.messages)
+ assert len(archived_messages) == 4
+ assert not any(m["content"] == "old user 0" for m in session_after.messages)
assert any(m["content"] == "new msg" for m in session_after.messages)
await loop.close_mcp()
@@ -329,7 +334,7 @@ class TestAutoCompactSystemMessages:
"""Proactive auto-new archives expired session; system messages reload it."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "old message from subagent context")
+ _add_turns(session, 6, prefix="old")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -349,7 +354,7 @@ class TestAutoCompactSystemMessages:
session_after = loop.sessions.get_or_create("cli:test")
assert not any(
- m["content"] == "old message from subagent context"
+ m["content"] == "old user 0"
for m in session_after.messages
)
await loop.close_mcp()
@@ -363,8 +368,7 @@ class TestAutoCompactEdgeCases:
"""Auto-new should not inject when archive produces '(nothing)'."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "thanks")
- session.add_message("assistant", "you're welcome")
+ _add_turns(session, 6, prefix="thanks")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -375,18 +379,18 @@ class TestAutoCompactEdgeCases:
await loop.auto_compact._archive("cli:test")
session_after = loop.sessions.get_or_create("cli:test")
- assert len(session_after.messages) == 0
+ assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES
# "(nothing)" summary should not be stored
assert "cli:test" not in loop.auto_compact._summaries
await loop.close_mcp()
@pytest.mark.asyncio
- async def test_auto_compact_archive_failure_still_clears(self, tmp_path):
- """Auto-new should clear session even if LLM archive fails (raw_archive fallback)."""
+ async def test_auto_compact_archive_failure_still_keeps_recent_suffix(self, tmp_path):
+ """Auto-new should keep the recent suffix even if LLM archive falls back to raw dump."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "important data")
+ _add_turns(session, 6, prefix="important")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -396,14 +400,13 @@ class TestAutoCompactEdgeCases:
await loop.auto_compact._archive("cli:test")
session_after = loop.sessions.get_or_create("cli:test")
- # Session should be cleared (archive falls back to raw dump)
- assert len(session_after.messages) == 0
+ assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES
await loop.close_mcp()
@pytest.mark.asyncio
async def test_auto_compact_preserves_runtime_checkpoint_before_check(self, tmp_path):
- """Runtime checkpoint is restored; proactive archive handles the expired session."""
+ """Short expired sessions keep recent messages; checkpoint restore still works on resume."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
session.metadata[AgentLoop._RUNTIME_CHECKPOINT_KEY] = {
@@ -429,8 +432,10 @@ class TestAutoCompactEdgeCases:
msg = InboundMessage(channel="cli", sender_id="user", chat_id="test", content="continue")
await loop._process_message(msg)
- # The checkpoint-restored message should have been archived by proactive path
- assert len(archived_messages) >= 1
+ session_after = loop.sessions.get_or_create("cli:test")
+ assert archived_messages == []
+ assert any(m["content"] == "previous message" for m in session_after.messages)
+ assert any(m["content"] == "interrupted response" for m in session_after.messages)
await loop.close_mcp()
@@ -446,11 +451,17 @@ class TestAutoCompactIntegration:
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- # Phase 1: User has a conversation
+ # Phase 1: User has a conversation longer than the retained recent suffix
session.add_message("user", "I'm learning English, teach me past tense")
session.add_message("assistant", "Past tense is used for actions completed in the past...")
session.add_message("user", "Give me an example")
session.add_message("assistant", '"I walked to the store yesterday."')
+ session.add_message("user", "Give me another example")
+ session.add_message("assistant", '"She visited Paris last year."')
+ session.add_message("user", "Quiz me")
+ session.add_message("assistant", "What is the past tense of go?")
+ session.add_message("user", "I think it is went")
+ session.add_message("assistant", "Correct.")
loop.sessions.save(session)
# Phase 2: Time passes (simulate idle)
@@ -474,7 +485,7 @@ class TestAutoCompactIntegration:
# Phase 4: Verify
session_after = loop.sessions.get_or_create("cli:test")
- # Old messages should be gone
+ # The oldest messages should be trimmed from live session history
assert not any(
"past tense is used" in str(m.get("content", "")) for m in session_after.messages
)
@@ -497,8 +508,8 @@ class TestAutoCompactIntegration:
await loop.close_mcp()
@pytest.mark.asyncio
- async def test_multi_paragraph_user_message_preserved(self, tmp_path):
- """Multi-paragraph user messages must be fully preserved after auto-new."""
+ async def test_runtime_context_markers_not_persisted_for_multi_paragraph_turn(self, tmp_path):
+ """Auto-compact resume context must not leak runtime markers into persisted session history."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
session.add_message("user", "old message")
@@ -520,16 +531,11 @@ class TestAutoCompactIntegration:
await loop._process_message(msg)
session_after = loop.sessions.get_or_create("cli:test")
- user_msgs = [m for m in session_after.messages if m.get("role") == "user"]
- assert len(user_msgs) >= 1
- # All three paragraphs must be preserved
- persisted = user_msgs[-1]["content"]
- assert "Paragraph one" in persisted
- assert "Paragraph two" in persisted
- assert "Paragraph three" in persisted
- # No runtime context markers in persisted message
- assert "[Runtime Context" not in persisted
- assert "[/Runtime Context]" not in persisted
+ assert any(m.get("content") == "old message" for m in session_after.messages)
+ for persisted in session_after.messages:
+ content = str(persisted.get("content", ""))
+ assert "[Runtime Context" not in content
+ assert "[/Runtime Context]" not in content
await loop.close_mcp()
@@ -562,8 +568,7 @@ class TestProactiveAutoCompact:
"""Expired session should be archived during idle tick."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "old message")
- session.add_message("assistant", "old response")
+ _add_turns(session, 5, prefix="old")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -578,7 +583,7 @@ class TestProactiveAutoCompact:
await self._run_check_expired(loop)
session_after = loop.sessions.get_or_create("cli:test")
- assert len(session_after.messages) == 0
+ assert len(session_after.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES
assert len(archived_messages) == 2
entry = loop.auto_compact._summaries.get("cli:test")
assert entry is not None
@@ -604,7 +609,7 @@ class TestProactiveAutoCompact:
"""Should not archive the same session twice if already in progress."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "old message")
+ _add_turns(session, 6, prefix="old")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -641,7 +646,7 @@ class TestProactiveAutoCompact:
"""Proactive archive failure should be caught and not block future ticks."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "old message")
+ _add_turns(session, 6, prefix="old")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -684,8 +689,7 @@ class TestProactiveAutoCompact:
"""Already-archived session should NOT be re-scheduled on subsequent ticks."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "old message")
- session.add_message("assistant", "old response")
+ _add_turns(session, 5, prefix="old")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -738,8 +742,7 @@ class TestProactiveAutoCompact:
"""After successful compact + user sends new messages + idle again, should compact again."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "first conversation")
- session.add_message("assistant", "first response")
+ _add_turns(session, 5, prefix="first")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -780,8 +783,7 @@ class TestSummaryPersistence:
"""After archive, _last_summary should be in session metadata."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "hello")
- session.add_message("assistant", "hi there")
+ _add_turns(session, 6, prefix="hello")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -805,8 +807,7 @@ class TestSummaryPersistence:
"""Summary should be recovered from metadata when _summaries is empty (simulates restart)."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "hello")
- session.add_message("assistant", "hi there")
+ _add_turns(session, 6, prefix="hello")
last_active = datetime.now() - timedelta(minutes=20)
session.updated_at = last_active
loop.sessions.save(session)
@@ -825,6 +826,7 @@ class TestSummaryPersistence:
# prepare_session should recover summary from metadata
reloaded = loop.sessions.get_or_create("cli:test")
+ assert len(reloaded.messages) == loop.auto_compact._RECENT_SUFFIX_MESSAGES
_, summary = loop.auto_compact.prepare_session(reloaded, "cli:test")
assert summary is not None
@@ -839,7 +841,7 @@ class TestSummaryPersistence:
"""_last_summary should be removed from metadata after being consumed."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "hello")
+ _add_turns(session, 6, prefix="hello")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
@@ -870,7 +872,7 @@ class TestSummaryPersistence:
"""In-memory _summaries path should also clean up _last_summary from metadata."""
loop = _make_loop(tmp_path, session_ttl_minutes=15)
session = loop.sessions.get_or_create("cli:test")
- session.add_message("user", "hello")
+ _add_turns(session, 6, prefix="hello")
session.updated_at = datetime.now() - timedelta(minutes=20)
loop.sessions.save(session)
From 84e840659aabc5682d3699c9466a050d82f644df Mon Sep 17 00:00:00 2001
From: Xubin Ren
Date: Sat, 11 Apr 2026 07:32:56 +0000
Subject: [PATCH 07/13] refactor(config): rename auto compact config key
Prefer the more user-friendly idleCompactAfterMinutes name for auto compact while keeping sessionTtlMinutes as a backward-compatible alias. Update tests and README to document the retained recent-context behavior and the new preferred key.
---
README.md | 13 ++++++++-----
nanobot/config/schema.py | 7 ++++++-
tests/agent/test_auto_compact.py | 17 +++++++++++++++++
3 files changed, 31 insertions(+), 6 deletions(-)
diff --git a/README.md b/README.md
index 88ff35f29..856986754 100644
--- a/README.md
+++ b/README.md
@@ -1505,13 +1505,13 @@ MCP tools are automatically discovered and registered on startup. The LLM can us
### Auto Compact
-When a user is idle for longer than a configured TTL, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input.
+When a user is idle for longer than a configured threshold, nanobot **proactively** compresses the older part of the session context into a summary while keeping a recent legal suffix of live messages. This reduces token cost and first-token latency when the user returns — instead of re-processing a long stale context with an expired KV cache, the model receives a compact summary, the most recent live context, and fresh input.
```json
{
"agents": {
"defaults": {
- "sessionTtlMinutes": 15
+ "idleCompactAfterMinutes": 15
}
}
}
@@ -1519,15 +1519,18 @@ When a user is idle for longer than a configured TTL, nanobot **proactively** co
| Option | Default | Description |
|--------|---------|-------------|
-| `agents.defaults.sessionTtlMinutes` | `0` (disabled) | Minutes of idle time before auto-compaction. Set to `0` to disable. Recommended: `15` — matches typical LLM KV cache expiration, so compacted sessions won't waste cache on cold entries. |
+| `agents.defaults.idleCompactAfterMinutes` | `0` (disabled) | Minutes of idle time before auto-compaction starts. Set to `0` to disable. Recommended: `15` — close to a typical LLM KV cache expiry window, so stale sessions get compacted before the user returns. |
+
+`sessionTtlMinutes` remains accepted as a legacy alias for backward compatibility, but `idleCompactAfterMinutes` is the preferred config key going forward.
How it works:
1. **Idle detection**: On each idle tick (~1 s), checks all sessions for expiration.
-2. **Background compaction**: Expired sessions summarize the older live prefix via LLM and keep the most recent legal suffix (currently 8 messages).
+2. **Background compaction**: Idle sessions summarize the older live prefix via LLM and keep the most recent legal suffix (currently 8 messages).
3. **Summary injection**: When the user returns, the summary is injected as runtime context (one-shot, not persisted) alongside the retained recent suffix.
+4. **Restart-safe resume**: The summary is also mirrored into session metadata so it can still be recovered after a process restart.
> [!TIP]
-> The summary survives bot restarts — it's stored in session metadata and recovered on the next message.
+> Think of auto compact as "summarize older context, keep the freshest live turns." It is not a hard session reset.
### Timezone
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index 8ab68d7b5..67cce4470 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -77,7 +77,12 @@ class AgentDefaults(Base):
reasoning_effort: str | None = None # low / medium / high / adaptive - enables LLM thinking mode
timezone: str = "UTC" # IANA timezone, e.g. "Asia/Shanghai", "America/New_York"
unified_session: bool = False # Share one session across all channels (single-user multi-device)
- session_ttl_minutes: int = Field(default=0, ge=0) # Auto /new after idle (0 = disabled)
+ session_ttl_minutes: int = Field(
+ default=0,
+ ge=0,
+ validation_alias=AliasChoices("idleCompactAfterMinutes", "sessionTtlMinutes"),
+ serialization_alias="idleCompactAfterMinutes",
+ ) # Auto-compact idle threshold in minutes (0 = disabled)
dream: DreamConfig = Field(default_factory=DreamConfig)
diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py
index 8f1be03a2..b3462820b 100644
--- a/tests/agent/test_auto_compact.py
+++ b/tests/agent/test_auto_compact.py
@@ -55,6 +55,23 @@ class TestSessionTTLConfig:
defaults = AgentDefaults(session_ttl_minutes=30)
assert defaults.session_ttl_minutes == 30
+ def test_user_friendly_alias_is_supported(self):
+ """Config should accept idleCompactAfterMinutes as the preferred JSON key."""
+ defaults = AgentDefaults.model_validate({"idleCompactAfterMinutes": 30})
+ assert defaults.session_ttl_minutes == 30
+
+ def test_legacy_alias_is_still_supported(self):
+ """Config should still accept the old sessionTtlMinutes key for compatibility."""
+ defaults = AgentDefaults.model_validate({"sessionTtlMinutes": 30})
+ assert defaults.session_ttl_minutes == 30
+
+ def test_serializes_with_user_friendly_alias(self):
+ """Config dumps should use idleCompactAfterMinutes for JSON output."""
+ defaults = AgentDefaults(session_ttl_minutes=30)
+ data = defaults.model_dump(mode="json", by_alias=True)
+ assert data["idleCompactAfterMinutes"] == 30
+ assert "sessionTtlMinutes" not in data
+
class TestAgentLoopTTLParam:
"""Test that AutoCompact receives and stores session_ttl_minutes."""
From 5932482d01bb442e99143cabea2c0a0c272c5a1b Mon Sep 17 00:00:00 2001
From: Xubin Ren
Date: Sat, 11 Apr 2026 07:49:31 +0000
Subject: [PATCH 08/13] refactor(agent): rename auto compact module
Rename the auto compact module to autocompact.py for a cleaner path while keeping the AutoCompact type and behavior unchanged. Update the agent loop import to match.
---
nanobot/agent/{auto_compact.py => autocompact.py} | 0
nanobot/agent/loop.py | 2 +-
2 files changed, 1 insertion(+), 1 deletion(-)
rename nanobot/agent/{auto_compact.py => autocompact.py} (100%)
diff --git a/nanobot/agent/auto_compact.py b/nanobot/agent/autocompact.py
similarity index 100%
rename from nanobot/agent/auto_compact.py
rename to nanobot/agent/autocompact.py
diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py
index 65a5a1abc..05a27349f 100644
--- a/nanobot/agent/loop.py
+++ b/nanobot/agent/loop.py
@@ -13,7 +13,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable
from loguru import logger
-from nanobot.agent.auto_compact import AutoCompact
+from nanobot.agent.autocompact import AutoCompact
from nanobot.agent.context import ContextBuilder
from nanobot.agent.hook import AgentHook, AgentHookContext, CompositeHook
from nanobot.agent.memory import Consolidator, Dream
From e0ba56808967f6bac652fa8e5ed13ac32874f2e4 Mon Sep 17 00:00:00 2001
From: weitongtong
Date: Sat, 11 Apr 2026 14:34:45 +0800
Subject: [PATCH 09/13] =?UTF-8?q?fix(cron):=20=E4=BF=AE=E5=A4=8D=E5=9B=BA?=
=?UTF-8?q?=E5=AE=9A=E9=97=B4=E9=9A=94=E4=BB=BB=E5=8A=A1=E5=9B=A0=20store?=
=?UTF-8?q?=20=E5=B9=B6=E5=8F=91=E6=9B=BF=E6=8D=A2=E5=AF=BC=E8=87=B4?=
=?UTF-8?q?=E7=9A=84=E9=87=8D=E5=A4=8D=E6=89=A7=E8=A1=8C?=
MIME-Version: 1.0
Content-Type: text/plain; charset=UTF-8
Content-Transfer-Encoding: 8bit
_on_timer 中 await _execute_job 让出控制权期间,前端轮询触发的
list_jobs 调用 _load_store 从磁盘重新加载覆盖 self._store,
已执行任务的状态被旧值回退,导致再次触发。
引入 _timer_active 标志位,在任务执行期间阻止并发 _load_store
替换 store。同时修复 store 为空时未重新 arm timer 的问题。
Made-with: Cursor
---
nanobot/cron/service.py | 26 ++++++++++++++++++--------
1 file changed, 18 insertions(+), 8 deletions(-)
diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py
index 267613012..165ce54d7 100644
--- a/nanobot/cron/service.py
+++ b/nanobot/cron/service.py
@@ -80,6 +80,7 @@ class CronService:
self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None
self._running = False
+ self._timer_active = False
self.max_sleep_ms = max_sleep_ms
def _load_jobs(self) -> tuple[list[CronJob], int]:
@@ -171,7 +172,11 @@ class CronService:
def _load_store(self) -> CronStore:
"""Load jobs from disk. Reloads automatically if file was modified externally.
- Reload every time because it needs to merge operations on the jobs object from other instances.
+ - During _on_timer execution, return the existing store to prevent concurrent
+ _load_store calls (e.g. from list_jobs polling) from replacing it mid-execution.
"""
+ if self._timer_active and self._store:
+ return self._store
jobs, version = self._load_jobs()
self._store = CronStore(version=version, jobs=jobs)
self._merge_action()
@@ -290,18 +295,23 @@ class CronService:
"""Handle timer tick - run due jobs."""
self._load_store()
if not self._store:
+ self._arm_timer()
return
- now = _now_ms()
- due_jobs = [
- j for j in self._store.jobs
- if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
- ]
+ self._timer_active = True
+ try:
+ now = _now_ms()
+ due_jobs = [
+ j for j in self._store.jobs
+ if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
+ ]
- for job in due_jobs:
- await self._execute_job(job)
+ for job in due_jobs:
+ await self._execute_job(job)
- self._save_store()
+ self._save_store()
+ finally:
+ self._timer_active = False
self._arm_timer()
async def _execute_job(self, job: CronJob) -> None:
From 5bb7f77b80934d49432c6acc96644f33c1c7b956 Mon Sep 17 00:00:00 2001
From: Xubin Ren
Date: Sat, 11 Apr 2026 08:43:25 +0000
Subject: [PATCH 10/13] feat(tests): add regression test for timer execution to
prevent store rollback during job execution
---
tests/cron/test_cron_service.py | 39 +++++++++++++++++++++++++++++++++
1 file changed, 39 insertions(+)
diff --git a/tests/cron/test_cron_service.py b/tests/cron/test_cron_service.py
index 4aa7fc06d..747f8ec81 100644
--- a/tests/cron/test_cron_service.py
+++ b/tests/cron/test_cron_service.py
@@ -329,6 +329,45 @@ async def test_external_update_preserves_run_history_records(tmp_path):
fresh._save_store()
+# ── timer race regression tests ──
+
+
+@pytest.mark.asyncio
+async def test_timer_execution_is_not_rolled_back_by_list_jobs_reload(tmp_path):
+ """list_jobs() during _on_timer should not replace the active store and re-run the same due job."""
+ store_path = tmp_path / "cron" / "jobs.json"
+ calls: list[str] = []
+
+ async def on_job(job):
+ calls.append(job.id)
+ # Simulate frontend polling list_jobs while the timer callback is mid-execution.
+ service.list_jobs(include_disabled=True)
+ await asyncio.sleep(0)
+
+ service = CronService(store_path, on_job=on_job)
+ service._running = True
+ service._load_store()
+ service._arm_timer = lambda: None
+
+ job = service.add_job(
+ name="race",
+ schedule=CronSchedule(kind="every", every_ms=60_000),
+ message="hello",
+ )
+ job.state.next_run_at_ms = max(1, int(time.time() * 1000) - 1_000)
+ service._save_store()
+
+ await service._on_timer()
+ await service._on_timer()
+
+ assert calls == [job.id]
+ loaded = service.get_job(job.id)
+ assert loaded is not None
+ assert loaded.state.last_run_at_ms is not None
+ assert loaded.state.next_run_at_ms is not None
+ assert loaded.state.next_run_at_ms > loaded.state.last_run_at_ms
+
+
# ── update_job tests ──
From d3aa209cf6967563e023701acb2d6f4b867762d9 Mon Sep 17 00:00:00 2001
From: Mike Terhar
Date: Wed, 8 Apr 2026 09:24:32 -0400
Subject: [PATCH 11/13] add kagi web search tool
---
nanobot/agent/tools/web.py | 25 +++++++++++++++++++++++++
nanobot/config/schema.py | 2 +-
2 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/nanobot/agent/tools/web.py b/nanobot/agent/tools/web.py
index 275fcf88c..38fc33d74 100644
--- a/nanobot/agent/tools/web.py
+++ b/nanobot/agent/tools/web.py
@@ -114,6 +114,8 @@ class WebSearchTool(Tool):
return await self._search_jina(query, n)
elif provider == "brave":
return await self._search_brave(query, n)
+ elif provider == "kagi":
+ return await self._search_kagi(query, n)
else:
return f"Error: unknown search provider '{provider}'"
@@ -204,6 +206,29 @@ class WebSearchTool(Tool):
logger.warning("Jina search failed ({}), falling back to DuckDuckGo", e)
return await self._search_duckduckgo(query, n)
+ async def _search_kagi(self, query: str, n: int) -> str:
+ api_key = self.config.api_key or os.environ.get("KAGI_API_KEY", "")
+ if not api_key:
+ logger.warning("KAGI_API_KEY not set, falling back to DuckDuckGo")
+ return await self._search_duckduckgo(query, n)
+ try:
+ async with httpx.AsyncClient(proxy=self.proxy) as client:
+ r = await client.get(
+ "https://kagi.com/api/v0/search",
+ params={"q": query, "limit": n},
+ headers={"Authorization": f"Bot {api_key}"},
+ timeout=10.0,
+ )
+ r.raise_for_status()
+ # t=0 items are search results; other values are related searches, etc.
+ items = [
+ {"title": d.get("title", ""), "url": d.get("url", ""), "content": d.get("snippet", "")}
+ for d in r.json().get("data", []) if d.get("t") == 0
+ ]
+ return _format_results(query, items, n)
+ except Exception as e:
+ return f"Error: {e}"
+
async def _search_duckduckgo(self, query: str, n: int) -> str:
try:
# Note: duckduckgo_search is synchronous and does its own requests
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index 67cce4470..a841fe159 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -159,7 +159,7 @@ class GatewayConfig(Base):
class WebSearchConfig(Base):
"""Web search tool configuration."""
- provider: str = "duckduckgo" # brave, tavily, duckduckgo, searxng, jina
+ provider: str = "duckduckgo" # brave, tavily, duckduckgo, searxng, jina, kagi
api_key: str = ""
base_url: str = "" # SearXNG base URL
max_results: int = 5
From 74dbce3770ce180c553ae5fbe7a17a8512c17ef7 Mon Sep 17 00:00:00 2001
From: Mike Terhar
Date: Wed, 8 Apr 2026 09:25:21 -0400
Subject: [PATCH 12/13] add kagi info to README
---
README.md | 15 +++++++++++++++
1 file changed, 15 insertions(+)
diff --git a/README.md b/README.md
index 856986754..72fd62aa0 100644
--- a/README.md
+++ b/README.md
@@ -1312,6 +1312,7 @@ If you need to allow trusted private ranges such as Tailscale / CGNAT addresses,
| `brave` | `apiKey` | `BRAVE_API_KEY` | No |
| `tavily` | `apiKey` | `TAVILY_API_KEY` | No |
| `jina` | `apiKey` | `JINA_API_KEY` | Free tier (10M tokens) |
+| `kagi` | `apiKey` | `KAGI_API_KEY` | No |
| `searxng` | `baseUrl` | `SEARXNG_BASE_URL` | Yes (self-hosted) |
| `duckduckgo` (default) | — | — | Yes |
@@ -1368,6 +1369,20 @@ If you need to allow trusted private ranges such as Tailscale / CGNAT addresses,
}
```
+**Kagi:**
+```json
+{
+ "tools": {
+ "web": {
+ "search": {
+ "provider": "kagi",
+ "apiKey": "your-kagi-api-key"
+ }
+ }
+ }
+}
+```
+
**SearXNG** (self-hosted, no API key needed):
```json
{
From b959ae6d8965ca58b8dde49247137403e09ecea1 Mon Sep 17 00:00:00 2001
From: Xubin Ren
Date: Sat, 11 Apr 2026 08:48:42 +0000
Subject: [PATCH 13/13] test(web): cover Kagi search provider
Add focused coverage for the Kagi web search provider, including the request format and the DuckDuckGo fallback when no API key is configured.
---
tests/tools/test_web_search_tool.py | 38 +++++++++++++++++++++++++++++
1 file changed, 38 insertions(+)
diff --git a/tests/tools/test_web_search_tool.py b/tests/tools/test_web_search_tool.py
index e33dd7e6c..790d8adcd 100644
--- a/tests/tools/test_web_search_tool.py
+++ b/tests/tools/test_web_search_tool.py
@@ -120,6 +120,27 @@ async def test_jina_search(monkeypatch):
assert "https://jina.ai" in result
+@pytest.mark.asyncio
+async def test_kagi_search(monkeypatch):
+ async def mock_get(self, url, **kw):
+ assert "kagi.com/api/v0/search" in url
+ assert kw["headers"]["Authorization"] == "Bot kagi-key"
+ assert kw["params"] == {"q": "test", "limit": 2}
+ return _response(json={
+ "data": [
+ {"t": 0, "title": "Kagi Result", "url": "https://kagi.com", "snippet": "Premium search"},
+ {"t": 1, "list": ["ignored related search"]},
+ ]
+ })
+
+ monkeypatch.setattr(httpx.AsyncClient, "get", mock_get)
+ tool = _tool(provider="kagi", api_key="kagi-key")
+ result = await tool.execute(query="test", count=2)
+ assert "Kagi Result" in result
+ assert "https://kagi.com" in result
+ assert "ignored related search" not in result
+
+
@pytest.mark.asyncio
async def test_unknown_provider():
tool = _tool(provider="unknown")
@@ -189,6 +210,23 @@ async def test_jina_422_falls_back_to_duckduckgo(monkeypatch):
assert "DuckDuckGo fallback" in result
+@pytest.mark.asyncio
+async def test_kagi_fallback_to_duckduckgo_when_no_key(monkeypatch):
+ class MockDDGS:
+ def __init__(self, **kw):
+ pass
+
+ def text(self, query, max_results=5):
+ return [{"title": "Fallback", "href": "https://ddg.example", "body": "DuckDuckGo fallback"}]
+
+ monkeypatch.setattr("ddgs.DDGS", MockDDGS)
+ monkeypatch.delenv("KAGI_API_KEY", raising=False)
+
+ tool = _tool(provider="kagi", api_key="")
+ result = await tool.execute(query="test")
+ assert "Fallback" in result
+
+
@pytest.mark.asyncio
async def test_jina_search_uses_path_encoded_query(monkeypatch):
calls = {}