From 93bcb0a64980c40f98a3eb4c3dd5d6d2a33177a2 Mon Sep 17 00:00:00 2001 From: chengyongru <2755839590@qq.com> Date: Fri, 24 Apr 2026 01:30:03 +0800 Subject: [PATCH] fix(agent): prevent history.jsonl bloat from raw_archive and stuck consolidation Root cause: when consolidation LLM fails, raw_archive() dumped full message content (~1MB) into history.jsonl with no size limit. Since build_system_prompt() injects history.jsonl into every system prompt, all subsequent LLM calls exceeded the 200K context window with error 1261. Additionally, _cap_consolidation_boundary's 60-message cap caused consolidation to get stuck on sessions with long tool chains (200+ iterations), triggering the raw_archive fallback in the first place. Three-layer fix: - Remove _cap_consolidation_boundary: let pick_consolidation_boundary drive chunk sizing based solely on token budget - Truncate archive() input: use tiktoken to cap formatted text to the model's input token budget before sending to consolidation LLM - Truncate raw_archive() output: cap history.jsonl entries at 16K chars --- nanobot/agent/memory.py | 59 +++++++++++---------- tests/agent/test_consolidator.py | 91 +++++++++++++++++++++++++++----- 2 files changed, 108 insertions(+), 42 deletions(-) diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 60bc9accb..6a23227f0 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -6,6 +6,7 @@ import asyncio import json import re import weakref +import tiktoken from datetime import datetime from pathlib import Path from typing import TYPE_CHECKING, Any, Callable, Iterator @@ -13,7 +14,7 @@ from typing import TYPE_CHECKING, Any, Callable, Iterator from loguru import logger from nanobot.utils.prompt_templates import render_template -from nanobot.utils.helpers import ensure_dir, estimate_message_tokens, estimate_prompt_tokens_chain, strip_think +from nanobot.utils.helpers import ensure_dir, estimate_message_tokens, estimate_prompt_tokens_chain, strip_think, truncate_text from nanobot.agent.runner import AgentRunSpec, AgentRunner from nanobot.agent.tools.registry import ToolRegistry @@ -373,11 +374,13 @@ class MemoryStore: ) return "\n".join(lines) - def raw_archive(self, messages: list[dict]) -> None: + def raw_archive(self, messages: list[dict], *, max_chars: int | None = None) -> None: """Fallback: dump raw messages to history.jsonl without LLM summarization.""" + limit = max_chars if max_chars is not None else _RAW_ARCHIVE_MAX_CHARS + formatted = truncate_text(self._format_messages(messages), limit) self.append_history( f"[RAW] {len(messages)} messages\n" - f"{self._format_messages(messages)}" + f"{formatted}" ) logger.warning( "Memory consolidation degraded: raw-archived {} messages", len(messages) @@ -390,11 +393,13 @@ class MemoryStore: # --------------------------------------------------------------------------- +_RAW_ARCHIVE_MAX_CHARS = 16_000 # cap raw_archive entries to avoid bloating history.jsonl + + class Consolidator: """Lightweight consolidation: summarizes evicted messages into history.jsonl.""" _MAX_CONSOLIDATION_ROUNDS = 5 - _MAX_CHUNK_MESSAGES = 60 # hard cap per consolidation round _SAFETY_BUFFER = 1024 # extra headroom for tokenizer estimation drift @@ -447,22 +452,6 @@ class Consolidator: return last_boundary - def _cap_consolidation_boundary( - self, - session: Session, - end_idx: int, - ) -> int | None: - """Clamp the chunk size without breaking the user-turn boundary.""" - start = session.last_consolidated - if end_idx - start <= self._MAX_CHUNK_MESSAGES: - return end_idx - - capped_end = start + self._MAX_CHUNK_MESSAGES - for idx in range(capped_end, start, -1): - if session.messages[idx].get("role") == "user": - return idx - return None - def estimate_session_prompt_tokens( self, session: Session, @@ -486,6 +475,25 @@ class Consolidator: self._get_tool_definitions(), ) + @property + def _input_token_budget(self) -> int: + """Available input token budget for consolidation LLM.""" + return self.context_window_tokens - self.max_completion_tokens - self._SAFETY_BUFFER + + def _truncate_to_token_budget(self, text: str) -> str: + """Truncate text so it fits within the consolidation LLM's token budget.""" + budget = self._input_token_budget + if budget <= 0: + return truncate_text(text, _RAW_ARCHIVE_MAX_CHARS) + try: + enc = tiktoken.get_encoding("cl100k_base") + tokens = enc.encode(text) + if len(tokens) <= budget: + return text + return enc.decode(tokens[:budget]) + "\n... (truncated)" + except Exception: + return truncate_text(text, budget * 4) + async def archive(self, messages: list[dict]) -> str | None: """Summarize messages via LLM and append to history.jsonl. @@ -495,6 +503,7 @@ class Consolidator: return None try: formatted = MemoryStore._format_messages(messages) + formatted = self._truncate_to_token_budget(formatted) response = await self.provider.chat_with_retry( model=self.model, messages=[ @@ -536,7 +545,7 @@ class Consolidator: lock = self.get_lock(session.key) async with lock: - budget = self.context_window_tokens - self.max_completion_tokens - self._SAFETY_BUFFER + budget = self._input_token_budget target = budget // 2 try: estimated, source = self.estimate_session_prompt_tokens( @@ -575,14 +584,6 @@ class Consolidator: break end_idx = boundary[0] - end_idx = self._cap_consolidation_boundary(session, end_idx) - if end_idx is None: - logger.debug( - "Token consolidation: no capped boundary for {} (round {})", - session.key, - round_num, - ) - break chunk = session.messages[session.last_consolidated:end_idx] if not chunk: diff --git a/tests/agent/test_consolidator.py b/tests/agent/test_consolidator.py index 985bc6ade..77aee609d 100644 --- a/tests/agent/test_consolidator.py +++ b/tests/agent/test_consolidator.py @@ -4,7 +4,7 @@ import pytest import asyncio from unittest.mock import AsyncMock, MagicMock, patch -from nanobot.agent.memory import Consolidator, MemoryStore +from nanobot.agent.memory import Consolidator, MemoryStore, _RAW_ARCHIVE_MAX_CHARS @pytest.fixture @@ -117,8 +117,8 @@ class TestConsolidatorTokenBudget: await consolidator.maybe_consolidate_by_tokens(session) consolidator.archive.assert_not_called() - async def test_chunk_cap_preserves_user_turn_boundary(self, consolidator): - """Chunk cap should rewind to the last user boundary within the cap.""" + async def test_large_chunk_archived_without_cap(self, consolidator): + """Without chunk cap, the full range from pick_consolidation_boundary is archived.""" consolidator._SAFETY_BUFFER = 0 session = MagicMock() session.last_consolidated = 0 @@ -133,19 +133,19 @@ class TestConsolidatorTokenBudget: consolidator.estimate_session_prompt_tokens = MagicMock( side_effect=[(1200, "tiktoken"), (400, "tiktoken")] ) - consolidator.pick_consolidation_boundary = MagicMock(return_value=(61, 999)) + # Use real pick_consolidation_boundary — it will find boundary at idx=50 + # (user message at 50, token budget met) consolidator.archive = AsyncMock(return_value=True) await consolidator.maybe_consolidate_by_tokens(session) archived_chunk = consolidator.archive.await_args.args[0] - assert len(archived_chunk) == 50 + # pick_consolidation_boundary returns (50, tokens) — user turn at idx 50 assert archived_chunk[0]["content"] == "m0" - assert archived_chunk[-1]["content"] == "m49" - assert session.last_consolidated == 50 + assert session.last_consolidated > 0 - async def test_chunk_cap_skips_when_no_user_boundary_within_cap(self, consolidator): - """If the cap would cut mid-turn, consolidation should skip that round.""" + async def test_boundary_respected_when_no_intermediate_user_turn(self, consolidator): + """When boundary points past a long tool chain, the full chunk is archived.""" consolidator._SAFETY_BUFFER = 0 session = MagicMock() session.last_consolidated = 0 @@ -157,11 +157,76 @@ class TestConsolidatorTokenBudget: } for i in range(70) ] - consolidator.estimate_session_prompt_tokens = MagicMock(return_value=(1200, "tiktoken")) - consolidator.pick_consolidation_boundary = MagicMock(return_value=(61, 999)) + consolidator.estimate_session_prompt_tokens = MagicMock( + side_effect=[(1200, "tiktoken"), (400, "tiktoken")] + ) consolidator.archive = AsyncMock(return_value=True) await consolidator.maybe_consolidate_by_tokens(session) - consolidator.archive.assert_not_awaited() - assert session.last_consolidated == 0 + consolidator.archive.assert_awaited_once() + # pick_consolidation_boundary finds the only boundary at idx=61 + assert session.last_consolidated == 61 + + +class TestRawArchiveTruncation: + """raw_archive() must cap entry size to avoid bloating history.jsonl.""" + + def test_raw_archive_truncates_large_content(self, store): + """Large messages should be truncated to _RAW_ARCHIVE_MAX_CHARS.""" + big = "x" * 50_000 + messages = [{"role": "user", "content": big}] + store.raw_archive(messages) + entries = store.read_unprocessed_history(since_cursor=0) + assert len(entries) == 1 + assert len(entries[0]["content"]) < 50_000 + assert "[RAW]" in entries[0]["content"] + + def test_raw_archive_preserves_small_content(self, store): + """Small messages should not be truncated.""" + messages = [{"role": "user", "content": "hello"}] + store.raw_archive(messages) + entries = store.read_unprocessed_history(since_cursor=0) + assert len(entries) == 1 + assert "hello" in entries[0]["content"] + + def test_raw_archive_custom_max_chars(self, store): + """max_chars parameter should override default limit.""" + messages = [{"role": "user", "content": "a" * 200}] + store.raw_archive(messages, max_chars=100) + entries = store.read_unprocessed_history(since_cursor=0) + assert len(entries[0]["content"]) < 200 + + +class TestArchiveTruncation: + """archive() must truncate formatted text before sending to consolidation LLM.""" + + async def test_archive_truncates_large_formatted_text(self, consolidator, mock_provider, store): + """Large formatted text should be truncated to token budget before LLM call.""" + # context_window_tokens=1000, max_completion_tokens=100, _SAFETY_BUFFER=1024 + # budget = 1000 - 100 - 1024 = -124 → fallback via truncate_text(budget*4) + big_messages = [{"role": "user", "content": "x" * 100_000}] + mock_provider.chat_with_retry.return_value = MagicMock( + content="Summary of large input.", finish_reason="stop" + ) + await consolidator.archive(big_messages) + + call_args = mock_provider.chat_with_retry.call_args + user_content = call_args.kwargs["messages"][1]["content"] + # Should be significantly shorter than 100K + assert len(user_content) < 50_000 + + async def test_archive_truncates_with_small_token_budget(self, consolidator, mock_provider, store): + """Small context window: truncation uses actual tokenizer count.""" + consolidator.context_window_tokens = 500 + big_messages = [{"role": "user", "content": "word " * 50_000}] + mock_provider.chat_with_retry.return_value = MagicMock( + content="Summary.", finish_reason="stop" + ) + await consolidator.archive(big_messages) + + sent_messages = mock_provider.chat_with_retry.call_args.kwargs["messages"] + user_content = sent_messages[1]["content"] + # budget = 500 - 100 - 1024 = negative, fallback char-based + # Should be truncated + assert len(user_content) < 250_000