fix(agent): eliminate race condition in auto compact summary retrieval

Make Consolidator.archive() return the summary string directly instead
of writing to history.jsonl then reading back via get_last_history_entry().
This eliminates a race condition where concurrent _archive calls for
different sessions could read each other's summaries from the shared
history file (cross-user context leak in multi-user deployments).

Also removes Consolidator.get_last_history_entry() — no longer needed.
This commit is contained in:
chengyongru 2026-04-10 18:14:14 +08:00 committed by Xubin Ren
parent 69d60e2b06
commit d03458f034
4 changed files with 31 additions and 71 deletions

View File

@ -53,9 +53,7 @@ class AutoCompact:
return
n = len(msgs)
last_active = session.updated_at
await self.consolidator.archive(msgs)
entry = self.consolidator.get_last_history_entry()
summary = (entry or {}).get("content", "")
summary = await self.consolidator.archive(msgs) or ""
if summary and summary != "(nothing)":
self._summaries[key] = (summary, last_active)
session.metadata["_last_summary"] = {"text": summary, "last_active": last_active.isoformat()}
@ -71,6 +69,8 @@ class AutoCompact:
if key in self._archiving or self._is_expired(session.updated_at):
logger.info("Auto-compact: reloading session {} (archiving={})", key, key in self._archiving)
session = self.sessions.get_or_create(key)
# Hot path: summary from in-memory dict (process hasn't restarted).
# Also clean metadata copy so stale _last_summary never leaks to disk.
entry = self._summaries.pop(key, None)
if entry:
session.metadata.pop("_last_summary", None)

View File

@ -374,10 +374,6 @@ class Consolidator:
weakref.WeakValueDictionary()
)
def get_last_history_entry(self) -> dict[str, Any] | None:
"""Return the most recent entry from history.jsonl."""
return self.store._read_last_entry()
def get_lock(self, session_key: str) -> asyncio.Lock:
"""Return the shared consolidation lock for one session."""
return self._locks.setdefault(session_key, asyncio.Lock())
@ -437,13 +433,13 @@ class Consolidator:
self._get_tool_definitions(),
)
async def archive(self, messages: list[dict]) -> bool:
async def archive(self, messages: list[dict]) -> str | None:
"""Summarize messages via LLM and append to history.jsonl.
Returns True on success (or degraded success), False if nothing to do.
Returns the summary text on success, None if nothing to archive.
"""
if not messages:
return False
return None
try:
formatted = MemoryStore._format_messages(messages)
response = await self.provider.chat_with_retry(
@ -463,11 +459,11 @@ class Consolidator:
)
summary = response.content or "[no summary]"
self.store.append_history(summary)
return True
return summary
except Exception:
logger.warning("Consolidation LLM call failed, raw-dumping to history")
self.store.raw_archive(messages)
return True
return None
async def maybe_consolidate_by_tokens(self, session: Session) -> None:
"""Loop: archive old messages until prompt fits within safe budget.

View File

@ -101,7 +101,7 @@ class TestAutoCompact:
loop.sessions.save(s2)
async def _fake_archive(messages):
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.auto_compact.check_expired(loop._schedule_background)
@ -126,7 +126,7 @@ class TestAutoCompact:
async def _fake_archive(messages):
archived_messages.extend(messages)
return True
return "Summary."
loop.consolidator.archive = _fake_archive
@ -147,12 +147,9 @@ class TestAutoCompact:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "User said hello."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
}
await loop.auto_compact._archive("cli:test")
@ -174,7 +171,7 @@ class TestAutoCompact:
async def _fake_archive(messages):
nonlocal archive_called
archive_called = True
return True
return "Summary."
loop.consolidator.archive = _fake_archive
@ -201,7 +198,7 @@ class TestAutoCompact:
async def _fake_archive(messages):
nonlocal archived_count
archived_count = len(messages)
return True
return "Summary."
loop.consolidator.archive = _fake_archive
@ -243,12 +240,9 @@ class TestAutoCompactIdleDetection:
async def _fake_archive(messages):
archived_messages.extend(messages)
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
# Simulate proactive archive completing before message arrives
await loop.auto_compact._archive("cli:test")
@ -311,7 +305,7 @@ class TestAutoCompactIdleDetection:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "Summary."
loop.consolidator.archive = _fake_archive
@ -340,12 +334,9 @@ class TestAutoCompactSystemMessages:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
# Simulate proactive archive completing before system message arrives
await loop.auto_compact._archive("cli:test")
@ -428,12 +419,9 @@ class TestAutoCompactEdgeCases:
async def _fake_archive(messages):
archived_messages.extend(messages)
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
# Simulate proactive archive completing before message arrives
await loop.auto_compact._archive("cli:test")
@ -518,12 +506,9 @@ class TestAutoCompactIntegration:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
# Simulate proactive archive completing before message arrives
await loop.auto_compact._archive("cli:test")
@ -586,12 +571,9 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
archived_messages.extend(messages)
return True
return "User chatted about old things."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User chatted about old things.",
}
await self._run_check_expired(loop)
@ -635,7 +617,7 @@ class TestProactiveAutoCompact:
archive_count += 1
started.set()
await block_forever.wait()
return True
return "Summary."
loop.consolidator.archive = _slow_archive
@ -688,7 +670,7 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_called
archive_called = True
return True
return "Summary."
loop.consolidator.archive = _fake_archive
@ -712,12 +694,9 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_count
archive_count += 1
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
# First tick: archives the session
await self._run_check_expired(loop)
@ -741,7 +720,7 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_count
archive_count += 1
return True
return "Summary."
loop.consolidator.archive = _fake_archive
@ -769,12 +748,9 @@ class TestProactiveAutoCompact:
async def _fake_archive(messages):
nonlocal archive_count
archive_count += 1
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
# First compact cycle
await loop.auto_compact._archive("cli:test")
@ -810,12 +786,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "User said hello."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
}
await loop.auto_compact._archive("cli:test")
@ -839,12 +812,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "User said hello."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "User said hello.",
}
# Archive
await loop.auto_compact._archive("cli:test")
@ -874,12 +844,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
await loop.auto_compact._archive("cli:test")
@ -908,12 +875,9 @@ class TestSummaryPersistence:
loop.sessions.save(session)
async def _fake_archive(messages):
return True
return "Summary."
loop.consolidator.archive = _fake_archive
loop.consolidator.get_last_history_entry = lambda: {
"cursor": 1, "timestamp": "2026-01-01 00:00", "content": "Summary.",
}
await loop.auto_compact._archive("cli:test")

View File

@ -46,7 +46,7 @@ class TestConsolidatorSummarize:
{"role": "assistant", "content": "Done, fixed the race condition."},
]
result = await consolidator.archive(messages)
assert result is True
assert result == "User fixed a bug in the auth module."
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 1
@ -55,14 +55,14 @@ class TestConsolidatorSummarize:
mock_provider.chat_with_retry.side_effect = Exception("API error")
messages = [{"role": "user", "content": "hello"}]
result = await consolidator.archive(messages)
assert result is True # always succeeds
assert result is None # no summary on raw dump fallback
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 1
assert "[RAW]" in entries[0]["content"]
async def test_summarize_skips_empty_messages(self, consolidator):
result = await consolidator.archive([])
assert result is False
assert result is None
class TestConsolidatorTokenBudget: