+ {app.display_name} +
+ + {app.category} + ++ {app.requires + ? `${tx("settings.cliApps.requires", "Requires")}: ${app.requires}` + : app.description || tx("settings.cliApps.noDescription", "No description available.")} +
+From e2d00ffc8fab8cb73a89823d02dc95fc9a14549f Mon Sep 17 00:00:00 2001
From: Xubin Ren <52506698+Re-bin@users.noreply.github.com>
Date: Fri, 22 May 2026 22:25:12 +0800
Subject: [PATCH] feat: add CLI Apps settings MVP
---
nanobot/agent/context.py | 7 +-
nanobot/agent/loop.py | 8 +-
nanobot/agent/tools/cli_apps.py | 127 +++
nanobot/channels/websocket.py | 52 ++
nanobot/cli_apps/__init__.py | 13 +
nanobot/cli_apps/service.py | 828 ++++++++++++++++++
nanobot/cli_apps/utils.py | 62 ++
nanobot/config/schema.py | 4 +
nanobot/session/manager.py | 17 +
nanobot/templates/agent/tool_contract.md | 7 +
nanobot/webui/cli_apps_api.py | 93 ++
nanobot/webui/transcript.py | 59 +-
tests/agent/test_context_builder.py | 15 +
tests/agent/test_session_manager_history.py | 25 +
.../channels/test_websocket_envelope_media.py | 37 +
tests/channels/test_websocket_http_routes.py | 69 ++
tests/cli_apps/test_service.py | 378 ++++++++
tests/cli_apps/test_tool.py | 121 +++
tests/cli_apps/test_utils.py | 64 ++
tests/tools/test_tool_loader.py | 2 +
tests/utils/test_webui_transcript.py | 44 +
webui/src/components/CliAppMentionText.tsx | 148 ++++
webui/src/components/MessageBubble.tsx | 42 +-
.../src/components/settings/SettingsView.tsx | 585 ++++++++++++-
.../thread/AgentActivityCluster.tsx | 378 +++++++-
.../src/components/thread/ThreadComposer.tsx | 405 ++++++++-
.../src/components/thread/ThreadMessages.tsx | 6 +-
webui/src/components/thread/ThreadShell.tsx | 42 +-
.../src/components/thread/ThreadViewport.tsx | 5 +-
webui/src/globals.css | 46 +
webui/src/hooks/useNanobotStream.ts | 16 +-
webui/src/i18n/locales/en/common.json | 55 ++
webui/src/i18n/locales/zh-CN/common.json | 55 ++
webui/src/lib/api.ts | 19 +
webui/src/lib/nanobot-client.ts | 4 +-
webui/src/lib/tool-traces.ts | 55 +-
webui/src/lib/types.ts | 52 ++
.../src/tests/agent-activity-cluster.test.tsx | 80 +-
webui/src/tests/api.test.ts | 29 +
webui/src/tests/app-layout.test.tsx | 4 +
webui/src/tests/message-bubble.test.tsx | 84 +-
webui/src/tests/nanobot-client.test.ts | 43 +
webui/src/tests/thread-composer.test.tsx | 178 +++-
webui/src/tests/useNanobotStream.test.tsx | 52 ++
44 files changed, 4338 insertions(+), 77 deletions(-)
create mode 100644 nanobot/agent/tools/cli_apps.py
create mode 100644 nanobot/cli_apps/__init__.py
create mode 100644 nanobot/cli_apps/service.py
create mode 100644 nanobot/cli_apps/utils.py
create mode 100644 nanobot/webui/cli_apps_api.py
create mode 100644 tests/cli_apps/test_service.py
create mode 100644 tests/cli_apps/test_tool.py
create mode 100644 tests/cli_apps/test_utils.py
create mode 100644 webui/src/components/CliAppMentionText.tsx
diff --git a/nanobot/agent/context.py b/nanobot/agent/context.py
index 82ebfab65..4ad2124e7 100644
--- a/nanobot/agent/context.py
+++ b/nanobot/agent/context.py
@@ -156,9 +156,14 @@ class ContextBuilder:
sender_id: str | None = None,
session_summary: str | None = None,
session_metadata: Mapping[str, Any] | None = None,
+ current_runtime_lines: Sequence[str] | None = None,
) -> list[dict[str, Any]]:
"""Build the complete message list for an LLM call."""
- extra = goal_state_runtime_lines(session_metadata)
+ extra = [
+ *goal_state_runtime_lines(session_metadata),
+ ]
+ if current_runtime_lines:
+ extra.extend(line for line in current_runtime_lines if line)
runtime_ctx = self._build_runtime_context(
channel,
chat_id,
diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py
index abc6449de..b9e459af1 100644
--- a/nanobot/agent/loop.py
+++ b/nanobot/agent/loop.py
@@ -28,6 +28,7 @@ from nanobot.agent.tools.registry import ToolRegistry
from nanobot.agent.tools.self import MyTool
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
+from nanobot.cli_apps import utils as cli_app_utils
from nanobot.command import CommandContext, CommandRouter, register_builtin_commands
from nanobot.config.schema import AgentDefaults, ModelPresetConfig
from nanobot.providers.base import LLMProvider
@@ -59,7 +60,6 @@ if TYPE_CHECKING:
UNIFIED_SESSION_KEY = "unified:default"
-
class TurnState(Enum):
RESTORE = auto()
COMPACT = auto()
@@ -568,7 +568,7 @@ class AgentLoop:
media_paths = [p for p in (msg.media or []) if isinstance(p, str) and p]
has_text = isinstance(msg.content, str) and msg.content.strip()
if has_text or media_paths:
- extra: dict[str, Any] = {"media": list(media_paths)} if media_paths else {}
+ extra: dict[str, Any] = ({"media": list(media_paths)} if media_paths else {}) | cli_app_utils.session_extra(msg.metadata)
extra.update(kwargs)
text = msg.content if isinstance(msg.content, str) else ""
session.add_message("user", text, **extra)
@@ -593,7 +593,7 @@ class AgentLoop:
chat_id=self._runtime_chat_id(msg),
sender_id=msg.sender_id,
session_summary=pending_summary,
- session_metadata=session.metadata,
+ session_metadata=session.metadata, current_runtime_lines=cli_app_utils.runtime_lines(msg, self.context.workspace),
)
async def _dispatch_command_inline(
@@ -1058,7 +1058,7 @@ class AgentLoop:
current_role=current_role,
sender_id=msg.sender_id,
session_summary=pending,
- session_metadata=session.metadata,
+ session_metadata=session.metadata, current_runtime_lines=cli_app_utils.runtime_lines(msg, self.context.workspace, skip=is_subagent),
)
t_wall = time.time()
final_content, _, all_msgs, stop_reason, _ = await self._run_agent_loop(
diff --git a/nanobot/agent/tools/cli_apps.py b/nanobot/agent/tools/cli_apps.py
new file mode 100644
index 000000000..eb410ecbf
--- /dev/null
+++ b/nanobot/agent/tools/cli_apps.py
@@ -0,0 +1,127 @@
+"""Controlled runner for installed CLI Apps."""
+
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any
+
+from pydantic import Field
+
+from nanobot.agent.tools.base import Tool, tool_parameters
+from nanobot.agent.tools.schema import ArraySchema, BooleanSchema, IntegerSchema, StringSchema, tool_parameters_schema
+from nanobot.cli_apps import CliAppError, CliAppManager, CliAppsRuntimeConfig
+from nanobot.config.schema import Base
+
+
+class CliAppsToolConfig(Base):
+ """CLI Apps tool configuration."""
+
+ enable: bool = True
+ install_timeout: int = Field(default=300, ge=1, le=3600)
+ run_timeout: int = Field(default=60, ge=1, le=600)
+ catalog_ttl_seconds: int = Field(default=3600, ge=60, le=86_400)
+
+
+@tool_parameters(
+ tool_parameters_schema(
+ required=["name"],
+ name=StringSchema("Installed CLI app registry name, for example gimp, safari, or obsidian."),
+ args=ArraySchema(
+ StringSchema("One command-line argument."),
+ description="Arguments to pass to the CLI entry point. Do not include the entry point itself.",
+ nullable=True,
+ ),
+ json=BooleanSchema(
+ description="Whether to prepend --json when supported by the CLI.",
+ default=False,
+ nullable=True,
+ ),
+ working_dir=StringSchema("Optional working directory for the CLI call.", nullable=True),
+ timeout=IntegerSchema(
+ description="Timeout in seconds for this CLI call.",
+ minimum=1,
+ maximum=600,
+ nullable=True,
+ ),
+ )
+)
+class CliAppsTool(Tool):
+ """Run an installed CLI-Anything or public CLI app through a controlled argv subprocess."""
+
+ config_key = "cli_apps"
+ _scopes = {"core", "subagent"}
+
+ @classmethod
+ def config_cls(cls):
+ return CliAppsToolConfig
+
+ @classmethod
+ def enabled(cls, ctx: Any) -> bool:
+ return ctx.config.cli_apps.enable
+
+ @classmethod
+ def create(cls, ctx: Any) -> Tool:
+ cfg = ctx.config.cli_apps
+ return cls(
+ workspace=Path(ctx.workspace),
+ restrict_to_workspace=ctx.config.restrict_to_workspace,
+ runtime=CliAppsRuntimeConfig(
+ install_timeout=cfg.install_timeout,
+ run_timeout=cfg.run_timeout,
+ catalog_ttl_seconds=cfg.catalog_ttl_seconds,
+ ),
+ )
+
+ def __init__(
+ self,
+ *,
+ workspace: Path,
+ restrict_to_workspace: bool = False,
+ runtime: CliAppsRuntimeConfig | None = None,
+ ) -> None:
+ self.workspace = workspace
+ self.restrict_to_workspace = restrict_to_workspace
+ self.runtime = runtime or CliAppsRuntimeConfig()
+
+ @property
+ def name(self) -> str:
+ return "run_cli_app"
+
+ @property
+ def description(self) -> str:
+ try:
+ installed = CliAppManager(workspace=self.workspace, runtime=self.runtime).installed_names()
+ except Exception:
+ installed = []
+ installed_note = (
+ f" Installed Settings CLI Apps: {', '.join(installed)}."
+ if installed
+ else " No Settings CLI Apps are currently installed."
+ )
+ return (
+ "Run a CLI App that the user explicitly installed in Settings or attached as @app. "
+ "Do not use this for ordinary system CLIs such as git, gh, python, npm, or brew; "
+ "unknown names are rejected. Execution uses argv, not shell."
+ + installed_note
+ )
+
+ async def execute(
+ self,
+ name: str,
+ args: list[str] | None = None,
+ json: bool | None = False,
+ working_dir: str | None = None,
+ timeout: int | None = None,
+ ) -> str:
+ manager = CliAppManager(workspace=self.workspace, runtime=self.runtime)
+ try:
+ return manager.run(
+ name,
+ args=args or [],
+ json_output=bool(json),
+ working_dir=working_dir,
+ timeout=timeout,
+ restrict_to_workspace=self.restrict_to_workspace,
+ )
+ except CliAppError as exc:
+ return f"Error: {exc.message}"
diff --git a/nanobot/channels/websocket.py b/nanobot/channels/websocket.py
index 11818ea90..139e0060e 100644
--- a/nanobot/channels/websocket.py
+++ b/nanobot/channels/websocket.py
@@ -52,6 +52,11 @@ from nanobot.webui.settings_api import (
update_provider_settings,
update_web_search_settings,
)
+from nanobot.webui.cli_apps_api import (
+ cli_apps_action,
+ cli_apps_payload,
+ normalize_cli_app_mentions,
+)
from nanobot.webui.sidebar_state import (
read_webui_sidebar_state,
write_webui_sidebar_state,
@@ -653,6 +658,21 @@ class WebSocketChannel(BaseChannel):
if got == "/api/settings/image-generation/update":
return self._handle_settings_image_generation_update(request)
+ if got == "/api/settings/cli-apps":
+ return self._handle_settings_cli_apps(request)
+
+ if got == "/api/settings/cli-apps/install":
+ return await self._handle_settings_cli_apps_action(request, "install")
+
+ if got == "/api/settings/cli-apps/update":
+ return await self._handle_settings_cli_apps_action(request, "update")
+
+ if got == "/api/settings/cli-apps/uninstall":
+ return await self._handle_settings_cli_apps_action(request, "uninstall")
+
+ if got == "/api/settings/cli-apps/test":
+ return await self._handle_settings_cli_apps_action(request, "test")
+
m = re.match(r"^/api/sessions/([^/]+)/messages$", got)
if m:
return self._handle_session_messages(request, m.group(1))
@@ -874,6 +894,32 @@ class WebSocketChannel(BaseChannel):
return _http_error(e.status, e.message)
return _http_json_response(self._with_settings_restart_state(payload, section="image"))
+ def _handle_settings_cli_apps(self, request: WsRequest) -> Response:
+ if not self._check_api_token(request):
+ return _http_error(401, "Unauthorized")
+ try:
+ payload = cli_apps_payload()
+ except Exception:
+ self.logger.exception("failed to load CLI Apps payload")
+ return _http_error(500, "failed to load CLI Apps")
+ return _http_json_response(payload)
+
+ async def _handle_settings_cli_apps_action(self, request: WsRequest, action: str) -> Response:
+ if not self._check_api_token(request):
+ return _http_error(401, "Unauthorized")
+ query = _parse_query(request.path)
+ try:
+ payload = await asyncio.to_thread(cli_apps_action, action, query)
+ except WebUISettingsError as e:
+ return _http_error(e.status, e.message)
+ except Exception as e:
+ status = getattr(e, "status", 500)
+ message = getattr(e, "message", str(e))
+ if status >= 500:
+ self.logger.exception("CLI Apps action '{}' failed", action)
+ return _http_error(status, message)
+ return _http_json_response(payload)
+
@staticmethod
def _is_websocket_channel_session_key(key: str) -> bool:
"""True when *key* is a ``websocket:…`` session exposed on this HTTP surface."""
@@ -961,6 +1007,9 @@ class WebSocketChannel(BaseChannel):
}
if media:
user_obj["media_paths"] = list(media)
+ cli_apps = meta.get("cli_apps")
+ if isinstance(cli_apps, list) and cli_apps:
+ user_obj["cli_apps"] = cli_apps
self._try_append_webui_transcript(chat_id, user_obj)
await super()._handle_message(
sender_id,
@@ -1421,6 +1470,9 @@ class WebSocketChannel(BaseChannel):
metadata: dict[str, Any] = {"remote": getattr(connection, "remote_address", None)}
if envelope.get("webui") is True:
metadata["webui"] = True
+ cli_apps = normalize_cli_app_mentions(envelope.get("cli_apps"))
+ if cli_apps:
+ metadata["cli_apps"] = cli_apps
image_generation = envelope.get("image_generation")
if isinstance(image_generation, dict) and image_generation.get("enabled") is True:
aspect_ratio = image_generation.get("aspect_ratio")
diff --git a/nanobot/cli_apps/__init__.py b/nanobot/cli_apps/__init__.py
new file mode 100644
index 000000000..9ca70839c
--- /dev/null
+++ b/nanobot/cli_apps/__init__.py
@@ -0,0 +1,13 @@
+"""CLI Apps integration helpers."""
+
+from nanobot.cli_apps.service import (
+ CliAppError,
+ CliAppManager,
+ CliAppsRuntimeConfig,
+)
+
+__all__ = [
+ "CliAppError",
+ "CliAppManager",
+ "CliAppsRuntimeConfig",
+]
diff --git a/nanobot/cli_apps/service.py b/nanobot/cli_apps/service.py
new file mode 100644
index 000000000..5ad257982
--- /dev/null
+++ b/nanobot/cli_apps/service.py
@@ -0,0 +1,828 @@
+"""CLI-Anything catalog, install state, and safe CLI execution."""
+
+from __future__ import annotations
+
+import json
+import os
+import re
+import shlex
+import shutil
+import subprocess
+import sys
+import time
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Any
+from urllib.parse import urlparse
+
+import httpx
+
+from nanobot.config.paths import get_runtime_subdir
+
+CLI_ANYTHING_REGISTRY_URL = "https://hkuds.github.io/CLI-Anything/registry.json"
+CLI_ANYTHING_PUBLIC_REGISTRY_URL = "https://hkuds.github.io/CLI-Anything/public_registry.json"
+CLI_ANYTHING_RAW_BASE = "https://raw.githubusercontent.com/HKUDS/CLI-Anything/main"
+CLI_ANYTHING_RAW_SKILLS_BASE = f"{CLI_ANYTHING_RAW_BASE}/skills/"
+
+_MAX_TOOL_OUTPUT_CHARS = 12_000
+_SAFE_NAME_RE = re.compile(r"[^a-z0-9_-]+")
+_MENTION_RE = re.compile(r"(^|[\s([{])@([a-z0-9_-]+)\b", re.IGNORECASE)
+_SHELL_META_CHARS = ("|", "&&", "||", ";", "$(", "`", ">", "<")
+
+
+class CliAppError(ValueError):
+ """User-facing CLI Apps failure."""
+
+ def __init__(self, message: str, *, status: int = 400) -> None:
+ super().__init__(message)
+ self.message = message
+ self.status = status
+
+
+@dataclass(slots=True)
+class CliAppsRuntimeConfig:
+ """Runtime knobs for CLI Apps."""
+
+ install_timeout: int = 300
+ run_timeout: int = 60
+ catalog_ttl_seconds: int = 3600
+
+
+_BRANDS: dict[str, tuple[str, str]] = {
+ "1password-cli": ("1password", "#3B66BC"),
+ "audacity": ("audacity", "#0000CC"),
+ "blender": ("blender", "#E87D0D"),
+ "browser": ("googlechrome", "#4285F4"),
+ "calibre": ("calibre", "#45B29D"),
+ "chromadb": ("chroma", "#FFDE2D"),
+ "comfyui": ("comfyui", "#111827"),
+ "contentful": ("contentful", "#2478CC"),
+ "dify": ("dify", "#155EEF"),
+ "drawio": ("diagramsdotnet", "#F08705"),
+ "elevenlabs": ("elevenlabs", "#000000"),
+ "eth2-quickstart": ("ethereum", "#627EEA"),
+ "firefly-iii": ("fireflyiii", "#CD5029"),
+ "freecad": ("freecad", "#418FDE"),
+ "generate-veo-video": ("googlegemini", "#8E75B2"),
+ "gimp": ("gimp", "#5C5543"),
+ "godot": ("godotengine", "#478CBF"),
+ "hacker-feeds-cli": ("rss", "#FFA500"),
+ "inkscape": ("inkscape", "#000000"),
+ "intelwatch": ("intel", "#0071C5"),
+ "iterm2": ("iterm2", "#000000"),
+ "jimeng": ("bytedance", "#3C8CFF"),
+ "kdenlive": ("kdenlive", "#527EB2"),
+ "krita": ("krita", "#3BABFF"),
+ "libreoffice": ("libreoffice", "#18A303"),
+ "mailchimp": ("mailchimp", "#FFE01B"),
+ "mermaid": ("mermaid", "#FF3670"),
+ "minimax": ("minimax", "#111827"),
+ "musescore": ("musescore", "#1A70B8"),
+ "n8n": ("n8n", "#EA4B71"),
+ "notebooklm": ("googlenotebooklm", "#4285F4"),
+ "obs-studio": ("obsstudio", "#302E31"),
+ "obsidian": ("obsidian", "#7C3AED"),
+ "ollama": ("ollama", "#000000"),
+ "pm2": ("pm2", "#2B037A"),
+ "qgis": ("qgis", "#589632"),
+ "safari": ("safari", "#006CFF"),
+ "sanity": ("sanity", "#F03E2F"),
+ "sentry": ("sentry", "#362D59"),
+ "sketch": ("sketch", "#F7B500"),
+ "shopify": ("shopify", "#7AB55C"),
+ "nsight-graphics": ("nvidia", "#76B900"),
+ "unrealinsights": ("unrealengine", "#0E1128"),
+ "ueatelier": ("unrealengine", "#0E1128"),
+ "ve-twini": ("x", "#000000"),
+ "wecom": ("wechat", "#07C160"),
+ "suno": ("suno", "#000000"),
+ "lldb": ("llvm", "#262D3A"),
+ "android-cli": ("android", "#3DDC84"),
+ "adguardhome": ("adguard", "#68BC71"),
+ "zotero": ("zotero", "#CC2936"),
+ "zoom": ("zoom", "#0B5CFF"),
+}
+
+_BRAND_DOMAINS: dict[str, tuple[str, str]] = {
+ "3mf": ("3mf.io", "#00A1DE"),
+ "anygen": ("anygen.com", "#111827"),
+ "clibrowser": ("github.com/allthingssecurity/clibrowser", "#24292F"),
+ "cloudanalyzer": ("github.com/rsasaki0109/CloudAnalyzer", "#2563EB"),
+ "cloudcompare": ("cloudcompare.org", "#4D83C3"),
+ "deployhq": ("deployhq.com", "#00A2D9"),
+ "exa": ("exa.ai", "#111827"),
+ "feishu": ("larksuite.com", "#00A5FF"),
+ "inkstitch": ("inkstitch.org", "#222222"),
+ "macrocli": ("github.com/HKUDS/CLI-Anything/tree/main/macrocli", "#24292F"),
+ "mubu": ("mubu.com", "#16A085"),
+ "nslogger": ("github.com/fpillet/NSLogger", "#24292F"),
+ "novita": ("novita.ai", "#7C3AED"),
+ "openscreen": ("openscreen.com", "#2563EB"),
+ "py4csr": ("github.com/yanmingyu92/py4csr", "#24292F"),
+ "quietshrink": ("github.com/achiya-automation/quietshrink", "#111827"),
+ "renderdoc": ("renderdoc.org", "#2C7DB8"),
+ "rms": ("rms.teltonika-networks.com", "#0054A6"),
+ "sbox": ("sbox.game", "#F59E0B"),
+ "seaclip": ("github.com/SeaClip-Lite/SeaClip", "#0284C7"),
+ "shotcut": ("shotcut.org", "#3B82F6"),
+ "slay-the-spire-ii": ("megacrit.com", "#B91C1C"),
+ "stata": ("stata.com", "#1F4E79"),
+ "unimol-tools": ("github.com/deepmodeling/Uni-Mol", "#4F46E5"),
+ "videocaptioner": ("github.com/WEIFENG2333/VideoCaptioner", "#2563EB"),
+ "wiremock": ("wiremock.org", "#FF6A00"),
+}
+
+_BRAND_ALIASES: dict[str, str] = {
+ "1password": "1password-cli",
+ "dify-workflow": "dify",
+ "feishu-lark": "feishu",
+ "lark-cli": "feishu",
+ "minimax-cli": "minimax",
+ "obsidian-cli": "obsidian",
+ "slay-the-spire-2": "slay-the-spire-ii",
+ "slay-the-spire-ii": "slay-the-spire-ii",
+ "unimol-tools": "unimol-tools",
+ "unimol": "unimol-tools",
+ "veo": "generate-veo-video",
+}
+
+_BRAND_TRAILING_WORDS = ("cli", "workflow", "workflows", "app", "apps", "tool", "tools")
+
+
+def _now() -> float:
+ return time.time()
+
+
+def _safe_skill_name(name: str) -> str:
+ clean = _SAFE_NAME_RE.sub("-", name.lower()).strip("-")
+ return f"cli-app-{clean or 'app'}"
+
+
+def _has_shell_meta(command: str) -> bool:
+ return any(char in command for char in _SHELL_META_CHARS)
+
+
+def _command_exists(command: str) -> bool:
+ try:
+ parts = shlex.split(command)
+ except ValueError:
+ return False
+ if not parts:
+ return False
+ return shutil.which(parts[0]) is not None
+
+
+def _is_pip_install_command(command: str) -> bool:
+ try:
+ tokens = shlex.split(command)
+ except ValueError:
+ return False
+ return (
+ len(tokens) >= 3
+ and tokens[:2] == ["pip", "install"]
+ ) or (
+ len(tokens) >= 5
+ and tokens[1:4] == ["-m", "pip", "install"]
+ and tokens[0] in {"python", "python3", sys.executable}
+ )
+
+
+def _pip_uninstall_args_from_command(command: str) -> list[str] | None:
+ if not command or _has_shell_meta(command):
+ return None
+ try:
+ tokens = shlex.split(command)
+ except ValueError:
+ return None
+ if tokens[:2] == ["pip", "uninstall"]:
+ args = tokens[2:]
+ elif (
+ len(tokens) >= 5
+ and tokens[1:4] == ["-m", "pip", "uninstall"]
+ and tokens[0] in {"python", "python3", sys.executable}
+ ):
+ args = tokens[4:]
+ else:
+ return None
+ packages = [arg for arg in args if arg not in {"-y", "--yes"}]
+ if not packages or any(arg.startswith("-") for arg in packages):
+ return None
+ return packages
+
+
+def _brand_key(value: str) -> str:
+ return _SAFE_NAME_RE.sub("-", value.lower()).replace("_", "-").strip("-")
+
+
+def _brand_candidates(app: dict[str, Any]) -> list[str]:
+ values = [
+ str(app.get("name") or ""),
+ str(app.get("display_name") or ""),
+ str(app.get("entry_point") or "").removeprefix("cli-anything-"),
+ ]
+ seen: set[str] = set()
+ candidates: list[str] = []
+ for value in values:
+ key = _brand_key(value)
+ while key and key not in seen:
+ seen.add(key)
+ candidates.append(key)
+ parts = key.split("-")
+ if len(parts) <= 1 or parts[-1] not in _BRAND_TRAILING_WORDS:
+ break
+ key = "-".join(parts[:-1])
+ return candidates
+
+
+def _brand_payload(app: dict[str, Any]) -> tuple[str | None, str | None]:
+ brand = None
+ domain_brand = None
+ for candidate in _brand_candidates(app):
+ key = _BRAND_ALIASES.get(candidate, candidate)
+ brand = _BRANDS.get(key)
+ if brand:
+ break
+ domain_brand = _BRAND_DOMAINS.get(key)
+ if domain_brand:
+ break
+ if not brand:
+ if not domain_brand:
+ return None, None
+ domain, color = domain_brand
+ return f"https://www.google.com/s2/favicons?domain={domain}&sz=64", color
+ slug, color = brand
+ return f"https://cdn.simpleicons.org/{slug}/{color.lstrip('#')}", color
+
+
+def _read_json(path: Path) -> dict[str, Any] | None:
+ try:
+ data = json.loads(path.read_text(encoding="utf-8"))
+ except (OSError, json.JSONDecodeError):
+ return None
+ return data if isinstance(data, dict) else None
+
+
+def _write_json(path: Path, data: dict[str, Any]) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ payload = json.dumps(data, indent=2, ensure_ascii=False)
+ tmp_path = path.with_name(f".{path.name}.{os.getpid()}.{int(_now() * 1_000_000)}.tmp")
+ try:
+ tmp_path.write_text(payload, encoding="utf-8")
+ tmp_path.replace(path)
+ finally:
+ if tmp_path.exists():
+ tmp_path.unlink()
+
+
+def _safe_skill_path(value: str) -> str | None:
+ if not value.startswith("skills/"):
+ return None
+ parts = value.split("/")
+ if any(part in {"", ".", ".."} for part in parts):
+ return None
+ return value if parts[-1] == "SKILL.md" else None
+
+
+def _skill_content_url(skill_md: str) -> str | None:
+ safe_path = _safe_skill_path(skill_md)
+ if safe_path:
+ return f"{CLI_ANYTHING_RAW_BASE}/{safe_path}"
+ parsed = urlparse(skill_md)
+ if parsed.scheme != "https" or parsed.netloc != "raw.githubusercontent.com":
+ return None
+ if not skill_md.startswith(CLI_ANYTHING_RAW_SKILLS_BASE):
+ return None
+ suffix = skill_md.removeprefix(f"{CLI_ANYTHING_RAW_BASE}/")
+ return skill_md if _safe_skill_path(suffix) else None
+
+
+def _truncate(text: str, limit: int = _MAX_TOOL_OUTPUT_CHARS) -> str:
+ if len(text) <= limit:
+ return text
+ omitted = len(text) - limit
+ return text[:limit] + f"\n\n... truncated {omitted} characters ..."
+
+
+class CliAppManager:
+ """Manage CLI-Anything registry entries and local install state."""
+
+ def __init__(
+ self,
+ *,
+ workspace: Path,
+ data_dir: Path | None = None,
+ runtime: CliAppsRuntimeConfig | None = None,
+ ) -> None:
+ self.workspace = Path(workspace).expanduser()
+ self.data_dir = Path(data_dir) if data_dir is not None else get_runtime_subdir("cli-apps")
+ self.runtime = runtime or CliAppsRuntimeConfig()
+
+ @property
+ def installed_path(self) -> Path:
+ return self.data_dir / "installed.json"
+
+ def _cache_path(self, source: str) -> Path:
+ return self.data_dir / f"{source}_registry_cache.json"
+
+ def _load_installed(self) -> dict[str, Any]:
+ data = _read_json(self.installed_path) or {}
+ apps = data.get("apps") if isinstance(data.get("apps"), dict) else data
+ return apps if isinstance(apps, dict) else {}
+
+ def _save_installed(self, installed: dict[str, Any]) -> None:
+ _write_json(self.installed_path, {"schema_version": 1, "apps": installed})
+
+ def installed_names(self) -> list[str]:
+ """Return registry names explicitly installed through CLI Apps."""
+ return sorted(str(name) for name in self._load_installed())
+
+ def _fetch_registry(
+ self,
+ url: str,
+ cache_path: Path,
+ *,
+ force_refresh: bool = False,
+ ) -> dict[str, Any]:
+ cached = _read_json(cache_path)
+ if (
+ not force_refresh
+ and cached
+ and _now() - float(cached.get("_cached_at", 0)) < self.runtime.catalog_ttl_seconds
+ ):
+ data = cached.get("data")
+ if isinstance(data, dict):
+ return data
+
+ try:
+ response = httpx.get(url, timeout=15.0, follow_redirects=True)
+ response.raise_for_status()
+ data = response.json()
+ if not isinstance(data, dict):
+ raise ValueError("registry response must be an object")
+ except Exception:
+ if cached and isinstance(cached.get("data"), dict):
+ return cached["data"]
+ raise
+
+ _write_json(cache_path, {"_cached_at": _now(), "data": data})
+ return data
+
+ def catalog(self, *, force_refresh: bool = False) -> tuple[list[dict[str, Any]], str | None]:
+ registries = [
+ (
+ "harness",
+ self._fetch_registry(
+ CLI_ANYTHING_REGISTRY_URL,
+ self._cache_path("harness"),
+ force_refresh=force_refresh,
+ ),
+ ),
+ (
+ "public",
+ self._fetch_registry(
+ CLI_ANYTHING_PUBLIC_REGISTRY_URL,
+ self._cache_path("public"),
+ force_refresh=force_refresh,
+ ),
+ ),
+ ]
+ apps_by_name: dict[str, dict[str, Any]] = {}
+ updated_values: list[str] = []
+ for source, registry in registries:
+ meta = registry.get("meta")
+ if isinstance(meta, dict) and isinstance(meta.get("updated"), str):
+ updated_values.append(meta["updated"])
+ for row in registry.get("clis", []):
+ if not isinstance(row, dict) or not row.get("name"):
+ continue
+ entry = dict(row)
+ entry["_source"] = source
+ key = str(entry["name"]).lower()
+ previous = apps_by_name.get(key)
+ if previous:
+ previous_source = str(previous.get("_source") or source)
+ merged_source = (
+ previous_source if previous_source == source else f"{previous_source}+{source}"
+ )
+ apps_by_name[key] = {**previous, **entry, "_source": merged_source}
+ else:
+ apps_by_name[key] = entry
+ return list(apps_by_name.values()), max(updated_values) if updated_values else None
+
+ def get_app(self, name: str, *, force_refresh: bool = False) -> dict[str, Any]:
+ wanted = name.lower()
+ for app in self.catalog(force_refresh=force_refresh)[0]:
+ if str(app.get("name", "")).lower() == wanted:
+ return app
+ raise CliAppError(f"CLI app '{name}' not found", status=404)
+
+ def mentioned_installed_apps(self, text: str) -> list[dict[str, str]]:
+ """Return installed CLI Apps referenced as ``@name`` in user text."""
+ if "@" not in text:
+ return []
+ installed = self._load_installed()
+ if not installed:
+ return []
+ installed_by_name = {
+ str(name).lower(): (str(name), data if isinstance(data, dict) else {})
+ for name, data in installed.items()
+ }
+ seen: set[str] = set()
+ mentions: list[dict[str, str]] = []
+ for match in _MENTION_RE.finditer(text):
+ wanted = str(match.group(2)).lower()
+ if wanted in seen or wanted not in installed_by_name:
+ continue
+ installed_name, data = installed_by_name[wanted]
+ seen.add(wanted)
+ entry_point = str(data.get("entry_point") or "")
+ mentions.append(
+ {
+ "name": installed_name,
+ "entry_point": entry_point,
+ "source": str(data.get("source") or ""),
+ "skill": f"skills/{_safe_skill_name(installed_name)}/SKILL.md",
+ "tool": "run_cli_app",
+ }
+ )
+ return mentions
+
+ def _strategy(self, app: dict[str, Any]) -> str:
+ package_manager = str(app.get("package_manager") or "").lower()
+ install_strategy = str(app.get("install_strategy") or "").lower()
+ if package_manager == "bundled" or install_strategy == "bundled":
+ return "bundled"
+ if package_manager in {"npm", "brew", "uv", "pip"}:
+ return package_manager
+ if app.get("npm_package"):
+ return "npm"
+ install_cmd = str(app.get("install_cmd") or "")
+ if _is_pip_install_command(install_cmd):
+ return "pip"
+ return "unsupported"
+
+ def _install_supported(self, app: dict[str, Any]) -> bool:
+ if self._strategy(app) == "unsupported":
+ return False
+ install_cmd = str(app.get("install_cmd") or "")
+ return not _has_shell_meta(install_cmd)
+
+ def _skill_path(self, name: str) -> Path:
+ return self.workspace / "skills" / _safe_skill_name(name) / "SKILL.md"
+
+ def _app_payload(
+ self,
+ app: dict[str, Any],
+ installed: dict[str, Any],
+ ) -> dict[str, Any]:
+ name = str(app["name"])
+ entry_point = str(app.get("entry_point") or "")
+ install_supported = self._install_supported(app)
+ is_installed = name in installed
+ available = bool(entry_point and shutil.which(entry_point))
+ if is_installed and available:
+ status = "installed"
+ elif is_installed:
+ status = "missing"
+ elif not install_supported:
+ status = "unsupported"
+ elif available:
+ status = "available"
+ else:
+ status = "not_installed"
+ logo_url, brand_color = _brand_payload(app)
+ return {
+ "name": name,
+ "display_name": app.get("display_name") or name,
+ "category": app.get("category") or "uncategorized",
+ "description": app.get("description") or "",
+ "requires": app.get("requires") or "",
+ "source": app.get("_source") or "harness",
+ "entry_point": entry_point,
+ "install_supported": install_supported,
+ "installed": is_installed,
+ "available": available,
+ "status": status,
+ "logo_url": logo_url,
+ "brand_color": brand_color,
+ "skill_installed": self._skill_path(name).is_file(),
+ }
+
+ def payload(self, *, force_refresh: bool = False) -> dict[str, Any]:
+ apps, updated = self.catalog(force_refresh=force_refresh)
+ installed = self._load_installed()
+ rows = [self._app_payload(app, installed) for app in apps]
+ rows.sort(key=lambda item: (str(item["category"]), str(item["display_name"]).lower()))
+ return {
+ "apps": rows,
+ "installed_count": sum(1 for item in rows if item["installed"]),
+ "catalog_updated_at": updated,
+ }
+
+ def _pip_package_from_install(self, app: dict[str, Any]) -> str | None:
+ install_cmd = str(app.get("install_cmd") or "")
+ try:
+ tokens = shlex.split(install_cmd)
+ except ValueError:
+ return None
+ if tokens[:2] == ["pip", "install"]:
+ args = tokens[2:]
+ elif len(tokens) >= 5 and tokens[1:4] == ["-m", "pip", "install"]:
+ args = tokens[4:]
+ else:
+ return None
+ args = [arg for arg in args if not arg.startswith("-")]
+ if len(args) != 1 or args[0].startswith("git+"):
+ return None
+ return args[0]
+
+ def _pip_install_argv(self, app: dict[str, Any], *, update: bool = False) -> list[str]:
+ install_cmd = str(app.get("install_cmd") or "")
+ if not _is_pip_install_command(install_cmd) or _has_shell_meta(install_cmd):
+ raise CliAppError("unsupported pip install command")
+ tokens = shlex.split(install_cmd)
+ args = tokens[2:] if tokens[:2] == ["pip", "install"] else tokens[4:]
+ prefix = [sys.executable, "-m", "pip", "install"]
+ if update:
+ prefix.extend(["--upgrade", "--force-reinstall"])
+ return prefix + args
+
+ def _pip_uninstall_argv(self, app: dict[str, Any]) -> list[str]:
+ uninstall_cmd = str(app.get("uninstall_cmd") or "")
+ packages = _pip_uninstall_args_from_command(uninstall_cmd)
+ if packages:
+ return [sys.executable, "-m", "pip", "uninstall", "-y", *packages]
+ package = str(app.get("pip_package") or "").strip() or self._pip_package_from_install(app)
+ if not package:
+ entry_point = str(app.get("entry_point") or "").strip()
+ package = entry_point if entry_point.startswith("cli-anything-") else f"cli-anything-{_brand_key(str(app['name']))}"
+ return [sys.executable, "-m", "pip", "uninstall", "-y", package]
+
+ def _npm_argv(self, app: dict[str, Any], action: str) -> list[str]:
+ npm = shutil.which("npm")
+ if not npm:
+ raise CliAppError("npm is not installed")
+ package = str(app.get("npm_package") or "")
+ if not package:
+ raise CliAppError("registry entry has no npm_package")
+ if action == "install":
+ return [npm, "install", "-g", package]
+ if action == "update":
+ return [npm, "install", "-g", package + "@latest"]
+ return [npm, "uninstall", "-g", package]
+
+ def _split_safe_command(self, app: dict[str, Any], key: str, expected: str) -> list[str]:
+ command = str(app.get(key) or "")
+ if not command:
+ raise CliAppError(f"no {key} is defined for {app['name']}")
+ if _has_shell_meta(command):
+ raise CliAppError("script-style install commands are disabled in this MVP")
+ try:
+ argv = shlex.split(command)
+ except ValueError as exc:
+ raise CliAppError(f"invalid command: {exc}") from exc
+ if not argv or argv[0] != expected:
+ raise CliAppError(f"unsupported {expected} command")
+ return argv
+
+ def _argv_for_action(self, app: dict[str, Any], action: str) -> list[str] | None:
+ strategy = self._strategy(app)
+ if strategy == "pip":
+ if action == "install":
+ return self._pip_install_argv(app)
+ if action == "update":
+ return self._pip_install_argv(app, update=True)
+ return self._pip_uninstall_argv(app)
+ if strategy == "npm":
+ return self._npm_argv(app, action)
+ if strategy == "brew":
+ key = {"install": "install_cmd", "update": "update_cmd", "uninstall": "uninstall_cmd"}[action]
+ return self._split_safe_command(app, key, "brew")
+ if strategy == "uv":
+ key = {"install": "install_cmd", "update": "update_cmd", "uninstall": "uninstall_cmd"}[action]
+ return self._split_safe_command(app, key, "uv")
+ if strategy == "bundled":
+ return None
+ raise CliAppError("this CLI app uses an unsupported install strategy")
+
+ def _run_argv(self, argv: list[str], *, timeout: int) -> subprocess.CompletedProcess[str]:
+ return subprocess.run(
+ argv,
+ capture_output=True,
+ text=True,
+ timeout=timeout,
+ )
+
+ def _installed_entry(self, app: dict[str, Any]) -> dict[str, Any]:
+ return {
+ "version": app.get("version") or "unknown",
+ "entry_point": app.get("entry_point") or "",
+ "source": app.get("_source") or "harness",
+ "strategy": self._strategy(app),
+ "installed_at": int(_now()),
+ }
+
+ def _fetch_skill_content(self, app: dict[str, Any]) -> str | None:
+ skill_md = str(app.get("skill_md") or "").strip()
+ if not skill_md:
+ return None
+ url = _skill_content_url(skill_md)
+ if not url:
+ return None
+ try:
+ response = httpx.get(url, timeout=15.0, follow_redirects=True)
+ response.raise_for_status()
+ text = response.text
+ except Exception:
+ return None
+ if "SKILL.md" not in url and not text.lstrip().startswith("---"):
+ return None
+ return text if len(text) < 250_000 else None
+
+ def _fallback_skill(self, app: dict[str, Any]) -> str:
+ name = str(app.get("name") or "unknown")
+ display = str(app.get("display_name") or name)
+ entry = str(app.get("entry_point") or f"cli-anything-{name}")
+ description = str(app.get("description") or f"Use {display} from nanobot.")
+ return f"""---
+name: {_safe_skill_name(name)}
+description: >-
+ {description}
+---
+
+# {display}
+
+Use this skill when the user asks nanobot to operate {display} through its installed CLI app.
+
+If the user attached `@{name}` in chat, treat that as the selected app for the current turn.
+
+## Commands
+
+```bash
+{entry} --help
+{entry} --json --help
+```
+
+Prefer machine-readable output when the CLI supports `--json`.
+"""
+
+ def _with_nanobot_skill_note(self, content: str, app: dict[str, Any]) -> str:
+ marker = ""
+ if marker in content:
+ return content
+ name = str(app.get("name") or "unknown")
+ note = f"""{marker}
+## Nanobot execution
+
+Use the `run_cli_app` tool with `name="{name}"` for command execution. Do not invoke this CLI through shell unless the user explicitly asks. Prefer this skill when Runtime Context mentions `@{name}` as a CLI App Attachment.
+"""
+ lines = content.splitlines(keepends=True)
+ if lines and lines[0].strip() == "---":
+ for index, line in enumerate(lines[1:], start=1):
+ if line.strip() == "---":
+ return "".join(lines[: index + 1]) + "\n" + note + "\n" + "".join(lines[index + 1 :])
+ return note + "\n" + content
+
+ def install_skill(self, app: dict[str, Any]) -> Path:
+ path = self._skill_path(str(app["name"]))
+ path.parent.mkdir(parents=True, exist_ok=True)
+ content = self._fetch_skill_content(app) or self._fallback_skill(app)
+ content = self._with_nanobot_skill_note(content, app)
+ path.write_text(content, encoding="utf-8")
+ return path
+
+ def remove_skill(self, name: str) -> None:
+ skill_dir = self._skill_path(name).parent
+ if skill_dir.is_dir():
+ shutil.rmtree(skill_dir)
+
+ def _record_installed(self, app: dict[str, Any]) -> None:
+ installed = self._load_installed()
+ installed[str(app["name"])] = self._installed_entry(app)
+ self._save_installed(installed)
+ self.install_skill(app)
+
+ def install(self, name: str) -> dict[str, Any]:
+ app = self.get_app(name)
+ if not self._install_supported(app):
+ raise CliAppError("this CLI app uses an unsupported install strategy")
+ strategy = self._strategy(app)
+ if strategy == "bundled":
+ detect_cmd = str(app.get("detect_cmd") or app.get("entry_point") or "")
+ if detect_cmd and _command_exists(detect_cmd):
+ self._record_installed(app)
+ return self.payload() | {"last_action": {"ok": True, "message": f"CLI for {app['display_name']} is available."}}
+ note = app.get("install_notes") or f"{app['display_name']} is bundled with its parent app."
+ raise CliAppError(str(note))
+ argv = self._argv_for_action(app, "install")
+ assert argv is not None
+ result = self._run_argv(argv, timeout=self.runtime.install_timeout)
+ if result.returncode != 0:
+ raise CliAppError(_truncate(result.stderr or result.stdout or "install failed"), status=500)
+ self._record_installed(app)
+ return self.payload() | {"last_action": {"ok": True, "message": f"Installed CLI for {app['display_name']}."}}
+
+ def update(self, name: str) -> dict[str, Any]:
+ app = self.get_app(name, force_refresh=True)
+ if str(app["name"]) not in self._load_installed():
+ raise CliAppError("CLI app is not installed")
+ if self._strategy(app) == "bundled":
+ self._record_installed(app)
+ return self.payload() | {"last_action": {"ok": True, "message": f"Checked {app['display_name']}."}}
+ argv = self._argv_for_action(app, "update")
+ assert argv is not None
+ result = self._run_argv(argv, timeout=self.runtime.install_timeout)
+ if result.returncode != 0:
+ raise CliAppError(_truncate(result.stderr or result.stdout or "update failed"), status=500)
+ self._record_installed(app)
+ return self.payload() | {"last_action": {"ok": True, "message": f"Updated CLI for {app['display_name']}."}}
+
+ def uninstall(self, name: str) -> dict[str, Any]:
+ app = self.get_app(name)
+ installed = self._load_installed()
+ if str(app["name"]) not in installed:
+ raise CliAppError("CLI app is not installed")
+ if self._strategy(app) != "bundled":
+ argv = self._argv_for_action(app, "uninstall")
+ assert argv is not None
+ result = self._run_argv(argv, timeout=self.runtime.install_timeout)
+ if result.returncode != 0:
+ raise CliAppError(_truncate(result.stderr or result.stdout or "uninstall failed"), status=500)
+ installed.pop(str(app["name"]), None)
+ self._save_installed(installed)
+ self.remove_skill(str(app["name"]))
+ return self.payload() | {"last_action": {"ok": True, "message": f"Uninstalled CLI for {app['display_name']}."}}
+
+ def test(self, name: str) -> dict[str, Any]:
+ app = self.get_app(name)
+ entry = str(app.get("entry_point") or "")
+ resolved = shutil.which(entry)
+ if not entry or not resolved:
+ raise CliAppError(f"{entry or name} is not available on PATH")
+ result = self._run_argv([resolved, "--help"], timeout=min(self.runtime.run_timeout, 30))
+ ok = result.returncode == 0
+ output = _truncate((result.stdout or result.stderr or "").strip(), 3000)
+ return self.payload() | {
+ "last_action": {
+ "ok": ok,
+ "message": f"{entry} --help exited {result.returncode}",
+ "output": output,
+ }
+ }
+
+ def _resolve_cwd(
+ self,
+ working_dir: str | None,
+ *,
+ restrict_to_workspace: bool,
+ ) -> Path:
+ cwd = Path(working_dir).expanduser() if working_dir else self.workspace
+ cwd = cwd.resolve(strict=False)
+ workspace = self.workspace.resolve(strict=False)
+ if restrict_to_workspace and cwd != workspace and not cwd.is_relative_to(workspace):
+ raise CliAppError("working_dir is outside the configured workspace")
+ return cwd
+
+ def run(
+ self,
+ name: str,
+ args: list[str] | None = None,
+ *,
+ json_output: bool = False,
+ working_dir: str | None = None,
+ timeout: int | None = None,
+ restrict_to_workspace: bool = False,
+ ) -> str:
+ app = self.get_app(name)
+ installed = self._load_installed()
+ if str(app["name"]) not in installed:
+ raise CliAppError(f"CLI app '{name}' is not installed")
+ cwd = self._resolve_cwd(working_dir, restrict_to_workspace=restrict_to_workspace)
+ entry = str(installed[str(app["name"])].get("entry_point") or app.get("entry_point") or "")
+ resolved = shutil.which(entry)
+ if not entry or not resolved:
+ raise CliAppError(f"{entry or name} is not available on PATH")
+ clean_args = [str(arg) for arg in (args or [])]
+ if json_output and "--json" not in clean_args:
+ clean_args = ["--json", *clean_args]
+ effective_timeout = max(1, min(timeout or self.runtime.run_timeout, 600))
+ try:
+ result = subprocess.run(
+ [resolved, *clean_args],
+ cwd=str(cwd),
+ capture_output=True,
+ text=True,
+ timeout=effective_timeout,
+ env=os.environ.copy(),
+ )
+ except subprocess.TimeoutExpired:
+ return f"CLI app '{name}' timed out after {effective_timeout}s"
+ output = [
+ f"CLI app '{name}' exited {result.returncode}.",
+ f"Command: {entry} {' '.join(shlex.quote(arg) for arg in clean_args)}".rstrip(),
+ ]
+ if result.stdout:
+ output.append("\nSTDOUT:\n" + result.stdout.rstrip())
+ if result.stderr:
+ output.append("\nSTDERR:\n" + result.stderr.rstrip())
+ return _truncate("\n".join(output))
diff --git a/nanobot/cli_apps/utils.py b/nanobot/cli_apps/utils.py
new file mode 100644
index 000000000..264cedc25
--- /dev/null
+++ b/nanobot/cli_apps/utils.py
@@ -0,0 +1,62 @@
+"""CLI Apps helpers shared by the agent loop and settings surfaces."""
+
+from __future__ import annotations
+
+from pathlib import Path
+from typing import Any, Mapping
+
+
+def session_extra(metadata: Mapping[str, Any] | None) -> dict[str, Any]:
+ """Return persisted session kwargs for CLI app attachments."""
+ cli_apps = metadata.get("cli_apps") if isinstance(metadata, Mapping) else None
+ return {"cli_apps": cli_apps} if isinstance(cli_apps, list) and cli_apps else {}
+
+
+def runtime_lines(message: Any, workspace: Path, *, skip: bool = False) -> list[str]:
+ """Return model-visible CLI app annotations for the current turn."""
+ if skip:
+ return []
+ text = message.content if isinstance(getattr(message, "content", None), str) else ""
+ metadata = message.metadata if isinstance(getattr(message, "metadata", None), Mapping) else None
+ return _cli_app_runtime_lines(text, metadata, workspace)
+
+
+def _cli_app_runtime_lines(
+ text: str,
+ metadata: Mapping[str, Any] | None,
+ workspace: Path,
+) -> list[str]:
+ structured = metadata.get("cli_apps") if isinstance(metadata, Mapping) else None
+ if isinstance(structured, list):
+ mentions = [
+ item for item in structured
+ if isinstance(item, Mapping) and isinstance(item.get("name"), str)
+ ]
+ if mentions:
+ return [
+ "CLI App Attachment: "
+ f"@{str(item['name']).strip().lower()} "
+ f"(installed; tool=run_cli_app; "
+ f"entry_point={str(item.get('entry_point') or 'unknown')}; "
+ f"skill=skills/cli-app-{str(item['name']).strip().lower()}/SKILL.md). "
+ "Read the skill when useful, then run this app with `run_cli_app`; do not bypass it with shell."
+ for item in mentions
+ if str(item.get("name") or "").strip()
+ ]
+ if "@" not in text:
+ return []
+ try:
+ from nanobot.cli_apps import CliAppManager
+
+ mentions = CliAppManager(workspace=workspace).mentioned_installed_apps(text)
+ except Exception:
+ return []
+ return [
+ "CLI App Mention: "
+ f"@{item['name']} "
+ f"(installed; tool={item['tool']}; "
+ f"entry_point={item['entry_point'] or 'unknown'}; "
+ f"skill={item['skill']}). "
+ "Read the skill when useful, then run this app with `run_cli_app`; do not bypass it with shell."
+ for item in mentions
+ ]
diff --git a/nanobot/config/schema.py b/nanobot/config/schema.py
index 2e094cc09..093e3a7f3 100644
--- a/nanobot/config/schema.py
+++ b/nanobot/config/schema.py
@@ -11,6 +11,7 @@ from pydantic_settings import BaseSettings
from nanobot.cron.types import CronSchedule
if TYPE_CHECKING:
+ from nanobot.agent.tools.cli_apps import CliAppsToolConfig
from nanobot.agent.tools.image_generation import ImageGenerationToolConfig
from nanobot.agent.tools.self import MyToolConfig
from nanobot.agent.tools.shell import ExecToolConfig
@@ -276,6 +277,7 @@ class ToolsConfig(Base):
web: WebToolsConfig = Field(default_factory=lambda: _lazy_default("nanobot.agent.tools.web", "WebToolsConfig"))
exec: ExecToolConfig = Field(default_factory=lambda: _lazy_default("nanobot.agent.tools.shell", "ExecToolConfig"))
+ cli_apps: CliAppsToolConfig = Field(default_factory=lambda: _lazy_default("nanobot.agent.tools.cli_apps", "CliAppsToolConfig"))
my: MyToolConfig = Field(default_factory=lambda: _lazy_default("nanobot.agent.tools.self", "MyToolConfig"))
image_generation: ImageGenerationToolConfig = Field(
default_factory=lambda: _lazy_default("nanobot.agent.tools.image_generation", "ImageGenerationToolConfig"),
@@ -462,6 +464,7 @@ def _resolve_tool_config_refs() -> None:
"""
import sys
+ from nanobot.agent.tools.cli_apps import CliAppsToolConfig
from nanobot.agent.tools.image_generation import ImageGenerationToolConfig
from nanobot.agent.tools.self import MyToolConfig
from nanobot.agent.tools.shell import ExecToolConfig
@@ -470,6 +473,7 @@ def _resolve_tool_config_refs() -> None:
# Re-export into this module's namespace
mod = sys.modules[__name__]
mod.ExecToolConfig = ExecToolConfig # type: ignore[attr-defined]
+ mod.CliAppsToolConfig = CliAppsToolConfig # type: ignore[attr-defined]
mod.WebToolsConfig = WebToolsConfig # type: ignore[attr-defined]
mod.WebSearchConfig = WebSearchConfig # type: ignore[attr-defined]
mod.WebFetchConfig = WebFetchConfig # type: ignore[attr-defined]
diff --git a/nanobot/session/manager.py b/nanobot/session/manager.py
index 269301104..846ec381f 100644
--- a/nanobot/session/manager.py
+++ b/nanobot/session/manager.py
@@ -165,6 +165,23 @@ class Session:
image_placeholder_text(p) for p in media if isinstance(p, str) and p
)
content = f"{content}\n{breadcrumbs}" if content else breadcrumbs
+ cli_apps = message.get("cli_apps")
+ if role == "user" and isinstance(cli_apps, list) and cli_apps and isinstance(content, str):
+ cli_lines: list[str] = []
+ for item in cli_apps[:8]:
+ if not isinstance(item, dict):
+ continue
+ name = str(item.get("name") or "").strip().lower()
+ if not name:
+ continue
+ entry = str(item.get("entry_point") or "unknown").strip() or "unknown"
+ cli_lines.append(
+ f"[CLI App Attachment: @{name}; tool=run_cli_app; entry_point={entry}; "
+ f"skill=skills/cli-app-{name}/SKILL.md]"
+ )
+ if cli_lines:
+ breadcrumbs = "\n".join(cli_lines)
+ content = f"{content}\n{breadcrumbs}" if content else breadcrumbs
if include_timestamps:
content = self._annotate_message_time(message, content)
if role == "assistant" and isinstance(content, str) and not content.strip():
diff --git a/nanobot/templates/agent/tool_contract.md b/nanobot/templates/agent/tool_contract.md
index edbba21c9..8d34ad861 100644
--- a/nanobot/templates/agent/tool_contract.md
+++ b/nanobot/templates/agent/tool_contract.md
@@ -41,6 +41,13 @@ documents the general tool contract and non-obvious usage patterns.
- Use `write_stdin` to poll, provide stdin, close stdin, wait for expected output with `wait_for`, or terminate an existing exec session.
- Use `list_exec_sessions` to recover active session IDs after context shifts.
+## CLI App Attachments
+
+- When Runtime Context lists a `CLI App Attachment` or `CLI App Mention`, treat the `@name` as an app capability the user intentionally attached to the current turn.
+- If the task may need app-specific behavior, read the listed skill first, then call `run_cli_app` with that `name`.
+- Do not run an attached CLI app through shell or generic process tools unless the user explicitly asks for that lower-level path.
+- If the app CLI is missing, lacks local desktop/app/API prerequisites, or cannot complete the requested action, explain that concrete blocker and what was attempted.
+
## Web and External Information
- Use web tools when the user asks for current information, a specific URL, or information likely to have changed.
diff --git a/nanobot/webui/cli_apps_api.py b/nanobot/webui/cli_apps_api.py
new file mode 100644
index 000000000..86e8e715e
--- /dev/null
+++ b/nanobot/webui/cli_apps_api.py
@@ -0,0 +1,93 @@
+"""CLI Apps helpers for the WebUI HTTP and message surfaces."""
+
+from __future__ import annotations
+
+import re
+from typing import Any
+
+from nanobot.cli_apps import CliAppError, CliAppManager, CliAppsRuntimeConfig
+from nanobot.config.loader import load_config
+
+QueryParams = dict[str, list[str]]
+
+_CLI_APP_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9_-]{0,63}$", re.IGNORECASE)
+_CLI_APP_ATTACHMENT_KEYS = (
+ "name",
+ "display_name",
+ "category",
+ "entry_point",
+ "logo_url",
+ "brand_color",
+)
+
+
+def _clip_ws_string(value: Any, limit: int = 240) -> str | None:
+ if not isinstance(value, str):
+ return None
+ text = value.strip()
+ if not text:
+ return None
+ return text[:limit]
+
+
+def normalize_cli_app_mentions(raw: Any) -> list[dict[str, str]]:
+ """Sanitize structured CLI app mentions sent by the WebUI."""
+ if not isinstance(raw, list):
+ return []
+ out: list[dict[str, str]] = []
+ seen: set[str] = set()
+ for item in raw[:8]:
+ if not isinstance(item, dict):
+ continue
+ name = _clip_ws_string(item.get("name"), 64)
+ if not name or _CLI_APP_NAME_RE.match(name) is None:
+ continue
+ key = name.lower()
+ if key in seen:
+ continue
+ seen.add(key)
+ row: dict[str, str] = {"name": key}
+ for field in _CLI_APP_ATTACHMENT_KEYS[1:]:
+ value = _clip_ws_string(item.get(field), 512 if field == "logo_url" else 160)
+ if value:
+ row[field] = value
+ out.append(row)
+ return out
+
+
+def _query_first(query: QueryParams, key: str) -> str | None:
+ values = query.get(key)
+ return values[0] if values else None
+
+
+def _manager() -> CliAppManager:
+ config = load_config()
+ cli_cfg = config.tools.cli_apps
+ return CliAppManager(
+ workspace=config.workspace_path,
+ runtime=CliAppsRuntimeConfig(
+ install_timeout=cli_cfg.install_timeout,
+ run_timeout=cli_cfg.run_timeout,
+ catalog_ttl_seconds=cli_cfg.catalog_ttl_seconds,
+ ),
+ )
+
+
+def cli_apps_payload() -> dict[str, Any]:
+ return _manager().payload()
+
+
+def cli_apps_action(action: str, query: QueryParams) -> dict[str, Any]:
+ name = (_query_first(query, "name") or "").strip()
+ if not name:
+ raise CliAppError("missing CLI app name")
+ manager = _manager()
+ if action == "install":
+ return manager.install(name)
+ if action == "update":
+ return manager.update(name)
+ if action == "uninstall":
+ return manager.uninstall(name)
+ if action == "test":
+ return manager.test(name)
+ raise CliAppError(f"unknown CLI app action '{action}'", status=404)
diff --git a/nanobot/webui/transcript.py b/nanobot/webui/transcript.py
index d7978d9f6..426930ff9 100644
--- a/nanobot/webui/transcript.py
+++ b/nanobot/webui/transcript.py
@@ -116,6 +116,55 @@ def tool_trace_lines_from_events(events: Any) -> list[str]:
return lines
+_PHASE_RANK = {"start": 1, "end": 2, "error": 3}
+
+
+def _normalize_tool_events(events: Any) -> list[dict[str, Any]]:
+ if not isinstance(events, list):
+ return []
+ out: list[dict[str, Any]] = []
+ for event in events:
+ if not event or not isinstance(event, dict):
+ continue
+ if event.get("phase") not in {"start", "end", "error"}:
+ continue
+ if not isinstance(event.get("name"), str):
+ fn = event.get("function")
+ if not (isinstance(fn, dict) and isinstance(fn.get("name"), str)):
+ continue
+ out.append(dict(event))
+ return out
+
+
+def _tool_event_key(event: dict[str, Any]) -> str:
+ call_id = event.get("call_id")
+ if isinstance(call_id, str) and call_id:
+ return f"call:{call_id}"
+ return _format_tool_call_trace(event) or json.dumps(event, sort_keys=True, ensure_ascii=False)
+
+
+def _merge_tool_events(previous: Any, incoming: list[dict[str, Any]]) -> list[dict[str, Any]]:
+ if not isinstance(previous, list) or not previous:
+ return incoming
+ if not incoming:
+ return [dict(event) for event in previous if isinstance(event, dict)]
+ merged = [dict(event) for event in previous if isinstance(event, dict)]
+ index_by_key = {_tool_event_key(event): idx for idx, event in enumerate(merged)}
+ for event in incoming:
+ key = _tool_event_key(event)
+ existing_index = index_by_key.get(key)
+ if existing_index is None:
+ index_by_key[key] = len(merged)
+ merged.append(event)
+ continue
+ existing = merged[existing_index]
+ incoming_rank = _PHASE_RANK.get(str(event.get("phase")), 0)
+ existing_rank = _PHASE_RANK.get(str(existing.get("phase")), 0)
+ if incoming_rank >= existing_rank:
+ merged[existing_index] = {**existing, **event}
+ return merged
+
+
def _merge_unique_tool_trace_lines(
previous_traces: list[str],
lines: list[str],
@@ -405,6 +454,9 @@ def replay_transcript_to_ui_messages(
row["media"] = media_att
if all(m.get("kind") == "image" for m in media_att):
row["images"] = [{"url": m.get("url"), "name": m.get("name")} for m in media_att]
+ cli_apps = rec.get("cli_apps")
+ if isinstance(cli_apps, list) and cli_apps:
+ row["cliApps"] = [dict(app) for app in cli_apps if isinstance(app, dict)]
messages.append(row)
continue
@@ -486,6 +538,7 @@ def replay_transcript_to_ui_messages(
close_reasoning(messages)
continue
if kind in ("tool_hint", "progress"):
+ structured_events = _normalize_tool_events(rec.get("tool_events"))
structured = tool_trace_lines_from_events(rec.get("tool_events"))
text = rec.get("text")
trace_lines = structured if structured else ([text] if isinstance(text, str) and text else [])
@@ -502,7 +555,7 @@ def replay_transcript_to_ui_messages(
prev_traces = list(last.get("traces") or [last.get("content")])
if structured:
merged_traces, added = _merge_unique_tool_trace_lines(prev_traces, structured)
- if not added:
+ if not added and not structured_events:
continue
else:
merged_traces = prev_traces + trace_lines
@@ -510,6 +563,9 @@ def replay_transcript_to_ui_messages(
**last,
"traces": merged_traces,
"content": merged_traces[-1],
+ "toolEvents": _merge_tool_events(last.get("toolEvents"), structured_events)
+ if structured_events
+ else last.get("toolEvents"),
"activitySegmentId": last.get("activitySegmentId") or segment,
}
messages[-1] = merged
@@ -521,6 +577,7 @@ def replay_transcript_to_ui_messages(
"kind": "trace",
"content": trace_lines[-1],
"traces": trace_lines,
+ **({"toolEvents": structured_events} if structured_events else {}),
"activitySegmentId": segment,
"createdAt": _ts_base + idx,
},
diff --git a/tests/agent/test_context_builder.py b/tests/agent/test_context_builder.py
index a36c0a30a..dffb93694 100644
--- a/tests/agent/test_context_builder.py
+++ b/tests/agent/test_context_builder.py
@@ -362,6 +362,21 @@ class TestBuildMessages:
assert "Other chat goal." not in str(without_goal[-1]["content"])
assert "Goal (active):" not in str(without_goal[-1]["content"])
+ def test_current_runtime_lines_are_injected(self, tmp_path):
+ builder = _builder(tmp_path)
+ messages = builder.build_messages(
+ [],
+ "please use @zoom tonight",
+ current_runtime_lines=[
+ "CLI App Attachment: @zoom (installed; tool=run_cli_app; entry_point=cli-anything-zoom).",
+ ],
+ )
+ user_msg = str(messages[-1]["content"])
+
+ assert "CLI App Attachment: @zoom" in user_msg
+ assert "tool=run_cli_app" in user_msg
+ assert "entry_point=cli-anything-zoom" in user_msg
+
def test_consecutive_same_role_merged(self, tmp_path):
builder = _builder(tmp_path)
history = [{"role": "user", "content": "previous user message"}]
diff --git a/tests/agent/test_session_manager_history.py b/tests/agent/test_session_manager_history.py
index ffc41583d..0a0909de7 100644
--- a/tests/agent/test_session_manager_history.py
+++ b/tests/agent/test_session_manager_history.py
@@ -359,6 +359,31 @@ def test_get_history_synthesizes_breadcrumb_for_image_only_turn():
assert history[0] == {"role": "user", "content": "[image: /m/pic.png]"}
+def test_get_history_synthesizes_cli_app_attachment_breadcrumb():
+ session = Session(key="test:cli-app")
+ session.messages.append(
+ {
+ "role": "user",
+ "content": "please use @drawio",
+ "cli_apps": [{
+ "name": "drawio",
+ "entry_point": "cli-anything-drawio",
+ }],
+ }
+ )
+
+ history = session.get_history(max_messages=500)
+
+ assert history == [{
+ "role": "user",
+ "content": (
+ "please use @drawio\n"
+ "[CLI App Attachment: @drawio; tool=run_cli_app; "
+ "entry_point=cli-anything-drawio; skill=skills/cli-app-drawio/SKILL.md]"
+ ),
+ }]
+
+
def test_get_history_ignores_media_kwarg_on_non_user_rows():
"""``media`` only ever appears on user entries in practice, but the
synthesizer must be defensive: assistants / tools with list content
diff --git a/tests/channels/test_websocket_envelope_media.py b/tests/channels/test_websocket_envelope_media.py
index 975408045..982525c32 100644
--- a/tests/channels/test_websocket_envelope_media.py
+++ b/tests/channels/test_websocket_envelope_media.py
@@ -105,6 +105,43 @@ async def test_message_without_media_backward_compatible() -> None:
assert call.kwargs["media"] is None
+@pytest.mark.asyncio
+async def test_message_forwards_normalized_cli_app_attachments() -> None:
+ channel = _make_channel()
+ mock_conn = AsyncMock()
+ envelope = {
+ "type": "message",
+ "chat_id": "abc123",
+ "content": "please use @drawio",
+ "webui": True,
+ "cli_apps": [
+ {
+ "name": "DrawIO",
+ "display_name": "Draw.io",
+ "category": "diagram",
+ "entry_point": "cli-anything-drawio",
+ "logo_url": "https://example.invalid/drawio.svg",
+ "brand_color": "#F08705",
+ },
+ {"name": "bad name", "entry_point": "nope"},
+ ],
+ }
+
+ await channel._dispatch_envelope(mock_conn, "client-1", envelope)
+
+ channel._handle_message.assert_awaited_once()
+ metadata = channel._handle_message.call_args.kwargs["metadata"]
+ assert metadata["webui"] is True
+ assert metadata["cli_apps"] == [{
+ "name": "drawio",
+ "display_name": "Draw.io",
+ "category": "diagram",
+ "entry_point": "cli-anything-drawio",
+ "logo_url": "https://example.invalid/drawio.svg",
+ "brand_color": "#F08705",
+ }]
+
+
@pytest.mark.asyncio
async def test_message_with_single_image_forwards_saved_path(tmp_path) -> None:
channel = _make_channel()
diff --git a/tests/channels/test_websocket_http_routes.py b/tests/channels/test_websocket_http_routes.py
index ddb45dfbf..502b50842 100644
--- a/tests/channels/test_websocket_http_routes.py
+++ b/tests/channels/test_websocket_http_routes.py
@@ -140,6 +140,75 @@ async def test_sessions_routes_require_bearer_token(
await server_task
+@pytest.mark.asyncio
+async def test_cli_apps_routes_require_token_and_return_payload(
+ bus: MagicMock,
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ monkeypatch.setattr(
+ "nanobot.channels.websocket.cli_apps_payload",
+ lambda: {
+ "apps": [
+ {
+ "name": "gimp",
+ "display_name": "GIMP",
+ "category": "image",
+ "description": "Image editing",
+ "requires": "Python",
+ "source": "harness",
+ "entry_point": "cli-anything-gimp",
+ "install_supported": True,
+ "installed": False,
+ "available": False,
+ "status": "not_installed",
+ "logo_url": None,
+ "brand_color": None,
+ "skill_installed": False,
+ }
+ ],
+ "installed_count": 0,
+ "catalog_updated_at": "2026-04-18",
+ },
+ )
+ monkeypatch.setattr(
+ "nanobot.channels.websocket.cli_apps_action",
+ lambda action, query: {
+ "apps": [],
+ "installed_count": 1,
+ "catalog_updated_at": "2026-04-18",
+ "last_action": {"ok": True, "message": f"{action}:{query['name'][0]}"},
+ },
+ )
+ channel = _ch(bus, session_manager=_seed_session(tmp_path), port=29912)
+ server_task = asyncio.create_task(channel.start())
+ await asyncio.sleep(0.3)
+ try:
+ deny = await _http_get("http://127.0.0.1:29912/api/settings/cli-apps")
+ assert deny.status_code == 401
+
+ boot = await _http_get("http://127.0.0.1:29912/webui/bootstrap")
+ token = boot.json()["token"]
+ auth = {"Authorization": f"Bearer {token}"}
+
+ catalog = await _http_get(
+ "http://127.0.0.1:29912/api/settings/cli-apps",
+ headers=auth,
+ )
+ assert catalog.status_code == 200
+ assert catalog.json()["apps"][0]["name"] == "gimp"
+
+ installed = await _http_get(
+ "http://127.0.0.1:29912/api/settings/cli-apps/install?name=gimp",
+ headers=auth,
+ )
+ assert installed.status_code == 200
+ assert installed.json()["last_action"]["message"] == "install:gimp"
+ finally:
+ await channel.stop()
+ await server_task
+
+
@pytest.mark.asyncio
async def test_sessions_list_only_returns_websocket_sessions_by_default(
bus: MagicMock, tmp_path: Path
diff --git a/tests/cli_apps/test_service.py b/tests/cli_apps/test_service.py
new file mode 100644
index 000000000..b4bfc04d0
--- /dev/null
+++ b/tests/cli_apps/test_service.py
@@ -0,0 +1,378 @@
+from __future__ import annotations
+
+import json
+import os
+import stat
+import subprocess
+import sys
+import time
+from pathlib import Path
+
+import pytest
+
+from nanobot.cli_apps.service import CliAppError, CliAppManager, CliAppsRuntimeConfig
+
+
+def _write_cache(path: Path, registry: dict) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_text(
+ json.dumps({"_cached_at": time.time(), "data": registry}),
+ encoding="utf-8",
+ )
+
+
+def _manager(tmp_path: Path) -> CliAppManager:
+ workspace = tmp_path / "workspace"
+ workspace.mkdir()
+ return CliAppManager(
+ workspace=workspace,
+ data_dir=tmp_path / "data",
+ runtime=CliAppsRuntimeConfig(catalog_ttl_seconds=3600, install_timeout=5, run_timeout=5),
+ )
+
+
+def _seed_catalog(manager: CliAppManager) -> None:
+ harness = {
+ "meta": {"updated": "2026-04-16"},
+ "clis": [
+ {
+ "name": "gimp",
+ "display_name": "GIMP",
+ "version": "1.0.0",
+ "description": "Image editing",
+ "category": "image",
+ "requires": "Python 3.10+",
+ "install_cmd": "pip install cli-anything-gimp",
+ "entry_point": "cli-anything-gimp",
+ "skill_md": "skills/cli-anything-gimp/SKILL.md",
+ }
+ ],
+ }
+ public = {
+ "meta": {"updated": "2026-04-18"},
+ "clis": [
+ {
+ "name": "gimp",
+ "display_name": "GIMP",
+ "description": "Public duplicate entry",
+ },
+ {
+ "name": "jimeng",
+ "display_name": "Jimeng",
+ "version": "latest",
+ "description": "Script install",
+ "category": "ai",
+ "install_strategy": "script",
+ "install_cmd": "curl -fsSL https://example.invalid/install.sh | bash",
+ "entry_point": "dreamina",
+ },
+ {
+ "name": "feishu",
+ "display_name": "Feishu/Lark CLI",
+ "version": "latest",
+ "description": "Official Lark CLI",
+ "category": "communication",
+ "package_manager": "npm",
+ "npm_package": "@larksuite/cli",
+ "install_cmd": "npm install -g @larksuite/cli",
+ "entry_point": "lark-cli",
+ },
+ {
+ "name": "dify-workflow",
+ "display_name": "Dify Workflow",
+ "version": "latest",
+ "description": "Run Dify workflows",
+ "category": "ai",
+ "install_cmd": "pip install cli-anything-dify-workflow",
+ "entry_point": "cli-anything-dify-workflow",
+ },
+ {
+ "name": "shopify",
+ "display_name": "Shopify CLI",
+ "version": "latest",
+ "description": "Shopify",
+ "category": "web",
+ "package_manager": "npm",
+ "npm_package": "@shopify/cli",
+ "install_cmd": "npm install -g @shopify/cli",
+ "entry_point": "shopify",
+ },
+ {
+ "name": "clibrowser",
+ "display_name": "clibrowser",
+ "version": "latest",
+ "description": "Cargo install",
+ "category": "web",
+ "install_cmd": "cargo install --git https://example.invalid/clibrowser.git",
+ "entry_point": "clibrowser",
+ },
+ {
+ "name": "suno",
+ "display_name": "Suno CLI",
+ "version": "latest",
+ "description": "python3 pip install",
+ "category": "music",
+ "package_manager": "pip",
+ "install_strategy": "command",
+ "install_cmd": "python3 -m pip install git+https://example.invalid/suno-cli.git",
+ "uninstall_cmd": "python3 -m pip uninstall -y suno-cli",
+ "entry_point": "suno",
+ },
+ ],
+ }
+ _write_cache(manager._cache_path("harness"), harness)
+ _write_cache(manager._cache_path("public"), public)
+
+
+def test_payload_merges_catalog_and_marks_unsupported_installs(tmp_path: Path) -> None:
+ manager = _manager(tmp_path)
+ _seed_catalog(manager)
+
+ payload = manager.payload()
+
+ assert payload["catalog_updated_at"] == "2026-04-18"
+ apps = {app["name"]: app for app in payload["apps"]}
+ assert set(apps) == {
+ "clibrowser",
+ "dify-workflow",
+ "feishu",
+ "gimp",
+ "jimeng",
+ "shopify",
+ "suno",
+ }
+ assert apps["gimp"]["install_supported"] is True
+ assert apps["gimp"]["source"] == "harness+public"
+ assert apps["gimp"]["description"] == "Public duplicate entry"
+ assert apps["clibrowser"]["install_supported"] is False
+ assert apps["jimeng"]["install_supported"] is False
+ assert apps["suno"]["install_supported"] is True
+ assert apps["gimp"]["logo_url"]
+ assert apps["dify-workflow"]["logo_url"] == "https://cdn.simpleicons.org/dify/155EEF"
+ assert apps["feishu"]["logo_url"] == (
+ "https://www.google.com/s2/favicons?domain=larksuite.com&sz=64"
+ )
+ assert apps["jimeng"]["logo_url"] == "https://cdn.simpleicons.org/bytedance/3C8CFF"
+ assert apps["clibrowser"]["logo_url"] == (
+ "https://www.google.com/s2/favicons?domain=github.com/allthingssecurity/clibrowser&sz=64"
+ )
+
+
+def test_install_dispatches_safe_pip_and_installs_skill(
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ manager = _manager(tmp_path)
+ _seed_catalog(manager)
+ calls: list[list[str]] = []
+
+ def fake_run(argv: list[str], *, timeout: int) -> subprocess.CompletedProcess[str]:
+ calls.append(argv)
+ return subprocess.CompletedProcess(argv, 0, stdout="ok", stderr="")
+
+ monkeypatch.setattr(manager, "_run_argv", fake_run)
+ monkeypatch.setattr(
+ manager,
+ "_fetch_skill_content",
+ lambda app: "---\nname: cli-anything-gimp\ndescription: GIMP\n---\n# GIMP\n",
+ )
+
+ payload = manager.install("gimp")
+
+ assert calls == [[sys.executable, "-m", "pip", "install", "cli-anything-gimp"]]
+ assert payload["last_action"]["ok"] is True
+ installed = json.loads(manager.installed_path.read_text(encoding="utf-8"))["apps"]
+ assert installed["gimp"]["entry_point"] == "cli-anything-gimp"
+ skill = manager.workspace / "skills" / "cli-app-gimp" / "SKILL.md"
+ assert skill.is_file()
+ assert 'run_cli_app` tool with `name="gimp"' in skill.read_text(encoding="utf-8")
+
+
+def test_installed_state_writes_atomically_without_temp_leftovers(tmp_path: Path) -> None:
+ manager = _manager(tmp_path)
+
+ manager._save_installed({"gimp": {"entry_point": "cli-anything-gimp"}})
+ manager._save_installed({"zoom": {"entry_point": "cli-anything-zoom"}})
+
+ installed = json.loads(manager.installed_path.read_text(encoding="utf-8"))["apps"]
+ assert set(installed) == {"zoom"}
+ assert not list(manager.installed_path.parent.glob(".installed.json.*.tmp"))
+
+
+def test_fetch_skill_content_rejects_untrusted_urls(
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ manager = _manager(tmp_path)
+
+ def fail_get(*args, **kwargs):
+ raise AssertionError("untrusted skill URL should not be fetched")
+
+ monkeypatch.setattr("nanobot.cli_apps.service.httpx.get", fail_get)
+
+ assert manager._fetch_skill_content({
+ "name": "evil",
+ "skill_md": "https://example.com/SKILL.md",
+ }) is None
+ assert manager._fetch_skill_content({
+ "name": "evil",
+ "skill_md": "skills/../evil/SKILL.md",
+ }) is None
+
+
+def test_fetch_skill_content_allows_cli_anything_raw_skill_url(
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ manager = _manager(tmp_path)
+ seen: list[str] = []
+
+ class Response:
+ text = "---\nname: cli-app-test\ndescription: Test\n---\n# Test\n"
+
+ @staticmethod
+ def raise_for_status() -> None:
+ return None
+
+ def fake_get(url: str, **kwargs):
+ seen.append(url)
+ return Response()
+
+ monkeypatch.setattr("nanobot.cli_apps.service.httpx.get", fake_get)
+
+ content = manager._fetch_skill_content({
+ "name": "gimp",
+ "skill_md": "https://raw.githubusercontent.com/HKUDS/CLI-Anything/main/skills/cli-anything-gimp/SKILL.md",
+ })
+
+ assert content and "# Test" in content
+ assert seen == [
+ "https://raw.githubusercontent.com/HKUDS/CLI-Anything/main/skills/cli-anything-gimp/SKILL.md"
+ ]
+
+
+def test_uninstall_removes_installed_state_and_generated_skill(
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ manager = _manager(tmp_path)
+ _seed_catalog(manager)
+ manager._save_installed({"gimp": {"entry_point": "cli-anything-gimp"}})
+ skill_dir = manager.workspace / "skills" / "cli-app-gimp"
+ skill_dir.mkdir(parents=True)
+ (skill_dir / "SKILL.md").write_text("# GIMP\n", encoding="utf-8")
+ monkeypatch.setattr(
+ manager,
+ "_run_argv",
+ lambda argv, *, timeout: subprocess.CompletedProcess(argv, 0, stdout="ok", stderr=""),
+ )
+
+ payload = manager.uninstall("gimp")
+
+ assert payload["last_action"]["ok"] is True
+ assert "gimp" not in json.loads(manager.installed_path.read_text(encoding="utf-8"))["apps"]
+ assert not skill_dir.exists()
+
+
+def test_uninstall_uses_safe_python_m_pip_uninstall_command(
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ manager = _manager(tmp_path)
+ _seed_catalog(manager)
+ manager._save_installed({"suno": {"entry_point": "suno"}})
+ calls: list[list[str]] = []
+
+ def fake_run(argv: list[str], *, timeout: int) -> subprocess.CompletedProcess[str]:
+ calls.append(argv)
+ return subprocess.CompletedProcess(argv, 0, stdout="ok", stderr="")
+
+ monkeypatch.setattr(manager, "_run_argv", fake_run)
+
+ payload = manager.uninstall("suno")
+
+ assert calls == [[sys.executable, "-m", "pip", "uninstall", "-y", "suno-cli"]]
+ assert payload["last_action"]["ok"] is True
+
+
+def test_mentioned_installed_apps_only_returns_installed_mentions(tmp_path: Path) -> None:
+ manager = _manager(tmp_path)
+ manager._save_installed(
+ {
+ "gimp": {"entry_point": "cli-anything-gimp", "source": "harness"},
+ "zoom": {"entry_point": "cli-anything-zoom", "source": "public"},
+ }
+ )
+
+ mentions = manager.mentioned_installed_apps("use @zoom and @krita, then @GIMP")
+
+ assert mentions == [
+ {
+ "name": "zoom",
+ "entry_point": "cli-anything-zoom",
+ "source": "public",
+ "skill": "skills/cli-app-zoom/SKILL.md",
+ "tool": "run_cli_app",
+ },
+ {
+ "name": "gimp",
+ "entry_point": "cli-anything-gimp",
+ "source": "harness",
+ "skill": "skills/cli-app-gimp/SKILL.md",
+ "tool": "run_cli_app",
+ },
+ ]
+
+
+def test_install_rejects_unknown_and_script_strategy(tmp_path: Path) -> None:
+ manager = _manager(tmp_path)
+ _seed_catalog(manager)
+
+ with pytest.raises(CliAppError, match="not found"):
+ manager.install("missing")
+
+ with pytest.raises(CliAppError, match="unsupported"):
+ manager.install("jimeng")
+
+
+def test_run_installed_cli_uses_argv_without_shell(
+ tmp_path: Path,
+ monkeypatch: pytest.MonkeyPatch,
+) -> None:
+ manager = _manager(tmp_path)
+ _seed_catalog(manager)
+ bin_dir = tmp_path / "bin"
+ bin_dir.mkdir()
+ cli = bin_dir / "cli-anything-gimp"
+ cli.write_text(
+ "#!/usr/bin/env python3\n"
+ "import sys\n"
+ "print('ARGS=' + repr(sys.argv[1:]))\n",
+ encoding="utf-8",
+ )
+ cli.chmod(cli.stat().st_mode | stat.S_IEXEC)
+ monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}")
+ manager._save_installed(
+ {
+ "gimp": {
+ "version": "1.0.0",
+ "entry_point": "cli-anything-gimp",
+ "source": "harness",
+ "strategy": "pip",
+ }
+ }
+ )
+
+ result = manager.run("gimp", ["project", "list"], json_output=True)
+
+ assert "CLI app 'gimp' exited 0" in result
+ assert "['--json', 'project', 'list']" in result
+
+
+def test_run_blocks_working_dir_outside_workspace(tmp_path: Path) -> None:
+ manager = _manager(tmp_path)
+ _seed_catalog(manager)
+ manager._save_installed({"gimp": {"entry_point": "cli-anything-gimp"}})
+
+ with pytest.raises(CliAppError, match="outside the configured workspace"):
+ manager.run("gimp", working_dir="/etc", restrict_to_workspace=True)
diff --git a/tests/cli_apps/test_tool.py b/tests/cli_apps/test_tool.py
new file mode 100644
index 000000000..b11d7e593
--- /dev/null
+++ b/tests/cli_apps/test_tool.py
@@ -0,0 +1,121 @@
+from __future__ import annotations
+
+import asyncio
+import json
+import os
+import stat
+import time
+from pathlib import Path
+
+from nanobot.agent.tools.cli_apps import CliAppsTool
+from nanobot.cli_apps.service import CliAppManager, CliAppsRuntimeConfig
+
+
+def _write_cache(path: Path, registry: dict) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ path.write_text(
+ json.dumps({"_cached_at": time.time(), "data": registry}),
+ encoding="utf-8",
+ )
+
+
+def test_run_cli_app_uses_installed_registry_app(
+ tmp_path: Path,
+ monkeypatch,
+) -> None:
+ workspace = tmp_path / "workspace"
+ workspace.mkdir()
+ data_dir = tmp_path / "data"
+ registry = {
+ "meta": {"updated": "2026-04-16"},
+ "clis": [
+ {
+ "name": "gimp",
+ "display_name": "GIMP",
+ "version": "1.0.0",
+ "description": "Image editing",
+ "category": "image",
+ "install_cmd": "pip install cli-anything-gimp",
+ "entry_point": "cli-anything-gimp",
+ }
+ ],
+ }
+ _write_cache(data_dir / "harness_registry_cache.json", registry)
+ _write_cache(data_dir / "public_registry_cache.json", {"meta": {}, "clis": []})
+ CliAppManager(workspace=workspace, data_dir=data_dir)._save_installed(
+ {"gimp": {"entry_point": "cli-anything-gimp"}}
+ )
+ bin_dir = tmp_path / "bin"
+ bin_dir.mkdir()
+ cli = bin_dir / "cli-anything-gimp"
+ cli.write_text(
+ "#!/usr/bin/env python3\n"
+ "import sys\n"
+ "print('tool:' + ' '.join(sys.argv[1:]))\n",
+ encoding="utf-8",
+ )
+ cli.chmod(cli.stat().st_mode | stat.S_IEXEC)
+ monkeypatch.setenv("PATH", f"{bin_dir}{os.pathsep}{os.environ.get('PATH', '')}")
+ monkeypatch.setattr("nanobot.cli_apps.service.get_runtime_subdir", lambda _name: data_dir)
+
+ tool = CliAppsTool(
+ workspace=workspace,
+ restrict_to_workspace=True,
+ runtime=CliAppsRuntimeConfig(run_timeout=5),
+ )
+ assert tool.name == "run_cli_app"
+
+ result = asyncio.run(
+ tool.execute(
+ name="gimp",
+ args=["project", "list"],
+ json=True,
+ working_dir=str(workspace),
+ )
+ )
+
+ assert "CLI app 'gimp' exited 0" in result
+ assert "tool:--json project list" in result
+
+
+def test_run_cli_app_rejects_uninstalled_app(tmp_path: Path, monkeypatch) -> None:
+ workspace = tmp_path / "workspace"
+ workspace.mkdir()
+ data_dir = tmp_path / "data"
+ registry = {
+ "meta": {"updated": "2026-04-16"},
+ "clis": [
+ {
+ "name": "gimp",
+ "display_name": "GIMP",
+ "version": "1.0.0",
+ "description": "Image editing",
+ "category": "image",
+ "install_cmd": "pip install cli-anything-gimp",
+ "entry_point": "cli-anything-gimp",
+ }
+ ],
+ }
+ _write_cache(data_dir / "harness_registry_cache.json", registry)
+ _write_cache(data_dir / "public_registry_cache.json", {"meta": {}, "clis": []})
+ monkeypatch.setattr("nanobot.cli_apps.service.get_runtime_subdir", lambda _name: data_dir)
+ tool = CliAppsTool(workspace=workspace, restrict_to_workspace=True)
+
+ result = asyncio.run(tool.execute(name="gimp"))
+
+ assert "not installed" in result
+
+
+def test_run_cli_app_description_names_only_settings_installed_apps(tmp_path: Path, monkeypatch) -> None:
+ workspace = tmp_path / "workspace"
+ workspace.mkdir()
+ data_dir = tmp_path / "data"
+ CliAppManager(workspace=workspace, data_dir=data_dir)._save_installed(
+ {"drawio": {"entry_point": "cli-anything-drawio"}}
+ )
+ monkeypatch.setattr("nanobot.cli_apps.service.get_runtime_subdir", lambda _name: data_dir)
+
+ tool = CliAppsTool(workspace=workspace)
+
+ assert "Settings CLI Apps: drawio" in tool.description
+ assert "ordinary system CLIs such as git, gh" in tool.description
diff --git a/tests/cli_apps/test_utils.py b/tests/cli_apps/test_utils.py
new file mode 100644
index 000000000..9be2116a6
--- /dev/null
+++ b/tests/cli_apps/test_utils.py
@@ -0,0 +1,64 @@
+"""Tests for CLI Apps loop helpers."""
+
+from types import SimpleNamespace
+
+from nanobot.cli_apps.service import CliAppManager
+from nanobot.cli_apps.utils import runtime_lines, session_extra
+
+
+def test_session_extra_returns_cli_apps_only_when_present() -> None:
+ cli_apps = [{"name": "zoom"}]
+ assert session_extra({"cli_apps": cli_apps}) == {"cli_apps": cli_apps}
+ assert session_extra({}) == {}
+ assert session_extra(None) == {}
+
+
+def test_cli_app_mentions_inject_runtime_metadata(tmp_path, monkeypatch):
+ data_dir = tmp_path / "data"
+ monkeypatch.setattr("nanobot.cli_apps.service.get_runtime_subdir", lambda _name: data_dir)
+ manager = CliAppManager(workspace=tmp_path)
+ manager._save_installed(
+ {
+ "zoom": {
+ "entry_point": "cli-anything-zoom",
+ "source": "harness",
+ },
+ "krita": {
+ "entry_point": "cli-anything-krita",
+ "source": "harness",
+ },
+ }
+ )
+
+ lines = runtime_lines(
+ SimpleNamespace(content="please use @zoom tonight; ignore @krita?", metadata={}),
+ tmp_path,
+ )
+
+ joined = "\n".join(lines)
+ assert "CLI App Mention: @zoom" in joined
+ assert "tool=run_cli_app" in joined
+ assert "entry_point=cli-anything-zoom" in joined
+ assert "skill=skills/cli-app-zoom/SKILL.md" in joined
+
+
+def test_structured_cli_app_attachment_injects_runtime_metadata(tmp_path):
+ lines = runtime_lines(
+ SimpleNamespace(
+ content="please use @zoom tonight",
+ metadata={
+ "cli_apps": [{
+ "name": "zoom",
+ "entry_point": "cli-anything-zoom",
+ "display_name": "Zoom",
+ }],
+ },
+ ),
+ tmp_path,
+ )
+
+ joined = "\n".join(lines)
+ assert "CLI App Attachment: @zoom" in joined
+ assert "tool=run_cli_app" in joined
+ assert "entry_point=cli-anything-zoom" in joined
+ assert "skill=skills/cli-app-zoom/SKILL.md" in joined
diff --git a/tests/tools/test_tool_loader.py b/tests/tools/test_tool_loader.py
index 62703883c..bfe35d910 100644
--- a/tests/tools/test_tool_loader.py
+++ b/tests/tools/test_tool_loader.py
@@ -91,6 +91,7 @@ def test_discover_finds_concrete_tools():
class_names = {cls.__name__ for cls in discovered}
assert "ApplyPatchTool" in class_names
assert "ExecTool" in class_names
+ assert "CliAppsTool" in class_names
assert "MessageTool" in class_names
assert "SpawnTool" in class_names
assert "WriteStdinTool" in class_names
@@ -366,6 +367,7 @@ def test_config_defaults():
assert config.tools.my.enable is True
assert config.tools.my.allow_set is False
assert config.tools.image_generation.enabled is False
+ assert config.tools.cli_apps.enable is True
assert config.tools.restrict_to_workspace is False
diff --git a/tests/utils/test_webui_transcript.py b/tests/utils/test_webui_transcript.py
index 4c2226f0b..6c855462c 100644
--- a/tests/utils/test_webui_transcript.py
+++ b/tests/utils/test_webui_transcript.py
@@ -143,6 +143,50 @@ def test_replay_tool_events_dedupes_finish_after_start() -> None:
'exec({"cmd": "ls"})',
'read_file({"path": "notes.md"})',
]
+ assert msgs[0]["toolEvents"][0]["phase"] == "end"
+ assert msgs[0]["toolEvents"][0]["call_id"] == "call-exec"
+
+
+def test_replay_tool_events_keeps_phase_update_when_trace_is_deduped() -> None:
+ args = {"name": "github", "args": ["repo", "view"], "json": "true"}
+ msgs = replay_transcript_to_ui_messages([
+ {
+ "event": "message",
+ "chat_id": "t-tool",
+ "text": "",
+ "kind": "tool_hint",
+ "tool_events": [
+ {
+ "phase": "start",
+ "call_id": "call-cli",
+ "name": "run_cli_app",
+ "arguments": args,
+ },
+ ],
+ },
+ {
+ "event": "message",
+ "chat_id": "t-tool",
+ "text": "",
+ "kind": "progress",
+ "tool_events": [
+ {
+ "phase": "error",
+ "call_id": "call-cli",
+ "name": "run_cli_app",
+ "arguments": args,
+ "error": "Error: CLI app 'github' not found",
+ },
+ ],
+ },
+ ])
+
+ assert len(msgs) == 1
+ assert msgs[0]["traces"] == [
+ 'run_cli_app({"name": "github", "args": ["repo", "view"], "json": "true"})',
+ ]
+ assert msgs[0]["toolEvents"][0]["phase"] == "error"
+ assert msgs[0]["toolEvents"][0]["error"] == "Error: CLI app 'github' not found"
def test_replay_file_edit_progress_merges_after_interleaved_activity(tmp_path, monkeypatch) -> None:
diff --git a/webui/src/components/CliAppMentionText.tsx b/webui/src/components/CliAppMentionText.tsx
new file mode 100644
index 000000000..d9adbd877
--- /dev/null
+++ b/webui/src/components/CliAppMentionText.tsx
@@ -0,0 +1,148 @@
+import { useState } from "react";
+
+import type { CliAppInfo } from "@/lib/types";
+import { cn } from "@/lib/utils";
+
+export type CliAppMentionSegment =
+ | { kind: "text"; text: string }
+ | { kind: "cli"; text: string; app: CliAppInfo };
+
+export function cliAppInitials(app: CliAppInfo): string {
+ const value = app.display_name || app.name;
+ return (
+ value
+ .split(/\s+/)
+ .filter(Boolean)
+ .slice(0, 2)
+ .map((part) => part[0]?.toUpperCase())
+ .join("") || app.name.slice(0, 2).toUpperCase()
+ );
+}
+
+export function splitCliAppMentionSegments(
+ value: string,
+ cliApps: CliAppInfo[],
+): CliAppMentionSegment[] {
+ if (!value || cliApps.length === 0) return value ? [{ kind: "text", text: value }] : [];
+ const appsByName = new Map(
+ cliApps
+ .filter((app) => app.installed)
+ .map((app) => [app.name.toLowerCase(), app]),
+ );
+ if (appsByName.size === 0) return [{ kind: "text", text: value }];
+
+ const segments: CliAppMentionSegment[] = [];
+ const mentionRe = /(^|[\s([{])@([a-z0-9_-]+)\b/gi;
+ let cursor = 0;
+ let match: RegExpExecArray | null;
+ while ((match = mentionRe.exec(value)) !== null) {
+ const prefix = match[1] ?? "";
+ const name = match[2] ?? "";
+ const app = appsByName.get(name.toLowerCase());
+ if (!app) continue;
+
+ const mentionStart = match.index + prefix.length;
+ const mentionEnd = mentionStart + name.length + 1;
+ if (mentionStart > cursor) {
+ segments.push({ kind: "text", text: value.slice(cursor, mentionStart) });
+ }
+ segments.push({ kind: "cli", text: value.slice(mentionStart, mentionEnd), app });
+ cursor = mentionEnd;
+ }
+ if (cursor < value.length) {
+ segments.push({ kind: "text", text: value.slice(cursor) });
+ }
+ return segments.length ? segments : [{ kind: "text", text: value }];
+}
+
+export function CliAppMentionText({
+ text,
+ cliApps,
+}: {
+ text: string;
+ cliApps: CliAppInfo[];
+}) {
+ const segments = splitCliAppMentionSegments(text, cliApps);
+ if (!segments.some((segment) => segment.kind === "cli")) return <>{text}>;
+ return (
+ <>
+ {segments.map((segment, index) => {
+ if (segment.kind === "text") {
+ return {segment.text};
+ }
+ return (
+ setFailed(true)}
+ />
+
+ ) : null}
+
+ {mentionName}
+
+ );
+}
+
+function alphaColor(color: string, percent: number): string {
+ if (/^#[0-9a-f]{6}$/i.test(color)) {
+ const alpha = Math.round((percent / 100) * 255)
+ .toString(16)
+ .padStart(2, "0");
+ return `${color}${alpha}`;
+ }
+ return `color-mix(in srgb, ${color} ${percent}%, transparent)`;
+}
diff --git a/webui/src/components/MessageBubble.tsx b/webui/src/components/MessageBubble.tsx
index 98ab0c941..9868ef31f 100644
--- a/webui/src/components/MessageBubble.tsx
+++ b/webui/src/components/MessageBubble.tsx
@@ -1,6 +1,7 @@
import {
useCallback,
useEffect,
+ useMemo,
useRef,
useState,
type ReactNode,
@@ -8,16 +9,18 @@ import {
import { Check, ChevronRight, Copy, FileIcon, ImageIcon, PlaySquare, Sparkles, Wrench } from "lucide-react";
import { useTranslation } from "react-i18next";
+import { CliAppMentionText } from "@/components/CliAppMentionText";
import { ImageLightbox } from "@/components/ImageLightbox";
import { MarkdownText, preloadMarkdownText } from "@/components/MarkdownText";
import { cn } from "@/lib/utils";
import { formatTurnLatency } from "@/lib/format";
-import type { UIImage, UIMediaAttachment, UIMessage } from "@/lib/types";
+import type { CliAppInfo, UICliAppAttachment, UIImage, UIMediaAttachment, UIMessage } from "@/lib/types";
interface MessageBubbleProps {
message: UIMessage;
/** When false, hide the assistant reply copy button (mid-turn text before more agent activity). Default true. */
showAssistantCopyAction?: boolean;
+ cliApps?: CliAppInfo[];
}
/**
@@ -32,11 +35,16 @@ interface MessageBubbleProps {
export function MessageBubble({
message,
showAssistantCopyAction = true,
+ cliApps = [],
}: MessageBubbleProps) {
const { t } = useTranslation();
const [copied, setCopied] = useState(false);
const copyResetRef = useRef
+ {tx("settings.cliApps.summary", "{{installed}} of {{total}} CLIs installed") + .replace("{{installed}}", String(payload?.installed_count ?? 0)) + .replace("{{total}}", String(apps.length))} +
++ {app.requires + ? `${tx("settings.cliApps.requires", "Requires")}: ${app.requires}` + : app.description || tx("settings.cliApps.noDescription", "No description available.")} +
++ {t("settings.legal.thirdPartyBrands", { + defaultValue: + "Product names, logos, and brands are property of their respective owners. Use is for identification only and does not imply endorsement.", + })} +
+ ); +} + function orderUnconfiguredProviders( providers: SettingsPayload["providers"], ): SettingsPayload["providers"] { @@ -2126,6 +2614,63 @@ function providerLabel( return providers.find((provider) => provider.name === value)?.label ?? value; } +interface ProviderBrand { + logoUrl: string; + color: string; + initials: string; +} + +function faviconUrl(domain: string): string { + return `https://www.google.com/s2/favicons?domain=${domain}&sz=64`; +} + +const PROVIDER_BRAND_ALIASES: Record