diff --git a/docs/configuration.md b/docs/configuration.md index 153cbc959..b04a98222 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -717,7 +717,10 @@ When a user is idle for longer than a configured threshold, nanobot **proactivel { "agents": { "defaults": { - "idleCompactAfterMinutes": 15 + "idleCompactAfterMinutes": 15, + "sessionHistoryMaxMessages": 120, + "sessionHistoryMaxTokens": 0, + "sessionFileMaxMessages": 2000 } } } @@ -726,6 +729,9 @@ When a user is idle for longer than a configured threshold, nanobot **proactivel | Option | Default | Description | |--------|---------|-------------| | `agents.defaults.idleCompactAfterMinutes` | `0` (disabled) | Minutes of idle time before auto-compaction starts. Set to `0` to disable. Recommended: `15` — close to a typical LLM KV cache expiry window, so stale sessions get compacted before the user returns. | +| `agents.defaults.sessionHistoryMaxMessages` | `120` | Per-turn max number of session messages included in prompt replay. Set to `0` for unlimited history. | +| `agents.defaults.sessionHistoryMaxTokens` | `0` (auto) | Per-turn token budget for replay history. `0` auto-derives a budget from context window and output reserve; set a positive number to force a fixed token cap. | +| `agents.defaults.sessionFileMaxMessages` | `2000` | Hard cap for on-disk `sessions/*.jsonl` message count. When exceeded, old prefixes are raw-archived into `memory/history.jsonl` and trimmed from the session file. Set to `0` to disable. | `sessionTtlMinutes` remains accepted as a legacy alias for backward compatibility, but `idleCompactAfterMinutes` is the preferred config key going forward. diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 3c893c38b..06e0a3ad8 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -192,6 +192,9 @@ class AgentLoop: timezone: str | None = None, session_ttl_minutes: int = 0, consolidation_ratio: float = 0.5, + session_history_max_messages: int | None = None, + session_history_max_tokens: int | None = None, + session_file_max_messages: int | None = None, hooks: list[AgentHook] | None = None, unified_session: bool = False, disabled_skills: list[str] | None = None, @@ -224,6 +227,21 @@ class AgentLoop: if max_tool_result_chars is not None else defaults.max_tool_result_chars ) + self.session_history_max_messages = ( + session_history_max_messages + if session_history_max_messages is not None + else defaults.session_history_max_messages + ) + self.session_history_max_tokens = ( + session_history_max_tokens + if session_history_max_tokens is not None + else defaults.session_history_max_tokens + ) + self.session_file_max_messages = ( + session_file_max_messages + if session_file_max_messages is not None + else defaults.session_file_max_messages + ) self.provider_retry_mode = provider_retry_mode self.web_config = web_config or WebToolsConfig() self.exec_config = exec_config or ExecToolConfig() @@ -452,6 +470,49 @@ class AgentLoop: return UNIFIED_SESSION_KEY return msg.session_key + def _history_token_budget(self) -> int: + """Resolve token budget for session history replay.""" + if self.session_history_max_tokens > 0: + return self.session_history_max_tokens + if self.context_window_tokens <= 0: + return 0 + max_output = getattr(getattr(self.provider, "generation", None), "max_tokens", 4096) + try: + reserved_output = int(max_output) + except (TypeError, ValueError): + reserved_output = 4096 + budget = self.context_window_tokens - max(1, reserved_output) - 1024 + if budget > 0: + return budget + return max(128, self.context_window_tokens // 2) + + def _enforce_session_file_cap(self, session: Session) -> None: + """Bound session.jsonl growth by archiving and trimming old prefixes.""" + limit = self.session_file_max_messages + if limit <= 0 or len(session.messages) <= limit: + return + + before = list(session.messages) + before_last_consolidated = session.last_consolidated + before_count = len(before) + session.retain_recent_legal_suffix(limit) + dropped_count = before_count - len(session.messages) + if dropped_count <= 0: + return + + dropped = before[:dropped_count] + already_consolidated = min(before_last_consolidated, dropped_count) + archive_chunk = dropped[already_consolidated:] + if archive_chunk: + self.context.memory.raw_archive(archive_chunk) + logger.info( + "Session file cap hit for {}: dropped {}, raw-archived {}, kept {}", + session.key, + dropped_count, + len(archive_chunk), + len(session.messages), + ) + async def _run_agent_loop( self, initial_messages: list[dict], @@ -832,7 +893,10 @@ class AgentLoop: if is_subagent and self._persist_subagent_followup(session, msg): self.sessions.save(session) self._set_tool_context(channel, chat_id, msg.metadata.get("message_id")) - history = session.get_history(max_messages=0) + history = session.get_history( + max_messages=self.session_history_max_messages, + max_tokens=self._history_token_budget(), + ) current_role = "assistant" if is_subagent else "user" # Subagent content is already in `history` above; passing it again @@ -851,6 +915,7 @@ class AgentLoop: pending_queue=pending_queue, ) self._save_turn(session, all_msgs, 1 + len(history)) + self._enforce_session_file_cap(session) self._clear_runtime_checkpoint(session) self.sessions.save(session) self._schedule_background(self.consolidator.maybe_consolidate_by_tokens(session)) @@ -901,7 +966,10 @@ class AgentLoop: if isinstance(message_tool, MessageTool): message_tool.start_turn() - history = session.get_history(max_messages=0) + history = session.get_history( + max_messages=self.session_history_max_messages, + max_tokens=self._history_token_budget(), + ) pending_ask_id = pending_ask_user_id(history) if pending_ask_id: @@ -987,6 +1055,7 @@ class AgentLoop: # Skip the already-persisted user message when saving the turn save_skip = 1 + len(history) + (1 if user_persisted_early else 0) self._save_turn(session, all_msgs, save_skip) + self._enforce_session_file_cap(session) self._clear_pending_user_turn(session) self._clear_runtime_checkpoint(session) self.sessions.save(session) diff --git a/nanobot/cli/commands.py b/nanobot/cli/commands.py index ce88ece58..c351440a1 100644 --- a/nanobot/cli/commands.py +++ b/nanobot/cli/commands.py @@ -538,6 +538,9 @@ def serve( disabled_skills=runtime_config.agents.defaults.disabled_skills, session_ttl_minutes=runtime_config.agents.defaults.session_ttl_minutes, consolidation_ratio=runtime_config.agents.defaults.consolidation_ratio, + session_history_max_messages=runtime_config.agents.defaults.session_history_max_messages, + session_history_max_tokens=runtime_config.agents.defaults.session_history_max_tokens, + session_file_max_messages=runtime_config.agents.defaults.session_file_max_messages, tools_config=runtime_config.tools, ) @@ -651,6 +654,9 @@ def _run_gateway( disabled_skills=config.agents.defaults.disabled_skills, session_ttl_minutes=config.agents.defaults.session_ttl_minutes, consolidation_ratio=config.agents.defaults.consolidation_ratio, + session_history_max_messages=config.agents.defaults.session_history_max_messages, + session_history_max_tokens=config.agents.defaults.session_history_max_tokens, + session_file_max_messages=config.agents.defaults.session_file_max_messages, tools_config=config.tools, provider_snapshot_loader=load_provider_snapshot, provider_signature=provider_snapshot.signature, @@ -1028,6 +1034,9 @@ def agent( disabled_skills=config.agents.defaults.disabled_skills, session_ttl_minutes=config.agents.defaults.session_ttl_minutes, consolidation_ratio=config.agents.defaults.consolidation_ratio, + session_history_max_messages=config.agents.defaults.session_history_max_messages, + session_history_max_tokens=config.agents.defaults.session_history_max_tokens, + session_file_max_messages=config.agents.defaults.session_file_max_messages, tools_config=config.tools, ) restart_notice = consume_restart_notice_from_env() diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py index e1f91aeb0..3feed668e 100644 --- a/nanobot/config/schema.py +++ b/nanobot/config/schema.py @@ -97,6 +97,18 @@ class AgentDefaults(Base): validation_alias=AliasChoices("consolidationRatio"), serialization_alias="consolidationRatio", ) # Consolidation target ratio (0.5 = 50% of budget retained after compression) + session_history_max_messages: int = Field( + default=120, + ge=0, + ) # Per-turn session history window for prompt replay (0 = unlimited) + session_history_max_tokens: int = Field( + default=0, + ge=0, + ) # Per-turn token budget for replay history (0 = auto based on context window) + session_file_max_messages: int = Field( + default=2000, + ge=0, + ) # Hard cap for on-disk session.jsonl messages (0 = disabled) dream: DreamConfig = Field(default_factory=DreamConfig) diff --git a/nanobot/nanobot.py b/nanobot/nanobot.py index d2bff97d7..53ec718d2 100644 --- a/nanobot/nanobot.py +++ b/nanobot/nanobot.py @@ -85,6 +85,9 @@ class Nanobot: disabled_skills=defaults.disabled_skills, session_ttl_minutes=defaults.session_ttl_minutes, consolidation_ratio=defaults.consolidation_ratio, + session_history_max_messages=defaults.session_history_max_messages, + session_history_max_tokens=defaults.session_history_max_tokens, + session_file_max_messages=defaults.session_file_max_messages, tools_config=config.tools, ) return cls(loop) diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index ddcfdea14..8d98bf8e6 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -12,6 +12,7 @@ from loguru import logger from nanobot.config.paths import get_legacy_sessions_dir from nanobot.utils.helpers import ( + estimate_message_tokens, ensure_dir, find_legal_message_start, image_placeholder_text, @@ -41,8 +42,17 @@ class Session: self.messages.append(msg) self.updated_at = datetime.now() - def get_history(self, max_messages: int = 500) -> list[dict[str, Any]]: - """Return unconsolidated messages for LLM input, aligned to a legal tool-call boundary.""" + def get_history( + self, + max_messages: int = 500, + *, + max_tokens: int = 0, + ) -> list[dict[str, Any]]: + """Return unconsolidated messages for LLM input. + + History is sliced by message count first (``max_messages``), then by + token budget from the tail (``max_tokens``) when provided. + """ unconsolidated = self.messages[self.last_consolidated:] sliced = unconsolidated[-max_messages:] @@ -80,6 +90,38 @@ class Session: if key in message: entry[key] = message[key] out.append(entry) + + if max_tokens > 0 and out: + kept: list[dict[str, Any]] = [] + used = 0 + for message in reversed(out): + tokens = estimate_message_tokens(message) + if kept and used + tokens > max_tokens: + break + kept.append(message) + used += tokens + kept.reverse() + + # Keep history aligned to the first visible user turn. + first_user = next((i for i, m in enumerate(kept) if m.get("role") == "user"), None) + if first_user is not None: + kept = kept[first_user:] + else: + # Tight token budgets can otherwise leave assistant-only tails. + # If a user turn exists in the unsliced output, recover the + # nearest one even if it slightly exceeds the token budget. + recovered_user = next( + (i for i in range(len(out) - 1, -1, -1) if out[i].get("role") == "user"), + None, + ) + if recovered_user is not None: + kept = out[recovered_user:] + + # And keep a legal tool-call boundary at the front. + start = find_legal_message_start(kept) + if start: + kept = kept[start:] + out = kept return out def clear(self) -> None: @@ -89,26 +131,42 @@ class Session: self.updated_at = datetime.now() def retain_recent_legal_suffix(self, max_messages: int) -> None: - """Keep a legal recent suffix, mirroring get_history boundary rules.""" + """Keep a legal recent suffix constrained by a hard message cap.""" if max_messages <= 0: self.clear() return if len(self.messages) <= max_messages: return - start_idx = max(0, len(self.messages) - max_messages) + retained = list(self.messages[-max_messages:]) - # If the cutoff lands mid-turn, extend backward to the nearest user turn. - while start_idx > 0 and self.messages[start_idx].get("role") != "user": - start_idx -= 1 - - retained = self.messages[start_idx:] + # Prefer starting at a user turn when one exists within the tail. + first_user = next((i for i, m in enumerate(retained) if m.get("role") == "user"), None) + if first_user is not None: + retained = retained[first_user:] + else: + # If the tail is assistant/tool-only, anchor to the latest user in + # the full session and take a capped forward window from there. + latest_user = next( + (i for i in range(len(self.messages) - 1, -1, -1) + if self.messages[i].get("role") == "user"), + None, + ) + if latest_user is not None: + retained = list(self.messages[latest_user: latest_user + max_messages]) # Mirror get_history(): avoid persisting orphan tool results at the front. start = find_legal_message_start(retained) if start: retained = retained[start:] + # Hard-cap guarantee: never keep more than max_messages. + if len(retained) > max_messages: + retained = retained[-max_messages:] + start = find_legal_message_start(retained) + if start: + retained = retained[start:] + dropped = len(self.messages) - len(retained) self.messages = retained self.last_consolidated = max(0, self.last_consolidated - dropped) diff --git a/tests/agent/test_auto_compact.py b/tests/agent/test_auto_compact.py index 1f6886ed0..91ca09e6c 100644 --- a/tests/agent/test_auto_compact.py +++ b/tests/agent/test_auto_compact.py @@ -2,8 +2,8 @@ import asyncio from datetime import datetime, timedelta -from unittest.mock import AsyncMock, MagicMock from pathlib import Path +from unittest.mock import AsyncMock, MagicMock import pytest @@ -15,7 +15,13 @@ from nanobot.command import CommandContext from nanobot.providers.base import LLMResponse -def _make_loop(tmp_path: Path, session_ttl_minutes: int = 15) -> AgentLoop: +def _make_loop( + tmp_path: Path, + session_ttl_minutes: int = 15, + session_history_max_messages: int | None = None, + session_history_max_tokens: int | None = None, + session_file_max_messages: int | None = None, +) -> AgentLoop: """Create a minimal AgentLoop for testing.""" bus = MessageBus() provider = MagicMock() @@ -30,6 +36,9 @@ def _make_loop(tmp_path: Path, session_ttl_minutes: int = 15) -> AgentLoop: model="test-model", context_window_tokens=128_000, session_ttl_minutes=session_ttl_minutes, + session_history_max_messages=session_history_max_messages, + session_history_max_tokens=session_history_max_tokens, + session_file_max_messages=session_file_max_messages, ) loop.tools.get_definitions = MagicMock(return_value=[]) return loop @@ -72,6 +81,34 @@ class TestSessionTTLConfig: assert data["idleCompactAfterMinutes"] == 30 assert "sessionTtlMinutes" not in data + def test_default_session_history_window(self): + """Session history replay should be capped by default.""" + defaults = AgentDefaults() + assert defaults.session_history_max_messages == 120 + + def test_default_session_history_token_budget_auto(self): + defaults = AgentDefaults() + assert defaults.session_history_max_tokens == 0 + + def test_default_session_file_cap(self): + defaults = AgentDefaults() + assert defaults.session_file_max_messages == 2000 + + def test_serializes_session_history_window(self): + """Config should expose sessionHistoryMaxMessages in JSON output.""" + defaults = AgentDefaults(session_history_max_messages=64) + data = defaults.model_dump(mode="json", by_alias=True) + assert data["sessionHistoryMaxMessages"] == 64 + + def test_serializes_history_token_budget_and_file_cap(self): + defaults = AgentDefaults( + session_history_max_tokens=2048, + session_file_max_messages=1024, + ) + data = defaults.model_dump(mode="json", by_alias=True) + assert data["sessionHistoryMaxTokens"] == 2048 + assert data["sessionFileMaxMessages"] == 1024 + class TestAgentLoopTTLParam: """Test that AutoCompact receives and stores session_ttl_minutes.""" @@ -86,6 +123,109 @@ class TestAgentLoopTTLParam: loop = _make_loop(tmp_path, session_ttl_minutes=0) assert loop.auto_compact._ttl == 0 + def test_loop_stores_history_window(self, tmp_path): + """AgentLoop should store configured session history max_messages.""" + loop = _make_loop(tmp_path, session_history_max_messages=42) + assert loop.session_history_max_messages == 42 + + def test_loop_stores_history_token_budget(self, tmp_path): + loop = _make_loop(tmp_path, session_history_max_tokens=2048) + assert loop.session_history_max_tokens == 2048 + + def test_loop_stores_session_file_cap(self, tmp_path): + loop = _make_loop(tmp_path, session_file_max_messages=512) + assert loop.session_file_max_messages == 512 + + @pytest.mark.asyncio + async def test_process_message_reads_history_with_configured_cap(self, tmp_path): + """_process_message should use session_history_max_messages, not unlimited history.""" + loop = _make_loop(tmp_path, session_history_max_messages=7) + session = loop.sessions.get_or_create("cli:direct") + session.get_history = MagicMock(return_value=[]) + loop.context.build_messages = MagicMock(return_value=[]) + loop._run_agent_loop = AsyncMock(return_value=("ok", [], [], "stop", False)) + loop._save_turn = MagicMock() + + msg = InboundMessage( + channel="cli", + sender_id="u1", + chat_id="direct", + content="hello", + ) + await loop._process_message(msg) + session.get_history.assert_called_once() + kwargs = session.get_history.call_args.kwargs + assert kwargs["max_messages"] == 7 + assert isinstance(kwargs.get("max_tokens"), int) + + @pytest.mark.asyncio + async def test_process_message_reads_history_with_token_budget(self, tmp_path): + loop = _make_loop( + tmp_path, + session_history_max_messages=7, + session_history_max_tokens=333, + ) + session = loop.sessions.get_or_create("cli:direct") + session.get_history = MagicMock(return_value=[]) + loop.context.build_messages = MagicMock(return_value=[]) + loop._run_agent_loop = AsyncMock(return_value=("ok", [], [], "stop", False)) + loop._save_turn = MagicMock() + + msg = InboundMessage( + channel="cli", + sender_id="u1", + chat_id="direct", + content="hello", + ) + await loop._process_message(msg) + session.get_history.assert_called_once_with(max_messages=7, max_tokens=333) + + @pytest.mark.asyncio + async def test_session_file_cap_archives_and_trims_old_messages(self, tmp_path): + loop = _make_loop(tmp_path, session_file_max_messages=6) + loop.context.memory.raw_archive = MagicMock() + + for i in range(4): + msg = InboundMessage( + channel="cli", + sender_id="u1", + chat_id="direct", + content=f"hello {i}", + ) + await loop._process_message(msg) + + session = loop.sessions.get_or_create("cli:direct") + assert len(session.messages) <= 6 + assert loop.context.memory.raw_archive.called + + def test_session_file_cap_skips_raw_archive_when_dropped_prefix_is_already_consolidated(self, tmp_path): + loop = _make_loop(tmp_path, session_file_max_messages=4) + loop.context.memory.raw_archive = MagicMock() + session = loop.sessions.get_or_create("cli:direct") + for i in range(8): + session.add_message("user", f"u{i}") + session.last_consolidated = 6 + + loop._enforce_session_file_cap(session) + + assert len(session.messages) <= 4 + loop.context.memory.raw_archive.assert_not_called() + + def test_session_file_cap_archives_only_unconsolidated_part_of_dropped_prefix(self, tmp_path): + loop = _make_loop(tmp_path, session_file_max_messages=4) + loop.context.memory.raw_archive = MagicMock() + session = loop.sessions.get_or_create("cli:direct") + for i in range(8): + session.add_message("user", f"u{i}") + session.last_consolidated = 2 + + loop._enforce_session_file_cap(session) + + assert len(session.messages) <= 4 + loop.context.memory.raw_archive.assert_called_once() + archived = loop.context.memory.raw_archive.call_args.args[0] + assert [m["content"] for m in archived] == ["u2", "u3"] + class TestAutoCompact: """Test the _archive method.""" diff --git a/tests/agent/test_session_manager_history.py b/tests/agent/test_session_manager_history.py index 8b4d0740e..e169372dd 100644 --- a/tests/agent/test_session_manager_history.py +++ b/tests/agent/test_session_manager_history.py @@ -269,3 +269,66 @@ def test_get_history_ignores_media_kwarg_on_non_user_rows(): # List content is passed through verbatim — the synthesizer only # rewrites plain-string content. assert history[0]["content"] == [{"type": "text", "text": "structured"}] + + +def test_get_history_respects_max_tokens(monkeypatch): + session = Session(key="test:token-cap") + session.messages.extend( + [ + {"role": "user", "content": "u1"}, + {"role": "assistant", "content": "a1"}, + {"role": "user", "content": "u2"}, + {"role": "assistant", "content": "a2"}, + {"role": "user", "content": "u3"}, + {"role": "assistant", "content": "a3"}, + ] + ) + + token_map = {"u1": 50, "a1": 50, "u2": 50, "a2": 50, "u3": 50, "a3": 50} + monkeypatch.setattr( + "nanobot.session.manager.estimate_message_tokens", + lambda message: token_map.get(message.get("content"), 0), + ) + + history = session.get_history(max_messages=500, max_tokens=120) + assert [m["content"] for m in history] == ["u3", "a3"] + + +def test_get_history_recovers_user_when_token_slice_would_be_assistant_only(monkeypatch): + session = Session(key="test:assistant-only-slice") + session.messages.extend( + [ + {"role": "user", "content": "u1"}, + {"role": "assistant", "content": "a1"}, + {"role": "user", "content": "u2"}, + {"role": "assistant", "content": "a2"}, + ] + ) + token_map = {"u1": 100, "a1": 100, "u2": 100, "a2": 100} + monkeypatch.setattr( + "nanobot.session.manager.estimate_message_tokens", + lambda message: token_map.get(message.get("content"), 0), + ) + + history = session.get_history(max_messages=500, max_tokens=100) + assert [m["content"] for m in history] == ["u2", "a2"] + + +def test_retain_recent_legal_suffix_hard_cap_with_long_non_user_chain(): + session = Session(key="test:hard-cap-chain") + session.messages.append({"role": "user", "content": "u0"}) + session.messages.append( + { + "role": "assistant", + "content": None, + "tool_calls": [ + {"id": "c1", "type": "function", "function": {"name": "x", "arguments": "{}"}} + ], + } + ) + for i in range(12): + session.messages.append({"role": "assistant", "content": f"a{i}"}) + + session.retain_recent_legal_suffix(6) + + assert len(session.messages) <= 6