fix(agent): prevent history.jsonl bloat from raw_archive and stuck consolidation

Root cause: when consolidation LLM fails, raw_archive() dumped full message
content (~1MB) into history.jsonl with no size limit. Since build_system_prompt()
injects history.jsonl into every system prompt, all subsequent LLM calls exceeded
the 200K context window with error 1261.

Additionally, _cap_consolidation_boundary's 60-message cap caused consolidation
to get stuck on sessions with long tool chains (200+ iterations), triggering
the raw_archive fallback in the first place.

Three-layer fix:
- Remove _cap_consolidation_boundary: let pick_consolidation_boundary drive
  chunk sizing based solely on token budget
- Truncate archive() input: use tiktoken to cap formatted text to the model's
  input token budget before sending to consolidation LLM
- Truncate raw_archive() output: cap history.jsonl entries at 16K chars
This commit is contained in:
chengyongru 2026-04-24 01:30:03 +08:00 committed by chengyongru
parent da0ebc64fb
commit 93bcb0a649
2 changed files with 108 additions and 42 deletions

View File

@ -6,6 +6,7 @@ import asyncio
import json import json
import re import re
import weakref import weakref
import tiktoken
from datetime import datetime from datetime import datetime
from pathlib import Path from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable, Iterator from typing import TYPE_CHECKING, Any, Callable, Iterator
@ -13,7 +14,7 @@ from typing import TYPE_CHECKING, Any, Callable, Iterator
from loguru import logger from loguru import logger
from nanobot.utils.prompt_templates import render_template from nanobot.utils.prompt_templates import render_template
from nanobot.utils.helpers import ensure_dir, estimate_message_tokens, estimate_prompt_tokens_chain, strip_think from nanobot.utils.helpers import ensure_dir, estimate_message_tokens, estimate_prompt_tokens_chain, strip_think, truncate_text
from nanobot.agent.runner import AgentRunSpec, AgentRunner from nanobot.agent.runner import AgentRunSpec, AgentRunner
from nanobot.agent.tools.registry import ToolRegistry from nanobot.agent.tools.registry import ToolRegistry
@ -373,11 +374,13 @@ class MemoryStore:
) )
return "\n".join(lines) return "\n".join(lines)
def raw_archive(self, messages: list[dict]) -> None: def raw_archive(self, messages: list[dict], *, max_chars: int | None = None) -> None:
"""Fallback: dump raw messages to history.jsonl without LLM summarization.""" """Fallback: dump raw messages to history.jsonl without LLM summarization."""
limit = max_chars if max_chars is not None else _RAW_ARCHIVE_MAX_CHARS
formatted = truncate_text(self._format_messages(messages), limit)
self.append_history( self.append_history(
f"[RAW] {len(messages)} messages\n" f"[RAW] {len(messages)} messages\n"
f"{self._format_messages(messages)}" f"{formatted}"
) )
logger.warning( logger.warning(
"Memory consolidation degraded: raw-archived {} messages", len(messages) "Memory consolidation degraded: raw-archived {} messages", len(messages)
@ -390,11 +393,13 @@ class MemoryStore:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_RAW_ARCHIVE_MAX_CHARS = 16_000 # cap raw_archive entries to avoid bloating history.jsonl
class Consolidator: class Consolidator:
"""Lightweight consolidation: summarizes evicted messages into history.jsonl.""" """Lightweight consolidation: summarizes evicted messages into history.jsonl."""
_MAX_CONSOLIDATION_ROUNDS = 5 _MAX_CONSOLIDATION_ROUNDS = 5
_MAX_CHUNK_MESSAGES = 60 # hard cap per consolidation round
_SAFETY_BUFFER = 1024 # extra headroom for tokenizer estimation drift _SAFETY_BUFFER = 1024 # extra headroom for tokenizer estimation drift
@ -447,22 +452,6 @@ class Consolidator:
return last_boundary return last_boundary
def _cap_consolidation_boundary(
self,
session: Session,
end_idx: int,
) -> int | None:
"""Clamp the chunk size without breaking the user-turn boundary."""
start = session.last_consolidated
if end_idx - start <= self._MAX_CHUNK_MESSAGES:
return end_idx
capped_end = start + self._MAX_CHUNK_MESSAGES
for idx in range(capped_end, start, -1):
if session.messages[idx].get("role") == "user":
return idx
return None
def estimate_session_prompt_tokens( def estimate_session_prompt_tokens(
self, self,
session: Session, session: Session,
@ -486,6 +475,25 @@ class Consolidator:
self._get_tool_definitions(), self._get_tool_definitions(),
) )
@property
def _input_token_budget(self) -> int:
"""Available input token budget for consolidation LLM."""
return self.context_window_tokens - self.max_completion_tokens - self._SAFETY_BUFFER
def _truncate_to_token_budget(self, text: str) -> str:
"""Truncate text so it fits within the consolidation LLM's token budget."""
budget = self._input_token_budget
if budget <= 0:
return truncate_text(text, _RAW_ARCHIVE_MAX_CHARS)
try:
enc = tiktoken.get_encoding("cl100k_base")
tokens = enc.encode(text)
if len(tokens) <= budget:
return text
return enc.decode(tokens[:budget]) + "\n... (truncated)"
except Exception:
return truncate_text(text, budget * 4)
async def archive(self, messages: list[dict]) -> str | None: async def archive(self, messages: list[dict]) -> str | None:
"""Summarize messages via LLM and append to history.jsonl. """Summarize messages via LLM and append to history.jsonl.
@ -495,6 +503,7 @@ class Consolidator:
return None return None
try: try:
formatted = MemoryStore._format_messages(messages) formatted = MemoryStore._format_messages(messages)
formatted = self._truncate_to_token_budget(formatted)
response = await self.provider.chat_with_retry( response = await self.provider.chat_with_retry(
model=self.model, model=self.model,
messages=[ messages=[
@ -536,7 +545,7 @@ class Consolidator:
lock = self.get_lock(session.key) lock = self.get_lock(session.key)
async with lock: async with lock:
budget = self.context_window_tokens - self.max_completion_tokens - self._SAFETY_BUFFER budget = self._input_token_budget
target = budget // 2 target = budget // 2
try: try:
estimated, source = self.estimate_session_prompt_tokens( estimated, source = self.estimate_session_prompt_tokens(
@ -575,14 +584,6 @@ class Consolidator:
break break
end_idx = boundary[0] end_idx = boundary[0]
end_idx = self._cap_consolidation_boundary(session, end_idx)
if end_idx is None:
logger.debug(
"Token consolidation: no capped boundary for {} (round {})",
session.key,
round_num,
)
break
chunk = session.messages[session.last_consolidated:end_idx] chunk = session.messages[session.last_consolidated:end_idx]
if not chunk: if not chunk:

View File

@ -4,7 +4,7 @@ import pytest
import asyncio import asyncio
from unittest.mock import AsyncMock, MagicMock, patch from unittest.mock import AsyncMock, MagicMock, patch
from nanobot.agent.memory import Consolidator, MemoryStore from nanobot.agent.memory import Consolidator, MemoryStore, _RAW_ARCHIVE_MAX_CHARS
@pytest.fixture @pytest.fixture
@ -117,8 +117,8 @@ class TestConsolidatorTokenBudget:
await consolidator.maybe_consolidate_by_tokens(session) await consolidator.maybe_consolidate_by_tokens(session)
consolidator.archive.assert_not_called() consolidator.archive.assert_not_called()
async def test_chunk_cap_preserves_user_turn_boundary(self, consolidator): async def test_large_chunk_archived_without_cap(self, consolidator):
"""Chunk cap should rewind to the last user boundary within the cap.""" """Without chunk cap, the full range from pick_consolidation_boundary is archived."""
consolidator._SAFETY_BUFFER = 0 consolidator._SAFETY_BUFFER = 0
session = MagicMock() session = MagicMock()
session.last_consolidated = 0 session.last_consolidated = 0
@ -133,19 +133,19 @@ class TestConsolidatorTokenBudget:
consolidator.estimate_session_prompt_tokens = MagicMock( consolidator.estimate_session_prompt_tokens = MagicMock(
side_effect=[(1200, "tiktoken"), (400, "tiktoken")] side_effect=[(1200, "tiktoken"), (400, "tiktoken")]
) )
consolidator.pick_consolidation_boundary = MagicMock(return_value=(61, 999)) # Use real pick_consolidation_boundary — it will find boundary at idx=50
# (user message at 50, token budget met)
consolidator.archive = AsyncMock(return_value=True) consolidator.archive = AsyncMock(return_value=True)
await consolidator.maybe_consolidate_by_tokens(session) await consolidator.maybe_consolidate_by_tokens(session)
archived_chunk = consolidator.archive.await_args.args[0] archived_chunk = consolidator.archive.await_args.args[0]
assert len(archived_chunk) == 50 # pick_consolidation_boundary returns (50, tokens) — user turn at idx 50
assert archived_chunk[0]["content"] == "m0" assert archived_chunk[0]["content"] == "m0"
assert archived_chunk[-1]["content"] == "m49" assert session.last_consolidated > 0
assert session.last_consolidated == 50
async def test_chunk_cap_skips_when_no_user_boundary_within_cap(self, consolidator): async def test_boundary_respected_when_no_intermediate_user_turn(self, consolidator):
"""If the cap would cut mid-turn, consolidation should skip that round.""" """When boundary points past a long tool chain, the full chunk is archived."""
consolidator._SAFETY_BUFFER = 0 consolidator._SAFETY_BUFFER = 0
session = MagicMock() session = MagicMock()
session.last_consolidated = 0 session.last_consolidated = 0
@ -157,11 +157,76 @@ class TestConsolidatorTokenBudget:
} }
for i in range(70) for i in range(70)
] ]
consolidator.estimate_session_prompt_tokens = MagicMock(return_value=(1200, "tiktoken")) consolidator.estimate_session_prompt_tokens = MagicMock(
consolidator.pick_consolidation_boundary = MagicMock(return_value=(61, 999)) side_effect=[(1200, "tiktoken"), (400, "tiktoken")]
)
consolidator.archive = AsyncMock(return_value=True) consolidator.archive = AsyncMock(return_value=True)
await consolidator.maybe_consolidate_by_tokens(session) await consolidator.maybe_consolidate_by_tokens(session)
consolidator.archive.assert_not_awaited() consolidator.archive.assert_awaited_once()
assert session.last_consolidated == 0 # pick_consolidation_boundary finds the only boundary at idx=61
assert session.last_consolidated == 61
class TestRawArchiveTruncation:
"""raw_archive() must cap entry size to avoid bloating history.jsonl."""
def test_raw_archive_truncates_large_content(self, store):
"""Large messages should be truncated to _RAW_ARCHIVE_MAX_CHARS."""
big = "x" * 50_000
messages = [{"role": "user", "content": big}]
store.raw_archive(messages)
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 1
assert len(entries[0]["content"]) < 50_000
assert "[RAW]" in entries[0]["content"]
def test_raw_archive_preserves_small_content(self, store):
"""Small messages should not be truncated."""
messages = [{"role": "user", "content": "hello"}]
store.raw_archive(messages)
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 1
assert "hello" in entries[0]["content"]
def test_raw_archive_custom_max_chars(self, store):
"""max_chars parameter should override default limit."""
messages = [{"role": "user", "content": "a" * 200}]
store.raw_archive(messages, max_chars=100)
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries[0]["content"]) < 200
class TestArchiveTruncation:
"""archive() must truncate formatted text before sending to consolidation LLM."""
async def test_archive_truncates_large_formatted_text(self, consolidator, mock_provider, store):
"""Large formatted text should be truncated to token budget before LLM call."""
# context_window_tokens=1000, max_completion_tokens=100, _SAFETY_BUFFER=1024
# budget = 1000 - 100 - 1024 = -124 → fallback via truncate_text(budget*4)
big_messages = [{"role": "user", "content": "x" * 100_000}]
mock_provider.chat_with_retry.return_value = MagicMock(
content="Summary of large input.", finish_reason="stop"
)
await consolidator.archive(big_messages)
call_args = mock_provider.chat_with_retry.call_args
user_content = call_args.kwargs["messages"][1]["content"]
# Should be significantly shorter than 100K
assert len(user_content) < 50_000
async def test_archive_truncates_with_small_token_budget(self, consolidator, mock_provider, store):
"""Small context window: truncation uses actual tokenizer count."""
consolidator.context_window_tokens = 500
big_messages = [{"role": "user", "content": "word " * 50_000}]
mock_provider.chat_with_retry.return_value = MagicMock(
content="Summary.", finish_reason="stop"
)
await consolidator.archive(big_messages)
sent_messages = mock_provider.chat_with_retry.call_args.kwargs["messages"]
user_content = sent_messages[1]["content"]
# budget = 500 - 100 - 1024 = negative, fallback char-based
# Should be truncated
assert len(user_content) < 250_000