"""Append-only WebUI display transcript (JSONL), separate from agent session.""" from __future__ import annotations import base64 import binascii import json import os import re import shutil import time import uuid from pathlib import Path from typing import Any, Callable, Mapping, NamedTuple from urllib.parse import unquote, urlparse from loguru import logger from nanobot.config.paths import get_webui_dir from nanobot.session.manager import SessionManager WEBUI_TRANSCRIPT_SCHEMA_VERSION = 3 WEBUI_FORK_MARKER_EVENT = "fork_marker" _MAX_TRANSCRIPT_FILE_BYTES = 8 * 1024 * 1024 _TARGET_ACTIVE_TRANSCRIPT_BYTES = _MAX_TRANSCRIPT_FILE_BYTES // 2 _TRANSCRIPT_SEGMENT_MANIFEST_VERSION = 2 _TRANSCRIPT_ACTIVE_CHUNK_ID = "active" _TRANSCRIPT_SEGMENT_RE = re.compile(r"^\d{6}\.jsonl$") _DEFAULT_TRANSCRIPT_PAGE_LIMIT = 160 _MAX_TRANSCRIPT_PAGE_LIMIT = 1000 _WEBUI_TURN_ID_RE = re.compile(r"^[A-Za-z0-9._:-]{1,128}$") WEBUI_TURN_METADATA_KEY = "webui_turn_id" WEBUI_MESSAGE_SOURCE_METADATA_KEY = "_webui_message_source" _MARKDOWN_LOCAL_IMAGE_RE = re.compile( r"!\[([^\]]*)\]\((<[^>]+>|[^)\s]+)(\s+(?:\"[^\"]*\"|'[^']*'))?\)" ) _INLINE_MARKDOWN_IMAGE_EXTS: frozenset[str] = frozenset({ ".png", ".jpg", ".jpeg", ".webp", ".gif", ".svg", }) _INLINE_MARKDOWN_VIDEO_EXTS: frozenset[str] = frozenset({ ".mp4", ".mov", ".webm", }) _INLINE_MARKDOWN_MEDIA_EXTS = _INLINE_MARKDOWN_IMAGE_EXTS | _INLINE_MARKDOWN_VIDEO_EXTS _FILE_EDIT_TOOL_NAMES: frozenset[str] = frozenset({ "write_file", "edit_file", "apply_patch", }) _TURN_DISPLAY_EVENTS: frozenset[str] = frozenset({ "reasoning_delta", "reasoning_end", "delta", "stream_end", "message", "file_edit", "turn_end", }) def rewrite_local_markdown_images( text: str, *, workspace_path: Path, sign_path: Callable[[Path], Mapping[str, Any] | None], ) -> str: """Rewrite markdown media paths inside the workspace to signed WebUI media URLs.""" if "![" not in text: return text def resolve_url(raw_url: str) -> str | None: url = raw_url.strip() if url.startswith("<") and url.endswith(">"): url = url[1:-1].strip() if not url or url.startswith(("/api/media/", "#")): return None parsed = urlparse(url) if parsed.scheme or parsed.netloc or parsed.query or parsed.fragment: return None path_text = unquote(url) if Path(path_text).suffix.lower() not in _INLINE_MARKDOWN_MEDIA_EXTS: return None candidate = Path(path_text).expanduser() if not candidate.is_absolute(): candidate = workspace_path / candidate try: resolved = candidate.resolve(strict=False) resolved.relative_to(workspace_path) except (OSError, ValueError): return None if not resolved.is_file(): return None signed = sign_path(resolved) return str(signed.get("url")) if signed and signed.get("url") else None def replace(match: re.Match[str]) -> str: signed_url = resolve_url(match.group(2)) if not signed_url: return match.group(0) title = match.group(3) or "" return f"![{match.group(1)}]({signed_url}{title})" return _MARKDOWN_LOCAL_IMAGE_RE.sub(replace, text) def _media_kind_from_name(name: str) -> str: ext = Path(name).suffix.lower() if ext in _INLINE_MARKDOWN_IMAGE_EXTS: return "image" if ext in _INLINE_MARKDOWN_VIDEO_EXTS: return "video" return "file" def webui_transcript_path(session_key: str) -> Path: stem = SessionManager.safe_key(session_key) return get_webui_dir() / f"{stem}.jsonl" def webui_transcript_segments_dir(session_key: str) -> Path: stem = SessionManager.safe_key(session_key) return get_webui_dir() / f"{stem}.segments" def _webui_transcript_manifest_path(session_key: str) -> Path: return webui_transcript_segments_dir(session_key) / "manifest.json" def _legacy_webui_thread_path(session_key: str) -> Path: stem = SessionManager.safe_key(session_key) return get_webui_dir() / f"{stem}.json" class _TranscriptTurnRef(NamedTuple): ordinal: int records: list[dict[str, Any]] class _TranscriptChunkRef(NamedTuple): chunk_id: str start_ordinal: int turn_count: int user_count: int def _record_json_line(record: dict[str, Any]) -> str: return json.dumps(record, ensure_ascii=False, separators=(",", ":")) def _read_transcript_file(path: Path) -> list[dict[str, Any]]: lines_out: list[dict[str, Any]] = [] try: with open(path, encoding="utf-8") as f: for line_no, line in enumerate(f, start=1): line = line.strip() if not line: continue try: obj = json.loads(line) except json.JSONDecodeError: logger.warning("bad jsonl at {} line {}", path, line_no) continue if isinstance(obj, dict): lines_out.append(obj) except OSError as e: logger.warning("read transcript failed {}: {}", path, e) return [] return lines_out def _records_bytes(records: list[dict[str, Any]]) -> int: total = 0 for record in records: total += len(_record_json_line(record).encode("utf-8")) + 1 return total def _flatten_turns(turns: list[list[dict[str, Any]]]) -> list[dict[str, Any]]: return [record for turn in turns for record in turn] def _write_records_to_path(path: Path, rows: list[dict[str, Any]]) -> None: path.parent.mkdir(parents=True, exist_ok=True) tmp_path = path.with_suffix(path.suffix + ".tmp") try: with open(tmp_path, "w", encoding="utf-8") as f: for row in rows: raw = _record_json_line(row) if len(raw.encode("utf-8")) > _MAX_TRANSCRIPT_FILE_BYTES: raise ValueError("webui transcript line too large") f.write(raw + "\n") f.flush() os.fsync(f.fileno()) os.replace(tmp_path, path) except BaseException: tmp_path.unlink(missing_ok=True) raise def _segment_file_path(session_key: str, segment_id: str) -> Path: return webui_transcript_segments_dir(session_key) / f"{segment_id}.jsonl" def _segment_ids_on_disk(session_key: str) -> list[str]: directory = webui_transcript_segments_dir(session_key) if not directory.is_dir(): return [] return sorted( path.stem for path in directory.iterdir() if path.is_file() and _TRANSCRIPT_SEGMENT_RE.fullmatch(path.name) ) def _segment_manifest_entry(session_key: str, segment_id: str) -> dict[str, Any]: path = _segment_file_path(session_key, segment_id) lines = _read_transcript_file(path) return { "id": segment_id, "bytes": path.stat().st_size if path.exists() else 0, "turn_count": len(_split_transcript_turns(lines)), "user_count": sum(1 for line in lines if _is_user_transcript_row(line)), } def _non_negative_int(value: Any) -> int | None: if isinstance(value, bool) or not isinstance(value, int) or value < 0: return None return value def _normalize_manifest_entry(session_key: str, entry: Any) -> dict[str, Any] | None: if not isinstance(entry, dict): return None segment_id = entry.get("id") if not isinstance(segment_id, str) or not _TRANSCRIPT_SEGMENT_RE.fullmatch(f"{segment_id}.jsonl"): return None segment_path = _segment_file_path(session_key, segment_id) values = { key: _non_negative_int(entry.get(key)) for key in ("bytes", "turn_count", "user_count") } if not segment_path.is_file() or values["bytes"] != segment_path.stat().st_size: return None if values["turn_count"] is None or values["user_count"] is None: return None return { "id": segment_id, "bytes": values["bytes"], "turn_count": values["turn_count"], "user_count": values["user_count"], } def _write_segment_manifest(session_key: str, segment_ids: list[str]) -> None: directory = webui_transcript_segments_dir(session_key) directory.mkdir(parents=True, exist_ok=True) data = { "version": _TRANSCRIPT_SEGMENT_MANIFEST_VERSION, "segments": [_segment_manifest_entry(session_key, segment_id) for segment_id in segment_ids], } path = _webui_transcript_manifest_path(session_key) tmp_path = path.with_suffix(".json.tmp") try: tmp_path.write_text(json.dumps(data, ensure_ascii=False, indent=2) + "\n", encoding="utf-8") os.replace(tmp_path, path) except BaseException: tmp_path.unlink(missing_ok=True) raise def _rebuild_segment_manifest(session_key: str) -> list[str]: segment_ids = _segment_ids_on_disk(session_key) if segment_ids: _write_segment_manifest(session_key, segment_ids) else: _webui_transcript_manifest_path(session_key).unlink(missing_ok=True) return segment_ids def _rebuilt_segment_manifest_entries(session_key: str) -> list[dict[str, Any]]: return [_segment_manifest_entry(session_key, segment_id) for segment_id in _rebuild_segment_manifest(session_key)] def _read_segment_manifest_entries(session_key: str) -> list[dict[str, Any]]: directory = webui_transcript_segments_dir(session_key) if not directory.is_dir(): return [] path = _webui_transcript_manifest_path(session_key) if not path.is_file(): return _rebuilt_segment_manifest_entries(session_key) try: data = json.loads(path.read_text(encoding="utf-8")) raw_segments = data.get("segments") if isinstance(data, dict) else None if data.get("version") != _TRANSCRIPT_SEGMENT_MANIFEST_VERSION or not isinstance(raw_segments, list): return _rebuilt_segment_manifest_entries(session_key) entries: list[dict[str, Any]] = [] for entry in raw_segments: normalized = _normalize_manifest_entry(session_key, entry) if normalized is None: return _rebuilt_segment_manifest_entries(session_key) entries.append(normalized) if [entry["id"] for entry in entries] != _segment_ids_on_disk(session_key): return _rebuilt_segment_manifest_entries(session_key) return entries except (OSError, json.JSONDecodeError, TypeError, AttributeError): return _rebuilt_segment_manifest_entries(session_key) def _read_segment_ids(session_key: str) -> list[str]: return [entry["id"] for entry in _read_segment_manifest_entries(session_key)] def _append_segment_turns(session_key: str, turns: list[list[dict[str, Any]]]) -> None: if not turns: return segment_ids = _read_segment_ids(session_key) next_id = int(segment_ids[-1]) + 1 if segment_ids else 1 batch: list[list[dict[str, Any]]] = [] batch_bytes = 0 for turn in turns: turn_bytes = _records_bytes(turn) if batch and batch_bytes + turn_bytes > _MAX_TRANSCRIPT_FILE_BYTES: segment_id = f"{next_id:06d}" _write_records_to_path(_segment_file_path(session_key, segment_id), _flatten_turns(batch)) segment_ids.append(segment_id) next_id += 1 batch = [] batch_bytes = 0 batch.append(turn) batch_bytes += turn_bytes if batch: segment_id = f"{next_id:06d}" _write_records_to_path(_segment_file_path(session_key, segment_id), _flatten_turns(batch)) segment_ids.append(segment_id) _write_segment_manifest(session_key, segment_ids) def _rotate_active_transcript_if_needed(session_key: str) -> None: path = webui_transcript_path(session_key) if not path.is_file(): return try: if path.stat().st_size <= _MAX_TRANSCRIPT_FILE_BYTES: return except OSError: return lines = _read_transcript_file(path) if not lines: return turns = _split_transcript_turns(lines) if len(turns) <= 1: return keep_start = len(turns) - 1 keep_bytes = 0 for idx in range(len(turns) - 1, -1, -1): turn_bytes = _records_bytes(turns[idx]) if idx == len(turns) - 1 or keep_bytes + turn_bytes <= _TARGET_ACTIVE_TRANSCRIPT_BYTES: keep_start = idx keep_bytes += turn_bytes continue break moved = turns[:keep_start] kept = turns[keep_start:] if not moved: return _append_segment_turns(session_key, moved) _write_records_to_path(path, _flatten_turns(kept)) def _chunk_ids(session_key: str) -> list[str]: _rotate_active_transcript_if_needed(session_key) ids = _read_segment_ids(session_key) if webui_transcript_path(session_key).is_file(): ids.append(_TRANSCRIPT_ACTIVE_CHUNK_ID) return ids def _read_chunk_turns(session_key: str, chunk_id: str) -> list[list[dict[str, Any]]]: if chunk_id == _TRANSCRIPT_ACTIVE_CHUNK_ID: path = webui_transcript_path(session_key) else: path = _segment_file_path(session_key, chunk_id) if not path.is_file(): return [] return _split_transcript_turns(_read_transcript_file(path)) def _encode_page_cursor(before_turn_ordinal: int) -> str: raw = json.dumps( {"before_turn": before_turn_ordinal}, separators=(",", ":"), ensure_ascii=False, ).encode("utf-8") return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") def _decode_page_cursor(value: str | None) -> int | None: if not value: return None try: padded = value + "=" * (-len(value) % 4) data = json.loads(base64.urlsafe_b64decode(padded.encode("ascii")).decode("utf-8")) except (binascii.Error, json.JSONDecodeError, UnicodeDecodeError, ValueError): return None if not isinstance(data, dict): return None before_turn = data.get("before_turn") if ( isinstance(before_turn, bool) or not isinstance(before_turn, int) or before_turn < 0 ): return None return before_turn def _coerce_page_limit(limit: int | None) -> int: if limit is None: return _DEFAULT_TRANSCRIPT_PAGE_LIMIT return max(1, min(_MAX_TRANSCRIPT_PAGE_LIMIT, int(limit))) def _chunk_turn_refs(session_key: str) -> list[_TranscriptChunkRef]: _rotate_active_transcript_if_needed(session_key) refs: list[_TranscriptChunkRef] = [] ordinal = 0 for entry in _read_segment_manifest_entries(session_key): chunk_id = str(entry["id"]) turn_count = int(entry["turn_count"]) if turn_count <= 0: continue refs.append(_TranscriptChunkRef(chunk_id, ordinal, turn_count, int(entry["user_count"]))) ordinal += turn_count if webui_transcript_path(session_key).is_file(): active_turns = _read_chunk_turns(session_key, _TRANSCRIPT_ACTIVE_CHUNK_ID) active_turn_count = len(active_turns) if active_turn_count > 0: refs.append( _TranscriptChunkRef( _TRANSCRIPT_ACTIVE_CHUNK_ID, ordinal, active_turn_count, sum(1 for turn in active_turns for row in turn if _is_user_transcript_row(row)), ), ) return refs def _count_user_messages_before_ordinal( session_key: str, chunks: list[_TranscriptChunkRef], before_ordinal: int, ) -> int: total = 0 for chunk in chunks: if before_ordinal <= chunk.start_ordinal: break local_end = min(chunk.turn_count, before_ordinal - chunk.start_ordinal) if local_end <= 0: continue if local_end >= chunk.turn_count: total += chunk.user_count continue turns = _read_chunk_turns(session_key, chunk.chunk_id) total += sum( 1 for turn in turns[:local_end] for row in turn if _is_user_transcript_row(row) ) return total def _select_transcript_page( session_key: str, *, limit: int | None, before: str | None, _manifest_rebuilt: bool = False, ) -> tuple[list[dict[str, Any]], dict[str, Any]]: page_limit = _coerce_page_limit(limit) chunks = _chunk_turn_refs(session_key) total_turns = sum(chunk.turn_count for chunk in chunks) before_ordinal = _decode_page_cursor(before) upper_ordinal = total_turns if before_ordinal is None else min(before_ordinal, total_turns) selected: list[_TranscriptTurnRef] = [] selected_message_count = 0 for chunk in reversed(chunks): if chunk.start_ordinal >= upper_ordinal: continue local_upper = min(chunk.turn_count, upper_ordinal - chunk.start_ordinal) if local_upper <= 0: continue turns = _read_chunk_turns(session_key, chunk.chunk_id) if ( chunk.chunk_id != _TRANSCRIPT_ACTIVE_CHUNK_ID and len(turns) != chunk.turn_count and not _manifest_rebuilt ): _rebuild_segment_manifest(session_key) return _select_transcript_page( session_key, limit=limit, before=before, _manifest_rebuilt=True, ) local_upper = min(local_upper, len(turns)) for turn_index in range(local_upper - 1, -1, -1): ordinal = chunk.start_ordinal + turn_index turn = turns[turn_index] selected.append(_TranscriptTurnRef(ordinal, turn)) selected_message_count += len(replay_transcript_to_ui_messages(turn)) if selected_message_count >= page_limit: break if selected_message_count >= page_limit: break selected_chronological = list(reversed(selected)) lines = [record for ref in selected_chronological for record in ref.records] if not selected_chronological: return [], { "before_cursor": None, "has_more_before": False, "loaded_message_count": 0, "user_message_offset": 0, } first_ref = selected_chronological[0] has_more = first_ref.ordinal > 0 page = { "before_cursor": _encode_page_cursor(first_ref.ordinal) if has_more else None, "has_more_before": has_more, "loaded_message_count": 0, "user_message_offset": _count_user_messages_before_ordinal( session_key, chunks, first_ref.ordinal, ), } return lines, page def read_transcript_lines(session_key: str) -> list[dict[str, Any]]: lines: list[dict[str, Any]] = [] for chunk_id in _chunk_ids(session_key): if chunk_id == _TRANSCRIPT_ACTIVE_CHUNK_ID: lines.extend(_read_transcript_file(webui_transcript_path(session_key))) else: lines.extend(_read_transcript_file(_segment_file_path(session_key, chunk_id))) return lines def _write_transcript_lines(session_key: str, rows: list[dict[str, Any]]) -> None: delete_webui_transcript(session_key) path = webui_transcript_path(session_key) _write_records_to_path(path, rows) _rotate_active_transcript_if_needed(session_key) def _append_to_active_transcript(session_key: str, obj: dict[str, Any]) -> None: raw = _record_json_line(obj) if len(raw.encode("utf-8")) > _MAX_TRANSCRIPT_FILE_BYTES: msg = "webui transcript line too large" raise ValueError(msg) path = webui_transcript_path(session_key) path.parent.mkdir(parents=True, exist_ok=True) line = raw + "\n" with open(path, "a", encoding="utf-8") as f: f.write(line) f.flush() os.fsync(f.fileno()) def append_transcript_object(session_key: str, obj: dict[str, Any]) -> None: _append_to_active_transcript(session_key, obj) if obj.get("event") == "turn_end": _rotate_active_transcript_if_needed(session_key) def normalize_webui_turn_id(value: Any) -> str: if isinstance(value, str): candidate = value.strip() if _WEBUI_TURN_ID_RE.fullmatch(candidate): return candidate return str(uuid.uuid4()) def webui_message_source(metadata: dict[str, Any] | None) -> dict[str, str] | None: raw = (metadata or {}).get(WEBUI_MESSAGE_SOURCE_METADATA_KEY) if not isinstance(raw, dict) or raw.get("kind") != "cron": return None source: dict[str, str] = {"kind": "cron"} label = raw.get("label") if isinstance(label, str) and label.strip(): source["label"] = label.strip() return source class WebUITranscriptRecorder: """Prepare and persist WebUI wire events without leaking UI rules into channels.""" def __init__(self, log: Any = logger) -> None: self._log = log self._turn_sequences: dict[tuple[str, str], int] = {} def client_turn_metadata(self, value: Any) -> dict[str, str]: return {WEBUI_TURN_METADATA_KEY: normalize_webui_turn_id(value)} def prepare_event( self, chat_id: str, event: dict[str, Any], *, metadata: dict[str, Any] | None = None, phase: str | None = None, include_source: bool = False, ) -> None: if include_source and (source := webui_message_source(metadata)): event["source"] = source self._annotate_turn(chat_id, event, metadata, phase) def prepare_and_append( self, chat_id: str, event: dict[str, Any], *, metadata: dict[str, Any] | None = None, phase: str | None = None, include_source: bool = False, transcript_overrides: dict[str, Any] | None = None, ) -> None: self.prepare_event( chat_id, event, metadata=metadata, phase=phase, include_source=include_source, ) record = dict(event) if transcript_overrides: record.update(transcript_overrides) self.append(chat_id, record) def append_user_message( self, chat_id: str, text: str, *, metadata: dict[str, Any], media_paths: list[str] | None = None, cli_apps: list[dict[str, Any]] | None = None, mcp_presets: list[dict[str, Any]] | None = None, ) -> None: if text.strip() == "/stop" and not media_paths: return payload = build_user_transcript_event( chat_id, text, media_paths=media_paths, cli_apps=cli_apps, mcp_presets=mcp_presets, ) if payload is None: return self.prepare_and_append(chat_id, payload, metadata=metadata, phase="user") def append(self, chat_id: str, event: dict[str, Any]) -> None: try: dup = json.loads(json.dumps(event, ensure_ascii=False)) append_transcript_object(f"websocket:{chat_id}", dup) except (OSError, ValueError, TypeError) as e: self._log.warning("webui transcript append failed: {}", e) def _next_turn_seq(self, chat_id: str, turn_id: str) -> int: key = (chat_id, turn_id) seq = self._turn_sequences.get(key, 0) + 1 self._turn_sequences[key] = seq return seq def _annotate_turn( self, chat_id: str, event: dict[str, Any], metadata: dict[str, Any] | None, phase: str | None, ) -> None: if phase is None: return turn_id = (metadata or {}).get(WEBUI_TURN_METADATA_KEY) if not isinstance(turn_id, str) or not turn_id: return event["turn_id"] = turn_id event["turn_phase"] = phase event["turn_seq"] = self._next_turn_seq(chat_id, turn_id) if phase == "complete": self._turn_sequences.pop((chat_id, turn_id), None) def _chat_id_from_session_key(session_key: str) -> str | None: if not session_key.startswith("websocket:"): return None chat_id = session_key.split(":", 1)[1].strip() return chat_id or None def _is_user_transcript_row(row: dict[str, Any]) -> bool: return row.get("event") == "user" or row.get("role") == "user" def fork_transcript_before_user_index( source_key: str, target_key: str, before_user_index: int, ) -> bool: """Copy transcript rows before a zero-based global user-message index. ``before_user_index == user_count`` copies the full transcript prefix. WebUI uses that when forking from an assistant reply at the end of a chat. """ if before_user_index < 0: return False lines = read_transcript_lines(source_key) if not lines: return False target_chat_id = _chat_id_from_session_key(target_key) copied: list[dict[str, Any]] = [] user_index = 0 found_target = False for row in lines: if row.get("event") == WEBUI_FORK_MARKER_EVENT: continue if _is_user_transcript_row(row): if user_index == before_user_index: found_target = True break user_index += 1 dup = json.loads(json.dumps(row, ensure_ascii=False)) if target_chat_id is not None: dup["chat_id"] = target_chat_id copied.append(dup) if user_index == before_user_index: found_target = True if not found_target: return False _write_transcript_lines(target_key, copied) return True def append_fork_marker(session_key: str) -> None: """Mark the UI-only boundary where a WebUI fork starts accepting new turns.""" append_transcript_object( session_key, { "event": WEBUI_FORK_MARKER_EVENT, "chat_id": _chat_id_from_session_key(session_key), }, ) def write_session_messages_as_transcript( target_key: str, messages: list[dict[str, Any]], ) -> None: """Write a minimal WebUI transcript from already-truncated session messages.""" target_chat_id = _chat_id_from_session_key(target_key) rows: list[dict[str, Any]] = [] for msg in messages: role = msg.get("role") content = msg.get("content") text = content if isinstance(content, str) else "" if role == "user": row: dict[str, Any] = {"event": "user", "chat_id": target_chat_id, "text": text} media = msg.get("media") if isinstance(media, list) and media: row["media_paths"] = [str(p) for p in media if isinstance(p, str) and p] for key in ("cli_apps", "mcp_presets"): value = msg.get(key) if isinstance(value, list) and value: row[key] = json.loads(json.dumps(value, ensure_ascii=False)) elif role == "assistant" and text.strip(): row = {"event": "message", "chat_id": target_chat_id, "text": text} media = msg.get("media") if isinstance(media, list) and media: row["media"] = [str(p) for p in media if isinstance(p, str) and p] else: continue rows.append(row) _write_transcript_lines(target_key, rows) def delete_webui_transcript(session_key: str) -> bool: removed = False for path in (webui_transcript_path(session_key), _legacy_webui_thread_path(session_key)): if not path.is_file(): continue try: path.unlink() removed = True except OSError as e: logger.warning("Failed to delete webui transcript {}: {}", path, e) segments_dir = webui_transcript_segments_dir(session_key) if segments_dir.is_dir(): try: shutil.rmtree(segments_dir) removed = True except OSError as e: logger.warning("Failed to delete webui transcript segments {}: {}", segments_dir, e) return removed def build_user_transcript_event( chat_id: str, text: str, *, media_paths: list[Any] | None = None, cli_apps: list[Any] | None = None, mcp_presets: list[Any] | None = None, ) -> dict[str, Any] | None: paths = [str(path) for path in (media_paths or []) if path] if not text and not paths: return None event: dict[str, Any] = { "event": "user", "chat_id": chat_id, "text": text, } if paths: event["media_paths"] = paths apps = [dict(app) for app in (cli_apps or []) if isinstance(app, Mapping)] if apps: event["cli_apps"] = apps presets = [dict(preset) for preset in (mcp_presets or []) if isinstance(preset, Mapping)] if presets: event["mcp_presets"] = presets return event def _session_user_event( session_key: str, message: dict[str, Any], ) -> dict[str, Any] | None: if message.get("role") != "user": return None content = message.get("content") text = content if isinstance(content, str) else "" media = message.get("media") cli_apps = message.get("cli_apps") mcp_presets = message.get("mcp_presets") chat_id = session_key.split(":", 1)[1] if ":" in session_key else session_key return build_user_transcript_event( chat_id, text, media_paths=media if isinstance(media, list) else None, cli_apps=cli_apps if isinstance(cli_apps, list) else None, mcp_presets=mcp_presets if isinstance(mcp_presets, list) else None, ) def _assistant_text_signature(value: Any) -> str: return value.strip() if isinstance(value, str) else "" def _session_backfill_turns( session_key: str, session_messages: list[dict[str, Any]], ) -> list[tuple[dict[str, Any], tuple[str, ...]]]: turns: list[tuple[dict[str, Any], tuple[str, ...]]] = [] current_user: dict[str, Any] | None = None assistant_texts: list[str] = [] def flush() -> None: if current_user is None: return signature = tuple(text for text in assistant_texts if text) if signature: turns.append((current_user, signature)) for message in session_messages: role = message.get("role") if role == "user": flush() current_user = _session_user_event(session_key, message) assistant_texts = [] continue if role == "assistant" and current_user is not None: text = _assistant_text_signature(message.get("content")) if text: assistant_texts.append(text) flush() return turns def _split_transcript_turns(lines: list[dict[str, Any]]) -> list[list[dict[str, Any]]]: turns: list[list[dict[str, Any]]] = [] current: list[dict[str, Any]] = [] for rec in lines: current.append(rec) if rec.get("event") == "turn_end": turns.append(current) current = [] if current: turns.append(current) return turns def _transcript_turn_signature(records: list[dict[str, Any]]) -> tuple[str, ...]: texts: list[str] = [] for message in replay_transcript_to_ui_messages(records): if message.get("role") != "assistant" or message.get("kind") == "trace": continue text = _assistant_text_signature(message.get("content")) if text: texts.append(text) return tuple(texts) def _find_unique_session_turn( session_turns: list[tuple[dict[str, Any], tuple[str, ...]]], signature: tuple[str, ...], start: int, ) -> int | None: if not signature: return None found: int | None = None for index in range(start, len(session_turns)): if session_turns[index][1] != signature: continue if found is not None: return None found = index return found def _with_backfilled_user( records: list[dict[str, Any]], user_event: dict[str, Any], ) -> list[dict[str, Any]]: for index, rec in enumerate(records): if rec.get("event") in _TURN_DISPLAY_EVENTS: return [*records[:index], dict(user_event), *records[index:]] return records def inject_missing_user_events_from_session( session_key: str, lines: list[dict[str, Any]], session_messages: list[dict[str, Any]] | None, ) -> list[dict[str, Any]]: """Backfill user rows for legacy WebUI transcripts that only stored assistant streams.""" if not lines or not session_messages: return lines session_turns = _session_backfill_turns(session_key, session_messages) if not session_turns: return lines out: list[dict[str, Any]] = [] session_cursor = 0 for turn in _split_transcript_turns(lines): has_user = any(rec.get("event") == "user" for rec in turn) signature = _transcript_turn_signature(turn) match_index = _find_unique_session_turn(session_turns, signature, session_cursor) if match_index is None: out.extend(turn) continue out.extend(turn if has_user else _with_backfilled_user(turn, session_turns[match_index][0])) session_cursor = match_index + 1 return out def _format_tool_call_trace(call: Any) -> str | None: if not call or not isinstance(call, dict): return None fn = call.get("function") name = fn.get("name") if isinstance(fn, dict) else None if not isinstance(name, str) or not name: raw_name = call.get("name") name = raw_name if isinstance(raw_name, str) else "" if not name: return None args = (fn.get("arguments") if isinstance(fn, dict) else None) or call.get("arguments") if isinstance(args, str) and args.strip(): return f"{name}({args})" if args and isinstance(args, dict): return f"{name}({json.dumps(args, ensure_ascii=False)})" return f"{name}()" def tool_trace_lines_from_events(events: Any) -> list[str]: if not isinstance(events, list): return [] lines: list[str] = [] seen: set[str] = set() for event in events: if not event or not isinstance(event, dict): continue if event.get("phase") not in {"start", "end", "error"}: continue call_id = event.get("call_id") if isinstance(call_id, str) and call_id: if call_id in seen: continue seen.add(call_id) t = _format_tool_call_trace(event) if t: lines.append(t) return lines _PHASE_RANK = {"start": 1, "end": 2, "error": 3} def _normalize_tool_events(events: Any) -> list[dict[str, Any]]: if not isinstance(events, list): return [] out: list[dict[str, Any]] = [] for event in events: if not event or not isinstance(event, dict): continue if event.get("phase") not in {"start", "end", "error"}: continue if not isinstance(event.get("name"), str): fn = event.get("function") if not (isinstance(fn, dict) and isinstance(fn.get("name"), str)): continue out.append(dict(event)) return out def _tool_event_key(event: dict[str, Any]) -> str: call_id = event.get("call_id") if isinstance(call_id, str) and call_id: return f"call:{call_id}" return _format_tool_call_trace(event) or json.dumps(event, sort_keys=True, ensure_ascii=False) def _tool_event_file_edit_key(event: dict[str, Any]) -> str | None: call_id = event.get("call_id") if not isinstance(call_id, str) or not call_id: return None name = event.get("name") if not isinstance(name, str) or not name: fn = event.get("function") name = fn.get("name") if isinstance(fn, dict) else "" if not isinstance(name, str) or name not in _FILE_EDIT_TOOL_NAMES: return None return f"{call_id}|{name}" def _merge_tool_events(previous: Any, incoming: list[dict[str, Any]]) -> list[dict[str, Any]]: if not isinstance(previous, list) or not previous: return incoming if not incoming: return [dict(event) for event in previous if isinstance(event, dict)] merged = [dict(event) for event in previous if isinstance(event, dict)] index_by_key = {_tool_event_key(event): idx for idx, event in enumerate(merged)} for event in incoming: key = _tool_event_key(event) existing_index = index_by_key.get(key) if existing_index is None: index_by_key[key] = len(merged) merged.append(event) continue existing = merged[existing_index] incoming_rank = _PHASE_RANK.get(str(event.get("phase")), 0) existing_rank = _PHASE_RANK.get(str(existing.get("phase")), 0) if incoming_rank >= existing_rank: merged[existing_index] = {**existing, **event} return merged def _file_edit_key(edit: dict[str, Any]) -> str: call_id = str(edit.get("call_id") or "") tool = str(edit.get("tool") or "") if call_id: return f"{call_id}|{tool}" return f"{tool}|{edit.get('path') or ''}" def _message_has_file_edit_for_tool_event( message: dict[str, Any], event: dict[str, Any], ) -> bool: key = _tool_event_file_edit_key(event) if not key: return False edits = message.get("fileEdits") if not isinstance(edits, list): return False return any(isinstance(edit, dict) and _file_edit_key(edit) == key for edit in edits) def _filter_covered_file_edit_tool_events( messages: list[dict[str, Any]], events: list[dict[str, Any]], ) -> list[dict[str, Any]]: if not events: return events return [ event for event in events if not any(_message_has_file_edit_for_tool_event(message, event) for message in messages) ] def _strip_covered_file_edit_tool_hints( message: dict[str, Any], edits: list[dict[str, Any]], ) -> dict[str, Any]: incoming_keys = { _file_edit_key(edit) for edit in edits if isinstance(edit, dict) } events = message.get("toolEvents") if not incoming_keys or not isinstance(events, list): return message kept_events: list[dict[str, Any]] = [] removed_trace_lines: set[str] = set() changed = False for event in events: if not isinstance(event, dict): continue key = _tool_event_file_edit_key(event) if key and key in incoming_keys: changed = True removed_trace_lines.update(tool_trace_lines_from_events([event])) continue kept_events.append(event) if not changed: return message raw_traces = message.get("traces") if isinstance(raw_traces, list): previous_traces = [trace for trace in raw_traces if isinstance(trace, str)] else: content = message.get("content") previous_traces = [content] if isinstance(content, str) and content else [] next_traces = [trace for trace in previous_traces if trace not in removed_trace_lines] next_message = { **message, "traces": next_traces, "content": next_traces[-1] if next_traces else "", } if kept_events: next_message["toolEvents"] = kept_events else: next_message.pop("toolEvents", None) return next_message def _merge_unique_tool_trace_lines( previous_traces: list[str], lines: list[str], ) -> tuple[list[str], bool]: seen_lines = set(previous_traces) traces = list(previous_traces) added = False for line in lines: if line in seen_lines: continue seen_lines.add(line) traces.append(line) added = True return traces, added def _media_from_signed_urls(value: Any) -> list[dict[str, Any]]: media: list[dict[str, Any]] = [] urls = value if isinstance(value, list) else [] for m in urls: if isinstance(m, dict) and m.get("url"): name = str(m.get("name") or "") media.append( { "kind": _media_kind_from_name(name), "url": str(m["url"]), "name": name, }, ) return media def replay_transcript_to_ui_messages( lines: list[dict[str, Any]], *, augment_user_media: Callable[[list[str]], list[dict[str, Any]]] | None = None, augment_assistant_media: Callable[[list[str]], list[dict[str, Any]]] | None = None, augment_assistant_text: Callable[[str], str] | None = None, ) -> list[dict[str, Any]]: """Fold JSONL records into ``UIMessage``-shaped dicts for the WebUI. Mirrors the core fold in ``useNanobotStream.ts`` (delta, reasoning, message+kind, turn_end). ``augment_user_media`` maps persisted filesystem paths to ``{url, name?}`` / attachment dicts the client expects. Assistant media gets a separate hook so replay can re-sign outbound attachments after a gateway restart instead of reusing stale process-local signed URLs. """ messages: list[dict[str, Any]] = [] buffer_message_id: str | None = None buffer_parts: list[str] = [] suppress_until_turn_end = False active_activity_segment_id: str | None = None active_file_edit_segment_id: str | None = None activity_segment_counter = 0 _ts_base = int(time.time() * 1000) closed_turn_ids: set[str] = set() replay_turn_aliases: dict[str, str] = {} def _new_id(prefix: str, idx: int) -> str: return f"{prefix}-{idx}-{uuid.uuid4().hex[:8]}" def _new_activity_segment(*, activate: bool = True) -> str: nonlocal active_activity_segment_id, activity_segment_counter activity_segment_counter += 1 segment_id = f"activity-{activity_segment_counter}" if activate: active_activity_segment_id = segment_id return segment_id def _turn_fields(rec: dict[str, Any], fallback_phase: str | None = None) -> dict[str, Any]: fields: dict[str, Any] = {} turn_id = rec.get("turn_id") if isinstance(turn_id, str) and turn_id: if turn_id in closed_turn_ids: fields["turnId"] = replay_turn_aliases.setdefault( turn_id, f"{turn_id}:replay:{idx}", ) else: fields["turnId"] = turn_id phase = rec.get("turn_phase") if isinstance(phase, str) and phase: fields["turnPhase"] = phase elif fallback_phase: fields["turnPhase"] = fallback_phase seq = rec.get("turn_seq") if isinstance(seq, (int, float)): fields["turnSeq"] = int(seq) return fields def _source_fields(rec: dict[str, Any]) -> dict[str, Any]: source = rec.get("source") if not isinstance(source, dict) or source.get("kind") != "cron": return {} out: dict[str, Any] = {"source": {"kind": "cron"}} label = source.get("label") if isinstance(label, str) and label.strip(): out["source"]["label"] = label.strip() return out def _same_turn(message: dict[str, Any], turn_fields: dict[str, Any]) -> bool: turn_id = turn_fields.get("turnId") message_turn_id = message.get("turnId") return not turn_id or not message_turn_id or turn_id == message_turn_id def _ensure_activity_segment() -> str: return active_activity_segment_id or _new_activity_segment() def close_activity_for_answer() -> None: nonlocal active_activity_segment_id, active_file_edit_segment_id active_activity_segment_id = None active_file_edit_segment_id = None def close_file_edit_phase_before_activity() -> None: nonlocal active_activity_segment_id, active_file_edit_segment_id if active_file_edit_segment_id: active_activity_segment_id = None active_file_edit_segment_id = None def attach_reasoning_chunk( prev: list[dict[str, Any]], chunk: str, idx: int, turn_fields: dict[str, Any] | None = None, ) -> None: turn_fields = turn_fields or {} for i in range(len(prev) - 1, -1, -1): candidate = prev[i] if candidate.get("role") == "user": break if candidate.get("kind") == "trace": break if candidate.get("role") != "assistant": continue if not _same_turn(candidate, turn_fields): break content = str(candidate.get("content") or "") has_answer = len(content) > 0 if ( candidate.get("reasoningStreaming") or candidate.get("reasoning") is not None or has_answer or candidate.get("isStreaming") ): prev[i] = { **candidate, "reasoning": (str(candidate.get("reasoning") or "")) + chunk, "reasoningStreaming": True, "activitySegmentId": candidate.get("activitySegmentId") or _ensure_activity_segment(), **turn_fields, } return if not has_answer and candidate.get("isStreaming"): prev[i] = { **candidate, "reasoning": chunk, "reasoningStreaming": True, "activitySegmentId": candidate.get("activitySegmentId") or _ensure_activity_segment(), **turn_fields, } return break segment = _ensure_activity_segment() prev.append( { "id": _new_id("as", idx), "role": "assistant", "content": "", "isStreaming": True, "reasoning": chunk, "reasoningStreaming": True, "activitySegmentId": segment, **turn_fields, "createdAt": _ts_base + idx, }, ) def find_active_placeholder( prev: list[dict[str, Any]], turn_fields: dict[str, Any] | None = None, ) -> str | None: turn_fields = turn_fields or {} last = prev[-1] if prev else None if not last: return None if last.get("role") != "assistant" or last.get("kind") == "trace": return None if str(last.get("content") or ""): return None if not last.get("isStreaming"): return None if not _same_turn(last, turn_fields): return None return str(last.get("id")) def demote_interrupted_assistant(segment: str) -> None: nonlocal buffer_message_id, buffer_parts for i in range(len(messages) - 1, -1, -1): candidate = messages[i] if candidate.get("role") == "user": break content = candidate.get("content") if ( candidate.get("role") != "assistant" or candidate.get("kind") == "trace" or not candidate.get("isStreaming") or not isinstance(content, str) or not content.strip() or candidate.get("media") ): continue reasoning_parts = [ part for part in (candidate.get("reasoning"), content) if isinstance(part, str) and part.strip() ] messages[i] = { **candidate, "content": "", "reasoning": "\n\n".join(reasoning_parts), "reasoningStreaming": False, "isStreaming": False, "activitySegmentId": candidate.get("activitySegmentId") or segment, } if buffer_message_id == candidate.get("id"): buffer_message_id = None buffer_parts = [] return def close_reasoning(prev: list[dict[str, Any]]) -> None: for i in range(len(prev) - 1, -1, -1): if prev[i].get("reasoningStreaming"): prev[i] = {**prev[i], "reasoningStreaming": False} return def is_reasoning_only_placeholder(m: dict[str, Any]) -> bool: return ( m.get("role") == "assistant" and m.get("kind") != "trace" and not str(m.get("content") or "").strip() and bool(m.get("reasoning")) and not m.get("reasoningStreaming") and not m.get("media") ) def is_tool_trace_at(index: int) -> bool: m = messages[index] if 0 <= index < len(messages) else None return bool(m and m.get("kind") == "trace") def prune_reasoning_only() -> None: nonlocal messages kept: list[dict[str, Any]] = [] for i, m in enumerate(messages): if is_reasoning_only_placeholder(m) and not is_tool_trace_at(i + 1): continue kept.append(m) messages = kept def stamp_latency(latency_ms: int) -> None: for i in range(len(messages) - 1, -1, -1): if messages[i].get("role") == "assistant" and messages[i].get("kind") != "trace": messages[i] = { **messages[i], "latencyMs": latency_ms, "isStreaming": False, } return def absorb_complete(extra: dict[str, Any], idx: int) -> None: nonlocal active_activity_segment_id, active_file_edit_segment_id last = messages[-1] if messages else None if last and is_reasoning_only_placeholder(last) and _same_turn(last, extra): messages[-1] = { **last, **extra, "isStreaming": False, "reasoningStreaming": False, } else: messages.append( { "id": _new_id("as", idx), "role": "assistant", "createdAt": _ts_base + idx, **extra, }, ) active_activity_segment_id = None active_file_edit_segment_id = None def find_file_edit_trace_index( segment: str | None, edits: list[dict[str, Any]], ) -> int | None: incoming_keys = {_file_edit_key(edit) for edit in edits if isinstance(edit, dict)} for i in range(len(messages) - 1, -1, -1): candidate = messages[i] if candidate.get("role") == "user": break if candidate.get("kind") != "trace": continue if segment and candidate.get("activitySegmentId") == segment: return i existing_edits = candidate.get("fileEdits") if isinstance(existing_edits, list): for existing in existing_edits: if isinstance(existing, dict) and _file_edit_key(existing) in incoming_keys: return i existing_tool_events = candidate.get("toolEvents") if isinstance(existing_tool_events, list): for event in existing_tool_events: if not isinstance(event, dict): continue key = _tool_event_file_edit_key(event) if key and key in incoming_keys: return i return None def upsert_file_edits( edits: list[dict[str, Any]], idx: int, turn_fields: dict[str, Any] | None = None, ) -> None: nonlocal active_file_edit_segment_id turn_fields = turn_fields or {} if not edits: return segment = active_file_edit_segment_id if not segment: segment = _new_activity_segment(activate=False) active_file_edit_segment_id = segment demote_interrupted_assistant(segment) target_index = find_file_edit_trace_index(segment, edits) if target_index is not None: last = messages[target_index] segment = str(last.get("activitySegmentId") or segment or _new_activity_segment(activate=False)) active_file_edit_segment_id = segment last = _strip_covered_file_edit_tool_hints(last, edits) else: if not segment: segment = _new_activity_segment(activate=False) active_file_edit_segment_id = segment messages.append( { "id": _new_id("tr", idx), "role": "tool", "kind": "trace", "content": "", "traces": [], "fileEdits": [], "activitySegmentId": segment, **turn_fields, "createdAt": _ts_base + idx, }, ) target_index = len(messages) - 1 last = messages[target_index] if not segment: segment = _new_activity_segment(activate=False) active_file_edit_segment_id = segment existing = list(last.get("fileEdits") or []) index_by_key = { _file_edit_key(edit): pos for pos, edit in enumerate(existing) if isinstance(edit, dict) } for edit in edits: if not isinstance(edit, dict): continue key = _file_edit_key(edit) if key in index_by_key: pos = index_by_key[key] merged = {**existing[pos], **edit} if edit.get("path") and not edit.get("pending"): merged.pop("pending", None) existing[pos] = merged else: index_by_key[key] = len(existing) existing.append(dict(edit)) messages[target_index] = { **last, "fileEdits": existing, "activitySegmentId": last.get("activitySegmentId") or segment, **turn_fields, } for idx, rec in enumerate(lines): ev = rec.get("event") if ev == "user": active_activity_segment_id = None active_file_edit_segment_id = None text = rec.get("text") text_s = text if isinstance(text, str) else "" media_paths = rec.get("media_paths") paths: list[str] = [] if isinstance(media_paths, list): paths = [str(p) for p in media_paths if p] media_att: list[dict[str, Any]] | None = None if paths and augment_user_media is not None: media_att = augment_user_media(paths) row: dict[str, Any] = { "id": _new_id("u", idx), "role": "user", "content": text_s, **_turn_fields(rec, "user"), "createdAt": _ts_base + idx, } if media_att: row["media"] = media_att if all(m.get("kind") == "image" for m in media_att): row["images"] = [{"url": m.get("url"), "name": m.get("name")} for m in media_att] cli_apps = rec.get("cli_apps") if isinstance(cli_apps, list) and cli_apps: row["cliApps"] = [dict(app) for app in cli_apps if isinstance(app, dict)] mcp_presets = rec.get("mcp_presets") if isinstance(mcp_presets, list) and mcp_presets: row["mcpPresets"] = [ dict(preset) for preset in mcp_presets if isinstance(preset, dict) ] messages.append(row) continue if ev == "file_edit": raw_edits = rec.get("edits") if isinstance(raw_edits, list): upsert_file_edits( [e for e in raw_edits if isinstance(e, dict)], idx, _turn_fields(rec, "activity"), ) continue if ev == "delta": if suppress_until_turn_end: continue chunk = rec.get("text") if not isinstance(chunk, str): continue close_activity_for_answer() turn_fields = _turn_fields(rec, "answer") adopted = find_active_placeholder(messages, turn_fields) if buffer_message_id is None else None if buffer_message_id is None: if adopted: buffer_message_id = adopted else: buffer_message_id = _new_id("buf", idx) messages.append( { "id": buffer_message_id, "role": "assistant", "content": "", "isStreaming": True, **_turn_fields(rec, "answer"), "createdAt": _ts_base + idx, }, ) buffer_parts.append(chunk) combined = "".join(buffer_parts) for i, m in enumerate(messages): if m.get("id") == buffer_message_id: messages[i] = { **m, "content": combined, "isStreaming": True, **_turn_fields(rec, "answer"), } break continue if ev == "stream_end": if suppress_until_turn_end: buffer_message_id = None buffer_parts = [] continue final_text = rec.get("text") if isinstance(final_text, str): if buffer_message_id is None: buffer_message_id = _new_id("buf", idx) messages.append( { "id": buffer_message_id, "role": "assistant", "content": final_text, "isStreaming": True, **_turn_fields(rec, "answer"), "createdAt": _ts_base + idx, }, ) else: for i, m in enumerate(messages): if m.get("id") == buffer_message_id: messages[i] = { **m, "content": final_text, "isStreaming": True, **_turn_fields(rec, "answer"), } break buffer_message_id = None buffer_parts = [] continue if ev == "reasoning_delta": if suppress_until_turn_end: continue chunk = rec.get("text") if not isinstance(chunk, str) or not chunk: continue close_file_edit_phase_before_activity() attach_reasoning_chunk(messages, chunk, idx, _turn_fields(rec, "reasoning")) continue if ev == "reasoning_end": if suppress_until_turn_end: continue close_reasoning(messages) continue if ev == "message": if suppress_until_turn_end and rec.get("kind") in ( "tool_hint", "progress", "reasoning", ): continue kind = rec.get("kind") if kind == "reasoning": line = rec.get("text") if not isinstance(line, str) or not line: continue close_file_edit_phase_before_activity() attach_reasoning_chunk(messages, line, idx, _turn_fields(rec, "reasoning")) close_reasoning(messages) continue if kind in ("tool_hint", "progress"): structured_events = _normalize_tool_events(rec.get("tool_events")) visible_structured_events = _filter_covered_file_edit_tool_events(messages, structured_events) structured = tool_trace_lines_from_events(visible_structured_events) text = rec.get("text") if structured: trace_lines = structured elif structured_events: trace_lines = [] elif isinstance(text, str) and text: trace_lines = [text] else: trace_lines = [] if not trace_lines: continue segment = _ensure_activity_segment() demote_interrupted_assistant(segment) last = messages[-1] if messages else None if ( last and last.get("kind") == "trace" and not last.get("isStreaming") and (last.get("activitySegmentId") in (None, segment)) ): prev_traces = list(last.get("traces") or [last.get("content")]) if structured: merged_traces, added = _merge_unique_tool_trace_lines(prev_traces, structured) if not added and not visible_structured_events: continue else: merged_traces = prev_traces + trace_lines merged = { **last, "traces": merged_traces, "content": merged_traces[-1], "toolEvents": _merge_tool_events(last.get("toolEvents"), visible_structured_events) if visible_structured_events else last.get("toolEvents"), "activitySegmentId": last.get("activitySegmentId") or segment, **_turn_fields(rec, "activity"), } messages[-1] = merged else: messages.append( { "id": _new_id("tr", idx), "role": "tool", "kind": "trace", "content": trace_lines[-1], "traces": trace_lines, **({"toolEvents": visible_structured_events} if visible_structured_events else {}), "activitySegmentId": segment, **_turn_fields(rec, "activity"), "createdAt": _ts_base + idx, }, ) continue buffer_message_id = None buffer_parts = [] text = rec.get("text") content_s = text if isinstance(text, str) else "" media: list[dict[str, Any]] = [] raw_media = rec.get("media") raw_media_list = raw_media if isinstance(raw_media, list) else [] media_paths = [path for path in raw_media_list if isinstance(path, str) and path] if media_paths and augment_assistant_media is not None: media = augment_assistant_media(media_paths) if not media and (not media_paths or augment_assistant_media is None): media = _media_from_signed_urls(rec.get("media_urls")) extra: dict[str, Any] = {"content": content_s} if media: extra["media"] = media lat = rec.get("latency_ms") if isinstance(lat, (int, float)) and lat >= 0: extra["latencyMs"] = int(lat) extra.update(_turn_fields(rec, "answer")) extra.update(_source_fields(rec)) absorb_complete(extra, idx) if media: suppress_until_turn_end = True continue if ev == "turn_end": suppress_until_turn_end = False active_activity_segment_id = None active_file_edit_segment_id = None turn_id = rec.get("turn_id") if isinstance(turn_id, str) and turn_id: if turn_id in replay_turn_aliases: replay_turn_aliases.pop(turn_id, None) else: closed_turn_ids.add(turn_id) for i, m in enumerate(messages): if m.get("isStreaming"): messages[i] = {**m, "isStreaming": False} prune_reasoning_only() lat = rec.get("latency_ms") if isinstance(lat, (int, float)) and lat >= 0: stamp_latency(int(lat)) buffer_message_id = None buffer_parts = [] continue for i, m in enumerate(messages): if ( augment_assistant_text is not None and m.get("role") == "assistant" and m.get("kind") != "trace" and isinstance(m.get("content"), str) ): messages[i] = {**m, "content": augment_assistant_text(m["content"])} m.pop("isStreaming", None) m.pop("reasoningStreaming", None) return messages def fork_boundary_message_count(lines: list[dict[str, Any]]) -> int | None: """Return the replayed UI message count before the first fork marker, if any.""" for idx, rec in enumerate(lines): if rec.get("event") != WEBUI_FORK_MARKER_EVENT: continue return len(replay_transcript_to_ui_messages(lines[:idx])) return None def build_webui_thread_response( session_key: str, *, augment_user_media: Callable[[list[str]], list[dict[str, Any]]] | None = None, augment_assistant_media: Callable[[list[str]], list[dict[str, Any]]] | None = None, augment_assistant_text: Callable[[str], str] | None = None, session_messages: list[dict[str, Any]] | None = None, limit: int | None = None, direction: str | None = None, before: str | None = None, ) -> dict[str, Any] | None: """Return a payload compatible with ``WebuiThreadPersistedPayload``.""" paginated = limit is not None or direction is not None or before is not None page: dict[str, Any] | None = None if paginated: lines, page = _select_transcript_page(session_key, limit=limit, before=before) else: lines = read_transcript_lines(session_key) if not lines: return None lines = inject_missing_user_events_from_session(session_key, lines, session_messages) fork_boundary = fork_boundary_message_count(lines) msgs = replay_transcript_to_ui_messages( lines, augment_user_media=augment_user_media, augment_assistant_media=augment_assistant_media, augment_assistant_text=augment_assistant_text, ) payload = { "schemaVersion": WEBUI_TRANSCRIPT_SCHEMA_VERSION, "sessionKey": session_key, "messages": msgs, } if page is not None: page["loaded_message_count"] = len(msgs) payload["page"] = page if fork_boundary is not None: payload["fork_boundary_message_count"] = fork_boundary return payload