Scope prompt recent history by session

Fixes #4259
This commit is contained in:
chengyongru 2026-06-10 16:00:35 +08:00 committed by Xubin Ren
parent aee656eb9f
commit bfc6febddc
6 changed files with 258 additions and 14 deletions

View File

@ -70,6 +70,8 @@ class ContextBuilder:
session_summary: str | None = None,
workspace: Path | None = None,
include_memory_recent_history: bool = True,
session_key: str | None = None,
unified_session: bool = False,
) -> str:
"""Build the system prompt from identity, bootstrap files, memory, and skills."""
root = workspace or self.workspace
@ -96,7 +98,11 @@ class ContextBuilder:
parts.append(render_template("agent/skills_section.md", skills_summary=skills_summary))
if include_memory_recent_history:
entries = self.memory.read_unprocessed_history(since_cursor=self.memory.get_last_dream_cursor())
entries = self.memory.read_recent_history_for_prompt(
since_cursor=self.memory.get_last_dream_cursor(),
session_key=session_key,
unified_session=unified_session,
)
if entries:
capped = entries[-self._MAX_RECENT_HISTORY:]
history_text = "\n".join(
@ -196,6 +202,8 @@ class ContextBuilder:
inbound_message: Any | None = None,
skip_runtime_lines: bool = False,
include_memory_recent_history: bool = True,
session_key: str | None = None,
unified_session: bool = False,
) -> list[dict[str, Any]]:
"""Build the complete message list for an LLM call."""
root = workspace or self.workspace
@ -232,6 +240,8 @@ class ContextBuilder:
session_summary=session_summary,
workspace=root,
include_memory_recent_history=include_memory_recent_history,
session_key=session_key,
unified_session=unified_session,
),
},
*history,

View File

@ -9,6 +9,7 @@ import time
from contextlib import AsyncExitStack, nullcontext, suppress
from dataclasses import dataclass, field
from enum import Enum, auto
from functools import partial
from pathlib import Path
from typing import TYPE_CHECKING, Any, Awaitable, Callable
@ -314,6 +315,7 @@ class AgentLoop:
get_tool_definitions=self.tools.get_definitions,
max_completion_tokens=provider.generation.max_tokens,
consolidation_ratio=consolidation_ratio,
unified_session=unified_session,
)
self.auto_compact = AutoCompact(
sessions=self.sessions,
@ -610,6 +612,8 @@ class AgentLoop:
runtime_state=self,
inbound_message=msg,
include_memory_recent_history=include_memory_recent_history,
session_key=session.key,
unified_session=self._unified_session,
)
async def _dispatch_command_inline(
@ -1150,6 +1154,8 @@ class AgentLoop:
runtime_state=self,
inbound_message=msg,
skip_runtime_lines=is_subagent,
session_key=key,
unified_session=self._unified_session,
)
t_wall = time.time()
final_content, _, all_msgs, stop_reason, _ = await self._run_agent_loop(
@ -1163,7 +1169,9 @@ class AgentLoop:
latency_ms = max(0, int((wall_done - t_wall) * 1000))
self._save_turn(session, all_msgs, 1 + len(history), turn_latency_ms=latency_ms)
self._runtime_events().record_turn_latency(key, latency_ms)
session.enforce_file_cap(on_archive=self.context.memory.raw_archive)
session.enforce_file_cap(
on_archive=partial(self.context.memory.raw_archive, session_key=key)
)
self._clear_runtime_checkpoint(session)
self.sessions.save(session)
self._schedule_background(
@ -1487,7 +1495,9 @@ class AgentLoop:
ctx.turn_latency_ms,
)
if not ctx.ephemeral:
ctx.session.enforce_file_cap(on_archive=self.context.memory.raw_archive)
ctx.session.enforce_file_cap(
on_archive=partial(self.context.memory.raw_archive, session_key=ctx.session_key)
)
self._schedule_background(
self.consolidator.maybe_consolidate_by_tokens(
ctx.session,

View File

@ -41,6 +41,8 @@ class MemoryStore:
"""Pure file I/O for memory files: MEMORY.md, history.jsonl, SOUL.md, USER.md."""
_DEFAULT_MAX_HISTORY = 1000
_INTERNAL_HISTORY_SESSION_PREFIXES = ("cron:", "dream:")
_INTERNAL_HISTORY_SESSION_KEYS = {"heartbeat"}
_LEGACY_ENTRY_START_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2}[^\]]*)\]\s*")
_LEGACY_TIMESTAMP_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2})\]\s*")
_LEGACY_RAW_MESSAGE_RE = re.compile(
@ -232,7 +234,13 @@ class MemoryStore:
# -- history.jsonl — append-only, JSONL format ---------------------------
def append_history(self, entry: str, *, max_chars: int | None = None) -> int:
def append_history(
self,
entry: str,
*,
max_chars: int | None = None,
session_key: str | None = None,
) -> int:
"""Append *entry* to history.jsonl and return its auto-incrementing cursor.
Entries are passed through `strip_think` to drop template-level leaks
@ -272,6 +280,8 @@ class MemoryStore:
cursor,
)
record = {"cursor": cursor, "timestamp": ts, "content": content}
if session_key:
record["session_key"] = session_key
with open(self.history_file, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
self._cursor_file.write_text(str(cursor), encoding="utf-8")
@ -322,6 +332,36 @@ class MemoryStore:
"""Return history entries with a valid cursor > *since_cursor*."""
return [e for e, c in self._iter_valid_entries() if c > since_cursor]
@classmethod
def _is_internal_history_session(cls, session_key: str | None) -> bool:
if not session_key:
return False
return (
session_key in cls._INTERNAL_HISTORY_SESSION_KEYS
or session_key.startswith(cls._INTERNAL_HISTORY_SESSION_PREFIXES)
)
def read_recent_history_for_prompt(
self,
since_cursor: int,
*,
session_key: str | None,
unified_session: bool = False,
) -> list[dict[str, Any]]:
"""Return unprocessed history entries safe to inject into a turn prompt."""
entries = self.read_unprocessed_history(since_cursor=since_cursor)
if session_key is None:
return entries
if not unified_session:
return [e for e in entries if e.get("session_key") == session_key]
return [
entry
for entry in entries
if (entry_session := entry.get("session_key")) == session_key
or not self._is_internal_history_session(entry_session)
]
def compact_history(self) -> None:
"""Drop oldest entries if the file exceeds *max_history_entries*."""
if self.max_history_entries <= 0:
@ -489,13 +529,20 @@ class MemoryStore:
)
return "\n".join(lines)
def raw_archive(self, messages: list[dict], *, max_chars: int | None = None) -> None:
def raw_archive(
self,
messages: list[dict],
*,
max_chars: int | None = None,
session_key: str | None = None,
) -> None:
"""Fallback: dump raw messages to history.jsonl without LLM summarization."""
limit = max_chars if max_chars is not None else _RAW_ARCHIVE_MAX_CHARS
formatted = truncate_text(self._format_messages(messages), limit)
self.append_history(
f"[RAW] {len(messages)} messages\n"
f"{formatted}"
f"{formatted}",
session_key=session_key,
)
logger.warning(
"Memory consolidation degraded: raw-archived {} messages", len(messages)
@ -570,6 +617,7 @@ class Consolidator:
get_tool_definitions: Callable[[], list[dict[str, Any]]],
max_completion_tokens: int = 4096,
consolidation_ratio: float = 0.5,
unified_session: bool = False,
):
self.store = store
self.provider = provider
@ -578,6 +626,7 @@ class Consolidator:
self.context_window_tokens = context_window_tokens
self.max_completion_tokens = max_completion_tokens
self.consolidation_ratio = consolidation_ratio
self.unified_session = unified_session
self._build_messages = build_messages
self._get_tool_definitions = get_tool_definitions
self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = (
@ -685,7 +734,7 @@ class Consolidator:
len(chunk),
replay_max_messages,
)
summary = await self.archive(chunk)
summary = await self.archive(chunk, session_key=session.key)
session.last_consolidated = end_idx
self.sessions.save(session)
return summary
@ -716,6 +765,8 @@ class Consolidator:
sender_id=None,
session_summary=summary,
session_metadata=session.metadata,
session_key=session.key,
unified_session=self.unified_session,
)
return estimate_prompt_tokens_chain(
self.provider,
@ -743,7 +794,12 @@ class Consolidator:
except Exception:
return truncate_text(text, budget * 4)
async def archive(self, messages: list[dict]) -> str | None:
async def archive(
self,
messages: list[dict],
*,
session_key: str | None = None,
) -> str | None:
"""Summarize messages via LLM and append to history.jsonl.
Returns the summary text on success, None if nothing to archive.
@ -771,11 +827,15 @@ class Consolidator:
if response.finish_reason == "error":
raise RuntimeError(f"LLM returned error: {response.content}")
summary = response.content or "[no summary]"
self.store.append_history(summary, max_chars=_ARCHIVE_SUMMARY_MAX_CHARS)
self.store.append_history(
summary,
max_chars=_ARCHIVE_SUMMARY_MAX_CHARS,
session_key=session_key,
)
return summary
except Exception:
logger.warning("Consolidation LLM call failed, raw-dumping to history")
self.store.raw_archive(messages)
self.store.raw_archive(messages, session_key=session_key)
return None
async def maybe_consolidate_by_tokens(
@ -858,7 +918,7 @@ class Consolidator:
source,
len(chunk),
)
summary = await self.archive(chunk)
summary = await self.archive(chunk, session_key=session.key)
# Advance the cursor either way: on success the chunk was
# summarized; on failure archive() already raw-archived it as
# a breadcrumb. Re-archiving the same chunk on the next call
@ -930,7 +990,7 @@ class Consolidator:
last_active = session.updated_at
summary: str | None = ""
if archive_msgs:
summary = await self.archive(archive_msgs)
summary = await self.archive(archive_msgs, session_key=session_key)
if summary and summary != "(nothing)":
session.metadata["_last_summary"] = {

View File

@ -63,6 +63,23 @@ class TestConsolidatorSummarize:
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 1
async def test_summarize_appends_session_key_to_history(
self,
consolidator,
mock_provider,
store,
):
mock_provider.chat_with_retry.return_value = MagicMock(
content="User fixed a bug in the auth module.",
finish_reason="stop",
)
messages = [{"role": "user", "content": "fix the auth bug"}]
await consolidator.archive(messages, session_key="telegram:chat-1")
entries = store.read_unprocessed_history(since_cursor=0)
assert entries[0]["session_key"] == "telegram:chat-1"
async def test_summarize_raw_dumps_on_llm_failure(self, consolidator, mock_provider, store):
"""On LLM failure, raw-dump messages to HISTORY.md."""
mock_provider.chat_with_retry.side_effect = Exception("API error")
@ -73,6 +90,20 @@ class TestConsolidatorSummarize:
assert len(entries) == 1
assert "[RAW]" in entries[0]["content"]
async def test_raw_dump_fallback_appends_session_key(
self,
consolidator,
mock_provider,
store,
):
mock_provider.chat_with_retry.side_effect = Exception("API error")
messages = [{"role": "user", "content": "hello"}]
await consolidator.archive(messages, session_key="slack:chat-2")
entries = store.read_unprocessed_history(since_cursor=0)
assert entries[0]["session_key"] == "slack:chat-2"
async def test_summarize_skips_empty_messages(self, consolidator):
result = await consolidator.archive([])
assert result is None
@ -370,6 +401,27 @@ class TestCompactIdleSession:
assert meta["text"] == "Summary of old conversation."
assert "last_active" in meta
@pytest.mark.asyncio
async def test_idle_compact_writes_session_key_to_history(
self,
real_consolidator,
mock_provider,
store,
):
mock_provider.chat_with_retry.return_value = MagicMock(
content="Summary of old conversation.", finish_reason="stop"
)
session = real_consolidator.sessions.get_or_create("cli:test")
for i in range(10):
session.add_message("user", f"user msg {i}")
session.add_message("assistant", f"assistant msg {i}")
real_consolidator.sessions.save(session)
await real_consolidator.compact_idle_session("cli:test", max_suffix=4)
entries = store.read_unprocessed_history(since_cursor=0)
assert entries[0]["session_key"] == "cli:test"
@pytest.mark.asyncio
async def test_empty_session_refreshes_timestamp(self, real_consolidator):
"""Empty session with old updated_at → refreshed after call, returns ''."""
@ -640,6 +692,12 @@ class TestRawArchiveTruncation:
assert len(entries) == 1
assert "hello" in entries[0]["content"]
def test_raw_archive_preserves_session_key(self, store):
messages = [{"role": "user", "content": "hello"}]
store.raw_archive(messages, session_key="websocket:chat-1")
entries = store.read_unprocessed_history(since_cursor=0)
assert entries[0]["session_key"] == "websocket:chat-1"
def test_raw_archive_custom_max_chars(self, store):
"""max_chars parameter should override default limit."""
messages = [{"role": "user", "content": "a" * 200}]

View File

@ -2,11 +2,11 @@
from __future__ import annotations
import datetime as datetime_module
import re
from datetime import datetime as real_datetime
from importlib.resources import files as pkg_files
from pathlib import Path
import datetime as datetime_module
from nanobot.agent.context import ContextBuilder
@ -156,6 +156,58 @@ def test_unprocessed_history_injected_into_system_prompt(tmp_path) -> None:
assert re.search(r"\[\d{4}-\d{2}-\d{2} \d{2}:\d{2}\]", prompt)
def test_recent_history_injection_is_session_scoped(tmp_path) -> None:
workspace = _make_workspace(tmp_path)
builder = ContextBuilder(workspace)
builder.memory.append_history("legacy entry without session")
builder.memory.append_history("telegram history", session_key="telegram:chat-1")
builder.memory.append_history("slack history", session_key="slack:chat-2")
prompt = builder.build_system_prompt(session_key="telegram:chat-1")
assert "# Recent History" in prompt
assert "telegram history" in prompt
assert "slack history" not in prompt
assert "legacy entry without session" not in prompt
def test_recent_history_injection_unified_excludes_cron_internals(tmp_path) -> None:
workspace = _make_workspace(tmp_path)
builder = ContextBuilder(workspace)
builder.memory.append_history("unified user history", session_key="unified:default")
builder.memory.append_history("channel user history", session_key="telegram:chat-1")
builder.memory.append_history("cron internal history", session_key="cron:job-1")
prompt = builder.build_system_prompt(
session_key="unified:default",
unified_session=True,
)
assert "unified user history" in prompt
assert "channel user history" in prompt
assert "cron internal history" not in prompt
def test_cron_recent_history_can_see_own_history_and_unified_context(tmp_path) -> None:
workspace = _make_workspace(tmp_path)
builder = ContextBuilder(workspace)
builder.memory.append_history("unified user history", session_key="unified:default")
builder.memory.append_history("own cron history", session_key="cron:job-1")
builder.memory.append_history("other cron history", session_key="cron:job-2")
prompt = builder.build_system_prompt(
session_key="cron:job-1",
unified_session=True,
)
assert "unified user history" in prompt
assert "own cron history" in prompt
assert "other cron history" not in prompt
def test_recent_history_capped_at_max(tmp_path) -> None:
"""Only the most recent _MAX_RECENT_HISTORY entries are injected."""
workspace = _make_workspace(tmp_path)
@ -201,7 +253,7 @@ def test_partial_dream_processing_shows_only_remainder(tmp_path) -> None:
workspace = _make_workspace(tmp_path)
builder = ContextBuilder(workspace)
c1 = builder.memory.append_history("old conversation about Python")
builder.memory.append_history("old conversation about Python")
c2 = builder.memory.append_history("old conversation about Rust")
builder.memory.append_history("recent question about Docker")
builder.memory.append_history("recent question about K8s")

View File

@ -58,6 +58,12 @@ class TestHistoryWithCursor:
data = json.loads(content)
assert data["cursor"] == 1
def test_append_history_includes_session_key_when_provided(self, store):
store.append_history("event 1", session_key="telegram:chat-1")
content = store.read_file(store.history_file)
data = json.loads(content)
assert data["session_key"] == "telegram:chat-1"
def test_cursor_persists_across_appends(self, store):
store.append_history("event 1")
store.append_history("event 2")
@ -106,6 +112,54 @@ class TestHistoryWithCursor:
entries = store.read_unprocessed_history(since_cursor=0)
assert len(entries) == 2
def test_prompt_history_filters_to_current_session(self, store):
store.append_history("legacy entry without session")
store.append_history("telegram entry", session_key="telegram:chat-1")
store.append_history("slack entry", session_key="slack:chat-2")
entries = store.read_recent_history_for_prompt(
since_cursor=0,
session_key="telegram:chat-1",
)
assert [e["content"] for e in entries] == ["telegram entry"]
assert [e["content"] for e in store.read_unprocessed_history(0)] == [
"legacy entry without session",
"telegram entry",
"slack entry",
]
def test_unified_prompt_history_excludes_internal_cron_sessions(self, store):
store.append_history("legacy entry without session")
store.append_history("unified entry", session_key="unified:default")
store.append_history("telegram entry", session_key="telegram:chat-1")
store.append_history("cron internal entry", session_key="cron:job-1")
entries = store.read_recent_history_for_prompt(
since_cursor=0,
session_key="unified:default",
unified_session=True,
)
assert [e["content"] for e in entries] == [
"legacy entry without session",
"unified entry",
"telegram entry",
]
def test_unified_cron_prompt_history_includes_own_cron_entry(self, store):
store.append_history("unified entry", session_key="unified:default")
store.append_history("other cron entry", session_key="cron:job-2")
store.append_history("own cron entry", session_key="cron:job-1")
entries = store.read_recent_history_for_prompt(
since_cursor=0,
session_key="cron:job-1",
unified_session=True,
)
assert [e["content"] for e in entries] == ["unified entry", "own cron entry"]
def test_read_unprocessed_skips_entries_without_cursor(self, store):
"""Regression: entries missing the cursor key should be silently skipped."""
store.history_file.write_text(