diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py index d89f0c927..a81b973e9 100644 --- a/nanobot/agent/context.py +++ b/nanobot/agent/context.py @@ -70,6 +70,8 @@ class ContextBuilder: session_summary: str | None = None, workspace: Path | None = None, include_memory_recent_history: bool = True, + session_key: str | None = None, + unified_session: bool = False, ) -> str: """Build the system prompt from identity, bootstrap files, memory, and skills.""" root = workspace or self.workspace @@ -96,7 +98,11 @@ class ContextBuilder: parts.append(render_template("agent/skills_section.md", skills_summary=skills_summary)) if include_memory_recent_history: - entries = self.memory.read_unprocessed_history(since_cursor=self.memory.get_last_dream_cursor()) + entries = self.memory.read_recent_history_for_prompt( + since_cursor=self.memory.get_last_dream_cursor(), + session_key=session_key, + unified_session=unified_session, + ) if entries: capped = entries[-self._MAX_RECENT_HISTORY:] history_text = "\n".join( @@ -196,6 +202,8 @@ class ContextBuilder: inbound_message: Any | None = None, skip_runtime_lines: bool = False, include_memory_recent_history: bool = True, + session_key: str | None = None, + unified_session: bool = False, ) -> list[dict[str, Any]]: """Build the complete message list for an LLM call.""" root = workspace or self.workspace @@ -232,6 +240,8 @@ class ContextBuilder: session_summary=session_summary, workspace=root, include_memory_recent_history=include_memory_recent_history, + session_key=session_key, + unified_session=unified_session, ), }, *history, diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index b1bde811c..3431237fa 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -9,6 +9,7 @@ import time from contextlib import AsyncExitStack, nullcontext, suppress from dataclasses import dataclass, field from enum import Enum, auto +from functools import partial from pathlib import Path from typing import TYPE_CHECKING, Any, Awaitable, Callable @@ -314,6 +315,7 @@ class AgentLoop: get_tool_definitions=self.tools.get_definitions, max_completion_tokens=provider.generation.max_tokens, consolidation_ratio=consolidation_ratio, + unified_session=unified_session, ) self.auto_compact = AutoCompact( sessions=self.sessions, @@ -610,6 +612,8 @@ class AgentLoop: runtime_state=self, inbound_message=msg, include_memory_recent_history=include_memory_recent_history, + session_key=session.key, + unified_session=self._unified_session, ) async def _dispatch_command_inline( @@ -1150,6 +1154,8 @@ class AgentLoop: runtime_state=self, inbound_message=msg, skip_runtime_lines=is_subagent, + session_key=key, + unified_session=self._unified_session, ) t_wall = time.time() final_content, _, all_msgs, stop_reason, _ = await self._run_agent_loop( @@ -1163,7 +1169,9 @@ class AgentLoop: latency_ms = max(0, int((wall_done - t_wall) * 1000)) self._save_turn(session, all_msgs, 1 + len(history), turn_latency_ms=latency_ms) self._runtime_events().record_turn_latency(key, latency_ms) - session.enforce_file_cap(on_archive=self.context.memory.raw_archive) + session.enforce_file_cap( + on_archive=partial(self.context.memory.raw_archive, session_key=key) + ) self._clear_runtime_checkpoint(session) self.sessions.save(session) self._schedule_background( @@ -1487,7 +1495,9 @@ class AgentLoop: ctx.turn_latency_ms, ) if not ctx.ephemeral: - ctx.session.enforce_file_cap(on_archive=self.context.memory.raw_archive) + ctx.session.enforce_file_cap( + on_archive=partial(self.context.memory.raw_archive, session_key=ctx.session_key) + ) self._schedule_background( self.consolidator.maybe_consolidate_by_tokens( ctx.session, diff --git a/nanobot/agent/memory.py b/nanobot/agent/memory.py index 5aedb511a..9ba60bb31 100644 --- a/nanobot/agent/memory.py +++ b/nanobot/agent/memory.py @@ -41,6 +41,8 @@ class MemoryStore: """Pure file I/O for memory files: MEMORY.md, history.jsonl, SOUL.md, USER.md.""" _DEFAULT_MAX_HISTORY = 1000 + _INTERNAL_HISTORY_SESSION_PREFIXES = ("cron:", "dream:") + _INTERNAL_HISTORY_SESSION_KEYS = {"heartbeat"} _LEGACY_ENTRY_START_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2}[^\]]*)\]\s*") _LEGACY_TIMESTAMP_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2})\]\s*") _LEGACY_RAW_MESSAGE_RE = re.compile( @@ -232,7 +234,13 @@ class MemoryStore: # -- history.jsonl — append-only, JSONL format --------------------------- - def append_history(self, entry: str, *, max_chars: int | None = None) -> int: + def append_history( + self, + entry: str, + *, + max_chars: int | None = None, + session_key: str | None = None, + ) -> int: """Append *entry* to history.jsonl and return its auto-incrementing cursor. Entries are passed through `strip_think` to drop template-level leaks @@ -272,6 +280,8 @@ class MemoryStore: cursor, ) record = {"cursor": cursor, "timestamp": ts, "content": content} + if session_key: + record["session_key"] = session_key with open(self.history_file, "a", encoding="utf-8") as f: f.write(json.dumps(record, ensure_ascii=False) + "\n") self._cursor_file.write_text(str(cursor), encoding="utf-8") @@ -322,6 +332,36 @@ class MemoryStore: """Return history entries with a valid cursor > *since_cursor*.""" return [e for e, c in self._iter_valid_entries() if c > since_cursor] + @classmethod + def _is_internal_history_session(cls, session_key: str | None) -> bool: + if not session_key: + return False + return ( + session_key in cls._INTERNAL_HISTORY_SESSION_KEYS + or session_key.startswith(cls._INTERNAL_HISTORY_SESSION_PREFIXES) + ) + + def read_recent_history_for_prompt( + self, + since_cursor: int, + *, + session_key: str | None, + unified_session: bool = False, + ) -> list[dict[str, Any]]: + """Return unprocessed history entries safe to inject into a turn prompt.""" + entries = self.read_unprocessed_history(since_cursor=since_cursor) + if session_key is None: + return entries + if not unified_session: + return [e for e in entries if e.get("session_key") == session_key] + + return [ + entry + for entry in entries + if (entry_session := entry.get("session_key")) == session_key + or not self._is_internal_history_session(entry_session) + ] + def compact_history(self) -> None: """Drop oldest entries if the file exceeds *max_history_entries*.""" if self.max_history_entries <= 0: @@ -489,13 +529,20 @@ class MemoryStore: ) return "\n".join(lines) - def raw_archive(self, messages: list[dict], *, max_chars: int | None = None) -> None: + def raw_archive( + self, + messages: list[dict], + *, + max_chars: int | None = None, + session_key: str | None = None, + ) -> None: """Fallback: dump raw messages to history.jsonl without LLM summarization.""" limit = max_chars if max_chars is not None else _RAW_ARCHIVE_MAX_CHARS formatted = truncate_text(self._format_messages(messages), limit) self.append_history( f"[RAW] {len(messages)} messages\n" - f"{formatted}" + f"{formatted}", + session_key=session_key, ) logger.warning( "Memory consolidation degraded: raw-archived {} messages", len(messages) @@ -570,6 +617,7 @@ class Consolidator: get_tool_definitions: Callable[[], list[dict[str, Any]]], max_completion_tokens: int = 4096, consolidation_ratio: float = 0.5, + unified_session: bool = False, ): self.store = store self.provider = provider @@ -578,6 +626,7 @@ class Consolidator: self.context_window_tokens = context_window_tokens self.max_completion_tokens = max_completion_tokens self.consolidation_ratio = consolidation_ratio + self.unified_session = unified_session self._build_messages = build_messages self._get_tool_definitions = get_tool_definitions self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = ( @@ -685,7 +734,7 @@ class Consolidator: len(chunk), replay_max_messages, ) - summary = await self.archive(chunk) + summary = await self.archive(chunk, session_key=session.key) session.last_consolidated = end_idx self.sessions.save(session) return summary @@ -716,6 +765,8 @@ class Consolidator: sender_id=None, session_summary=summary, session_metadata=session.metadata, + session_key=session.key, + unified_session=self.unified_session, ) return estimate_prompt_tokens_chain( self.provider, @@ -743,7 +794,12 @@ class Consolidator: except Exception: return truncate_text(text, budget * 4) - async def archive(self, messages: list[dict]) -> str | None: + async def archive( + self, + messages: list[dict], + *, + session_key: str | None = None, + ) -> str | None: """Summarize messages via LLM and append to history.jsonl. Returns the summary text on success, None if nothing to archive. @@ -771,11 +827,15 @@ class Consolidator: if response.finish_reason == "error": raise RuntimeError(f"LLM returned error: {response.content}") summary = response.content or "[no summary]" - self.store.append_history(summary, max_chars=_ARCHIVE_SUMMARY_MAX_CHARS) + self.store.append_history( + summary, + max_chars=_ARCHIVE_SUMMARY_MAX_CHARS, + session_key=session_key, + ) return summary except Exception: logger.warning("Consolidation LLM call failed, raw-dumping to history") - self.store.raw_archive(messages) + self.store.raw_archive(messages, session_key=session_key) return None async def maybe_consolidate_by_tokens( @@ -858,7 +918,7 @@ class Consolidator: source, len(chunk), ) - summary = await self.archive(chunk) + summary = await self.archive(chunk, session_key=session.key) # Advance the cursor either way: on success the chunk was # summarized; on failure archive() already raw-archived it as # a breadcrumb. Re-archiving the same chunk on the next call @@ -930,7 +990,7 @@ class Consolidator: last_active = session.updated_at summary: str | None = "" if archive_msgs: - summary = await self.archive(archive_msgs) + summary = await self.archive(archive_msgs, session_key=session_key) if summary and summary != "(nothing)": session.metadata["_last_summary"] = { diff --git a/tests/agent/test_consolidator.py b/tests/agent/test_consolidator.py index 028bcbedc..61ad0109b 100644 --- a/tests/agent/test_consolidator.py +++ b/tests/agent/test_consolidator.py @@ -63,6 +63,23 @@ class TestConsolidatorSummarize: entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 1 + async def test_summarize_appends_session_key_to_history( + self, + consolidator, + mock_provider, + store, + ): + mock_provider.chat_with_retry.return_value = MagicMock( + content="User fixed a bug in the auth module.", + finish_reason="stop", + ) + messages = [{"role": "user", "content": "fix the auth bug"}] + + await consolidator.archive(messages, session_key="telegram:chat-1") + + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "telegram:chat-1" + async def test_summarize_raw_dumps_on_llm_failure(self, consolidator, mock_provider, store): """On LLM failure, raw-dump messages to HISTORY.md.""" mock_provider.chat_with_retry.side_effect = Exception("API error") @@ -73,6 +90,20 @@ class TestConsolidatorSummarize: assert len(entries) == 1 assert "[RAW]" in entries[0]["content"] + async def test_raw_dump_fallback_appends_session_key( + self, + consolidator, + mock_provider, + store, + ): + mock_provider.chat_with_retry.side_effect = Exception("API error") + messages = [{"role": "user", "content": "hello"}] + + await consolidator.archive(messages, session_key="slack:chat-2") + + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "slack:chat-2" + async def test_summarize_skips_empty_messages(self, consolidator): result = await consolidator.archive([]) assert result is None @@ -370,6 +401,27 @@ class TestCompactIdleSession: assert meta["text"] == "Summary of old conversation." assert "last_active" in meta + @pytest.mark.asyncio + async def test_idle_compact_writes_session_key_to_history( + self, + real_consolidator, + mock_provider, + store, + ): + mock_provider.chat_with_retry.return_value = MagicMock( + content="Summary of old conversation.", finish_reason="stop" + ) + session = real_consolidator.sessions.get_or_create("cli:test") + for i in range(10): + session.add_message("user", f"user msg {i}") + session.add_message("assistant", f"assistant msg {i}") + real_consolidator.sessions.save(session) + + await real_consolidator.compact_idle_session("cli:test", max_suffix=4) + + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "cli:test" + @pytest.mark.asyncio async def test_empty_session_refreshes_timestamp(self, real_consolidator): """Empty session with old updated_at → refreshed after call, returns ''.""" @@ -640,6 +692,12 @@ class TestRawArchiveTruncation: assert len(entries) == 1 assert "hello" in entries[0]["content"] + def test_raw_archive_preserves_session_key(self, store): + messages = [{"role": "user", "content": "hello"}] + store.raw_archive(messages, session_key="websocket:chat-1") + entries = store.read_unprocessed_history(since_cursor=0) + assert entries[0]["session_key"] == "websocket:chat-1" + def test_raw_archive_custom_max_chars(self, store): """max_chars parameter should override default limit.""" messages = [{"role": "user", "content": "a" * 200}] diff --git a/tests/agent/test_context_prompt_cache.py b/tests/agent/test_context_prompt_cache.py index bbafd4890..ac3a83bf4 100644 --- a/tests/agent/test_context_prompt_cache.py +++ b/tests/agent/test_context_prompt_cache.py @@ -2,11 +2,11 @@ from __future__ import annotations +import datetime as datetime_module import re from datetime import datetime as real_datetime from importlib.resources import files as pkg_files from pathlib import Path -import datetime as datetime_module from nanobot.agent.context import ContextBuilder @@ -156,6 +156,58 @@ def test_unprocessed_history_injected_into_system_prompt(tmp_path) -> None: assert re.search(r"\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}\]", prompt) +def test_recent_history_injection_is_session_scoped(tmp_path) -> None: + workspace = _make_workspace(tmp_path) + builder = ContextBuilder(workspace) + + builder.memory.append_history("legacy entry without session") + builder.memory.append_history("telegram history", session_key="telegram:chat-1") + builder.memory.append_history("slack history", session_key="slack:chat-2") + + prompt = builder.build_system_prompt(session_key="telegram:chat-1") + + assert "# Recent History" in prompt + assert "telegram history" in prompt + assert "slack history" not in prompt + assert "legacy entry without session" not in prompt + + +def test_recent_history_injection_unified_excludes_cron_internals(tmp_path) -> None: + workspace = _make_workspace(tmp_path) + builder = ContextBuilder(workspace) + + builder.memory.append_history("unified user history", session_key="unified:default") + builder.memory.append_history("channel user history", session_key="telegram:chat-1") + builder.memory.append_history("cron internal history", session_key="cron:job-1") + + prompt = builder.build_system_prompt( + session_key="unified:default", + unified_session=True, + ) + + assert "unified user history" in prompt + assert "channel user history" in prompt + assert "cron internal history" not in prompt + + +def test_cron_recent_history_can_see_own_history_and_unified_context(tmp_path) -> None: + workspace = _make_workspace(tmp_path) + builder = ContextBuilder(workspace) + + builder.memory.append_history("unified user history", session_key="unified:default") + builder.memory.append_history("own cron history", session_key="cron:job-1") + builder.memory.append_history("other cron history", session_key="cron:job-2") + + prompt = builder.build_system_prompt( + session_key="cron:job-1", + unified_session=True, + ) + + assert "unified user history" in prompt + assert "own cron history" in prompt + assert "other cron history" not in prompt + + def test_recent_history_capped_at_max(tmp_path) -> None: """Only the most recent _MAX_RECENT_HISTORY entries are injected.""" workspace = _make_workspace(tmp_path) @@ -201,7 +253,7 @@ def test_partial_dream_processing_shows_only_remainder(tmp_path) -> None: workspace = _make_workspace(tmp_path) builder = ContextBuilder(workspace) - c1 = builder.memory.append_history("old conversation about Python") + builder.memory.append_history("old conversation about Python") c2 = builder.memory.append_history("old conversation about Rust") builder.memory.append_history("recent question about Docker") builder.memory.append_history("recent question about K8s") diff --git a/tests/agent/test_memory_store.py b/tests/agent/test_memory_store.py index fda60b7c5..a9b5d1003 100644 --- a/tests/agent/test_memory_store.py +++ b/tests/agent/test_memory_store.py @@ -58,6 +58,12 @@ class TestHistoryWithCursor: data = json.loads(content) assert data["cursor"] == 1 + def test_append_history_includes_session_key_when_provided(self, store): + store.append_history("event 1", session_key="telegram:chat-1") + content = store.read_file(store.history_file) + data = json.loads(content) + assert data["session_key"] == "telegram:chat-1" + def test_cursor_persists_across_appends(self, store): store.append_history("event 1") store.append_history("event 2") @@ -106,6 +112,54 @@ class TestHistoryWithCursor: entries = store.read_unprocessed_history(since_cursor=0) assert len(entries) == 2 + def test_prompt_history_filters_to_current_session(self, store): + store.append_history("legacy entry without session") + store.append_history("telegram entry", session_key="telegram:chat-1") + store.append_history("slack entry", session_key="slack:chat-2") + + entries = store.read_recent_history_for_prompt( + since_cursor=0, + session_key="telegram:chat-1", + ) + + assert [e["content"] for e in entries] == ["telegram entry"] + assert [e["content"] for e in store.read_unprocessed_history(0)] == [ + "legacy entry without session", + "telegram entry", + "slack entry", + ] + + def test_unified_prompt_history_excludes_internal_cron_sessions(self, store): + store.append_history("legacy entry without session") + store.append_history("unified entry", session_key="unified:default") + store.append_history("telegram entry", session_key="telegram:chat-1") + store.append_history("cron internal entry", session_key="cron:job-1") + + entries = store.read_recent_history_for_prompt( + since_cursor=0, + session_key="unified:default", + unified_session=True, + ) + + assert [e["content"] for e in entries] == [ + "legacy entry without session", + "unified entry", + "telegram entry", + ] + + def test_unified_cron_prompt_history_includes_own_cron_entry(self, store): + store.append_history("unified entry", session_key="unified:default") + store.append_history("other cron entry", session_key="cron:job-2") + store.append_history("own cron entry", session_key="cron:job-1") + + entries = store.read_recent_history_for_prompt( + since_cursor=0, + session_key="cron:job-1", + unified_session=True, + ) + + assert [e["content"] for e in entries] == ["unified entry", "own cron entry"] + def test_read_unprocessed_skips_entries_without_cursor(self, store): """Regression: entries missing the cursor key should be silently skipped.""" store.history_file.write_text(