perf(webui): index session list metadata

This commit is contained in:
Xubin Ren 2026-06-10 20:02:22 +08:00
parent 603feef3aa
commit 999552b998
2 changed files with 236 additions and 67 deletions

View File

@ -31,6 +31,8 @@ _TOOL_CALL_ECHO_RE = re.compile(r'^\s*(?:generate_image|message)\([^)]*\)\s*$')
_SESSION_PREVIEW_MAX_CHARS = 120
_SESSION_LIST_PREVIEW_MAX_RECORDS = 200
_SESSION_LIST_PREVIEW_MAX_CHARS = 1_000_000
_SESSION_LIST_INDEX_VERSION = 1
_SESSION_LIST_INDEX_FILENAME = ".session_index.json"
_FORK_VOLATILE_METADATA_KEYS = {
"goal_state",
"pending_user_turn",
@ -97,6 +99,29 @@ def _metadata_title(metadata: Any) -> str:
return strip_think(title)
def _session_list_preview_from_messages(messages: list[dict[str, Any]]) -> str:
preview = ""
fallback_preview = ""
scanned_records = 0
scanned_chars = 0
for item in messages:
scanned_records += 1
scanned_chars += len(json.dumps(item, ensure_ascii=False)) + 1
if (
scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS
or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS
):
break
text = _message_preview_text(item)
if not text:
continue
if item.get("role") == "user":
return text
if not fallback_preview and item.get("role") == "assistant":
fallback_preview = text
return preview or fallback_preview
@dataclass
class Session:
"""A conversation session."""
@ -414,6 +439,162 @@ class SessionManager:
"""Legacy global session path (~/.nanobot/sessions/)."""
return self.legacy_sessions_dir / f"{self.safe_key(key)}.jsonl"
def _session_index_path(self) -> Path:
return self.sessions_dir / _SESSION_LIST_INDEX_FILENAME
@staticmethod
def _session_file_signature(path: Path) -> dict[str, int]:
stat = path.stat()
return {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size}
def _indexed_row_for_session(self, session: Session, path: Path) -> dict[str, Any]:
signature = self._session_file_signature(path)
return {
"key": session.key,
"created_at": session.created_at.isoformat(),
"updated_at": session.updated_at.isoformat(),
"title": _metadata_title(session.metadata),
"preview": _session_list_preview_from_messages(session.messages),
"file": path.name,
"mtime_ns": signature["mtime_ns"],
"size": signature["size"],
}
def _public_session_index_row(self, row: dict[str, Any]) -> dict[str, Any]:
return {
"key": row.get("key"),
"created_at": row.get("created_at"),
"updated_at": row.get("updated_at"),
"title": row.get("title", ""),
"preview": row.get("preview", ""),
"path": str(self.sessions_dir / str(row.get("file", ""))),
}
def _read_session_index_rows_unchecked(self) -> list[dict[str, Any]] | None:
path = self._session_index_path()
if not path.is_file():
return None
try:
data = json.loads(path.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
if not isinstance(data, dict) or data.get("version") != _SESSION_LIST_INDEX_VERSION:
return None
rows = data.get("sessions")
if not isinstance(rows, list) or not all(isinstance(row, dict) for row in rows):
return None
return rows
def _write_session_index_rows(self, rows: list[dict[str, Any]]) -> None:
path = self._session_index_path()
tmp_path = path.with_suffix(".json.tmp")
data = {"version": _SESSION_LIST_INDEX_VERSION, "sessions": rows}
try:
tmp_path.write_text(json.dumps(data, ensure_ascii=False) + "\n", encoding="utf-8")
os.replace(tmp_path, path)
except BaseException:
tmp_path.unlink(missing_ok=True)
raise
def _update_session_index(self, row: dict[str, Any]) -> None:
try:
rows = self._read_session_index_rows_unchecked() or []
rows = [existing for existing in rows if existing.get("file") != row.get("file")]
rows.append(row)
self._write_session_index_rows(rows)
except Exception as e:
logger.debug("Failed to update session list index: {}", e)
def _remove_session_index_row(self, file_name: str) -> None:
try:
rows = self._read_session_index_rows_unchecked()
if not rows:
return
kept = [row for row in rows if row.get("file") != file_name]
if len(kept) == len(rows):
return
self._write_session_index_rows(kept)
except Exception as e:
logger.debug("Failed to remove session from list index: {}", e)
def _read_valid_session_index(self) -> list[dict[str, Any]] | None:
rows = self._read_session_index_rows_unchecked()
if rows is None:
return None
paths = sorted(self.sessions_dir.glob("*.jsonl"))
by_file = {row.get("file"): row for row in rows if isinstance(row.get("file"), str)}
if set(by_file) != {path.name for path in paths}:
return None
public_rows: list[dict[str, Any]] = []
for path in paths:
row = by_file.get(path.name)
if row is None:
return None
if not all(isinstance(row.get(key), str) for key in ("key", "created_at", "updated_at")):
return None
if not isinstance(row.get("title", ""), str) or not isinstance(row.get("preview", ""), str):
return None
try:
signature = self._session_file_signature(path)
except OSError:
return None
if row.get("mtime_ns") != signature["mtime_ns"] or row.get("size") != signature["size"]:
return None
public_rows.append(self._public_session_index_row(row))
return public_rows
def _session_index_row_from_file(self, path: Path) -> dict[str, Any] | None:
fallback_key = path.stem.replace("_", ":", 1)
try:
with open(path, encoding="utf-8") as f:
first_line = f.readline().strip()
if not first_line:
return None
data = json.loads(first_line)
if data.get("_type") != "metadata":
return None
preview = ""
fallback_preview = ""
scanned_records = 0
scanned_chars = 0
for line in f:
if not line.strip():
continue
scanned_records += 1
scanned_chars += len(line)
if (
scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS
or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS
):
break
item = json.loads(line)
if item.get("_type") == "metadata":
continue
text = _message_preview_text(item)
if not text:
continue
if item.get("role") == "user":
preview = text
break
if not fallback_preview and item.get("role") == "assistant":
fallback_preview = text
signature = self._session_file_signature(path)
return {
"key": data.get("key") or fallback_key,
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
"title": _metadata_title(data.get("metadata", {})),
"preview": preview or fallback_preview,
"file": path.name,
"mtime_ns": signature["mtime_ns"],
"size": signature["size"],
}
except Exception:
repaired = self._repair(fallback_key)
if repaired is None:
return None
return self._indexed_row_for_session(repaired, path)
def get_or_create(self, key: str) -> Session:
"""
Get an existing session or create a new one.
@ -600,6 +781,7 @@ class SessionManager:
raise
self._cache[session.key] = session
self._update_session_index(self._indexed_row_for_session(session, path))
def flush_all(self) -> int:
"""Re-save every cached session with fsync for durable shutdown.
@ -632,6 +814,7 @@ class SessionManager:
return False
try:
path.unlink()
self._remove_session_index_row(path.name)
return True
except OSError as e:
logger.warning("Failed to delete session file {}: {}", path, e)
@ -743,72 +926,16 @@ class SessionManager:
Returns:
List of session info dicts.
"""
sessions = []
for path in self.sessions_dir.glob("*.jsonl"):
fallback_key = path.stem.replace("_", ":", 1)
sessions = self._read_valid_session_index()
if sessions is None:
indexed_rows = [
row
for path in self.sessions_dir.glob("*.jsonl")
if (row := self._session_index_row_from_file(path)) is not None
]
try:
# Read the metadata line and a small preview for WebUI/session lists.
with open(path, encoding="utf-8") as f:
first_line = f.readline().strip()
if first_line:
data = json.loads(first_line)
if data.get("_type") == "metadata":
key = data.get("key") or path.stem.replace("_", ":", 1)
metadata = data.get("metadata", {})
title = _metadata_title(metadata)
preview = ""
fallback_preview = ""
scanned_records = 0
scanned_chars = 0
for line in f:
if not line.strip():
continue
scanned_records += 1
scanned_chars += len(line)
if (
scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS
or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS
):
break
item = json.loads(line)
if item.get("_type") == "metadata":
continue
text = _message_preview_text(item)
if not text:
continue
if item.get("role") == "user":
preview = text
break
if not fallback_preview and item.get("role") == "assistant":
fallback_preview = text
preview = preview or fallback_preview
sessions.append({
"key": key,
"created_at": data.get("created_at"),
"updated_at": data.get("updated_at"),
"title": title,
"preview": preview,
"path": str(path)
})
except Exception:
repaired = self._repair(fallback_key)
if repaired is not None:
sessions.append({
"key": repaired.key,
"created_at": repaired.created_at.isoformat(),
"updated_at": repaired.updated_at.isoformat(),
"title": _metadata_title(repaired.metadata),
"preview": next(
(
text
for msg in repaired.messages
if (text := _message_preview_text(msg))
),
"",
),
"path": str(path)
})
continue
self._write_session_index_rows(indexed_rows)
except Exception as e:
logger.debug("Failed to write session list index: {}", e)
sessions = [self._public_session_index_row(row) for row in indexed_rows]
return sorted(sessions, key=lambda x: x.get("updated_at", ""), reverse=True)

View File

@ -96,6 +96,48 @@ def test_list_sessions_bounds_preview_scan(tmp_path):
assert rows[0]["preview"] == "assistant trace 0"
def test_list_sessions_reuses_valid_index_without_scanning_files(tmp_path, monkeypatch):
manager = SessionManager(tmp_path)
session = manager.get_or_create("websocket:indexed")
session.add_message("user", "indexed preview")
manager.save(session)
assert manager.list_sessions()[0]["preview"] == "indexed preview"
def fail_scan(path):
raise AssertionError(f"unexpected session file scan: {path}")
monkeypatch.setattr(manager, "_session_index_row_from_file", fail_scan)
rows = manager.list_sessions()
assert rows[0]["key"] == "websocket:indexed"
assert rows[0]["preview"] == "indexed preview"
def test_list_sessions_index_updates_on_save_and_delete(tmp_path, monkeypatch):
manager = SessionManager(tmp_path)
session = manager.get_or_create("websocket:index-refresh")
session.add_message("user", "before")
manager.save(session)
session.messages.clear()
session.add_message("user", "after")
session.metadata["title"] = "fresh title"
manager.save(session)
def fail_scan(path):
raise AssertionError(f"unexpected session file scan: {path}")
monkeypatch.setattr(manager, "_session_index_row_from_file", fail_scan)
rows = manager.list_sessions()
assert rows[0]["title"] == "fresh title"
assert rows[0]["preview"] == "after"
assert manager.delete_session("websocket:index-refresh") is True
assert manager.list_sessions() == []
# --- Original regression test (from PR 2075) ---
def test_get_history_drops_orphan_tool_results_when_window_cuts_tool_calls():