From 999552b998b4dd8611348e233163b7179f269d16 Mon Sep 17 00:00:00 2001 From: Xubin Ren <52506698+Re-bin@users.noreply.github.com> Date: Wed, 10 Jun 2026 20:02:22 +0800 Subject: [PATCH] perf(webui): index session list metadata --- nanobot/session/manager.py | 261 +++++++++++++++----- tests/agent/test_session_manager_history.py | 42 ++++ 2 files changed, 236 insertions(+), 67 deletions(-) diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 73fb52cec..235a0241f 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -31,6 +31,8 @@ _TOOL_CALL_ECHO_RE = re.compile(r'^\s*(?:generate_image|message)\([^)]*\)\s*$') _SESSION_PREVIEW_MAX_CHARS = 120 _SESSION_LIST_PREVIEW_MAX_RECORDS = 200 _SESSION_LIST_PREVIEW_MAX_CHARS = 1_000_000 +_SESSION_LIST_INDEX_VERSION = 1 +_SESSION_LIST_INDEX_FILENAME = ".session_index.json" _FORK_VOLATILE_METADATA_KEYS = { "goal_state", "pending_user_turn", @@ -97,6 +99,29 @@ def _metadata_title(metadata: Any) -> str: return strip_think(title) +def _session_list_preview_from_messages(messages: list[dict[str, Any]]) -> str: + preview = "" + fallback_preview = "" + scanned_records = 0 + scanned_chars = 0 + for item in messages: + scanned_records += 1 + scanned_chars += len(json.dumps(item, ensure_ascii=False)) + 1 + if ( + scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS + or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS + ): + break + text = _message_preview_text(item) + if not text: + continue + if item.get("role") == "user": + return text + if not fallback_preview and item.get("role") == "assistant": + fallback_preview = text + return preview or fallback_preview + + @dataclass class Session: """A conversation session.""" @@ -414,6 +439,162 @@ class SessionManager: """Legacy global session path (~/.nanobot/sessions/).""" return self.legacy_sessions_dir / f"{self.safe_key(key)}.jsonl" + def _session_index_path(self) -> Path: + return self.sessions_dir / _SESSION_LIST_INDEX_FILENAME + + @staticmethod + def _session_file_signature(path: Path) -> dict[str, int]: + stat = path.stat() + return {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size} + + def _indexed_row_for_session(self, session: Session, path: Path) -> dict[str, Any]: + signature = self._session_file_signature(path) + return { + "key": session.key, + "created_at": session.created_at.isoformat(), + "updated_at": session.updated_at.isoformat(), + "title": _metadata_title(session.metadata), + "preview": _session_list_preview_from_messages(session.messages), + "file": path.name, + "mtime_ns": signature["mtime_ns"], + "size": signature["size"], + } + + def _public_session_index_row(self, row: dict[str, Any]) -> dict[str, Any]: + return { + "key": row.get("key"), + "created_at": row.get("created_at"), + "updated_at": row.get("updated_at"), + "title": row.get("title", ""), + "preview": row.get("preview", ""), + "path": str(self.sessions_dir / str(row.get("file", ""))), + } + + def _read_session_index_rows_unchecked(self) -> list[dict[str, Any]] | None: + path = self._session_index_path() + if not path.is_file(): + return None + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + if not isinstance(data, dict) or data.get("version") != _SESSION_LIST_INDEX_VERSION: + return None + rows = data.get("sessions") + if not isinstance(rows, list) or not all(isinstance(row, dict) for row in rows): + return None + return rows + + def _write_session_index_rows(self, rows: list[dict[str, Any]]) -> None: + path = self._session_index_path() + tmp_path = path.with_suffix(".json.tmp") + data = {"version": _SESSION_LIST_INDEX_VERSION, "sessions": rows} + try: + tmp_path.write_text(json.dumps(data, ensure_ascii=False) + "\n", encoding="utf-8") + os.replace(tmp_path, path) + except BaseException: + tmp_path.unlink(missing_ok=True) + raise + + def _update_session_index(self, row: dict[str, Any]) -> None: + try: + rows = self._read_session_index_rows_unchecked() or [] + rows = [existing for existing in rows if existing.get("file") != row.get("file")] + rows.append(row) + self._write_session_index_rows(rows) + except Exception as e: + logger.debug("Failed to update session list index: {}", e) + + def _remove_session_index_row(self, file_name: str) -> None: + try: + rows = self._read_session_index_rows_unchecked() + if not rows: + return + kept = [row for row in rows if row.get("file") != file_name] + if len(kept) == len(rows): + return + self._write_session_index_rows(kept) + except Exception as e: + logger.debug("Failed to remove session from list index: {}", e) + + def _read_valid_session_index(self) -> list[dict[str, Any]] | None: + rows = self._read_session_index_rows_unchecked() + if rows is None: + return None + paths = sorted(self.sessions_dir.glob("*.jsonl")) + by_file = {row.get("file"): row for row in rows if isinstance(row.get("file"), str)} + if set(by_file) != {path.name for path in paths}: + return None + public_rows: list[dict[str, Any]] = [] + for path in paths: + row = by_file.get(path.name) + if row is None: + return None + if not all(isinstance(row.get(key), str) for key in ("key", "created_at", "updated_at")): + return None + if not isinstance(row.get("title", ""), str) or not isinstance(row.get("preview", ""), str): + return None + try: + signature = self._session_file_signature(path) + except OSError: + return None + if row.get("mtime_ns") != signature["mtime_ns"] or row.get("size") != signature["size"]: + return None + public_rows.append(self._public_session_index_row(row)) + return public_rows + + def _session_index_row_from_file(self, path: Path) -> dict[str, Any] | None: + fallback_key = path.stem.replace("_", ":", 1) + try: + with open(path, encoding="utf-8") as f: + first_line = f.readline().strip() + if not first_line: + return None + data = json.loads(first_line) + if data.get("_type") != "metadata": + return None + preview = "" + fallback_preview = "" + scanned_records = 0 + scanned_chars = 0 + for line in f: + if not line.strip(): + continue + scanned_records += 1 + scanned_chars += len(line) + if ( + scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS + or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS + ): + break + item = json.loads(line) + if item.get("_type") == "metadata": + continue + text = _message_preview_text(item) + if not text: + continue + if item.get("role") == "user": + preview = text + break + if not fallback_preview and item.get("role") == "assistant": + fallback_preview = text + signature = self._session_file_signature(path) + return { + "key": data.get("key") or fallback_key, + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + "title": _metadata_title(data.get("metadata", {})), + "preview": preview or fallback_preview, + "file": path.name, + "mtime_ns": signature["mtime_ns"], + "size": signature["size"], + } + except Exception: + repaired = self._repair(fallback_key) + if repaired is None: + return None + return self._indexed_row_for_session(repaired, path) + def get_or_create(self, key: str) -> Session: """ Get an existing session or create a new one. @@ -600,6 +781,7 @@ class SessionManager: raise self._cache[session.key] = session + self._update_session_index(self._indexed_row_for_session(session, path)) def flush_all(self) -> int: """Re-save every cached session with fsync for durable shutdown. @@ -632,6 +814,7 @@ class SessionManager: return False try: path.unlink() + self._remove_session_index_row(path.name) return True except OSError as e: logger.warning("Failed to delete session file {}: {}", path, e) @@ -743,72 +926,16 @@ class SessionManager: Returns: List of session info dicts. """ - sessions = [] - - for path in self.sessions_dir.glob("*.jsonl"): - fallback_key = path.stem.replace("_", ":", 1) + sessions = self._read_valid_session_index() + if sessions is None: + indexed_rows = [ + row + for path in self.sessions_dir.glob("*.jsonl") + if (row := self._session_index_row_from_file(path)) is not None + ] try: - # Read the metadata line and a small preview for WebUI/session lists. - with open(path, encoding="utf-8") as f: - first_line = f.readline().strip() - if first_line: - data = json.loads(first_line) - if data.get("_type") == "metadata": - key = data.get("key") or path.stem.replace("_", ":", 1) - metadata = data.get("metadata", {}) - title = _metadata_title(metadata) - preview = "" - fallback_preview = "" - scanned_records = 0 - scanned_chars = 0 - for line in f: - if not line.strip(): - continue - scanned_records += 1 - scanned_chars += len(line) - if ( - scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS - or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS - ): - break - item = json.loads(line) - if item.get("_type") == "metadata": - continue - text = _message_preview_text(item) - if not text: - continue - if item.get("role") == "user": - preview = text - break - if not fallback_preview and item.get("role") == "assistant": - fallback_preview = text - preview = preview or fallback_preview - sessions.append({ - "key": key, - "created_at": data.get("created_at"), - "updated_at": data.get("updated_at"), - "title": title, - "preview": preview, - "path": str(path) - }) - except Exception: - repaired = self._repair(fallback_key) - if repaired is not None: - sessions.append({ - "key": repaired.key, - "created_at": repaired.created_at.isoformat(), - "updated_at": repaired.updated_at.isoformat(), - "title": _metadata_title(repaired.metadata), - "preview": next( - ( - text - for msg in repaired.messages - if (text := _message_preview_text(msg)) - ), - "", - ), - "path": str(path) - }) - continue - + self._write_session_index_rows(indexed_rows) + except Exception as e: + logger.debug("Failed to write session list index: {}", e) + sessions = [self._public_session_index_row(row) for row in indexed_rows] return sorted(sessions, key=lambda x: x.get("updated_at", ""), reverse=True) diff --git a/tests/agent/test_session_manager_history.py b/tests/agent/test_session_manager_history.py index 3441c4833..58be41bde 100644 --- a/tests/agent/test_session_manager_history.py +++ b/tests/agent/test_session_manager_history.py @@ -96,6 +96,48 @@ def test_list_sessions_bounds_preview_scan(tmp_path): assert rows[0]["preview"] == "assistant trace 0" +def test_list_sessions_reuses_valid_index_without_scanning_files(tmp_path, monkeypatch): + manager = SessionManager(tmp_path) + session = manager.get_or_create("websocket:indexed") + session.add_message("user", "indexed preview") + manager.save(session) + + assert manager.list_sessions()[0]["preview"] == "indexed preview" + + def fail_scan(path): + raise AssertionError(f"unexpected session file scan: {path}") + + monkeypatch.setattr(manager, "_session_index_row_from_file", fail_scan) + + rows = manager.list_sessions() + + assert rows[0]["key"] == "websocket:indexed" + assert rows[0]["preview"] == "indexed preview" + + +def test_list_sessions_index_updates_on_save_and_delete(tmp_path, monkeypatch): + manager = SessionManager(tmp_path) + session = manager.get_or_create("websocket:index-refresh") + session.add_message("user", "before") + manager.save(session) + session.messages.clear() + session.add_message("user", "after") + session.metadata["title"] = "fresh title" + manager.save(session) + + def fail_scan(path): + raise AssertionError(f"unexpected session file scan: {path}") + + monkeypatch.setattr(manager, "_session_index_row_from_file", fail_scan) + + rows = manager.list_sessions() + assert rows[0]["title"] == "fresh title" + assert rows[0]["preview"] == "after" + + assert manager.delete_session("websocket:index-refresh") is True + assert manager.list_sessions() == [] + + # --- Original regression test (from PR 2075) --- def test_get_history_drops_orphan_tool_results_when_window_cuts_tool_calls():