mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-20 08:32:25 +00:00
feat(consolidator): add compact_idle_session method with lock-protected truncation
Add Consolidator.compact_idle_session(session_key, max_suffix=8) that performs hard-truncation of idle sessions under the per-session consolidation lock. This is the single lock-protected path for AutoCompact to use instead of modifying session state directly, fixing the race condition between AutoCompact and Consolidator. Behavior: - Acquires per-session consolidation lock - Invalidates cache and reloads fresh from disk - Splits unconsolidated tail into archive prefix and retained suffix - Archives prefix via LLM (with raw_archive fallback on failure) - Persists _last_summary in session metadata on success - Returns summary text, None on LLM failure, or '' if nothing to archive Tests: 6 new tests covering prefix archival, empty session timestamp refresh, (nothing) summary exclusion, LLM failure fallback, last_consolidated offset, and lock acquisition verification.
This commit is contained in:
parent
bf8a6e35fd
commit
48d35bd2d9
@ -769,6 +769,74 @@ class Consolidator:
|
|||||||
# the summary injection strategy with AutoCompact._archive().
|
# the summary injection strategy with AutoCompact._archive().
|
||||||
self._persist_last_summary(session, last_summary)
|
self._persist_last_summary(session, last_summary)
|
||||||
|
|
||||||
|
async def compact_idle_session(
|
||||||
|
self,
|
||||||
|
session_key: str,
|
||||||
|
max_suffix: int = 8,
|
||||||
|
) -> str | None:
|
||||||
|
"""Hard-truncate an idle session under the consolidation lock.
|
||||||
|
|
||||||
|
Used by AutoCompact so all session mutation goes through a single
|
||||||
|
lock-protected path. Returns the summary text on success, ``None``
|
||||||
|
if the LLM failed (raw_archive fallback), or ``""`` if there was
|
||||||
|
nothing to archive.
|
||||||
|
"""
|
||||||
|
lock = self.get_lock(session_key)
|
||||||
|
async with lock:
|
||||||
|
self.sessions.invalidate(session_key)
|
||||||
|
session = self.sessions.get_or_create(session_key)
|
||||||
|
|
||||||
|
tail = list(session.messages[session.last_consolidated:])
|
||||||
|
if not tail:
|
||||||
|
session.updated_at = datetime.now()
|
||||||
|
self.sessions.save(session)
|
||||||
|
return ""
|
||||||
|
|
||||||
|
probe = Session(
|
||||||
|
key=session.key,
|
||||||
|
messages=tail.copy(),
|
||||||
|
created_at=session.created_at,
|
||||||
|
updated_at=session.updated_at,
|
||||||
|
metadata={},
|
||||||
|
last_consolidated=0,
|
||||||
|
)
|
||||||
|
probe.retain_recent_legal_suffix(max_suffix)
|
||||||
|
kept = probe.messages
|
||||||
|
cut = len(tail) - len(kept)
|
||||||
|
archive_msgs = tail[:cut]
|
||||||
|
|
||||||
|
if not archive_msgs and not kept:
|
||||||
|
session.updated_at = datetime.now()
|
||||||
|
self.sessions.save(session)
|
||||||
|
return ""
|
||||||
|
|
||||||
|
last_active = session.updated_at
|
||||||
|
summary: str | None = ""
|
||||||
|
if archive_msgs:
|
||||||
|
summary = await self.archive(archive_msgs)
|
||||||
|
|
||||||
|
if summary and summary != "(nothing)":
|
||||||
|
session.metadata["_last_summary"] = {
|
||||||
|
"text": summary,
|
||||||
|
"last_active": last_active.isoformat(),
|
||||||
|
}
|
||||||
|
|
||||||
|
session.messages = kept
|
||||||
|
session.last_consolidated = 0
|
||||||
|
session.updated_at = datetime.now()
|
||||||
|
self.sessions.save(session)
|
||||||
|
|
||||||
|
if archive_msgs:
|
||||||
|
logger.info(
|
||||||
|
"Idle-session compact for {}: archived={}, kept={}, summary={}",
|
||||||
|
session_key,
|
||||||
|
len(archive_msgs),
|
||||||
|
len(kept),
|
||||||
|
bool(summary),
|
||||||
|
)
|
||||||
|
|
||||||
|
return summary
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
# ---------------------------------------------------------------------------
|
||||||
# Dream — heavyweight cron-scheduled memory consolidation
|
# Dream — heavyweight cron-scheduled memory consolidation
|
||||||
|
|||||||
@ -299,6 +299,168 @@ class TestConsolidatorTokenBudget:
|
|||||||
assert session.last_consolidated == 61
|
assert session.last_consolidated == 61
|
||||||
|
|
||||||
|
|
||||||
|
class TestCompactIdleSession:
|
||||||
|
"""Tests for Consolidator.compact_idle_session — lock-protected idle truncation."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def real_consolidator(self, store, mock_provider):
|
||||||
|
"""Create a Consolidator with a real SessionManager (not a mock)."""
|
||||||
|
from nanobot.session.manager import SessionManager
|
||||||
|
|
||||||
|
sessions = SessionManager(store.workspace)
|
||||||
|
return Consolidator(
|
||||||
|
store=store,
|
||||||
|
provider=mock_provider,
|
||||||
|
model="test-model",
|
||||||
|
sessions=sessions,
|
||||||
|
context_window_tokens=1000,
|
||||||
|
build_messages=MagicMock(return_value=[]),
|
||||||
|
get_tool_definitions=MagicMock(return_value=[]),
|
||||||
|
max_completion_tokens=100,
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_archives_prefix_keeps_suffix(self, real_consolidator, mock_provider):
|
||||||
|
"""20 user/assistant turns → compact with max_suffix=8 → messages ≤ 8,
|
||||||
|
last_consolidated=0, _last_summary stored."""
|
||||||
|
mock_provider.chat_with_retry.return_value = MagicMock(
|
||||||
|
content="Summary of old conversation.", finish_reason="stop"
|
||||||
|
)
|
||||||
|
sessions = real_consolidator.sessions
|
||||||
|
session = sessions.get_or_create("cli:test")
|
||||||
|
for i in range(20):
|
||||||
|
session.add_message("user", f"user msg {i}")
|
||||||
|
session.add_message("assistant", f"assistant msg {i}")
|
||||||
|
sessions.save(session)
|
||||||
|
|
||||||
|
result = await real_consolidator.compact_idle_session("cli:test", max_suffix=8)
|
||||||
|
assert result == "Summary of old conversation."
|
||||||
|
|
||||||
|
reloaded = sessions.get_or_create("cli:test")
|
||||||
|
assert len(reloaded.messages) <= 8
|
||||||
|
assert reloaded.last_consolidated == 0
|
||||||
|
meta = reloaded.metadata.get("_last_summary")
|
||||||
|
assert meta is not None
|
||||||
|
assert meta["text"] == "Summary of old conversation."
|
||||||
|
assert "last_active" in meta
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_empty_session_refreshes_timestamp(self, real_consolidator):
|
||||||
|
"""Empty session with old updated_at → refreshed after call, returns ''."""
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
|
||||||
|
sessions = real_consolidator.sessions
|
||||||
|
session = sessions.get_or_create("cli:empty")
|
||||||
|
old_ts = datetime.now() - timedelta(hours=2)
|
||||||
|
session.updated_at = old_ts
|
||||||
|
sessions.save(session)
|
||||||
|
|
||||||
|
result = await real_consolidator.compact_idle_session("cli:empty")
|
||||||
|
assert result == ""
|
||||||
|
|
||||||
|
reloaded = sessions.get_or_create("cli:empty")
|
||||||
|
assert reloaded.updated_at > old_ts
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_nothing_summary_not_stored(self, real_consolidator, mock_provider):
|
||||||
|
"""LLM returns '(nothing)' → _last_summary NOT in metadata."""
|
||||||
|
mock_provider.chat_with_retry.return_value = MagicMock(
|
||||||
|
content="(nothing)", finish_reason="stop"
|
||||||
|
)
|
||||||
|
sessions = real_consolidator.sessions
|
||||||
|
session = sessions.get_or_create("cli:nothing")
|
||||||
|
for i in range(10):
|
||||||
|
session.add_message("user", f"u{i}")
|
||||||
|
session.add_message("assistant", f"a{i}")
|
||||||
|
sessions.save(session)
|
||||||
|
|
||||||
|
result = await real_consolidator.compact_idle_session("cli:nothing", max_suffix=4)
|
||||||
|
assert result == "(nothing)"
|
||||||
|
|
||||||
|
reloaded = sessions.get_or_create("cli:nothing")
|
||||||
|
assert "_last_summary" not in reloaded.metadata
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_llm_failure_still_truncates(self, real_consolidator, mock_provider, store):
|
||||||
|
"""LLM raises RuntimeError → raw_archive fires, session still truncated, returns None."""
|
||||||
|
mock_provider.chat_with_retry.side_effect = RuntimeError("LLM unavailable")
|
||||||
|
sessions = real_consolidator.sessions
|
||||||
|
session = sessions.get_or_create("cli:fail")
|
||||||
|
for i in range(10):
|
||||||
|
session.add_message("user", f"u{i}")
|
||||||
|
session.add_message("assistant", f"a{i}")
|
||||||
|
sessions.save(session)
|
||||||
|
|
||||||
|
result = await real_consolidator.compact_idle_session("cli:fail", max_suffix=4)
|
||||||
|
assert result is None
|
||||||
|
|
||||||
|
# raw_archive should have been called (history.jsonl gets an entry)
|
||||||
|
entries = store.read_unprocessed_history(since_cursor=0)
|
||||||
|
assert any("[RAW]" in e["content"] for e in entries)
|
||||||
|
|
||||||
|
# Session should still be truncated
|
||||||
|
reloaded = sessions.get_or_create("cli:fail")
|
||||||
|
assert len(reloaded.messages) <= 4
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_respects_last_consolidated(self, real_consolidator, mock_provider):
|
||||||
|
"""30 turns with last_consolidated=50 → only unconsolidated tail considered."""
|
||||||
|
mock_provider.chat_with_retry.return_value = MagicMock(
|
||||||
|
content="Tail summary.", finish_reason="stop"
|
||||||
|
)
|
||||||
|
sessions = real_consolidator.sessions
|
||||||
|
session = sessions.get_or_create("cli:offset")
|
||||||
|
for i in range(30):
|
||||||
|
session.add_message("user", f"u{i}")
|
||||||
|
session.add_message("assistant", f"a{i}")
|
||||||
|
session.last_consolidated = 50 # Only 10 messages unconsolidated
|
||||||
|
sessions.save(session)
|
||||||
|
|
||||||
|
result = await real_consolidator.compact_idle_session("cli:offset", max_suffix=4)
|
||||||
|
assert result == "Tail summary."
|
||||||
|
|
||||||
|
# Verify only the unconsolidated tail was processed:
|
||||||
|
# 10 unconsolidated messages (50-59), keep suffix of 4 → archive 6
|
||||||
|
archived_call = mock_provider.chat_with_retry.call_args
|
||||||
|
user_content = archived_call.kwargs["messages"][1]["content"]
|
||||||
|
# Should contain only tail messages, not early ones
|
||||||
|
assert "u0" not in user_content
|
||||||
|
assert "u25" in user_content or "a25" in user_content
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_acquires_consolidation_lock(self, real_consolidator, mock_provider):
|
||||||
|
"""Verify lock is held during execution."""
|
||||||
|
import asyncio
|
||||||
|
|
||||||
|
# Use a slow LLM response to ensure the lock is held while we check
|
||||||
|
started = asyncio.Event()
|
||||||
|
|
||||||
|
async def slow_chat(**kwargs):
|
||||||
|
started.set()
|
||||||
|
await asyncio.sleep(0.1)
|
||||||
|
return MagicMock(content="Summary.", finish_reason="stop")
|
||||||
|
|
||||||
|
mock_provider.chat_with_retry = slow_chat
|
||||||
|
|
||||||
|
sessions = real_consolidator.sessions
|
||||||
|
session = sessions.get_or_create("cli:lock")
|
||||||
|
for i in range(10):
|
||||||
|
session.add_message("user", f"u{i}")
|
||||||
|
session.add_message("assistant", f"a{i}")
|
||||||
|
sessions.save(session)
|
||||||
|
|
||||||
|
lock = real_consolidator.get_lock("cli:lock")
|
||||||
|
assert not lock.locked()
|
||||||
|
|
||||||
|
task = asyncio.ensure_future(
|
||||||
|
real_consolidator.compact_idle_session("cli:lock", max_suffix=4)
|
||||||
|
)
|
||||||
|
await started.wait()
|
||||||
|
assert lock.locked()
|
||||||
|
await task
|
||||||
|
assert not lock.locked()
|
||||||
|
|
||||||
|
|
||||||
class TestRawArchiveTruncation:
|
class TestRawArchiveTruncation:
|
||||||
"""raw_archive() must cap entry size to avoid bloating history.jsonl."""
|
"""raw_archive() must cap entry size to avoid bloating history.jsonl."""
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user