"""Interactive onboarding questionnaire for nanobot.""" import json import types from dataclasses import dataclass from functools import lru_cache from typing import Any, Literal, NamedTuple, get_args, get_origin try: import questionary except ModuleNotFoundError: # pragma: no cover - exercised in environments without wizard deps questionary = None from loguru import logger from pydantic import BaseModel from rich.console import Console from rich.panel import Panel from rich.table import Table from nanobot.cli.models import ( format_token_count, get_model_context_limit, get_model_suggestions, ) from nanobot.config.loader import get_config_path, load_config from nanobot.config.schema import Config, ModelPresetConfig console = Console() @dataclass class OnboardResult: """Result of an onboarding session.""" config: Config should_save: bool # --- Field Hints for Select Fields --- # Maps field names to (choices, hint_text) # To add a new select field with hints, add an entry: # "field_name": (["choice1", "choice2", ...], "hint text for the field") _SELECT_FIELD_HINTS: dict[str, tuple[list[str], str]] = { "reasoning_effort": ( ["low", "medium", "high"], "low / medium / high - enables LLM thinking mode", ), } # --- Key Bindings for Navigation --- _BACK_PRESSED = object() # Sentinel value for back navigation # Cache of model-preset names populated at runtime so that field handlers can # offer existing presets as choices (e.g. AgentDefaults.model_preset). # # Lifecycle: populated by _sync_preset_cache(config), which must be called # after every config mutation that changes model_presets (add, delete, edit). # Cleared between tests via _MODEL_PRESET_CACHE.clear(). In long-running # processes (gateway) the cache is refreshed each time the preset management # screen is entered, so staleness is bounded by user interaction. _MODEL_PRESET_CACHE: set[str] = set() def _get_questionary(): """Return questionary or raise a clear error when wizard deps are unavailable.""" if questionary is None: raise RuntimeError( "Interactive onboarding requires the optional 'questionary' dependency. " "Install project dependencies and rerun with --wizard." ) return questionary def _select_with_back( prompt: str, choices: list[str], default: str | None = None ) -> str | None | object: """Select with Escape/Left arrow support for going back. Args: prompt: The prompt text to display. choices: List of choices to select from. Must not be empty. default: The default choice to pre-select. If not in choices, first item is used. Returns: _BACK_PRESSED sentinel if user pressed Escape or Left arrow The selected choice string if user confirmed None if user cancelled (Ctrl+C) """ from prompt_toolkit.application import Application from prompt_toolkit.key_binding import KeyBindings from prompt_toolkit.keys import Keys from prompt_toolkit.layout import Layout from prompt_toolkit.layout.containers import HSplit, Window from prompt_toolkit.layout.controls import FormattedTextControl from prompt_toolkit.styles import Style # Validate choices if not choices: logger.warning("Empty choices list provided to _select_with_back") return None # Find default index selected_index = 0 if default and default in choices: selected_index = choices.index(default) # State holder for the result state: dict[str, str | None | object] = {"result": None} # Build menu items (uses closure over selected_index) def get_menu_text(): items = [] for i, choice in enumerate(choices): if i == selected_index: items.append(("class:selected", f"> {choice}\n")) else: items.append(("", f" {choice}\n")) return items # Create layout menu_control = FormattedTextControl(get_menu_text) menu_window = Window(content=menu_control, height=len(choices)) prompt_control = FormattedTextControl(lambda: [("class:question", f"> {prompt}")]) prompt_window = Window(content=prompt_control, height=1) layout = Layout(HSplit([prompt_window, menu_window])) # Key bindings bindings = KeyBindings() @bindings.add(Keys.Up) def _up(event): nonlocal selected_index selected_index = (selected_index - 1) % len(choices) event.app.invalidate() @bindings.add(Keys.Down) def _down(event): nonlocal selected_index selected_index = (selected_index + 1) % len(choices) event.app.invalidate() @bindings.add(Keys.Enter) def _enter(event): state["result"] = choices[selected_index] event.app.exit() @bindings.add("escape") def _escape(event): state["result"] = _BACK_PRESSED event.app.exit() @bindings.add(Keys.Left) def _left(event): state["result"] = _BACK_PRESSED event.app.exit() @bindings.add(Keys.ControlC) def _ctrl_c(event): state["result"] = None event.app.exit() # Style style = Style.from_dict({ "selected": "fg:green bold", "question": "fg:cyan", }) app = Application(layout=layout, key_bindings=bindings, style=style) try: app.run() except Exception: logger.exception("Error in select prompt") return None return state["result"] # --- Type Introspection --- class FieldTypeInfo(NamedTuple): """Result of field type introspection.""" type_name: str inner_type: Any def _get_field_type_info(field_info) -> FieldTypeInfo: """Extract field type info from Pydantic field.""" annotation = field_info.annotation if annotation is None: return FieldTypeInfo("str", None) origin = get_origin(annotation) args = get_args(annotation) if origin is types.UnionType: non_none_args = [a for a in args if a is not type(None)] if len(non_none_args) == 1: annotation = non_none_args[0] origin = get_origin(annotation) args = get_args(annotation) _simple_types: dict[type, str] = {bool: "bool", int: "int", float: "float"} if origin is list or (hasattr(origin, "__name__") and origin.__name__ == "List"): return FieldTypeInfo("list", args[0] if args else str) if origin is dict or (hasattr(origin, "__name__") and origin.__name__ == "Dict"): return FieldTypeInfo("dict", None) for py_type, name in _simple_types.items(): if annotation is py_type: return FieldTypeInfo(name, None) if isinstance(annotation, type) and issubclass(annotation, BaseModel): return FieldTypeInfo("model", annotation) if origin is Literal: return FieldTypeInfo("literal", list(args)) return FieldTypeInfo("str", None) def _get_field_display_name(field_key: str, field_info) -> str: """Get display name for a field.""" if field_info and field_info.description: return field_info.description name = field_key suffix_map = { "_s": " (seconds)", "_ms": " (ms)", "_url": " URL", "_path": " Path", "_id": " ID", "_key": " Key", "_token": " Token", } for suffix, replacement in suffix_map.items(): if name.endswith(suffix): name = name[: -len(suffix)] + replacement break return name.replace("_", " ").title() # --- Sensitive Field Masking --- _SENSITIVE_KEYWORDS = frozenset({"api_key", "token", "secret", "password", "credentials"}) def _is_sensitive_field(field_name: str) -> bool: """Check if a field name indicates sensitive content.""" return any(kw in field_name.lower() for kw in _SENSITIVE_KEYWORDS) def _mask_value(value: str) -> str: """Mask a sensitive value, showing only the last 4 characters.""" if len(value) <= 4: return "****" return "*" * (len(value) - 4) + value[-4:] # --- Value Formatting --- def _format_value(value: Any, rich: bool = True, field_name: str = "") -> str: """Single recursive entry point for safe value display. Handles any depth.""" if value is None or value == "" or value == {} or value == []: return "[dim]not set[/dim]" if rich else "[not set]" if _is_sensitive_field(field_name) and isinstance(value, str): masked = _mask_value(value) return f"[dim]{masked}[/dim]" if rich else masked if isinstance(value, BaseModel): parts = [] for fname, _finfo in type(value).model_fields.items(): fval = getattr(value, fname, None) formatted = _format_value(fval, rich=False, field_name=fname) if formatted != "[not set]": parts.append(f"{fname}={formatted}") return ", ".join(parts) if parts else ("[dim]not set[/dim]" if rich else "[not set]") if isinstance(value, list): return ", ".join(str(v) for v in value) if isinstance(value, dict): # Handle dicts containing BaseModel instances parts = [] for k, v in value.items(): formatted = _format_value(v, rich=False, field_name=str(k)) parts.append(f"{k}: {formatted}") return ", ".join(parts) if parts else ("[dim]not set[/dim]" if rich else "[not set]") return str(value) def _format_value_for_input(value: Any, field_type: str) -> str: """Format a value for use as input default.""" if value is None or value == "": return "" if field_type == "list" and isinstance(value, list): return ",".join(str(v) for v in value) if field_type == "dict" and isinstance(value, dict): return json.dumps(value) return str(value) def _validate_field_constraint(value: Any, field_info) -> str | None: """Validate a value against Pydantic Field constraints. Returns an error message string if validation fails, None if valid. Uses attribute-based detection to handle Pydantic v2 internal types. """ if field_info is None or not hasattr(field_info, "metadata"): return None for m in field_info.metadata: if hasattr(m, "ge") and isinstance(value, (int, float)): if value < m.ge: return f"Value must be >= {m.ge}" if hasattr(m, "gt") and isinstance(value, (int, float)): if value <= m.gt: return f"Value must be > {m.gt}" if hasattr(m, "le") and isinstance(value, (int, float)): if value > m.le: return f"Value must be <= {m.le}" if hasattr(m, "lt") and isinstance(value, (int, float)): if value >= m.lt: return f"Value must be < {m.lt}" if hasattr(m, "min_length") and hasattr(value, "__len__"): if len(value) < m.min_length: return f"Length must be >= {m.min_length}" if hasattr(m, "max_length") and hasattr(value, "__len__"): if len(value) > m.max_length: return f"Length must be <= {m.max_length}" return None def _get_constraint_hint(field_info) -> str: """Derive a human-readable constraint hint from field metadata. Returns a string like "(0-10)" or "(>= 0)" to append to field display names. """ if field_info is None or not hasattr(field_info, "metadata"): return "" ge_val = None le_val = None for m in field_info.metadata: if hasattr(m, "ge"): ge_val = m.ge if hasattr(m, "le"): le_val = m.le if ge_val is not None and le_val is not None: return f" ({ge_val}-{le_val})" if ge_val is not None: return f" (>= {ge_val})" if le_val is not None: return f" (<= {le_val})" return "" # --- Rich UI Components --- def _show_config_panel(display_name: str, model: BaseModel, fields: list) -> None: """Display current configuration as a rich table.""" table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("Field", style="cyan") table.add_column("Value") for fname, field_info in fields: value = getattr(model, fname, None) display = _get_field_display_name(fname, field_info) formatted = _format_value(value, rich=True, field_name=fname) table.add_row(display, formatted) console.print(Panel(table, title=f"[bold]{display_name}[/bold]", border_style="blue")) def _show_main_menu_header() -> None: """Display the main menu header.""" from nanobot import __logo__, __version__ console.print() # Use Align.CENTER for the single line of text from rich.align import Align console.print( Align.center(f"{__logo__} [bold cyan]nanobot[{__version__}][/bold cyan]") ) console.print() def _show_section_header(title: str, subtitle: str = "") -> None: """Display a section header.""" console.print() if subtitle: console.print( Panel(f"[dim]{subtitle}[/dim]", title=f"[bold]{title}[/bold]", border_style="blue") ) else: console.print(Panel("", title=f"[bold]{title}[/bold]", border_style="blue")) # --- Input Handlers --- def _input_bool(display_name: str, current: bool | None) -> bool | None: """Get boolean input via confirm dialog.""" return _get_questionary().confirm( display_name, default=bool(current) if current is not None else False, ).ask() def _input_text(display_name: str, current: Any, field_type: str, field_info=None) -> Any: """Get text input and parse based on field type.""" default = _format_value_for_input(current, field_type) value = _get_questionary().text(f"{display_name}:", default=default).ask() if value is None: return None if field_type == "int": try: parsed = int(value) except ValueError: console.print("[yellow]! Invalid number format, value not saved[/yellow]") return None if field_info: error = _validate_field_constraint(parsed, field_info) if error: console.print(f"[yellow]! {error}, value not saved[/yellow]") return None return parsed elif field_type == "float": try: parsed = float(value) except ValueError: console.print("[yellow]! Invalid number format, value not saved[/yellow]") return None if field_info: error = _validate_field_constraint(parsed, field_info) if error: console.print(f"[yellow]! {error}, value not saved[/yellow]") return None return parsed elif field_type == "list": return [v.strip() for v in value.split(",") if v.strip()] elif field_type == "dict": try: return json.loads(value) except json.JSONDecodeError: console.print("[yellow]! Invalid JSON format, value not saved[/yellow]") return None return value def _input_with_existing( display_name: str, current: Any, field_type: str, field_info=None ) -> Any: """Handle input with 'keep existing' option for non-empty values.""" has_existing = current is not None and current != "" and current != {} and current != [] if has_existing and not isinstance(current, list): choice = _get_questionary().select( display_name, choices=["Enter new value", "Keep existing value"], default="Keep existing value", ).ask() if choice == "Keep existing value" or choice is None: return None return _input_text(display_name, current, field_type, field_info=field_info) # --- Pydantic Model Configuration --- def _get_current_provider(model: BaseModel) -> str: """Get the current provider setting from a model (if available).""" if hasattr(model, "provider"): return getattr(model, "provider", "auto") or "auto" return "auto" def _input_model_with_autocomplete( display_name: str, current: Any, provider: str ) -> str | None: """Get model input with autocomplete suggestions. """ from prompt_toolkit.completion import Completer, Completion default = str(current) if current else "" class DynamicModelCompleter(Completer): """Completer that dynamically fetches model suggestions.""" def __init__(self, provider_name: str): self.provider = provider_name def get_completions(self, document, complete_event): text = document.text_before_cursor suggestions = get_model_suggestions(text, provider=self.provider, limit=50) for model in suggestions: # Skip if model doesn't contain the typed text if text.lower() not in model.lower(): continue yield Completion( model, start_position=-len(text), display=model, ) value = _get_questionary().autocomplete( f"{display_name}:", choices=[""], # Placeholder, actual completions from completer completer=DynamicModelCompleter(provider), default=default, qmark=">", ).ask() return value if value is not None else None def _input_context_window_with_recommendation( display_name: str, current: Any, model_obj: BaseModel ) -> int | None: """Get context window input with option to fetch recommended value.""" current_val = current if current else "" choices = ["Enter new value"] if current_val: choices.append("Keep existing value") choices.append("[?] Get recommended value") choice = _get_questionary().select( display_name, choices=choices, default="Enter new value", ).ask() if choice is None: return None if choice == "Keep existing value": return None if choice == "[?] Get recommended value": # Get the model name from the model object model_name = getattr(model_obj, "model", None) if not model_name: console.print("[yellow]! Please configure the model field first[/yellow]") return None provider = _get_current_provider(model_obj) context_limit = get_model_context_limit(model_name, provider) if context_limit: console.print(f"[green]+ Recommended context window: {format_token_count(context_limit)} tokens[/green]") return context_limit else: console.print("[yellow]! Could not fetch model info, please enter manually[/yellow]") # Fall through to manual input # Manual input value = _get_questionary().text( f"{display_name}:", default=str(current_val) if current_val else "", ).ask() if value is None or value == "": return None try: return int(value) except ValueError: console.print("[yellow]! Invalid number format, value not saved[/yellow]") return None def _handle_model_field( working_model: BaseModel, field_name: str, field_display: str, current_value: Any ) -> None: """Handle the 'model' field with autocomplete and context-window auto-fill.""" provider = _get_current_provider(working_model) new_value = _input_model_with_autocomplete(field_display, current_value, provider) if new_value is not None and new_value != current_value: setattr(working_model, field_name, new_value) _try_auto_fill_context_window(working_model, new_value) def _handle_context_window_field( working_model: BaseModel, field_name: str, field_display: str, current_value: Any ) -> None: """Handle context_window_tokens with recommendation lookup.""" new_value = _input_context_window_with_recommendation( field_display, current_value, working_model ) if new_value is not None: setattr(working_model, field_name, new_value) def _handle_model_preset_field( working_model: BaseModel, field_name: str, field_display: str, current_value: Any ) -> None: """Handle the 'model_preset' field with a list of existing presets.""" # model_preset lives on AgentDefaults, but the preset list is on Config. # We can't easily access Config here, so we read from the global config # via a module-level cache set by _configure_model_presets / run_onboard. preset_names = sorted(_MODEL_PRESET_CACHE) choices = ["(clear/unset)"] + preset_names default_choice = str(current_value) if current_value else "(clear/unset)" new_value = _select_with_back(field_display, choices, default=default_choice) if new_value is _BACK_PRESSED: return if new_value == "(clear/unset)": setattr(working_model, field_name, None) elif new_value is not None: setattr(working_model, field_name, new_value) def _handle_provider_field( working_model: BaseModel, field_name: str, field_display: str, current_value: Any ) -> None: """Handle the 'provider' field with a list of registered providers.""" provider_names = sorted(_get_provider_names().keys()) choices = ["auto"] + provider_names default_choice = str(current_value) if current_value else "auto" new_value = _select_with_back(field_display, choices, default=default_choice) if new_value is _BACK_PRESSED: return if new_value is not None: setattr(working_model, field_name, new_value) def _handle_fallback_presets_field( working_model: BaseModel, field_name: str, field_display: str, current_value: Any ) -> None: """Handle the 'fallback_presets' field with preset-aware multi-select.""" items: list[str] = list(current_value) if isinstance(current_value, list) else [] preset_names = sorted(_MODEL_PRESET_CACHE) while True: console.clear() console.print(f"[bold]{field_display}[/bold]") if items: for idx, item in enumerate(items, 1): console.print(f" {idx}. {item}") else: console.print(" [dim](empty)[/dim]") console.print() choices = ["[+] Add preset"] if items: choices.append("[-] Remove last") choices.append("[X] Clear all") choices.append("[Done]") choices.append("<- Back") answer = _get_questionary().select( "Manage fallback chain:", choices=choices, qmark=">", ).ask() if answer is None or answer == "<- Back": return if answer == "[Done]": setattr(working_model, field_name, items) return if answer == "[+] Add preset": if not preset_names: console.print("[yellow]! No presets defined yet.[/yellow]") _get_questionary().press_any_key_to_continue().ask() continue add_choices = [p for p in preset_names if p not in items] if not add_choices: console.print("[yellow]! All presets already added.[/yellow]") _get_questionary().press_any_key_to_continue().ask() continue picked = _select_with_back("Select preset:", add_choices) if picked is _BACK_PRESSED or picked is None: continue items.append(picked) elif answer == "[-] Remove last" and items: items.pop() elif answer == "[X] Clear all" and items: items.clear() _FIELD_HANDLERS: dict[str, Any] = { "model": _handle_model_field, "context_window_tokens": _handle_context_window_field, "model_preset": _handle_model_preset_field, "provider": _handle_provider_field, "fallback_presets": _handle_fallback_presets_field, } def _is_str_or_none(annotation: Any) -> bool: """Check whether a field annotation is ``str | None`` (or ``Optional[str]``).""" origin = get_origin(annotation) if origin is None: return False args = get_args(annotation) return str in args and type(None) in args def _configure_pydantic_model( model: BaseModel, display_name: str, *, skip_fields: set[str] | None = None, ) -> BaseModel | None: """Configure a Pydantic model interactively. Returns the updated model only when the user explicitly selects "Done". Back and cancel actions discard the section draft. """ skip_fields = skip_fields or set() working_model = model.model_copy(deep=True) fields = [ (name, info) for name, info in type(working_model).model_fields.items() if name not in skip_fields ] if not fields: console.print(f"[dim]{display_name}: No configurable fields[/dim]") return working_model def get_choices() -> list[str]: items = [] for fname, finfo in fields: value = getattr(working_model, fname, None) display = _get_field_display_name(fname, finfo) formatted = _format_value(value, rich=False, field_name=fname) items.append(f"{display}: {formatted}") return items + ["[Done]"] last_field_name: str | None = None while True: console.clear() _show_config_panel(display_name, working_model, fields) choices = get_choices() default_choice = None if last_field_name: for idx, (fname, _) in enumerate(fields): if fname == last_field_name: default_choice = choices[idx] break answer = _select_with_back( "Select field to configure:", choices, default=default_choice ) if answer is _BACK_PRESSED or answer is None: return None if answer == "[Done]": return working_model field_idx = next((i for i, c in enumerate(choices) if c == answer), -1) if field_idx < 0 or field_idx >= len(fields): return None last_field_name = fields[field_idx][0] field_name, field_info = fields[field_idx] current_value = getattr(working_model, field_name, None) ftype = _get_field_type_info(field_info) field_display = _get_field_display_name(field_name, field_info) + _get_constraint_hint(field_info) # Nested Pydantic model - recurse if ftype.type_name == "model": nested = current_value created = nested is None if nested is None and ftype.inner_type: nested = ftype.inner_type() if nested and isinstance(nested, BaseModel): updated = _configure_pydantic_model(nested, field_display) if updated is not None: setattr(working_model, field_name, updated) elif created: setattr(working_model, field_name, None) continue # Registered special-field handlers handler = _FIELD_HANDLERS.get(field_name) if handler: handler(working_model, field_name, field_display, current_value) continue # Select fields with hints (e.g. reasoning_effort) if field_name in _SELECT_FIELD_HINTS: choices_list, hint = _SELECT_FIELD_HINTS[field_name] select_choices = choices_list + ["(clear/unset)"] console.print(f"[dim] Hint: {hint}[/dim]") new_value = _select_with_back( field_display, select_choices, default=current_value or select_choices[0] ) if new_value is _BACK_PRESSED: continue if new_value == "(clear/unset)": setattr(working_model, field_name, None) elif new_value is not None: setattr(working_model, field_name, new_value) continue # Generic field input if ftype.type_name == "literal" and ftype.inner_type: select_choices = [str(v) for v in ftype.inner_type] default_choice = str(current_value) if current_value in ftype.inner_type else select_choices[0] new_value = _select_with_back(field_display, select_choices, default=default_choice) if new_value is _BACK_PRESSED: continue if new_value is not None: setattr(working_model, field_name, new_value) continue if ftype.type_name == "bool": new_value = _input_bool(field_display, current_value) else: new_value = _input_with_existing(field_display, current_value, ftype.type_name, field_info=field_info) if new_value is not None: # Normalize empty string to None for optional string fields so that # clearing an api_key / api_base actually removes the value. if new_value == "" and _is_str_or_none(field_info.annotation): new_value = None setattr(working_model, field_name, new_value) def _try_auto_fill_context_window(model: BaseModel, new_model_name: str) -> None: """Try to auto-fill context_window_tokens if it's at default value. Note: This function imports AgentDefaults from nanobot.config.schema to get the default context_window_tokens value. If the schema changes, this coupling needs to be updated accordingly. """ # Check if context_window_tokens field exists if not hasattr(model, "context_window_tokens"): return current_context = getattr(model, "context_window_tokens", None) # Check if current value is the default (65536) # We only auto-fill if the user hasn't changed it from default from nanobot.config.schema import AgentDefaults default_context = AgentDefaults.model_fields["context_window_tokens"].default if current_context != default_context: return # User has customized it, don't override provider = _get_current_provider(model) context_limit = get_model_context_limit(new_model_name, provider) if context_limit: setattr(model, "context_window_tokens", context_limit) console.print(f"[green]+ Auto-filled context window: {format_token_count(context_limit)} tokens[/green]") else: console.print("[dim](i) Could not auto-fill context window (model not in database)[/dim]") # --- Model Preset Configuration --- def _sync_preset_cache(config: Config) -> None: """Synchronise the module-level preset name cache from config.""" _MODEL_PRESET_CACHE.clear() _MODEL_PRESET_CACHE.update(config.model_presets.keys()) def _configure_model_presets(config: Config) -> None: """Configure model presets (CRUD).""" _sync_preset_cache(config) def get_preset_choices() -> list[str]: choices: list[str] = [] for name, preset in config.model_presets.items(): choices.append(f"{name} ({preset.model})") choices.append("[+] Add new preset") choices.append("<- Back") return choices last_preset_name: str | None = None while True: try: console.clear() _show_section_header( "Model Presets", "Create, edit or delete named model presets for quick switching", ) choices = get_preset_choices() default_choice = None if last_preset_name: for c in choices: if c.startswith(last_preset_name + " ("): default_choice = c break answer = _select_with_back( "Select preset:", choices, default=default_choice ) if answer is _BACK_PRESSED or answer is None or answer == "<- Back": break assert isinstance(answer, str) if answer == "[+] Add new preset": name_input = _get_questionary().text( "Preset name:", validate=lambda t: True if t and t.strip() else "Name cannot be empty", ).ask() if not name_input: continue name = name_input.strip() if name in config.model_presets: console.print(f"[yellow]! Preset '{name}' already exists[/yellow]") _pause() continue new_preset = ModelPresetConfig(model="") updated = _configure_pydantic_model(new_preset, f"New Preset: {name}") if updated is not None: config.model_presets[name] = updated _sync_preset_cache(config) last_preset_name = name continue # Editing / deleting an existing preset # Extract preset name from "name (model)" format preset_name = answer.split(" (", 1)[0] preset = config.model_presets.get(preset_name) if preset is None: continue last_preset_name = preset_name choices = ["Edit", "Cancel"] if preset_name != "default": choices.insert(1, "Delete") action = _select_with_back( f"Preset: {preset_name}", choices, default="Edit", ) if action is _BACK_PRESSED or action == "Cancel" or action is None: continue if action == "Delete": confirm = _get_questionary().confirm( f"Delete preset '{preset_name}'?", default=False, ).ask() if confirm: del config.model_presets[preset_name] _sync_preset_cache(config) last_preset_name = None continue if action == "Edit": updated = _configure_pydantic_model(preset, f"Edit Preset: {preset_name}") if updated is not None: config.model_presets[preset_name] = updated _sync_preset_cache(config) except KeyboardInterrupt: console.print("\n[dim]Returning to main menu...[/dim]") break # --- Provider Configuration --- @lru_cache(maxsize=1) def _get_provider_info() -> dict[str, tuple[str, bool, bool, str]]: """Get provider info from registry (cached).""" from nanobot.providers.registry import PROVIDERS return { spec.name: ( spec.display_name or spec.name, spec.is_gateway, spec.is_local, spec.default_api_base, ) for spec in PROVIDERS if not spec.is_oauth } def _get_provider_names() -> dict[str, str]: """Get provider display names.""" info = _get_provider_info() return {name: data[0] for name, data in info.items() if name} def _configure_provider(config: Config, provider_name: str) -> None: """Configure a single LLM provider.""" provider_config = getattr(config.providers, provider_name, None) if provider_config is None: console.print(f"[red]Unknown provider: {provider_name}[/red]") return display_name = _get_provider_names().get(provider_name, provider_name) info = _get_provider_info() default_api_base = info.get(provider_name, (None, None, None, None))[3] if default_api_base and not provider_config.api_base: provider_config.api_base = default_api_base updated_provider = _configure_pydantic_model( provider_config, display_name, ) if updated_provider is not None: setattr(config.providers, provider_name, updated_provider) def _configure_providers(config: Config) -> None: """Configure LLM providers.""" def get_provider_choices() -> list[str]: """Build provider choices with config status indicators.""" choices = [] for name, display in _get_provider_names().items(): provider = getattr(config.providers, name, None) if provider and provider.api_key: choices.append(f"{display} *") else: choices.append(display) return choices + ["<- Back"] last_provider_key: str | None = None while True: try: console.clear() _show_section_header("LLM Providers", "Select a provider to configure API key and endpoint") choices = get_provider_choices() default_choice = None if last_provider_key: display = _get_provider_names().get(last_provider_key) if display: for c in choices: if c.replace(" *", "") == display: default_choice = c break answer = _select_with_back( "Select provider:", choices, default=default_choice ) if answer is _BACK_PRESSED or answer is None or answer == "<- Back": break # Type guard: answer is now guaranteed to be a string assert isinstance(answer, str) # Extract provider name from choice (remove " *" suffix if present) provider_name = answer.replace(" *", "") # Find the actual provider key from display names for name, display in _get_provider_names().items(): if display == provider_name: last_provider_key = name _configure_provider(config, name) break except KeyboardInterrupt: console.print("\n[dim]Returning to main menu...[/dim]") break # --- Channel Configuration --- @lru_cache(maxsize=1) def _get_channel_info() -> dict[str, tuple[str, type[BaseModel]]]: """Get channel info (display name + config class) from channel modules.""" import importlib from nanobot.channels.registry import discover_all result: dict[str, tuple[str, type[BaseModel]]] = {} for name, channel_cls in discover_all().items(): try: mod = importlib.import_module(f"nanobot.channels.{name}") config_name = channel_cls.__name__.replace("Channel", "Config") config_cls = getattr(mod, config_name, None) if config_cls and isinstance(config_cls, type) and issubclass(config_cls, BaseModel): display_name = getattr(channel_cls, "display_name", name.capitalize()) result[name] = (display_name, config_cls) except Exception: logger.warning("Failed to load channel module: {}", name) return result def _get_channel_names() -> dict[str, str]: """Get channel display names.""" return {name: info[0] for name, info in _get_channel_info().items()} def _get_channel_config_class(channel: str) -> type[BaseModel] | None: """Get channel config class.""" entry = _get_channel_info().get(channel) return entry[1] if entry else None def _configure_channel(config: Config, channel_name: str) -> None: """Configure a single channel.""" channel_dict = getattr(config.channels, channel_name, None) if channel_dict is None: channel_dict = {} setattr(config.channels, channel_name, channel_dict) display_name = _get_channel_names().get(channel_name, channel_name) config_cls = _get_channel_config_class(channel_name) if config_cls is None: console.print(f"[red]No configuration class found for {display_name}[/red]") return model = config_cls.model_validate(channel_dict) if channel_dict else config_cls() updated_channel = _configure_pydantic_model( model, display_name, ) if updated_channel is not None: new_dict = updated_channel.model_dump(by_alias=True, exclude_none=True) setattr(config.channels, channel_name, new_dict) def _configure_channels(config: Config) -> None: """Configure chat channels.""" channel_names = list(_get_channel_names().keys()) choices = channel_names + ["<- Back"] last_choice: str | None = None while True: try: console.clear() _show_section_header("Chat Channels", "Select a channel to configure connection settings") answer = _select_with_back( "Select channel:", choices, default=last_choice ) if answer is _BACK_PRESSED or answer is None or answer == "<- Back": break # Type guard: answer is now guaranteed to be a string assert isinstance(answer, str) last_choice = answer _configure_channel(config, answer) except KeyboardInterrupt: console.print("\n[dim]Returning to main menu...[/dim]") break # --- General Settings --- _SETTINGS_SECTIONS: dict[str, tuple[str, str, set[str] | None]] = { "Agent Settings": ("Agent Defaults", "Configure default model, temperature, and behavior", None), "Channel Common": ("Channel Common", "Configure cross-channel behavior: progress, tool hints, retries", None), "API Server": ("API Server", "Configure OpenAI-compatible API endpoint", None), "Gateway": ("Gateway Settings", "Configure server host, port, and heartbeat", None), "Tools": ("Tools Settings", "Configure web search, shell exec, and other tools", {"mcp_servers"}), } _SETTINGS_GETTER = { "Agent Settings": lambda c: c.agents.defaults, "Channel Common": lambda c: c.channels, "API Server": lambda c: c.api, "Gateway": lambda c: c.gateway, "Tools": lambda c: c.tools, } _SETTINGS_SETTER = { "Agent Settings": lambda c, v: setattr(c.agents, "defaults", v), "Channel Common": lambda c, v: setattr(c, "channels", v), "API Server": lambda c, v: setattr(c, "api", v), "Gateway": lambda c, v: setattr(c, "gateway", v), "Tools": lambda c, v: setattr(c, "tools", v), } def _configure_general_settings(config: Config, section: str) -> None: """Configure a general settings section (header + model edit + writeback).""" meta = _SETTINGS_SECTIONS.get(section) if not meta: return display_name, subtitle, skip = meta model = _SETTINGS_GETTER[section](config) updated = _configure_pydantic_model(model, display_name, skip_fields=skip) if updated is not None: _SETTINGS_SETTER[section](config, updated) # --- Summary --- def _summarize_model(obj: BaseModel) -> list[tuple[str, str]]: """Recursively summarize a Pydantic model. Returns list of (field, value) tuples.""" items: list[tuple[str, str]] = [] for field_name, field_info in type(obj).model_fields.items(): value = getattr(obj, field_name, None) if value is None or value == "" or value == {} or value == []: continue display = _get_field_display_name(field_name, field_info) ftype = _get_field_type_info(field_info) if ftype.type_name == "model" and isinstance(value, BaseModel): for nested_field, nested_value in _summarize_model(value): items.append((f"{display}.{nested_field}", nested_value)) continue formatted = _format_value(value, rich=False, field_name=field_name) if formatted != "[not set]": items.append((display, formatted)) return items def _print_summary_panel(rows: list[tuple[str, str]], title: str) -> None: """Build a two-column summary panel and print it.""" if not rows: return table = Table(show_header=False, box=None, padding=(0, 2)) table.add_column("Setting", style="cyan") table.add_column("Value") for field, value in rows: table.add_row(field, value) console.print(Panel(table, title=f"[bold]{title}[/bold]", border_style="blue")) def _show_summary(config: Config) -> None: """Display configuration summary using rich.""" console.print() # Providers provider_rows = [] for name, display in _get_provider_names().items(): provider = getattr(config.providers, name, None) status = "[green]configured[/green]" if (provider and provider.api_key) else "[dim]not configured[/dim]" provider_rows.append((display, status)) _print_summary_panel(provider_rows, "LLM Providers") # Channels channel_rows = [] for name, display in _get_channel_names().items(): channel = getattr(config.channels, name, None) if channel: enabled = ( channel.get("enabled", False) if isinstance(channel, dict) else getattr(channel, "enabled", False) ) status = "[green]enabled[/green]" if enabled else "[dim]disabled[/dim]" else: status = "[dim]not configured[/dim]" channel_rows.append((display, status)) _print_summary_panel(channel_rows, "Chat Channels") # Model Presets preset_rows = [] for name, preset in config.model_presets.items(): preset_rows.append((name, f"{preset.model} (ctx={preset.context_window_tokens})")) _print_summary_panel(preset_rows, "Model Presets") # Settings sections for title, model in [ ("Agent Settings", config.agents.defaults), ("Channel Common", config.channels), ("API Server", config.api), ("Gateway", config.gateway), ("Tools", config.tools), ]: _print_summary_panel(_summarize_model(model), title) _pause() def _pause() -> None: """Pause for user acknowledgement before clearing the screen.""" _get_questionary().text("Press Enter to continue...", default="").ask() # --- Main Entry Point --- def _has_unsaved_changes(original: Config, current: Config) -> bool: """Return True when the onboarding session has committed changes.""" return original.model_dump(by_alias=True) != current.model_dump(by_alias=True) def _prompt_main_menu_exit(has_unsaved_changes: bool) -> str: """Resolve how to leave the main menu.""" if not has_unsaved_changes: return "discard" answer = _get_questionary().select( "You have unsaved changes. What would you like to do?", choices=[ "[S] Save and Exit", "[X] Exit Without Saving", "[R] Resume Editing", ], default="[R] Resume Editing", qmark=">", ).ask() if answer == "[S] Save and Exit": return "save" if answer == "[X] Exit Without Saving": return "discard" return "resume" def run_onboard(initial_config: Config | None = None) -> OnboardResult: """Run the interactive onboarding questionnaire. Args: initial_config: Optional pre-loaded config to use as starting point. If None, loads from config file or creates new default. """ _get_questionary() if initial_config is not None: base_config = initial_config.model_copy(deep=True) else: config_path = get_config_path() if config_path.exists(): base_config = load_config() else: base_config = Config() original_config = base_config.model_copy(deep=True) config = base_config.model_copy(deep=True) _sync_preset_cache(config) last_main_choice: str | None = None while True: console.clear() _show_main_menu_header() try: answer = _get_questionary().select( "What would you like to configure?", choices=[ "[P] LLM Provider", "[M] Model Presets", "[C] Chat Channel", "[H] Channel Common", "[A] Agent Settings", "[I] API Server", "[G] Gateway", "[T] Tools", "[V] View Configuration Summary", "[S] Save and Exit", "[X] Exit Without Saving", ], default=last_main_choice, qmark=">", ).ask() except KeyboardInterrupt: answer = None if answer is None: action = _prompt_main_menu_exit(_has_unsaved_changes(original_config, config)) if action == "save": return OnboardResult(config=config, should_save=True) if action == "discard": return OnboardResult(config=original_config, should_save=False) continue _menu_dispatch = { "[P] LLM Provider": lambda: _configure_providers(config), "[M] Model Presets": lambda: _configure_model_presets(config), "[C] Chat Channel": lambda: _configure_channels(config), "[H] Channel Common": lambda: _configure_general_settings(config, "Channel Common"), "[A] Agent Settings": lambda: _configure_general_settings(config, "Agent Settings"), "[I] API Server": lambda: _configure_general_settings(config, "API Server"), "[G] Gateway": lambda: _configure_general_settings(config, "Gateway"), "[T] Tools": lambda: _configure_general_settings(config, "Tools"), "[V] View Configuration Summary": lambda: _show_summary(config), } if answer == "[S] Save and Exit": return OnboardResult(config=config, should_save=True) if answer == "[X] Exit Without Saving": return OnboardResult(config=original_config, should_save=False) action_fn = _menu_dispatch.get(answer) if action_fn: last_main_choice = answer action_fn()