diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index fd233bfa3..167f02284 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -769,6 +769,74 @@ class Consolidator: # the summary injection strategy with AutoCompact._archive(). self._persist_last_summary(session, last_summary) + async def compact_idle_session( + self, + session_key: str, + max_suffix: int = 8, + ) -> str | None: + """Hard-truncate an idle session under the consolidation lock. + + Used by AutoCompact so all session mutation goes through a single + lock-protected path. Returns the summary text on success, ``None`` + if the LLM failed (raw_archive fallback), or ``""`` if there was + nothing to archive. + """ + lock = self.get_lock(session_key) + async with lock: + self.sessions.invalidate(session_key) + session = self.sessions.get_or_create(session_key) + + tail = list(session.messages[session.last_consolidated:]) + if not tail: + session.updated_at = datetime.now() + self.sessions.save(session) + return "" + + probe = Session( + key=session.key, + messages=tail.copy(), + created_at=session.created_at, + updated_at=session.updated_at, + metadata={}, + last_consolidated=0, + ) + probe.retain_recent_legal_suffix(max_suffix) + kept = probe.messages + cut = len(tail) - len(kept) + archive_msgs = tail[:cut] + + if not archive_msgs and not kept: + session.updated_at = datetime.now() + self.sessions.save(session) + return "" + + last_active = session.updated_at + summary: str | None = "" + if archive_msgs: + summary = await self.archive(archive_msgs) + + if summary and summary != "(nothing)": + session.metadata["_last_summary"] = { + "text": summary, + "last_active": last_active.isoformat(), + } + + session.messages = kept + session.last_consolidated = 0 + session.updated_at = datetime.now() + self.sessions.save(session) + + if archive_msgs: + logger.info( + "Idle-session compact for {}: archived={}, kept={}, summary={}", + session_key, + len(archive_msgs), + len(kept), + bool(summary), + ) + + return summary + # --------------------------------------------------------------------------- # Dream — heavyweight cron-scheduled memory consolidation diff --git a/tests/agent/test_consolidator.py b/tests/agent/test_consolidator.py index 64ef9a886..49888b8a1 100644 --- a/tests/agent/test_consolidator.py +++ b/tests/agent/test_consolidator.py @@ -299,6 +299,168 @@ class TestConsolidatorTokenBudget: assert session.last_consolidated == 61 +class TestCompactIdleSession: + """Tests for Consolidator.compact_idle_session — lock-protected idle truncation.""" + + @pytest.fixture + def real_consolidator(self, store, mock_provider): + """Create a Consolidator with a real SessionManager (not a mock).""" + from nanobot.session.manager import SessionManager + + sessions = SessionManager(store.workspace) + return Consolidator( + store=store, + provider=mock_provider, + model="test-model", + sessions=sessions, + context_window_tokens=1000, + build_messages=MagicMock(return_value=[]), + get_tool_definitions=MagicMock(return_value=[]), + max_completion_tokens=100, + ) + + @pytest.mark.asyncio + async def test_archives_prefix_keeps_suffix(self, real_consolidator, mock_provider): + """20 user/assistant turns → compact with max_suffix=8 → messages ≤ 8, + last_consolidated=0, _last_summary stored.""" + mock_provider.chat_with_retry.return_value = MagicMock( + content="Summary of old conversation.", finish_reason="stop" + ) + sessions = real_consolidator.sessions + session = sessions.get_or_create("cli:test") + for i in range(20): + session.add_message("user", f"user msg {i}") + session.add_message("assistant", f"assistant msg {i}") + sessions.save(session) + + result = await real_consolidator.compact_idle_session("cli:test", max_suffix=8) + assert result == "Summary of old conversation." + + reloaded = sessions.get_or_create("cli:test") + assert len(reloaded.messages) <= 8 + assert reloaded.last_consolidated == 0 + meta = reloaded.metadata.get("_last_summary") + assert meta is not None + assert meta["text"] == "Summary of old conversation." + assert "last_active" in meta + + @pytest.mark.asyncio + async def test_empty_session_refreshes_timestamp(self, real_consolidator): + """Empty session with old updated_at → refreshed after call, returns ''.""" + from datetime import datetime, timedelta + + sessions = real_consolidator.sessions + session = sessions.get_or_create("cli:empty") + old_ts = datetime.now() - timedelta(hours=2) + session.updated_at = old_ts + sessions.save(session) + + result = await real_consolidator.compact_idle_session("cli:empty") + assert result == "" + + reloaded = sessions.get_or_create("cli:empty") + assert reloaded.updated_at > old_ts + + @pytest.mark.asyncio + async def test_nothing_summary_not_stored(self, real_consolidator, mock_provider): + """LLM returns '(nothing)' → _last_summary NOT in metadata.""" + mock_provider.chat_with_retry.return_value = MagicMock( + content="(nothing)", finish_reason="stop" + ) + sessions = real_consolidator.sessions + session = sessions.get_or_create("cli:nothing") + for i in range(10): + session.add_message("user", f"u{i}") + session.add_message("assistant", f"a{i}") + sessions.save(session) + + result = await real_consolidator.compact_idle_session("cli:nothing", max_suffix=4) + assert result == "(nothing)" + + reloaded = sessions.get_or_create("cli:nothing") + assert "_last_summary" not in reloaded.metadata + + @pytest.mark.asyncio + async def test_llm_failure_still_truncates(self, real_consolidator, mock_provider, store): + """LLM raises RuntimeError → raw_archive fires, session still truncated, returns None.""" + mock_provider.chat_with_retry.side_effect = RuntimeError("LLM unavailable") + sessions = real_consolidator.sessions + session = sessions.get_or_create("cli:fail") + for i in range(10): + session.add_message("user", f"u{i}") + session.add_message("assistant", f"a{i}") + sessions.save(session) + + result = await real_consolidator.compact_idle_session("cli:fail", max_suffix=4) + assert result is None + + # raw_archive should have been called (history.jsonl gets an entry) + entries = store.read_unprocessed_history(since_cursor=0) + assert any("[RAW]" in e["content"] for e in entries) + + # Session should still be truncated + reloaded = sessions.get_or_create("cli:fail") + assert len(reloaded.messages) <= 4 + + @pytest.mark.asyncio + async def test_respects_last_consolidated(self, real_consolidator, mock_provider): + """30 turns with last_consolidated=50 → only unconsolidated tail considered.""" + mock_provider.chat_with_retry.return_value = MagicMock( + content="Tail summary.", finish_reason="stop" + ) + sessions = real_consolidator.sessions + session = sessions.get_or_create("cli:offset") + for i in range(30): + session.add_message("user", f"u{i}") + session.add_message("assistant", f"a{i}") + session.last_consolidated = 50 # Only 10 messages unconsolidated + sessions.save(session) + + result = await real_consolidator.compact_idle_session("cli:offset", max_suffix=4) + assert result == "Tail summary." + + # Verify only the unconsolidated tail was processed: + # 10 unconsolidated messages (50-59), keep suffix of 4 → archive 6 + archived_call = mock_provider.chat_with_retry.call_args + user_content = archived_call.kwargs["messages"][1]["content"] + # Should contain only tail messages, not early ones + assert "u0" not in user_content + assert "u25" in user_content or "a25" in user_content + + @pytest.mark.asyncio + async def test_acquires_consolidation_lock(self, real_consolidator, mock_provider): + """Verify lock is held during execution.""" + import asyncio + + # Use a slow LLM response to ensure the lock is held while we check + started = asyncio.Event() + + async def slow_chat(**kwargs): + started.set() + await asyncio.sleep(0.1) + return MagicMock(content="Summary.", finish_reason="stop") + + mock_provider.chat_with_retry = slow_chat + + sessions = real_consolidator.sessions + session = sessions.get_or_create("cli:lock") + for i in range(10): + session.add_message("user", f"u{i}") + session.add_message("assistant", f"a{i}") + sessions.save(session) + + lock = real_consolidator.get_lock("cli:lock") + assert not lock.locked() + + task = asyncio.ensure_future( + real_consolidator.compact_idle_session("cli:lock", max_suffix=4) + ) + await started.wait() + assert lock.locked() + await task + assert not lock.locked() + + class TestRawArchiveTruncation: """raw_archive() must cap entry size to avoid bloating history.jsonl."""