diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py index 235a0241f..890b25c20 100644 --- a/nanobot/session/manager.py +++ b/nanobot/session/manager.py @@ -31,8 +31,6 @@ _TOOL_CALL_ECHO_RE = re.compile(r'^\s*(?:generate_image|message)\([^)]*\)\s*$') _SESSION_PREVIEW_MAX_CHARS = 120 _SESSION_LIST_PREVIEW_MAX_RECORDS = 200 _SESSION_LIST_PREVIEW_MAX_CHARS = 1_000_000 -_SESSION_LIST_INDEX_VERSION = 1 -_SESSION_LIST_INDEX_FILENAME = ".session_index.json" _FORK_VOLATILE_METADATA_KEYS = { "goal_state", "pending_user_turn", @@ -99,29 +97,6 @@ def _metadata_title(metadata: Any) -> str: return strip_think(title) -def _session_list_preview_from_messages(messages: list[dict[str, Any]]) -> str: - preview = "" - fallback_preview = "" - scanned_records = 0 - scanned_chars = 0 - for item in messages: - scanned_records += 1 - scanned_chars += len(json.dumps(item, ensure_ascii=False)) + 1 - if ( - scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS - or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS - ): - break - text = _message_preview_text(item) - if not text: - continue - if item.get("role") == "user": - return text - if not fallback_preview and item.get("role") == "assistant": - fallback_preview = text - return preview or fallback_preview - - @dataclass class Session: """A conversation session.""" @@ -439,162 +414,6 @@ class SessionManager: """Legacy global session path (~/.nanobot/sessions/).""" return self.legacy_sessions_dir / f"{self.safe_key(key)}.jsonl" - def _session_index_path(self) -> Path: - return self.sessions_dir / _SESSION_LIST_INDEX_FILENAME - - @staticmethod - def _session_file_signature(path: Path) -> dict[str, int]: - stat = path.stat() - return {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size} - - def _indexed_row_for_session(self, session: Session, path: Path) -> dict[str, Any]: - signature = self._session_file_signature(path) - return { - "key": session.key, - "created_at": session.created_at.isoformat(), - "updated_at": session.updated_at.isoformat(), - "title": _metadata_title(session.metadata), - "preview": _session_list_preview_from_messages(session.messages), - "file": path.name, - "mtime_ns": signature["mtime_ns"], - "size": signature["size"], - } - - def _public_session_index_row(self, row: dict[str, Any]) -> dict[str, Any]: - return { - "key": row.get("key"), - "created_at": row.get("created_at"), - "updated_at": row.get("updated_at"), - "title": row.get("title", ""), - "preview": row.get("preview", ""), - "path": str(self.sessions_dir / str(row.get("file", ""))), - } - - def _read_session_index_rows_unchecked(self) -> list[dict[str, Any]] | None: - path = self._session_index_path() - if not path.is_file(): - return None - try: - data = json.loads(path.read_text(encoding="utf-8")) - except (OSError, json.JSONDecodeError): - return None - if not isinstance(data, dict) or data.get("version") != _SESSION_LIST_INDEX_VERSION: - return None - rows = data.get("sessions") - if not isinstance(rows, list) or not all(isinstance(row, dict) for row in rows): - return None - return rows - - def _write_session_index_rows(self, rows: list[dict[str, Any]]) -> None: - path = self._session_index_path() - tmp_path = path.with_suffix(".json.tmp") - data = {"version": _SESSION_LIST_INDEX_VERSION, "sessions": rows} - try: - tmp_path.write_text(json.dumps(data, ensure_ascii=False) + "\n", encoding="utf-8") - os.replace(tmp_path, path) - except BaseException: - tmp_path.unlink(missing_ok=True) - raise - - def _update_session_index(self, row: dict[str, Any]) -> None: - try: - rows = self._read_session_index_rows_unchecked() or [] - rows = [existing for existing in rows if existing.get("file") != row.get("file")] - rows.append(row) - self._write_session_index_rows(rows) - except Exception as e: - logger.debug("Failed to update session list index: {}", e) - - def _remove_session_index_row(self, file_name: str) -> None: - try: - rows = self._read_session_index_rows_unchecked() - if not rows: - return - kept = [row for row in rows if row.get("file") != file_name] - if len(kept) == len(rows): - return - self._write_session_index_rows(kept) - except Exception as e: - logger.debug("Failed to remove session from list index: {}", e) - - def _read_valid_session_index(self) -> list[dict[str, Any]] | None: - rows = self._read_session_index_rows_unchecked() - if rows is None: - return None - paths = sorted(self.sessions_dir.glob("*.jsonl")) - by_file = {row.get("file"): row for row in rows if isinstance(row.get("file"), str)} - if set(by_file) != {path.name for path in paths}: - return None - public_rows: list[dict[str, Any]] = [] - for path in paths: - row = by_file.get(path.name) - if row is None: - return None - if not all(isinstance(row.get(key), str) for key in ("key", "created_at", "updated_at")): - return None - if not isinstance(row.get("title", ""), str) or not isinstance(row.get("preview", ""), str): - return None - try: - signature = self._session_file_signature(path) - except OSError: - return None - if row.get("mtime_ns") != signature["mtime_ns"] or row.get("size") != signature["size"]: - return None - public_rows.append(self._public_session_index_row(row)) - return public_rows - - def _session_index_row_from_file(self, path: Path) -> dict[str, Any] | None: - fallback_key = path.stem.replace("_", ":", 1) - try: - with open(path, encoding="utf-8") as f: - first_line = f.readline().strip() - if not first_line: - return None - data = json.loads(first_line) - if data.get("_type") != "metadata": - return None - preview = "" - fallback_preview = "" - scanned_records = 0 - scanned_chars = 0 - for line in f: - if not line.strip(): - continue - scanned_records += 1 - scanned_chars += len(line) - if ( - scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS - or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS - ): - break - item = json.loads(line) - if item.get("_type") == "metadata": - continue - text = _message_preview_text(item) - if not text: - continue - if item.get("role") == "user": - preview = text - break - if not fallback_preview and item.get("role") == "assistant": - fallback_preview = text - signature = self._session_file_signature(path) - return { - "key": data.get("key") or fallback_key, - "created_at": data.get("created_at"), - "updated_at": data.get("updated_at"), - "title": _metadata_title(data.get("metadata", {})), - "preview": preview or fallback_preview, - "file": path.name, - "mtime_ns": signature["mtime_ns"], - "size": signature["size"], - } - except Exception: - repaired = self._repair(fallback_key) - if repaired is None: - return None - return self._indexed_row_for_session(repaired, path) - def get_or_create(self, key: str) -> Session: """ Get an existing session or create a new one. @@ -781,7 +600,6 @@ class SessionManager: raise self._cache[session.key] = session - self._update_session_index(self._indexed_row_for_session(session, path)) def flush_all(self) -> int: """Re-save every cached session with fsync for durable shutdown. @@ -814,7 +632,6 @@ class SessionManager: return False try: path.unlink() - self._remove_session_index_row(path.name) return True except OSError as e: logger.warning("Failed to delete session file {}: {}", path, e) @@ -926,16 +743,75 @@ class SessionManager: Returns: List of session info dicts. """ - sessions = self._read_valid_session_index() - if sessions is None: - indexed_rows = [ - row - for path in self.sessions_dir.glob("*.jsonl") - if (row := self._session_index_row_from_file(path)) is not None - ] + sessions = [] + + for path in self.sessions_dir.glob("*.jsonl"): + fallback_key = path.stem.replace("_", ":", 1) try: - self._write_session_index_rows(indexed_rows) - except Exception as e: - logger.debug("Failed to write session list index: {}", e) - sessions = [self._public_session_index_row(row) for row in indexed_rows] + # Read the metadata line and a small preview for session lists. + with open(path, encoding="utf-8") as f: + first_line = f.readline().strip() + if first_line: + data = json.loads(first_line) + if data.get("_type") == "metadata": + key = data.get("key") or path.stem.replace("_", ":", 1) + metadata = data.get("metadata", {}) + title = _metadata_title(metadata) + preview = "" + fallback_preview = "" + scanned_records = 0 + scanned_chars = 0 + for line in f: + if not line.strip(): + continue + scanned_records += 1 + scanned_chars += len(line) + if ( + scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS + or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS + ): + break + item = json.loads(line) + if item.get("_type") == "metadata": + continue + text = _message_preview_text(item) + if not text: + continue + if item.get("role") == "user": + preview = text + break + if not fallback_preview and item.get("role") == "assistant": + fallback_preview = text + preview = preview or fallback_preview + sessions.append( + { + "key": key, + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + "title": title, + "preview": preview, + "path": str(path), + } + ) + except Exception: + repaired = self._repair(fallback_key) + if repaired is not None: + sessions.append( + { + "key": repaired.key, + "created_at": repaired.created_at.isoformat(), + "updated_at": repaired.updated_at.isoformat(), + "title": _metadata_title(repaired.metadata), + "preview": next( + ( + text + for msg in repaired.messages + if (text := _message_preview_text(msg)) + ), + "", + ), + "path": str(path), + } + ) + continue return sorted(sessions, key=lambda x: x.get("updated_at", ""), reverse=True) diff --git a/nanobot/webui/session_list_index.py b/nanobot/webui/session_list_index.py new file mode 100644 index 000000000..082ce5300 --- /dev/null +++ b/nanobot/webui/session_list_index.py @@ -0,0 +1,219 @@ +"""Cache-only WebUI session list index. + +The core ``SessionManager`` owns durable conversation history. This module owns +the WebUI sidebar optimization so core session writes stay independent from UI +presentation caches. +""" + +from __future__ import annotations + +import json +import os +from pathlib import Path +from typing import Any + +from loguru import logger + +from nanobot.session.manager import ( + _SESSION_LIST_PREVIEW_MAX_CHARS, + _SESSION_LIST_PREVIEW_MAX_RECORDS, + Session, + SessionManager, + _message_preview_text, + _metadata_title, +) + +_INDEX_VERSION = 1 +_INDEX_FILENAME = ".webui_session_index.json" + + +def list_webui_sessions(session_manager: SessionManager) -> list[dict[str, Any]]: + """Return session rows for the WebUI sidebar, backed by a rebuildable cache.""" + rows, changed = _reconcile_index(session_manager) + if changed: + try: + _write_index_rows(session_manager.sessions_dir, rows) + except Exception as e: + logger.debug("Failed to write WebUI session list index: {}", e) + sessions = [_public_row(session_manager.sessions_dir, row) for row in rows] + return sorted(sessions, key=lambda row: row.get("updated_at", ""), reverse=True) + + +def _reconcile_index(session_manager: SessionManager) -> tuple[list[dict[str, Any]], bool]: + existing_rows = _read_index_rows(session_manager.sessions_dir) + existing_by_file = { + row.get("file"): row + for row in existing_rows or [] + if isinstance(row.get("file"), str) + } + paths = sorted(session_manager.sessions_dir.glob("*.jsonl")) + rows: list[dict[str, Any]] = [] + changed = existing_rows is None + + for path in paths: + row = existing_by_file.get(path.name) + if row is not None and _indexed_row_matches_file(row, path): + rows.append(row) + continue + + changed = True + scanned = _scan_session_row(session_manager, path) + if scanned is not None: + rows.append(scanned) + + if set(existing_by_file) != {path.name for path in paths}: + changed = True + if existing_rows is not None and rows != existing_rows: + changed = True + return rows, changed + + +def _index_path(sessions_dir: Path) -> Path: + return sessions_dir / _INDEX_FILENAME + + +def _read_index_rows(sessions_dir: Path) -> list[dict[str, Any]] | None: + path = _index_path(sessions_dir) + if not path.is_file(): + return None + try: + data = json.loads(path.read_text(encoding="utf-8")) + except (OSError, json.JSONDecodeError): + return None + if not isinstance(data, dict) or data.get("version") != _INDEX_VERSION: + return None + rows = data.get("sessions") + if not isinstance(rows, list) or not all(isinstance(row, dict) for row in rows): + return None + return rows + + +def _write_index_rows(sessions_dir: Path, rows: list[dict[str, Any]]) -> None: + path = _index_path(sessions_dir) + tmp_path = path.with_suffix(".json.tmp") + data = {"version": _INDEX_VERSION, "sessions": rows} + try: + tmp_path.write_text(json.dumps(data, ensure_ascii=False) + "\n", encoding="utf-8") + os.replace(tmp_path, path) + except BaseException: + tmp_path.unlink(missing_ok=True) + raise + + +def _file_signature(path: Path) -> dict[str, int]: + stat = path.stat() + return {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size} + + +def _indexed_row_matches_file(row: dict[str, Any], path: Path) -> bool: + if not all(isinstance(row.get(key), str) for key in ("key", "created_at", "updated_at")): + return False + if not isinstance(row.get("title", ""), str) or not isinstance(row.get("preview", ""), str): + return False + if row.get("file") != path.name: + return False + try: + signature = _file_signature(path) + except OSError: + return False + return row.get("mtime_ns") == signature["mtime_ns"] and row.get("size") == signature["size"] + + +def _public_row(sessions_dir: Path, row: dict[str, Any]) -> dict[str, Any]: + return { + "key": row.get("key"), + "created_at": row.get("created_at"), + "updated_at": row.get("updated_at"), + "title": row.get("title", ""), + "preview": row.get("preview", ""), + "path": str(sessions_dir / str(row.get("file", ""))), + } + + +def _preview_from_messages(messages: list[dict[str, Any]]) -> str: + fallback_preview = "" + scanned_records = 0 + scanned_chars = 0 + for item in messages: + scanned_records += 1 + scanned_chars += len(json.dumps(item, ensure_ascii=False)) + 1 + if ( + scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS + or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS + ): + break + text = _message_preview_text(item) + if not text: + continue + if item.get("role") == "user": + return text + if not fallback_preview and item.get("role") == "assistant": + fallback_preview = text + return fallback_preview + + +def _indexed_row_for_session(session: Session, path: Path) -> dict[str, Any]: + signature = _file_signature(path) + return { + "key": session.key, + "created_at": session.created_at.isoformat(), + "updated_at": session.updated_at.isoformat(), + "title": _metadata_title(session.metadata), + "preview": _preview_from_messages(session.messages), + "file": path.name, + "mtime_ns": signature["mtime_ns"], + "size": signature["size"], + } + + +def _scan_session_row(session_manager: SessionManager, path: Path) -> dict[str, Any] | None: + fallback_key = path.stem.replace("_", ":", 1) + try: + with open(path, encoding="utf-8") as f: + first_line = f.readline().strip() + if not first_line: + return None + data = json.loads(first_line) + if data.get("_type") != "metadata": + return None + preview = "" + fallback_preview = "" + scanned_records = 0 + scanned_chars = 0 + for line in f: + if not line.strip(): + continue + scanned_records += 1 + scanned_chars += len(line) + if ( + scanned_records > _SESSION_LIST_PREVIEW_MAX_RECORDS + or scanned_chars > _SESSION_LIST_PREVIEW_MAX_CHARS + ): + break + item = json.loads(line) + if item.get("_type") == "metadata": + continue + text = _message_preview_text(item) + if not text: + continue + if item.get("role") == "user": + preview = text + break + if not fallback_preview and item.get("role") == "assistant": + fallback_preview = text + signature = _file_signature(path) + return { + "key": data.get("key") or fallback_key, + "created_at": data.get("created_at"), + "updated_at": data.get("updated_at"), + "title": _metadata_title(data.get("metadata", {})), + "preview": preview or fallback_preview, + "file": path.name, + "mtime_ns": signature["mtime_ns"], + "size": signature["size"], + } + except Exception: + repaired = session_manager._repair(fallback_key) + if repaired is None: + return None + return _indexed_row_for_session(repaired, path) diff --git a/nanobot/webui/ws_http.py b/nanobot/webui/ws_http.py index f04642e04..101b309fe 100644 --- a/nanobot/webui/ws_http.py +++ b/nanobot/webui/ws_http.py @@ -62,6 +62,7 @@ from nanobot.webui.http_utils import ( ) from nanobot.webui.media_gateway import WebUIMediaGateway from nanobot.webui.session_automations import session_automations_payload +from nanobot.webui.session_list_index import list_webui_sessions from nanobot.webui.sidebar_state import ( read_webui_sidebar_state, write_webui_sidebar_state, @@ -323,7 +324,7 @@ class GatewayHTTPHandler: return _http_error(401, "Unauthorized") if self.session_manager is None: return _http_error(503, "session manager unavailable") - sessions = self.session_manager.list_sessions() + sessions = list_webui_sessions(self.session_manager) from nanobot.session.webui_turns import websocket_turn_wall_started_at cleaned = [] diff --git a/tests/agent/test_session_manager_history.py b/tests/agent/test_session_manager_history.py index 58be41bde..3441c4833 100644 --- a/tests/agent/test_session_manager_history.py +++ b/tests/agent/test_session_manager_history.py @@ -96,48 +96,6 @@ def test_list_sessions_bounds_preview_scan(tmp_path): assert rows[0]["preview"] == "assistant trace 0" -def test_list_sessions_reuses_valid_index_without_scanning_files(tmp_path, monkeypatch): - manager = SessionManager(tmp_path) - session = manager.get_or_create("websocket:indexed") - session.add_message("user", "indexed preview") - manager.save(session) - - assert manager.list_sessions()[0]["preview"] == "indexed preview" - - def fail_scan(path): - raise AssertionError(f"unexpected session file scan: {path}") - - monkeypatch.setattr(manager, "_session_index_row_from_file", fail_scan) - - rows = manager.list_sessions() - - assert rows[0]["key"] == "websocket:indexed" - assert rows[0]["preview"] == "indexed preview" - - -def test_list_sessions_index_updates_on_save_and_delete(tmp_path, monkeypatch): - manager = SessionManager(tmp_path) - session = manager.get_or_create("websocket:index-refresh") - session.add_message("user", "before") - manager.save(session) - session.messages.clear() - session.add_message("user", "after") - session.metadata["title"] = "fresh title" - manager.save(session) - - def fail_scan(path): - raise AssertionError(f"unexpected session file scan: {path}") - - monkeypatch.setattr(manager, "_session_index_row_from_file", fail_scan) - - rows = manager.list_sessions() - assert rows[0]["title"] == "fresh title" - assert rows[0]["preview"] == "after" - - assert manager.delete_session("websocket:index-refresh") is True - assert manager.list_sessions() == [] - - # --- Original regression test (from PR 2075) --- def test_get_history_drops_orphan_tool_results_when_window_cuts_tool_calls(): diff --git a/tests/channels/test_websocket_channel.py b/tests/channels/test_websocket_channel.py index cf6a15455..b8ee27a76 100644 --- a/tests/channels/test_websocket_channel.py +++ b/tests/channels/test_websocket_channel.py @@ -2618,15 +2618,16 @@ def test_parse_envelope_rejects_legacy_and_garbage() -> None: assert _parse_envelope('{"type":123}') is None -def test_sessions_list_includes_active_run_started_at() -> None: +def test_sessions_list_includes_active_run_started_at(monkeypatch) -> None: from websockets.datastructures import Headers from websockets.http11 import Request from nanobot.session import webui_turns as wth + from nanobot.webui import ws_http as ws_http_module bus = MagicMock() session_manager = MagicMock() - session_manager.list_sessions.return_value = [ + sessions = [ { "key": "websocket:chat-1", "created_at": "2026-05-19T10:00:00Z", @@ -2641,6 +2642,7 @@ def test_sessions_list_includes_active_run_started_at() -> None: "updated_at": "2026-05-19T10:01:00Z", }, ] + monkeypatch.setattr(ws_http_module, "list_webui_sessions", lambda _session_manager: sessions) channel = WebSocketChannel( {"enabled": True, "allowFrom": ["*"]}, bus, diff --git a/tests/webui/test_session_list_index.py b/tests/webui/test_session_list_index.py new file mode 100644 index 000000000..aea32b3e7 --- /dev/null +++ b/tests/webui/test_session_list_index.py @@ -0,0 +1,75 @@ +from __future__ import annotations + +from pathlib import Path + +import nanobot.webui.session_list_index as session_list_index +from nanobot.session.manager import SessionManager + + +def test_webui_session_list_reuses_valid_index_without_scanning_files( + tmp_path: Path, + monkeypatch, +) -> None: + manager = SessionManager(tmp_path) + session = manager.get_or_create("websocket:indexed") + session.add_message("user", "indexed preview") + manager.save(session) + + assert list_webui_sessions(manager)[0]["preview"] == "indexed preview" + + def fail_scan(session_manager: SessionManager, path: Path) -> None: + raise AssertionError(f"unexpected session file scan: {path}") + + monkeypatch.setattr(session_list_index, "_scan_session_row", fail_scan) + + rows = list_webui_sessions(manager) + + assert rows[0]["key"] == "websocket:indexed" + assert rows[0]["preview"] == "indexed preview" + + +def test_webui_session_list_rescans_only_changed_file(tmp_path: Path, monkeypatch) -> None: + manager = SessionManager(tmp_path) + first = manager.get_or_create("websocket:first") + first.add_message("user", "first") + manager.save(first) + second = manager.get_or_create("websocket:second") + second.add_message("user", "second before") + manager.save(second) + + assert {row["preview"] for row in list_webui_sessions(manager)} == {"first", "second before"} + + second.messages.clear() + second.add_message("user", "second after") + manager.save(second) + + original_scan = session_list_index._scan_session_row + scanned: list[str] = [] + + def record_scan(session_manager: SessionManager, path: Path) -> dict | None: + scanned.append(path.name) + return original_scan(session_manager, path) + + monkeypatch.setattr(session_list_index, "_scan_session_row", record_scan) + + rows = list_webui_sessions(manager) + + assert scanned == [manager._get_session_path("websocket:second").name] + assert {row["preview"] for row in rows} == {"first", "second after"} + + +def test_webui_session_list_drops_deleted_index_rows(tmp_path: Path) -> None: + manager = SessionManager(tmp_path) + session = manager.get_or_create("websocket:deleted") + session.add_message("user", "gone") + manager.save(session) + + assert list_webui_sessions(manager)[0]["key"] == "websocket:deleted" + + assert manager.delete_session("websocket:deleted") is True + + assert list_webui_sessions(manager) == [] + + +def list_webui_sessions(manager: SessionManager) -> list[dict]: + return session_list_index.list_webui_sessions(manager)