From ac226d66f951ccdbd294435f940224ca98d82627 Mon Sep 17 00:00:00 2001 From: 04cb <0x04cb@gmail.com> Date: Tue, 2 Jun 2026 08:12:24 +0800 Subject: [PATCH] fix(memory): serialize cursor allocation in append_history (#4081) --- nanobot/agent/memory.py | 27 ++++++++++++++++----------- tests/agent/test_memory_store.py | 27 +++++++++++++++++++++++++++ 2 files changed, 43 insertions(+), 11 deletions(-) diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index da7a7cb65..fc5480b97 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -6,6 +6,7 @@ import asyncio import json import os import re +import threading import weakref from contextlib import suppress from datetime import datetime @@ -61,6 +62,7 @@ class MemoryStore: self._dream_cursor_file = self.memory_dir / ".dream_cursor" self._corruption_logged = False # rate-limit non-int cursor warning self._oversize_logged = False # rate-limit oversized-entry warning + self._append_lock = threading.Lock() # serialize cursor allocation + append self._git = GitStore(workspace, tracked_files=[ "SOUL.md", "USER.md", "memory/MEMORY.md", "memory/.dream_cursor", ]) @@ -248,7 +250,6 @@ class MemoryStore: large writes (e.g. an LLM echoing its input back as a "summary"). """ limit = max_chars if max_chars is not None else _HISTORY_ENTRY_HARD_CAP - cursor = self._next_cursor() ts = datetime.now().strftime("%Y-%m-%d %H:%M") raw = entry.rstrip() if len(raw) > limit: @@ -262,16 +263,20 @@ class MemoryStore: ) raw = truncate_text(raw, limit) content = strip_think(raw) - if raw and not content: - logger.debug( - "history entry {} stripped to empty (likely template leak); " - "persisting empty content to avoid re-polluting context", - cursor, - ) - record = {"cursor": cursor, "timestamp": ts, "content": content} - with open(self.history_file, "a", encoding="utf-8") as f: - f.write(json.dumps(record, ensure_ascii=False) + "\n") - self._cursor_file.write_text(str(cursor), encoding="utf-8") + # Cursor allocation and the append must be atomic: concurrent writers + # could otherwise read the same current cursor and emit duplicates. + with self._append_lock: + cursor = self._next_cursor() + if raw and not content: + logger.debug( + "history entry {} stripped to empty (likely template leak); " + "persisting empty content to avoid re-polluting context", + cursor, + ) + record = {"cursor": cursor, "timestamp": ts, "content": content} + with open(self.history_file, "a", encoding="utf-8") as f: + f.write(json.dumps(record, ensure_ascii=False) + "\n") + self._cursor_file.write_text(str(cursor), encoding="utf-8") return cursor @staticmethod diff --git a/tests/agent/test_memory_store.py b/tests/agent/test_memory_store.py index 4f58e9e37..fda60b7c5 100644 --- a/tests/agent/test_memory_store.py +++ b/tests/agent/test_memory_store.py @@ -129,6 +129,33 @@ class TestHistoryWithCursor: cursor = store.append_history("new event") assert cursor == 1 + def test_append_history_allocates_unique_cursors_under_concurrent_writes(self, store): + """Regression: concurrent appends must not allocate duplicate cursors.""" + import threading + + writers = 16 + start = threading.Barrier(writers) + cursors: list[int] = [] + lock = threading.Lock() + + def worker(i): + start.wait() + c = store.append_history(f"event {i}") + with lock: + cursors.append(c) + + threads = [threading.Thread(target=worker, args=(i,)) for i in range(writers)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(cursors) == writers + assert len(set(cursors)) == writers, f"duplicate cursors: {sorted(cursors)}" + assert sorted(cursors) == list(range(1, writers + 1)) + persisted = store.read_unprocessed_history(since_cursor=0) + assert sorted(e["cursor"] for e in persisted) == list(range(1, writers + 1)) + def test_compact_history_drops_oldest(self, tmp_path): store = MemoryStore(tmp_path, max_history_entries=2) store.append_history("event 1")