mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-05 19:02:38 +00:00
250 lines
9.5 KiB
Python
250 lines
9.5 KiB
Python
"""Tests for the restructured MemoryStore — pure file I/O layer."""
|
||
|
||
from datetime import datetime
|
||
import json
|
||
from pathlib import Path
|
||
|
||
import pytest
|
||
|
||
from nanobot.agent.memory import MemoryStore
|
||
|
||
|
||
@pytest.fixture
|
||
def store(tmp_path):
|
||
return MemoryStore(tmp_path)
|
||
|
||
|
||
class TestMemoryStoreBasicIO:
|
||
def test_read_memory_returns_empty_when_missing(self, store):
|
||
assert store.read_memory() == ""
|
||
|
||
def test_write_and_read_memory(self, store):
|
||
store.write_memory("hello")
|
||
assert store.read_memory() == "hello"
|
||
|
||
def test_read_soul_returns_empty_when_missing(self, store):
|
||
assert store.read_soul() == ""
|
||
|
||
def test_write_and_read_soul(self, store):
|
||
store.write_soul("soul content")
|
||
assert store.read_soul() == "soul content"
|
||
|
||
def test_read_user_returns_empty_when_missing(self, store):
|
||
assert store.read_user() == ""
|
||
|
||
def test_write_and_read_user(self, store):
|
||
store.write_user("user content")
|
||
assert store.read_user() == "user content"
|
||
|
||
def test_get_memory_context_returns_empty_when_missing(self, store):
|
||
assert store.get_memory_context() == ""
|
||
|
||
def test_get_memory_context_returns_formatted_content(self, store):
|
||
store.write_memory("important fact")
|
||
ctx = store.get_memory_context()
|
||
assert "Long-term Memory" in ctx
|
||
assert "important fact" in ctx
|
||
|
||
|
||
class TestHistoryWithCursor:
|
||
def test_append_history_returns_cursor(self, store):
|
||
cursor = store.append_history("event 1")
|
||
assert cursor == 1
|
||
cursor2 = store.append_history("event 2")
|
||
assert cursor2 == 2
|
||
|
||
def test_append_history_includes_cursor_in_file(self, store):
|
||
store.append_history("event 1")
|
||
content = store.read_file(store.history_file)
|
||
data = json.loads(content)
|
||
assert data["cursor"] == 1
|
||
|
||
def test_cursor_persists_across_appends(self, store):
|
||
store.append_history("event 1")
|
||
store.append_history("event 2")
|
||
cursor = store.append_history("event 3")
|
||
assert cursor == 3
|
||
|
||
def test_read_unprocessed_history(self, store):
|
||
store.append_history("event 1")
|
||
store.append_history("event 2")
|
||
store.append_history("event 3")
|
||
entries = store.read_unprocessed_history(since_cursor=1)
|
||
assert len(entries) == 2
|
||
assert entries[0]["cursor"] == 2
|
||
|
||
def test_read_unprocessed_history_returns_all_when_cursor_zero(self, store):
|
||
store.append_history("event 1")
|
||
store.append_history("event 2")
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 2
|
||
|
||
def test_compact_history_drops_oldest(self, tmp_path):
|
||
store = MemoryStore(tmp_path, max_history_entries=2)
|
||
store.append_history("event 1")
|
||
store.append_history("event 2")
|
||
store.append_history("event 3")
|
||
store.append_history("event 4")
|
||
store.append_history("event 5")
|
||
store.compact_history()
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 2
|
||
assert entries[0]["cursor"] in {4, 5}
|
||
|
||
|
||
class TestDreamCursor:
|
||
def test_initial_cursor_is_zero(self, store):
|
||
assert store.get_last_dream_cursor() == 0
|
||
|
||
def test_set_and_get_cursor(self, store):
|
||
store.set_last_dream_cursor(5)
|
||
assert store.get_last_dream_cursor() == 5
|
||
|
||
def test_cursor_persists(self, store):
|
||
store.set_last_dream_cursor(3)
|
||
store2 = MemoryStore(store.workspace)
|
||
assert store2.get_last_dream_cursor() == 3
|
||
|
||
|
||
class TestLegacyHistoryMigration:
|
||
def test_read_unprocessed_history_handles_entries_without_cursor(self, store):
|
||
"""JSONL entries with cursor=1 are correctly parsed and returned."""
|
||
store.history_file.write_text(
|
||
'{"cursor": 1, "timestamp": "2026-03-30 14:30", "content": "Old event"}\n',
|
||
encoding="utf-8")
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 1
|
||
assert entries[0]["cursor"] == 1
|
||
|
||
def test_migrates_legacy_history_md_preserving_partial_entries(self, tmp_path):
|
||
memory_dir = tmp_path / "memory"
|
||
memory_dir.mkdir()
|
||
legacy_file = memory_dir / "HISTORY.md"
|
||
legacy_content = (
|
||
"[2026-04-01 10:00] User prefers dark mode.\n\n"
|
||
"[2026-04-01 10:05] [RAW] 2 messages\n"
|
||
"[2026-04-01 10:04] USER: hello\n"
|
||
"[2026-04-01 10:04] ASSISTANT: hi\n\n"
|
||
"Legacy chunk without timestamp.\n"
|
||
"Keep whatever content we can recover.\n"
|
||
)
|
||
legacy_file.write_text(legacy_content, encoding="utf-8")
|
||
|
||
store = MemoryStore(tmp_path)
|
||
fallback_timestamp = datetime.fromtimestamp(
|
||
(memory_dir / "HISTORY.md.bak").stat().st_mtime,
|
||
).strftime("%Y-%m-%d %H:%M")
|
||
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert [entry["cursor"] for entry in entries] == [1, 2, 3]
|
||
assert entries[0]["timestamp"] == "2026-04-01 10:00"
|
||
assert entries[0]["content"] == "User prefers dark mode."
|
||
assert entries[1]["timestamp"] == "2026-04-01 10:05"
|
||
assert entries[1]["content"].startswith("[RAW] 2 messages")
|
||
assert "USER: hello" in entries[1]["content"]
|
||
assert entries[2]["timestamp"] == fallback_timestamp
|
||
assert entries[2]["content"].startswith("Legacy chunk without timestamp.")
|
||
assert store.read_file(store._cursor_file).strip() == "3"
|
||
assert store.read_file(store._dream_cursor_file).strip() == "3"
|
||
assert not legacy_file.exists()
|
||
assert (memory_dir / "HISTORY.md.bak").read_text(encoding="utf-8") == legacy_content
|
||
|
||
def test_migrates_consecutive_entries_without_blank_lines(self, tmp_path):
|
||
memory_dir = tmp_path / "memory"
|
||
memory_dir.mkdir()
|
||
legacy_file = memory_dir / "HISTORY.md"
|
||
legacy_content = (
|
||
"[2026-04-01 10:00] First event.\n"
|
||
"[2026-04-01 10:01] Second event.\n"
|
||
"[2026-04-01 10:02] Third event.\n"
|
||
)
|
||
legacy_file.write_text(legacy_content, encoding="utf-8")
|
||
|
||
store = MemoryStore(tmp_path)
|
||
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 3
|
||
assert [entry["content"] for entry in entries] == [
|
||
"First event.",
|
||
"Second event.",
|
||
"Third event.",
|
||
]
|
||
|
||
def test_raw_archive_stays_single_entry_while_following_events_split(self, tmp_path):
|
||
memory_dir = tmp_path / "memory"
|
||
memory_dir.mkdir()
|
||
legacy_file = memory_dir / "HISTORY.md"
|
||
legacy_content = (
|
||
"[2026-04-01 10:05] [RAW] 2 messages\n"
|
||
"[2026-04-01 10:04] USER: hello\n"
|
||
"[2026-04-01 10:04] ASSISTANT: hi\n"
|
||
"[2026-04-01 10:06] Normal event after raw block.\n"
|
||
)
|
||
legacy_file.write_text(legacy_content, encoding="utf-8")
|
||
|
||
store = MemoryStore(tmp_path)
|
||
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 2
|
||
assert entries[0]["content"].startswith("[RAW] 2 messages")
|
||
assert "USER: hello" in entries[0]["content"]
|
||
assert entries[1]["content"] == "Normal event after raw block."
|
||
|
||
def test_nonstandard_date_headers_still_start_new_entries(self, tmp_path):
|
||
memory_dir = tmp_path / "memory"
|
||
memory_dir.mkdir()
|
||
legacy_file = memory_dir / "HISTORY.md"
|
||
legacy_content = (
|
||
"[2026-03-25–2026-04-02] Multi-day summary.\n"
|
||
"[2026-03-26/27] Cross-day summary.\n"
|
||
)
|
||
legacy_file.write_text(legacy_content, encoding="utf-8")
|
||
|
||
store = MemoryStore(tmp_path)
|
||
fallback_timestamp = datetime.fromtimestamp(
|
||
(memory_dir / "HISTORY.md.bak").stat().st_mtime,
|
||
).strftime("%Y-%m-%d %H:%M")
|
||
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 2
|
||
assert entries[0]["timestamp"] == fallback_timestamp
|
||
assert entries[0]["content"] == "[2026-03-25–2026-04-02] Multi-day summary."
|
||
assert entries[1]["timestamp"] == fallback_timestamp
|
||
assert entries[1]["content"] == "[2026-03-26/27] Cross-day summary."
|
||
|
||
def test_existing_history_jsonl_skips_legacy_migration(self, tmp_path):
|
||
memory_dir = tmp_path / "memory"
|
||
memory_dir.mkdir()
|
||
history_file = memory_dir / "history.jsonl"
|
||
history_file.write_text(
|
||
'{"cursor": 7, "timestamp": "2026-04-01 12:00", "content": "existing"}\n',
|
||
encoding="utf-8",
|
||
)
|
||
legacy_file = memory_dir / "HISTORY.md"
|
||
legacy_file.write_text("[2026-04-01 10:00] legacy\n\n", encoding="utf-8")
|
||
|
||
store = MemoryStore(tmp_path)
|
||
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 1
|
||
assert entries[0]["cursor"] == 7
|
||
assert entries[0]["content"] == "existing"
|
||
assert legacy_file.exists()
|
||
assert not (memory_dir / "HISTORY.md.bak").exists()
|
||
|
||
def test_migrates_legacy_history_with_invalid_utf8_bytes(self, tmp_path):
|
||
memory_dir = tmp_path / "memory"
|
||
memory_dir.mkdir()
|
||
legacy_file = memory_dir / "HISTORY.md"
|
||
legacy_file.write_bytes(
|
||
b"[2026-04-01 10:00] Broken \xff data still needs migration.\n\n"
|
||
)
|
||
|
||
store = MemoryStore(tmp_path)
|
||
|
||
entries = store.read_unprocessed_history(since_cursor=0)
|
||
assert len(entries) == 1
|
||
assert entries[0]["timestamp"] == "2026-04-01 10:00"
|
||
assert "Broken" in entries[0]["content"]
|
||
assert "migration." in entries[0]["content"]
|