nanobot/nanobot/agent/memory.py
chengyongru 35f3084c03 feat(dream): per-line age annotations + dedup-aware prompt + max_iter=15
Three improvements to Dream's memory consolidation:

1. Per-line git-blame age annotations: MEMORY.md lines get `← Nd` suffixes
   (N>14) from dulwich annotate. SOUL.md/USER.md excluded as permanent.
   LLM uses content judgment, not just age, to decide what to prune.

2. Dedup-aware Phase 1 prompt: reframed as dual-task (extract facts +
   deduplicate existing files) with explicit redundancy patterns to scan for.
   Validated through 20 experiments (exp-002 prompt + max_iter=15 was best,
   averaging -1643 chars/5.4% compression per run).

3. Phase 1 analysis as commit body: dream git commits now include the full
   Phase 1 analysis for transparency via /dream-log.

4. max_iterations raised from 10 to 15: 30% improvement over 10 with no
   risk; 20 showed diminishing returns (exp-020: -701 vs exp-017: -1643).
2026-04-17 13:45:38 +08:00

804 lines
31 KiB
Python

"""Memory system: pure file I/O store, lightweight Consolidator, and Dream processor."""
from __future__ import annotations
import asyncio
import json
import re
import weakref
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING, Any, Callable
from loguru import logger
from nanobot.utils.prompt_templates import render_template
from nanobot.utils.helpers import ensure_dir, estimate_message_tokens, estimate_prompt_tokens_chain, strip_think
from nanobot.agent.runner import AgentRunSpec, AgentRunner
from nanobot.agent.tools.registry import ToolRegistry
from nanobot.utils.gitstore import GitStore
if TYPE_CHECKING:
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session, SessionManager
# ---------------------------------------------------------------------------
# MemoryStore — pure file I/O layer
# ---------------------------------------------------------------------------
class MemoryStore:
"""Pure file I/O for memory files: MEMORY.md, history.jsonl, SOUL.md, USER.md."""
_DEFAULT_MAX_HISTORY = 1000
_LEGACY_ENTRY_START_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2}[^\]]*)\]\s*")
_LEGACY_TIMESTAMP_RE = re.compile(r"^\[(\d{4}-\d{2}-\d{2} \d{2}:\d{2})\]\s*")
_LEGACY_RAW_MESSAGE_RE = re.compile(
r"^\[\d{4}-\d{2}-\d{2}[^\]]*\]\s+[A-Z][A-Z0-9_]*(?:\s+\[tools:\s*[^\]]+\])?:"
)
def __init__(self, workspace: Path, max_history_entries: int = _DEFAULT_MAX_HISTORY):
self.workspace = workspace
self.max_history_entries = max_history_entries
self.memory_dir = ensure_dir(workspace / "memory")
self.memory_file = self.memory_dir / "MEMORY.md"
self.history_file = self.memory_dir / "history.jsonl"
self.legacy_history_file = self.memory_dir / "HISTORY.md"
self.soul_file = workspace / "SOUL.md"
self.user_file = workspace / "USER.md"
self._cursor_file = self.memory_dir / ".cursor"
self._dream_cursor_file = self.memory_dir / ".dream_cursor"
self._git = GitStore(workspace, tracked_files=[
"SOUL.md", "USER.md", "memory/MEMORY.md",
])
self._maybe_migrate_legacy_history()
@property
def git(self) -> GitStore:
return self._git
# -- generic helpers -----------------------------------------------------
@staticmethod
def read_file(path: Path) -> str:
try:
return path.read_text(encoding="utf-8")
except FileNotFoundError:
return ""
def _maybe_migrate_legacy_history(self) -> None:
"""One-time upgrade from legacy HISTORY.md to history.jsonl.
The migration is best-effort and prioritizes preserving as much content
as possible over perfect parsing.
"""
if not self.legacy_history_file.exists():
return
if self.history_file.exists() and self.history_file.stat().st_size > 0:
return
try:
legacy_text = self.legacy_history_file.read_text(
encoding="utf-8",
errors="replace",
)
except OSError:
logger.exception("Failed to read legacy HISTORY.md for migration")
return
entries = self._parse_legacy_history(legacy_text)
try:
if entries:
self._write_entries(entries)
last_cursor = entries[-1]["cursor"]
self._cursor_file.write_text(str(last_cursor), encoding="utf-8")
# Default to "already processed" so upgrades do not replay the
# user's entire historical archive into Dream on first start.
self._dream_cursor_file.write_text(str(last_cursor), encoding="utf-8")
backup_path = self._next_legacy_backup_path()
self.legacy_history_file.replace(backup_path)
logger.info(
"Migrated legacy HISTORY.md to history.jsonl ({} entries)",
len(entries),
)
except Exception:
logger.exception("Failed to migrate legacy HISTORY.md")
def _parse_legacy_history(self, text: str) -> list[dict[str, Any]]:
normalized = text.replace("\r\n", "\n").replace("\r", "\n").strip()
if not normalized:
return []
fallback_timestamp = self._legacy_fallback_timestamp()
entries: list[dict[str, Any]] = []
chunks = self._split_legacy_history_chunks(normalized)
for cursor, chunk in enumerate(chunks, start=1):
timestamp = fallback_timestamp
content = chunk
match = self._LEGACY_TIMESTAMP_RE.match(chunk)
if match:
timestamp = match.group(1)
remainder = chunk[match.end():].lstrip()
if remainder:
content = remainder
entries.append({
"cursor": cursor,
"timestamp": timestamp,
"content": content,
})
return entries
def _split_legacy_history_chunks(self, text: str) -> list[str]:
lines = text.split("\n")
chunks: list[str] = []
current: list[str] = []
saw_blank_separator = False
for line in lines:
if saw_blank_separator and line.strip() and current:
chunks.append("\n".join(current).strip())
current = [line]
saw_blank_separator = False
continue
if self._should_start_new_legacy_chunk(line, current):
chunks.append("\n".join(current).strip())
current = [line]
saw_blank_separator = False
continue
current.append(line)
saw_blank_separator = not line.strip()
if current:
chunks.append("\n".join(current).strip())
return [chunk for chunk in chunks if chunk]
def _should_start_new_legacy_chunk(self, line: str, current: list[str]) -> bool:
if not current:
return False
if not self._LEGACY_ENTRY_START_RE.match(line):
return False
if self._is_raw_legacy_chunk(current) and self._LEGACY_RAW_MESSAGE_RE.match(line):
return False
return True
def _is_raw_legacy_chunk(self, lines: list[str]) -> bool:
first_nonempty = next((line for line in lines if line.strip()), "")
match = self._LEGACY_TIMESTAMP_RE.match(first_nonempty)
if not match:
return False
return first_nonempty[match.end():].lstrip().startswith("[RAW]")
def _legacy_fallback_timestamp(self) -> str:
try:
return datetime.fromtimestamp(
self.legacy_history_file.stat().st_mtime,
).strftime("%Y-%m-%d %H:%M")
except OSError:
return datetime.now().strftime("%Y-%m-%d %H:%M")
def _next_legacy_backup_path(self) -> Path:
candidate = self.memory_dir / "HISTORY.md.bak"
suffix = 2
while candidate.exists():
candidate = self.memory_dir / f"HISTORY.md.bak.{suffix}"
suffix += 1
return candidate
# -- MEMORY.md (long-term facts) -----------------------------------------
def read_memory(self) -> str:
return self.read_file(self.memory_file)
def write_memory(self, content: str) -> None:
self.memory_file.write_text(content, encoding="utf-8")
# -- SOUL.md -------------------------------------------------------------
def read_soul(self) -> str:
return self.read_file(self.soul_file)
def write_soul(self, content: str) -> None:
self.soul_file.write_text(content, encoding="utf-8")
# -- USER.md -------------------------------------------------------------
def read_user(self) -> str:
return self.read_file(self.user_file)
def write_user(self, content: str) -> None:
self.user_file.write_text(content, encoding="utf-8")
# -- context injection (used by context.py) ------------------------------
def get_memory_context(self) -> str:
long_term = self.read_memory()
return f"## Long-term Memory\n{long_term}" if long_term else ""
# -- history.jsonl — append-only, JSONL format ---------------------------
def append_history(self, entry: str) -> int:
"""Append *entry* to history.jsonl and return its auto-incrementing cursor."""
cursor = self._next_cursor()
ts = datetime.now().strftime("%Y-%m-%d %H:%M")
record = {"cursor": cursor, "timestamp": ts, "content": strip_think(entry.rstrip()) or entry.rstrip()}
with open(self.history_file, "a", encoding="utf-8") as f:
f.write(json.dumps(record, ensure_ascii=False) + "\n")
self._cursor_file.write_text(str(cursor), encoding="utf-8")
return cursor
def _next_cursor(self) -> int:
"""Read the current cursor counter and return next value."""
if self._cursor_file.exists():
try:
return int(self._cursor_file.read_text(encoding="utf-8").strip()) + 1
except (ValueError, OSError):
pass
# Fallback: read last line's cursor from the JSONL file.
last = self._read_last_entry()
if last and last.get("cursor"):
return last["cursor"] + 1
return 1
def read_unprocessed_history(self, since_cursor: int) -> list[dict[str, Any]]:
"""Return history entries with cursor > *since_cursor*."""
return [e for e in self._read_entries() if e.get("cursor", 0) > since_cursor]
def compact_history(self) -> None:
"""Drop oldest entries if the file exceeds *max_history_entries*."""
if self.max_history_entries <= 0:
return
entries = self._read_entries()
if len(entries) <= self.max_history_entries:
return
kept = entries[-self.max_history_entries:]
self._write_entries(kept)
# -- JSONL helpers -------------------------------------------------------
def _read_entries(self) -> list[dict[str, Any]]:
"""Read all entries from history.jsonl."""
entries: list[dict[str, Any]] = []
try:
with open(self.history_file, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
entries.append(json.loads(line))
except json.JSONDecodeError:
continue
except FileNotFoundError:
pass
return entries
def _read_last_entry(self) -> dict[str, Any] | None:
"""Read the last entry from the JSONL file efficiently."""
try:
with open(self.history_file, "rb") as f:
f.seek(0, 2)
size = f.tell()
if size == 0:
return None
read_size = min(size, 4096)
f.seek(size - read_size)
data = f.read().decode("utf-8")
lines = [l for l in data.split("\n") if l.strip()]
if not lines:
return None
return json.loads(lines[-1])
except (FileNotFoundError, json.JSONDecodeError, UnicodeDecodeError):
return None
def _write_entries(self, entries: list[dict[str, Any]]) -> None:
"""Overwrite history.jsonl with the given entries."""
with open(self.history_file, "w", encoding="utf-8") as f:
for entry in entries:
f.write(json.dumps(entry, ensure_ascii=False) + "\n")
# -- dream cursor --------------------------------------------------------
def get_last_dream_cursor(self) -> int:
if self._dream_cursor_file.exists():
try:
return int(self._dream_cursor_file.read_text(encoding="utf-8").strip())
except (ValueError, OSError):
pass
return 0
def set_last_dream_cursor(self, cursor: int) -> None:
self._dream_cursor_file.write_text(str(cursor), encoding="utf-8")
# -- message formatting utility ------------------------------------------
@staticmethod
def _format_messages(messages: list[dict]) -> str:
lines = []
for message in messages:
if not message.get("content"):
continue
tools = f" [tools: {', '.join(message['tools_used'])}]" if message.get("tools_used") else ""
lines.append(
f"[{message.get('timestamp', '?')[:16]}] {message['role'].upper()}{tools}: {message['content']}"
)
return "\n".join(lines)
def raw_archive(self, messages: list[dict]) -> None:
"""Fallback: dump raw messages to history.jsonl without LLM summarization."""
self.append_history(
f"[RAW] {len(messages)} messages\n"
f"{self._format_messages(messages)}"
)
logger.warning(
"Memory consolidation degraded: raw-archived {} messages", len(messages)
)
# ---------------------------------------------------------------------------
# Consolidator — lightweight token-budget triggered consolidation
# ---------------------------------------------------------------------------
class Consolidator:
"""Lightweight consolidation: summarizes evicted messages into history.jsonl."""
_MAX_CONSOLIDATION_ROUNDS = 5
_MAX_CHUNK_MESSAGES = 60 # hard cap per consolidation round
_SAFETY_BUFFER = 1024 # extra headroom for tokenizer estimation drift
def __init__(
self,
store: MemoryStore,
provider: LLMProvider,
model: str,
sessions: SessionManager,
context_window_tokens: int,
build_messages: Callable[..., list[dict[str, Any]]],
get_tool_definitions: Callable[[], list[dict[str, Any]]],
max_completion_tokens: int = 4096,
):
self.store = store
self.provider = provider
self.model = model
self.sessions = sessions
self.context_window_tokens = context_window_tokens
self.max_completion_tokens = max_completion_tokens
self._build_messages = build_messages
self._get_tool_definitions = get_tool_definitions
self._locks: weakref.WeakValueDictionary[str, asyncio.Lock] = (
weakref.WeakValueDictionary()
)
def get_lock(self, session_key: str) -> asyncio.Lock:
"""Return the shared consolidation lock for one session."""
return self._locks.setdefault(session_key, asyncio.Lock())
def pick_consolidation_boundary(
self,
session: Session,
tokens_to_remove: int,
) -> tuple[int, int] | None:
"""Pick a user-turn boundary that removes enough old prompt tokens."""
start = session.last_consolidated
if start >= len(session.messages) or tokens_to_remove <= 0:
return None
removed_tokens = 0
last_boundary: tuple[int, int] | None = None
for idx in range(start, len(session.messages)):
message = session.messages[idx]
if idx > start and message.get("role") == "user":
last_boundary = (idx, removed_tokens)
if removed_tokens >= tokens_to_remove:
return last_boundary
removed_tokens += estimate_message_tokens(message)
return last_boundary
def _cap_consolidation_boundary(
self,
session: Session,
end_idx: int,
) -> int | None:
"""Clamp the chunk size without breaking the user-turn boundary."""
start = session.last_consolidated
if end_idx - start <= self._MAX_CHUNK_MESSAGES:
return end_idx
capped_end = start + self._MAX_CHUNK_MESSAGES
for idx in range(capped_end, start, -1):
if session.messages[idx].get("role") == "user":
return idx
return None
def estimate_session_prompt_tokens(self, session: Session) -> tuple[int, str]:
"""Estimate current prompt size for the normal session history view."""
history = session.get_history(max_messages=0)
channel, chat_id = (session.key.split(":", 1) if ":" in session.key else (None, None))
probe_messages = self._build_messages(
history=history,
current_message="[token-probe]",
channel=channel,
chat_id=chat_id,
)
return estimate_prompt_tokens_chain(
self.provider,
self.model,
probe_messages,
self._get_tool_definitions(),
)
async def archive(self, messages: list[dict]) -> str | None:
"""Summarize messages via LLM and append to history.jsonl.
Returns the summary text on success, None if nothing to archive.
"""
if not messages:
return None
try:
formatted = MemoryStore._format_messages(messages)
response = await self.provider.chat_with_retry(
model=self.model,
messages=[
{
"role": "system",
"content": render_template(
"agent/consolidator_archive.md",
strip=True,
),
},
{"role": "user", "content": formatted},
],
tools=None,
tool_choice=None,
)
summary = response.content or "[no summary]"
self.store.append_history(summary)
return summary
except Exception:
logger.warning("Consolidation LLM call failed, raw-dumping to history")
self.store.raw_archive(messages)
return None
async def maybe_consolidate_by_tokens(self, session: Session) -> None:
"""Loop: archive old messages until prompt fits within safe budget.
The budget reserves space for completion tokens and a safety buffer
so the LLM request never exceeds the context window.
"""
if not session.messages or self.context_window_tokens <= 0:
return
lock = self.get_lock(session.key)
async with lock:
budget = self.context_window_tokens - self.max_completion_tokens - self._SAFETY_BUFFER
target = budget // 2
try:
estimated, source = self.estimate_session_prompt_tokens(session)
except Exception:
logger.exception("Token estimation failed for {}", session.key)
estimated, source = 0, "error"
if estimated <= 0:
return
if estimated < budget:
unconsolidated_count = len(session.messages) - session.last_consolidated
logger.debug(
"Token consolidation idle {}: {}/{} via {}, msgs={}",
session.key,
estimated,
self.context_window_tokens,
source,
unconsolidated_count,
)
return
for round_num in range(self._MAX_CONSOLIDATION_ROUNDS):
if estimated <= target:
return
boundary = self.pick_consolidation_boundary(session, max(1, estimated - target))
if boundary is None:
logger.debug(
"Token consolidation: no safe boundary for {} (round {})",
session.key,
round_num,
)
return
end_idx = boundary[0]
end_idx = self._cap_consolidation_boundary(session, end_idx)
if end_idx is None:
logger.debug(
"Token consolidation: no capped boundary for {} (round {})",
session.key,
round_num,
)
return
chunk = session.messages[session.last_consolidated:end_idx]
if not chunk:
return
logger.info(
"Token consolidation round {} for {}: {}/{} via {}, chunk={} msgs",
round_num,
session.key,
estimated,
self.context_window_tokens,
source,
len(chunk),
)
if not await self.archive(chunk):
return
session.last_consolidated = end_idx
self.sessions.save(session)
try:
estimated, source = self.estimate_session_prompt_tokens(session)
except Exception:
logger.exception("Token estimation failed for {}", session.key)
estimated, source = 0, "error"
if estimated <= 0:
return
# ---------------------------------------------------------------------------
# Dream — heavyweight cron-scheduled memory consolidation
# ---------------------------------------------------------------------------
class Dream:
"""Two-phase memory processor: analyze history.jsonl, then edit files via AgentRunner.
Phase 1 produces an analysis summary (plain LLM call).
Phase 2 delegates to AgentRunner with read_file / edit_file tools so the
LLM can make targeted, incremental edits instead of replacing entire files.
"""
def __init__(
self,
store: MemoryStore,
provider: LLMProvider,
model: str,
max_batch_size: int = 20,
max_iterations: int = 10,
max_tool_result_chars: int = 16_000,
):
self.store = store
self.provider = provider
self.model = model
self.max_batch_size = max_batch_size
self.max_iterations = max_iterations
self.max_tool_result_chars = max_tool_result_chars
self._runner = AgentRunner(provider)
self._tools = self._build_tools()
# -- tool registry -------------------------------------------------------
def _build_tools(self) -> ToolRegistry:
"""Build a minimal tool registry for the Dream agent."""
from nanobot.agent.skills import BUILTIN_SKILLS_DIR
from nanobot.agent.tools.filesystem import EditFileTool, ReadFileTool, WriteFileTool
tools = ToolRegistry()
workspace = self.store.workspace
# Allow reading builtin skills for reference during skill creation
extra_read = [BUILTIN_SKILLS_DIR] if BUILTIN_SKILLS_DIR.exists() else None
tools.register(ReadFileTool(
workspace=workspace,
allowed_dir=workspace,
extra_allowed_dirs=extra_read,
))
tools.register(EditFileTool(workspace=workspace, allowed_dir=workspace))
# write_file resolves relative paths from workspace root, but can only
# write under skills/ so the prompt can safely use skills/<name>/SKILL.md.
skills_dir = workspace / "skills"
skills_dir.mkdir(parents=True, exist_ok=True)
tools.register(WriteFileTool(workspace=workspace, allowed_dir=skills_dir))
return tools
# -- skill listing --------------------------------------------------------
def _list_existing_skills(self) -> list[str]:
"""List existing skills as 'name — description' for dedup context."""
import re as _re
from nanobot.agent.skills import BUILTIN_SKILLS_DIR
_DESC_RE = _re.compile(r"^description:\s*(.+)$", _re.MULTILINE | _re.IGNORECASE)
entries: dict[str, str] = {}
for base in (self.store.workspace / "skills", BUILTIN_SKILLS_DIR):
if not base.exists():
continue
for d in base.iterdir():
if not d.is_dir():
continue
skill_md = d / "SKILL.md"
if not skill_md.exists():
continue
# Prefer workspace skills over builtin (same name)
if d.name in entries and base == BUILTIN_SKILLS_DIR:
continue
content = skill_md.read_text(encoding="utf-8")[:500]
m = _DESC_RE.search(content)
desc = m.group(1).strip() if m else "(no description)"
entries[d.name] = desc
return [f"{name}{desc}" for name, desc in sorted(entries.items())]
# -- main entry ----------------------------------------------------------
def _annotate_with_ages(self, content: str) -> str:
"""Append per-line age suffixes to MEMORY.md content.
Each non-blank line gets a suffix like ``← 30d`` indicating how
many days since it was last modified. Lines ≤14 days old get no
suffix. Returns the original content unchanged if git is unavailable.
SOUL.md and USER.md are never annotated.
"""
file_path = "memory/MEMORY.md"
try:
ages = self.store.git.line_ages(file_path)
except Exception:
logger.debug("line_ages failed for {}", file_path)
return content
if not ages:
return content
had_trailing = content.endswith("\n")
lines = content.splitlines()
annotated: list[str] = []
for i, line in enumerate(lines):
if not line.strip() or i >= len(ages):
annotated.append(line)
continue
d = ages[i].age_days
if d > 14:
annotated.append(f"{line} \u2190 {d}d")
else:
annotated.append(line)
result = "\n".join(annotated)
if had_trailing:
result += "\n"
return result
async def run(self) -> bool:
"""Process unprocessed history entries. Returns True if work was done."""
from nanobot.agent.skills import BUILTIN_SKILLS_DIR
last_cursor = self.store.get_last_dream_cursor()
entries = self.store.read_unprocessed_history(since_cursor=last_cursor)
if not entries:
return False
batch = entries[: self.max_batch_size]
logger.info(
"Dream: processing {} entries (cursor {}{}), batch={}",
len(entries), last_cursor, batch[-1]["cursor"], len(batch),
)
# Build history text for LLM
history_text = "\n".join(
f"[{e['timestamp']}] {e['content']}" for e in batch
)
# Current file contents + per-line age annotations
current_date = datetime.now().strftime("%Y-%m-%d")
raw_memory = self.store.read_memory() or "(empty)"
current_memory = self._annotate_with_ages(raw_memory)
current_soul = self.store.read_soul() or "(empty)"
current_user = self.store.read_user() or "(empty)"
file_context = (
f"## Current Date\n{current_date}\n\n"
f"## Current MEMORY.md ({len(current_memory)} chars)\n{current_memory}\n\n"
f"## Current SOUL.md ({len(current_soul)} chars)\n{current_soul}\n\n"
f"## Current USER.md ({len(current_user)} chars)\n{current_user}"
)
# Phase 1: Analyze (no skills list — dedup is Phase 2's job)
phase1_prompt = (
f"## Conversation History\n{history_text}\n\n{file_context}"
)
try:
phase1_response = await self.provider.chat_with_retry(
model=self.model,
messages=[
{
"role": "system",
"content": render_template("agent/dream_phase1.md", strip=True),
},
{"role": "user", "content": phase1_prompt},
],
tools=None,
tool_choice=None,
)
analysis = phase1_response.content or ""
logger.debug("Dream Phase 1 analysis ({} chars): {}", len(analysis), analysis[:500])
except Exception:
logger.exception("Dream Phase 1 failed")
return False
# Phase 2: Delegate to AgentRunner with read_file / edit_file
existing_skills = self._list_existing_skills()
skills_section = ""
if existing_skills:
skills_section = (
"\n\n## Existing Skills\n"
+ "\n".join(f"- {s}" for s in existing_skills)
)
phase2_prompt = f"## Analysis Result\n{analysis}\n\n{file_context}{skills_section}"
tools = self._tools
skill_creator_path = BUILTIN_SKILLS_DIR / "skill-creator" / "SKILL.md"
messages: list[dict[str, Any]] = [
{
"role": "system",
"content": render_template(
"agent/dream_phase2.md",
strip=True,
skill_creator_path=str(skill_creator_path),
),
},
{"role": "user", "content": phase2_prompt},
]
try:
result = await self._runner.run(AgentRunSpec(
initial_messages=messages,
tools=tools,
model=self.model,
max_iterations=self.max_iterations,
max_tool_result_chars=self.max_tool_result_chars,
fail_on_tool_error=False,
))
logger.debug(
"Dream Phase 2 complete: stop_reason={}, tool_events={}",
result.stop_reason, len(result.tool_events),
)
for ev in (result.tool_events or []):
logger.info("Dream tool_event: name={}, status={}, detail={}", ev.get("name"), ev.get("status"), ev.get("detail", "")[:200])
except Exception:
logger.exception("Dream Phase 2 failed")
result = None
# Build changelog from tool events
changelog: list[str] = []
if result and result.tool_events:
for event in result.tool_events:
if event["status"] == "ok":
changelog.append(f"{event['name']}: {event['detail']}")
# Advance cursor — always, to avoid re-processing Phase 1
new_cursor = batch[-1]["cursor"]
self.store.set_last_dream_cursor(new_cursor)
self.store.compact_history()
if result and result.stop_reason == "completed":
logger.info(
"Dream done: {} change(s), cursor advanced to {}",
len(changelog), new_cursor,
)
else:
reason = result.stop_reason if result else "exception"
logger.warning(
"Dream incomplete ({}): cursor advanced to {}",
reason, new_cursor,
)
# Git auto-commit (only when there are actual changes)
if changelog and self.store.git.is_initialized():
ts = batch[-1]["timestamp"]
summary = f"dream: {ts}, {len(changelog)} change(s)"
commit_msg = f"{summary}\n\n{analysis.strip()}"
sha = self.store.git.auto_commit(commit_msg)
if sha:
logger.info("Dream commit: {}", sha)
return True