"""Tests for the restructured MemoryStore — pure file I/O layer.""" from datetime import datetime import json from pathlib import Path import pytest from nanobot.agent.memory import MemoryStore @pytest.fixture def store(tmp_path): return MemoryStore(tmp_path) class TestMemoryStoreBasicIO: def test_read_memory_returns_empty_when_missing(self, store): assert store.read_memory() == "" def test_write_and_read_memory(self, store): store.write_memory("hello") assert store.read_memory() == "hello" def test_read_soul_returns_empty_when_missing(self, store): assert store.read_soul() == "" def test_write_and_read_soul(self, store): store.write_soul("soul content") assert store.read_soul() == "soul content" def test_read_user_returns_empty_when_missing(self, store): assert store.read_user() == "" def test_write_and_read_user(self, store): store.write_user("user content") assert store.read_user() == "user content" def test_get_memory_context_returns_empty_when_missing(self, store): assert store.get_memory_context() == "" def test_get_memory_context_returns_formatted_content(self, store): store.write_memory("important fact") ctx = store.get_memory_context() assert "Long-term Memory" in ctx assert "important fact" in ctx class TestHistoryWithCursor: def test_append_history_returns_cursor(self, store): cursor = store.append_history("event 1") assert cursor == 1 cursor2 = store.append_history("event 2") assert cursor2 == 2 def test_append_history_includes_cursor_in_file(self, store): store.append_history("event 1") content = store.read_file(store.history_file) data = json.loads(content) assert data["cursor"] == 1 def test_cursor_persists_across_appends(self, store): store.append_history("event 1") store.append_history("event 2") cursor = store.append_history("event 3") assert cursor == 3 def test_read_unprocessed_history(self, store): store.append_history("event 1") store.append_history("event 2") store.append_history("event 3") entries = store.read_unprocessed_history(since_cursor=1) assert len(entries) == 2 assert entries[0]["cursor"] == 2 def test_read_unprocessed_history_returns_all_when_cursor_zero(self, store): store.append_history("event 1") store.append_history("event 2") entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 2 def test_compact_history_drops_oldest(self, tmp_path): store = MemoryStore(tmp_path, max_history_entries=2) store.append_history("event 1") store.append_history("event 2") store.append_history("event 3") store.append_history("event 4") store.append_history("event 5") store.compact_history() entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 2 assert entries[0]["cursor"] in {4, 5} class TestDreamCursor: def test_initial_cursor_is_zero(self, store): assert store.get_last_dream_cursor() == 0 def test_set_and_get_cursor(self, store): store.set_last_dream_cursor(5) assert store.get_last_dream_cursor() == 5 def test_cursor_persists(self, store): store.set_last_dream_cursor(3) store2 = MemoryStore(store.workspace) assert store2.get_last_dream_cursor() == 3 class TestLegacyHistoryMigration: def test_read_unprocessed_history_handles_entries_without_cursor(self, store): """JSONL entries with cursor=1 are correctly parsed and returned.""" store.history_file.write_text( '{"cursor": 1, "timestamp": "2026-03-30 14:30", "content": "Old event"}\n', encoding="utf-8") entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 assert entries[0]["cursor"] == 1 def test_migrates_legacy_history_md_preserving_partial_entries(self, tmp_path): memory_dir = tmp_path / "memory" memory_dir.mkdir() legacy_file = memory_dir / "HISTORY.md" legacy_content = ( "[2026-04-01 10:00] User prefers dark mode.\n\n" "[2026-04-01 10:05] [RAW] 2 messages\n" "[2026-04-01 10:04] USER: hello\n" "[2026-04-01 10:04] ASSISTANT: hi\n\n" "Legacy chunk without timestamp.\n" "Keep whatever content we can recover.\n" ) legacy_file.write_text(legacy_content, encoding="utf-8") store = MemoryStore(tmp_path) fallback_timestamp = datetime.fromtimestamp( (memory_dir / "HISTORY.md.bak").stat().st_mtime, ).strftime("%Y-%m-%d %H:%M") entries = store.read_unprocessed_history(since_cursor=0) assert [entry["cursor"] for entry in entries] == [1, 2, 3] assert entries[0]["timestamp"] == "2026-04-01 10:00" assert entries[0]["content"] == "User prefers dark mode." assert entries[1]["timestamp"] == "2026-04-01 10:05" assert entries[1]["content"].startswith("[RAW] 2 messages") assert "USER: hello" in entries[1]["content"] assert entries[2]["timestamp"] == fallback_timestamp assert entries[2]["content"].startswith("Legacy chunk without timestamp.") assert store.read_file(store._cursor_file).strip() == "3" assert store.read_file(store._dream_cursor_file).strip() == "3" assert not legacy_file.exists() assert (memory_dir / "HISTORY.md.bak").read_text(encoding="utf-8") == legacy_content def test_migrates_consecutive_entries_without_blank_lines(self, tmp_path): memory_dir = tmp_path / "memory" memory_dir.mkdir() legacy_file = memory_dir / "HISTORY.md" legacy_content = ( "[2026-04-01 10:00] First event.\n" "[2026-04-01 10:01] Second event.\n" "[2026-04-01 10:02] Third event.\n" ) legacy_file.write_text(legacy_content, encoding="utf-8") store = MemoryStore(tmp_path) entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 3 assert [entry["content"] for entry in entries] == [ "First event.", "Second event.", "Third event.", ] def test_raw_archive_stays_single_entry_while_following_events_split(self, tmp_path): memory_dir = tmp_path / "memory" memory_dir.mkdir() legacy_file = memory_dir / "HISTORY.md" legacy_content = ( "[2026-04-01 10:05] [RAW] 2 messages\n" "[2026-04-01 10:04] USER: hello\n" "[2026-04-01 10:04] ASSISTANT: hi\n" "[2026-04-01 10:06] Normal event after raw block.\n" ) legacy_file.write_text(legacy_content, encoding="utf-8") store = MemoryStore(tmp_path) entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 2 assert entries[0]["content"].startswith("[RAW] 2 messages") assert "USER: hello" in entries[0]["content"] assert entries[1]["content"] == "Normal event after raw block." def test_nonstandard_date_headers_still_start_new_entries(self, tmp_path): memory_dir = tmp_path / "memory" memory_dir.mkdir() legacy_file = memory_dir / "HISTORY.md" legacy_content = ( "[2026-03-25–2026-04-02] Multi-day summary.\n" "[2026-03-26/27] Cross-day summary.\n" ) legacy_file.write_text(legacy_content, encoding="utf-8") store = MemoryStore(tmp_path) fallback_timestamp = datetime.fromtimestamp( (memory_dir / "HISTORY.md.bak").stat().st_mtime, ).strftime("%Y-%m-%d %H:%M") entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 2 assert entries[0]["timestamp"] == fallback_timestamp assert entries[0]["content"] == "[2026-03-25–2026-04-02] Multi-day summary." assert entries[1]["timestamp"] == fallback_timestamp assert entries[1]["content"] == "[2026-03-26/27] Cross-day summary." def test_existing_history_jsonl_skips_legacy_migration(self, tmp_path): memory_dir = tmp_path / "memory" memory_dir.mkdir() history_file = memory_dir / "history.jsonl" history_file.write_text( '{"cursor": 7, "timestamp": "2026-04-01 12:00", "content": "existing"}\n', encoding="utf-8", ) legacy_file = memory_dir / "HISTORY.md" legacy_file.write_text("[2026-04-01 10:00] legacy\n\n", encoding="utf-8") store = MemoryStore(tmp_path) entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 assert entries[0]["cursor"] == 7 assert entries[0]["content"] == "existing" assert legacy_file.exists() assert not (memory_dir / "HISTORY.md.bak").exists() def test_empty_history_jsonl_still_allows_legacy_migration(self, tmp_path): memory_dir = tmp_path / "memory" memory_dir.mkdir() history_file = memory_dir / "history.jsonl" history_file.write_text("", encoding="utf-8") legacy_file = memory_dir / "HISTORY.md" legacy_file.write_text("[2026-04-01 10:00] legacy\n\n", encoding="utf-8") store = MemoryStore(tmp_path) entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 assert entries[0]["cursor"] == 1 assert entries[0]["timestamp"] == "2026-04-01 10:00" assert entries[0]["content"] == "legacy" assert not legacy_file.exists() assert (memory_dir / "HISTORY.md.bak").exists() def test_migrates_legacy_history_with_invalid_utf8_bytes(self, tmp_path): memory_dir = tmp_path / "memory" memory_dir.mkdir() legacy_file = memory_dir / "HISTORY.md" legacy_file.write_bytes( b"[2026-04-01 10:00] Broken \xff data still needs migration.\n\n" ) store = MemoryStore(tmp_path) entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 assert entries[0]["timestamp"] == "2026-04-01 10:00" assert "Broken" in entries[0]["content"] assert "migration." in entries[0]["content"]