mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-11 13:43:37 +00:00
128 lines
4.8 KiB
Python
128 lines
4.8 KiB
Python
"""Tests for the lightweight Consolidator — append-only to HISTORY.md."""
|
|
|
|
import pytest
|
|
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from nanobot.agent.memory import Consolidator, MemoryStore
|
|
|
|
|
|
@pytest.fixture
|
|
def store(tmp_path):
|
|
return MemoryStore(tmp_path)
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_provider():
|
|
p = MagicMock()
|
|
p.chat_with_retry = AsyncMock()
|
|
return p
|
|
|
|
|
|
@pytest.fixture
|
|
def consolidator(store, mock_provider):
|
|
sessions = MagicMock()
|
|
sessions.save = MagicMock()
|
|
return Consolidator(
|
|
store=store,
|
|
provider=mock_provider,
|
|
model="test-model",
|
|
sessions=sessions,
|
|
context_window_tokens=1000,
|
|
build_messages=MagicMock(return_value=[]),
|
|
get_tool_definitions=MagicMock(return_value=[]),
|
|
max_completion_tokens=100,
|
|
)
|
|
|
|
|
|
class TestConsolidatorSummarize:
|
|
async def test_summarize_appends_to_history(self, consolidator, mock_provider, store):
|
|
"""Consolidator should call LLM to summarize, then append to HISTORY.md."""
|
|
mock_provider.chat_with_retry.return_value = MagicMock(
|
|
content="User fixed a bug in the auth module."
|
|
)
|
|
messages = [
|
|
{"role": "user", "content": "fix the auth bug"},
|
|
{"role": "assistant", "content": "Done, fixed the race condition."},
|
|
]
|
|
result = await consolidator.archive(messages)
|
|
assert result is True
|
|
entries = store.read_unprocessed_history(since_cursor=0)
|
|
assert len(entries) == 1
|
|
|
|
async def test_summarize_raw_dumps_on_llm_failure(self, consolidator, mock_provider, store):
|
|
"""On LLM failure, raw-dump messages to HISTORY.md."""
|
|
mock_provider.chat_with_retry.side_effect = Exception("API error")
|
|
messages = [{"role": "user", "content": "hello"}]
|
|
result = await consolidator.archive(messages)
|
|
assert result is True # always succeeds
|
|
entries = store.read_unprocessed_history(since_cursor=0)
|
|
assert len(entries) == 1
|
|
assert "[RAW]" in entries[0]["content"]
|
|
|
|
async def test_summarize_skips_empty_messages(self, consolidator):
|
|
result = await consolidator.archive([])
|
|
assert result is False
|
|
|
|
|
|
class TestConsolidatorTokenBudget:
|
|
async def test_prompt_below_threshold_does_not_consolidate(self, consolidator):
|
|
"""No consolidation when tokens are within budget."""
|
|
session = MagicMock()
|
|
session.last_consolidated = 0
|
|
session.messages = [{"role": "user", "content": "hi"}]
|
|
session.key = "test:key"
|
|
consolidator.estimate_session_prompt_tokens = MagicMock(return_value=(100, "tiktoken"))
|
|
consolidator.archive = AsyncMock(return_value=True)
|
|
await consolidator.maybe_consolidate_by_tokens(session)
|
|
consolidator.archive.assert_not_called()
|
|
|
|
async def test_chunk_cap_preserves_user_turn_boundary(self, consolidator):
|
|
"""Chunk cap should rewind to the last user boundary within the cap."""
|
|
consolidator._SAFETY_BUFFER = 0
|
|
session = MagicMock()
|
|
session.last_consolidated = 0
|
|
session.key = "test:key"
|
|
session.messages = [
|
|
{
|
|
"role": "user" if i in {0, 50, 61} else "assistant",
|
|
"content": f"m{i}",
|
|
}
|
|
for i in range(70)
|
|
]
|
|
consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
side_effect=[(1200, "tiktoken"), (400, "tiktoken")]
|
|
)
|
|
consolidator.pick_consolidation_boundary = MagicMock(return_value=(61, 999))
|
|
consolidator.archive = AsyncMock(return_value=True)
|
|
|
|
await consolidator.maybe_consolidate_by_tokens(session)
|
|
|
|
archived_chunk = consolidator.archive.await_args.args[0]
|
|
assert len(archived_chunk) == 50
|
|
assert archived_chunk[0]["content"] == "m0"
|
|
assert archived_chunk[-1]["content"] == "m49"
|
|
assert session.last_consolidated == 50
|
|
|
|
async def test_chunk_cap_skips_when_no_user_boundary_within_cap(self, consolidator):
|
|
"""If the cap would cut mid-turn, consolidation should skip that round."""
|
|
consolidator._SAFETY_BUFFER = 0
|
|
session = MagicMock()
|
|
session.last_consolidated = 0
|
|
session.key = "test:key"
|
|
session.messages = [
|
|
{
|
|
"role": "user" if i in {0, 61} else "assistant",
|
|
"content": f"m{i}",
|
|
}
|
|
for i in range(70)
|
|
]
|
|
consolidator.estimate_session_prompt_tokens = MagicMock(return_value=(1200, "tiktoken"))
|
|
consolidator.pick_consolidation_boundary = MagicMock(return_value=(61, 999))
|
|
consolidator.archive = AsyncMock(return_value=True)
|
|
|
|
await consolidator.maybe_consolidate_by_tokens(session)
|
|
|
|
consolidator.archive.assert_not_awaited()
|
|
assert session.last_consolidated == 0
|