nanobot/nanobot/utils/gitstore.py

308 lines
11 KiB
Python

"""Git-backed version control for memory files, using dulwich."""
from __future__ import annotations
import io
import time
from dataclasses import dataclass
from pathlib import Path
from loguru import logger
@dataclass
class CommitInfo:
sha: str # Short SHA (8 chars)
message: str
timestamp: str # Formatted datetime
def format(self, diff: str = "") -> str:
"""Format this commit for display, optionally with a diff."""
header = f"## {self.message.splitlines()[0]}\n`{self.sha}` — {self.timestamp}\n"
if diff:
return f"{header}\n```diff\n{diff}\n```"
return f"{header}\n(no file changes)"
class GitStore:
"""Git-backed version control for memory files."""
def __init__(self, workspace: Path, tracked_files: list[str]):
self._workspace = workspace
self._tracked_files = tracked_files
def is_initialized(self) -> bool:
"""Check if the git repo has been initialized."""
return (self._workspace / ".git").is_dir()
# -- init ------------------------------------------------------------------
def init(self) -> bool:
"""Initialize a git repo if not already initialized.
Creates .gitignore and makes an initial commit.
Returns True if a new repo was created, False if already exists.
"""
if self.is_initialized():
return False
try:
from dulwich import porcelain
porcelain.init(str(self._workspace))
# Write .gitignore
gitignore = self._workspace / ".gitignore"
gitignore.write_text(self._build_gitignore(), encoding="utf-8")
# Ensure tracked files exist (touch them if missing) so the initial
# commit has something to track.
for rel in self._tracked_files:
p = self._workspace / rel
p.parent.mkdir(parents=True, exist_ok=True)
if not p.exists():
p.write_text("", encoding="utf-8")
# Initial commit
porcelain.add(str(self._workspace), paths=[".gitignore"] + self._tracked_files)
porcelain.commit(
str(self._workspace),
message=b"init: nanobot memory store",
author=b"nanobot <nanobot@dream>",
committer=b"nanobot <nanobot@dream>",
)
logger.info("Git store initialized at {}", self._workspace)
return True
except Exception:
logger.warning("Git store init failed for {}", self._workspace)
return False
# -- daily operations ------------------------------------------------------
def auto_commit(self, message: str) -> str | None:
"""Stage tracked memory files and commit if there are changes.
Returns the short commit SHA, or None if nothing to commit.
"""
if not self.is_initialized():
return None
try:
from dulwich import porcelain
# .gitignore excludes everything except tracked files,
# so any staged/unstaged change must be in our files.
st = porcelain.status(str(self._workspace))
if not st.unstaged and not any(st.staged.values()):
return None
msg_bytes = message.encode("utf-8") if isinstance(message, str) else message
porcelain.add(str(self._workspace), paths=self._tracked_files)
sha_bytes = porcelain.commit(
str(self._workspace),
message=msg_bytes,
author=b"nanobot <nanobot@dream>",
committer=b"nanobot <nanobot@dream>",
)
if sha_bytes is None:
return None
sha = sha_bytes.hex()[:8]
logger.debug("Git auto-commit: {} ({})", sha, message)
return sha
except Exception:
logger.warning("Git auto-commit failed: {}", message)
return None
# -- internal helpers ------------------------------------------------------
def _resolve_sha(self, short_sha: str) -> bytes | None:
"""Resolve a short SHA prefix to the full SHA bytes."""
try:
from dulwich.repo import Repo
with Repo(str(self._workspace)) as repo:
try:
sha = repo.refs[b"HEAD"]
except KeyError:
return None
while sha:
if sha.hex().startswith(short_sha):
return sha
commit = repo[sha]
if commit.type_name != b"commit":
break
sha = commit.parents[0] if commit.parents else None
return None
except Exception:
return None
def _build_gitignore(self) -> str:
"""Generate .gitignore content from tracked files."""
dirs: set[str] = set()
for f in self._tracked_files:
parent = str(Path(f).parent)
if parent != ".":
dirs.add(parent)
lines = ["/*"]
for d in sorted(dirs):
lines.append(f"!{d}/")
for f in self._tracked_files:
lines.append(f"!{f}")
lines.append("!.gitignore")
return "\n".join(lines) + "\n"
# -- query -----------------------------------------------------------------
def log(self, max_entries: int = 20) -> list[CommitInfo]:
"""Return simplified commit log."""
if not self.is_initialized():
return []
try:
from dulwich.repo import Repo
entries: list[CommitInfo] = []
with Repo(str(self._workspace)) as repo:
try:
head = repo.refs[b"HEAD"]
except KeyError:
return []
sha = head
while sha and len(entries) < max_entries:
commit = repo[sha]
if commit.type_name != b"commit":
break
ts = time.strftime(
"%Y-%m-%d %H:%M",
time.localtime(commit.commit_time),
)
msg = commit.message.decode("utf-8", errors="replace").strip()
entries.append(CommitInfo(
sha=sha.hex()[:8],
message=msg,
timestamp=ts,
))
sha = commit.parents[0] if commit.parents else None
return entries
except Exception:
logger.warning("Git log failed")
return []
def diff_commits(self, sha1: str, sha2: str) -> str:
"""Show diff between two commits."""
if not self.is_initialized():
return ""
try:
from dulwich import porcelain
full1 = self._resolve_sha(sha1)
full2 = self._resolve_sha(sha2)
if not full1 or not full2:
return ""
out = io.BytesIO()
porcelain.diff(
str(self._workspace),
commit=full1,
commit2=full2,
outstream=out,
)
return out.getvalue().decode("utf-8", errors="replace")
except Exception:
logger.warning("Git diff_commits failed")
return ""
def find_commit(self, short_sha: str, max_entries: int = 20) -> CommitInfo | None:
"""Find a commit by short SHA prefix match."""
for c in self.log(max_entries=max_entries):
if c.sha.startswith(short_sha):
return c
return None
def show_commit_diff(self, short_sha: str, max_entries: int = 20) -> tuple[CommitInfo, str] | None:
"""Find a commit and return it with its diff vs the parent."""
commits = self.log(max_entries=max_entries)
for i, c in enumerate(commits):
if c.sha.startswith(short_sha):
if i + 1 < len(commits):
diff = self.diff_commits(commits[i + 1].sha, c.sha)
else:
diff = ""
return c, diff
return None
# -- restore ---------------------------------------------------------------
def revert(self, commit: str) -> str | None:
"""Revert (undo) the changes introduced by the given commit.
Restores all tracked memory files to the state at the commit's parent,
then creates a new commit recording the revert.
Returns the new commit SHA, or None on failure.
"""
if not self.is_initialized():
return None
try:
from dulwich.repo import Repo
full_sha = self._resolve_sha(commit)
if not full_sha:
logger.warning("Git revert: SHA not found: {}", commit)
return None
with Repo(str(self._workspace)) as repo:
commit_obj = repo[full_sha]
if commit_obj.type_name != b"commit":
return None
if not commit_obj.parents:
logger.warning("Git revert: cannot revert root commit {}", commit)
return None
# Use the parent's tree — this undoes the commit's changes
parent_obj = repo[commit_obj.parents[0]]
tree = repo[parent_obj.tree]
restored: list[str] = []
for filepath in self._tracked_files:
content = self._read_blob_from_tree(repo, tree, filepath)
if content is not None:
dest = self._workspace / filepath
dest.write_text(content, encoding="utf-8")
restored.append(filepath)
if not restored:
return None
# Commit the restored state
msg = f"revert: undo {commit}"
return self.auto_commit(msg)
except Exception:
logger.warning("Git revert failed for {}", commit)
return None
@staticmethod
def _read_blob_from_tree(repo, tree, filepath: str) -> str | None:
"""Read a blob's content from a tree object by walking path parts."""
parts = Path(filepath).parts
current = tree
for part in parts:
try:
entry = current[part.encode()]
except KeyError:
return None
obj = repo[entry[1]]
if obj.type_name == b"blob":
return obj.data.decode("utf-8", errors="replace")
if obj.type_name == b"tree":
current = obj
else:
return None
return None