"""Git-backed version control for memory files, using dulwich.""" from __future__ import annotations import io import time from dataclasses import dataclass from pathlib import Path from loguru import logger @dataclass class CommitInfo: sha: str # Short SHA (8 chars) message: str timestamp: str # Formatted datetime def format(self, diff: str = "") -> str: """Format this commit for display, optionally with a diff.""" header = f"## {self.message.splitlines()[0]}\n`{self.sha}` — {self.timestamp}\n" if diff: return f"{header}\n```diff\n{diff}\n```" return f"{header}\n(no file changes)" class GitStore: """Git-backed version control for memory files.""" def __init__(self, workspace: Path, tracked_files: list[str]): self._workspace = workspace self._tracked_files = tracked_files def is_initialized(self) -> bool: """Check if the git repo has been initialized.""" return (self._workspace / ".git").is_dir() # -- init ------------------------------------------------------------------ def init(self) -> bool: """Initialize a git repo if not already initialized. Creates .gitignore and makes an initial commit. Returns True if a new repo was created, False if already exists. """ if self.is_initialized(): return False try: from dulwich import porcelain porcelain.init(str(self._workspace)) # Write .gitignore gitignore = self._workspace / ".gitignore" gitignore.write_text(self._build_gitignore(), encoding="utf-8") # Ensure tracked files exist (touch them if missing) so the initial # commit has something to track. for rel in self._tracked_files: p = self._workspace / rel p.parent.mkdir(parents=True, exist_ok=True) if not p.exists(): p.write_text("", encoding="utf-8") # Initial commit porcelain.add(str(self._workspace), paths=[".gitignore"] + self._tracked_files) porcelain.commit( str(self._workspace), message=b"init: nanobot memory store", author=b"nanobot ", committer=b"nanobot ", ) logger.info("Git store initialized at {}", self._workspace) return True except Exception: logger.warning("Git store init failed for {}", self._workspace) return False # -- daily operations ------------------------------------------------------ def auto_commit(self, message: str) -> str | None: """Stage tracked memory files and commit if there are changes. Returns the short commit SHA, or None if nothing to commit. """ if not self.is_initialized(): return None try: from dulwich import porcelain # .gitignore excludes everything except tracked files, # so any staged/unstaged change must be in our files. st = porcelain.status(str(self._workspace)) if not st.unstaged and not any(st.staged.values()): return None msg_bytes = message.encode("utf-8") if isinstance(message, str) else message porcelain.add(str(self._workspace), paths=self._tracked_files) sha_bytes = porcelain.commit( str(self._workspace), message=msg_bytes, author=b"nanobot ", committer=b"nanobot ", ) if sha_bytes is None: return None sha = sha_bytes.hex()[:8] logger.debug("Git auto-commit: {} ({})", sha, message) return sha except Exception: logger.warning("Git auto-commit failed: {}", message) return None # -- internal helpers ------------------------------------------------------ def _resolve_sha(self, short_sha: str) -> bytes | None: """Resolve a short SHA prefix to the full SHA bytes.""" try: from dulwich.repo import Repo with Repo(str(self._workspace)) as repo: try: sha = repo.refs[b"HEAD"] except KeyError: return None while sha: if sha.hex().startswith(short_sha): return sha commit = repo[sha] if commit.type_name != b"commit": break sha = commit.parents[0] if commit.parents else None return None except Exception: return None def _build_gitignore(self) -> str: """Generate .gitignore content from tracked files.""" dirs: set[str] = set() for f in self._tracked_files: parent = str(Path(f).parent) if parent != ".": dirs.add(parent) lines = ["/*"] for d in sorted(dirs): lines.append(f"!{d}/") for f in self._tracked_files: lines.append(f"!{f}") lines.append("!.gitignore") return "\n".join(lines) + "\n" # -- query ----------------------------------------------------------------- def log(self, max_entries: int = 20) -> list[CommitInfo]: """Return simplified commit log.""" if not self.is_initialized(): return [] try: from dulwich.repo import Repo entries: list[CommitInfo] = [] with Repo(str(self._workspace)) as repo: try: head = repo.refs[b"HEAD"] except KeyError: return [] sha = head while sha and len(entries) < max_entries: commit = repo[sha] if commit.type_name != b"commit": break ts = time.strftime( "%Y-%m-%d %H:%M", time.localtime(commit.commit_time), ) msg = commit.message.decode("utf-8", errors="replace").strip() entries.append(CommitInfo( sha=sha.hex()[:8], message=msg, timestamp=ts, )) sha = commit.parents[0] if commit.parents else None return entries except Exception: logger.warning("Git log failed") return [] def diff_commits(self, sha1: str, sha2: str) -> str: """Show diff between two commits.""" if not self.is_initialized(): return "" try: from dulwich import porcelain full1 = self._resolve_sha(sha1) full2 = self._resolve_sha(sha2) if not full1 or not full2: return "" out = io.BytesIO() porcelain.diff( str(self._workspace), commit=full1, commit2=full2, outstream=out, ) return out.getvalue().decode("utf-8", errors="replace") except Exception: logger.warning("Git diff_commits failed") return "" def find_commit(self, short_sha: str, max_entries: int = 20) -> CommitInfo | None: """Find a commit by short SHA prefix match.""" for c in self.log(max_entries=max_entries): if c.sha.startswith(short_sha): return c return None def show_commit_diff(self, short_sha: str, max_entries: int = 20) -> tuple[CommitInfo, str] | None: """Find a commit and return it with its diff vs the parent.""" commits = self.log(max_entries=max_entries) for i, c in enumerate(commits): if c.sha.startswith(short_sha): if i + 1 < len(commits): diff = self.diff_commits(commits[i + 1].sha, c.sha) else: diff = "" return c, diff return None # -- restore --------------------------------------------------------------- def revert(self, commit: str) -> str | None: """Revert (undo) the changes introduced by the given commit. Restores all tracked memory files to the state at the commit's parent, then creates a new commit recording the revert. Returns the new commit SHA, or None on failure. """ if not self.is_initialized(): return None try: from dulwich.repo import Repo full_sha = self._resolve_sha(commit) if not full_sha: logger.warning("Git revert: SHA not found: {}", commit) return None with Repo(str(self._workspace)) as repo: commit_obj = repo[full_sha] if commit_obj.type_name != b"commit": return None if not commit_obj.parents: logger.warning("Git revert: cannot revert root commit {}", commit) return None # Use the parent's tree — this undoes the commit's changes parent_obj = repo[commit_obj.parents[0]] tree = repo[parent_obj.tree] restored: list[str] = [] for filepath in self._tracked_files: content = self._read_blob_from_tree(repo, tree, filepath) if content is not None: dest = self._workspace / filepath dest.write_text(content, encoding="utf-8") restored.append(filepath) if not restored: return None # Commit the restored state msg = f"revert: undo {commit}" return self.auto_commit(msg) except Exception: logger.warning("Git revert failed for {}", commit) return None @staticmethod def _read_blob_from_tree(repo, tree, filepath: str) -> str | None: """Read a blob's content from a tree object by walking path parts.""" parts = Path(filepath).parts current = tree for part in parts: try: entry = current[part.encode()] except KeyError: return None obj = repo[entry[1]] if obj.type_name == b"blob": return obj.data.decode("utf-8", errors="replace") if obj.type_name == b"tree": current = obj else: return None return None