"""Shell execution tool.""" import asyncio import os import re import shutil import sys from pathlib import Path from typing import Any from loguru import logger from nanobot.agent.tools.base import Tool, tool_parameters from nanobot.agent.tools.sandbox import wrap_command from nanobot.agent.tools.schema import IntegerSchema, StringSchema, tool_parameters_schema from nanobot.config.paths import get_media_dir _IS_WINDOWS = sys.platform == "win32" @tool_parameters( tool_parameters_schema( command=StringSchema("The shell command to execute"), working_dir=StringSchema("Optional working directory for the command"), timeout=IntegerSchema( 60, description=( "Timeout in seconds. Increase for long-running commands " "like compilation or installation (default 60, max 600)." ), minimum=1, maximum=600, ), required=["command"], ) ) class ExecTool(Tool): """Tool to execute shell commands.""" def __init__( self, timeout: int = 60, working_dir: str | None = None, deny_patterns: list[str] | None = None, allow_patterns: list[str] | None = None, restrict_to_workspace: bool = False, sandbox: str = "", path_append: str = "", ): self.timeout = timeout self.working_dir = working_dir self.sandbox = sandbox self.deny_patterns = deny_patterns or [ r"\brm\s+-[rf]{1,2}\b", # rm -r, rm -rf, rm -fr r"\bdel\s+/[fq]\b", # del /f, del /q r"\brmdir\s+/s\b", # rmdir /s r"(?:^|[;&|]\s*)format\b", # format (as standalone command only) r"\b(mkfs|diskpart)\b", # disk operations r"\bdd\s+if=", # dd r">\s*/dev/sd", # write to disk r"\b(shutdown|reboot|poweroff)\b", # system power r":\(\)\s*\{.*\};\s*:", # fork bomb ] self.allow_patterns = allow_patterns or [] self.restrict_to_workspace = restrict_to_workspace self.path_append = path_append @property def name(self) -> str: return "exec" _MAX_TIMEOUT = 600 _MAX_OUTPUT = 10_000 @property def description(self) -> str: return "Execute a shell command and return its output. Use with caution." @property def exclusive(self) -> bool: return True async def execute( self, command: str, working_dir: str | None = None, timeout: int | None = None, **kwargs: Any, ) -> str: cwd = working_dir or self.working_dir or os.getcwd() guard_error = self._guard_command(command, cwd) if guard_error: return guard_error if self.sandbox: if _IS_WINDOWS: logger.warning( "Sandbox '{}' is not supported on Windows; running unsandboxed", self.sandbox, ) else: workspace = self.working_dir or cwd command = wrap_command(self.sandbox, command, workspace, cwd) cwd = str(Path(workspace).resolve()) effective_timeout = min(timeout or self.timeout, self._MAX_TIMEOUT) env = self._build_env() if self.path_append: if _IS_WINDOWS: env["PATH"] = env.get("PATH", "") + ";" + self.path_append else: command = f'export PATH="$PATH:{self.path_append}"; {command}' try: process = await self._spawn(command, cwd, env) try: stdout, stderr = await asyncio.wait_for( process.communicate(), timeout=effective_timeout, ) except asyncio.TimeoutError: await self._kill_process(process) return f"Error: Command timed out after {effective_timeout} seconds" except asyncio.CancelledError: await self._kill_process(process) raise output_parts = [] if stdout: output_parts.append(stdout.decode("utf-8", errors="replace")) if stderr: stderr_text = stderr.decode("utf-8", errors="replace") if stderr_text.strip(): output_parts.append(f"STDERR:\n{stderr_text}") output_parts.append(f"\nExit code: {process.returncode}") result = "\n".join(output_parts) if output_parts else "(no output)" max_len = self._MAX_OUTPUT if len(result) > max_len: half = max_len // 2 result = ( result[:half] + f"\n\n... ({len(result) - max_len:,} chars truncated) ...\n\n" + result[-half:] ) return result except Exception as e: return f"Error executing command: {str(e)}" @staticmethod async def _spawn( command: str, cwd: str, env: dict[str, str], ) -> asyncio.subprocess.Process: """Launch *command* in a platform-appropriate shell.""" if _IS_WINDOWS: comspec = env.get("COMSPEC", os.environ.get("COMSPEC", "cmd.exe")) return await asyncio.create_subprocess_exec( comspec, "/c", command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env, ) bash = shutil.which("bash") or "/bin/bash" return await asyncio.create_subprocess_exec( bash, "-l", "-c", command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, cwd=cwd, env=env, ) @staticmethod async def _kill_process(process: asyncio.subprocess.Process) -> None: """Kill a subprocess and reap it to prevent zombies.""" process.kill() try: await asyncio.wait_for(process.wait(), timeout=5.0) except asyncio.TimeoutError: pass finally: if not _IS_WINDOWS: try: os.waitpid(process.pid, os.WNOHANG) except (ProcessLookupError, ChildProcessError) as e: logger.debug("Process already reaped or not found: {}", e) def _build_env(self) -> dict[str, str]: """Build a minimal environment for subprocess execution. On Unix, only HOME/LANG/TERM are passed; ``bash -l`` sources the user's profile which sets PATH and other essentials. On Windows, ``cmd.exe`` has no login-profile mechanism, so a curated set of system variables (including PATH) is forwarded. API keys and other secrets are still excluded. """ if _IS_WINDOWS: sr = os.environ.get("SYSTEMROOT", r"C:\Windows") return { "SYSTEMROOT": sr, "COMSPEC": os.environ.get("COMSPEC", f"{sr}\\system32\\cmd.exe"), "USERPROFILE": os.environ.get("USERPROFILE", ""), "HOMEDRIVE": os.environ.get("HOMEDRIVE", "C:"), "HOMEPATH": os.environ.get("HOMEPATH", "\\"), "TEMP": os.environ.get("TEMP", f"{sr}\\Temp"), "TMP": os.environ.get("TMP", f"{sr}\\Temp"), "PATHEXT": os.environ.get("PATHEXT", ".COM;.EXE;.BAT;.CMD"), "PATH": os.environ.get("PATH", f"{sr}\\system32;{sr}"), } home = os.environ.get("HOME", "/tmp") return { "HOME": home, "LANG": os.environ.get("LANG", "C.UTF-8"), "TERM": os.environ.get("TERM", "dumb"), } def _guard_command(self, command: str, cwd: str) -> str | None: """Best-effort safety guard for potentially destructive commands.""" cmd = command.strip() lower = cmd.lower() for pattern in self.deny_patterns: if re.search(pattern, lower): return "Error: Command blocked by safety guard (dangerous pattern detected)" if self.allow_patterns: if not any(re.search(p, lower) for p in self.allow_patterns): return "Error: Command blocked by safety guard (not in allowlist)" from nanobot.security.network import contains_internal_url if contains_internal_url(cmd): return "Error: Command blocked by safety guard (internal/private URL detected)" if self.restrict_to_workspace: if "..\\" in cmd or "../" in cmd: return "Error: Command blocked by safety guard (path traversal detected)" cwd_path = Path(cwd).resolve() for raw in self._extract_absolute_paths(cmd): try: expanded = os.path.expandvars(raw.strip()) p = Path(expanded).expanduser().resolve() except Exception: continue media_path = get_media_dir().resolve() if (p.is_absolute() and cwd_path not in p.parents and p != cwd_path and media_path not in p.parents and p != media_path ): return "Error: Command blocked by safety guard (path outside working dir)" return None @staticmethod def _extract_absolute_paths(command: str) -> list[str]: # Windows: match drive-root paths like `C:\` as well as `C:\path\to\file` # NOTE: `*` is required so `C:\` (nothing after the slash) is still extracted. win_paths = re.findall(r"[A-Za-z]:\\[^\s\"'|><;]*", command) posix_paths = re.findall(r"(?:^|[\s|>'\"])(/[^\s\"'>;|<]+)", command) # POSIX: /absolute only home_paths = re.findall(r"(?:^|[\s|>'\"])(~[^\s\"'>;|<]*)", command) # POSIX/Windows home shortcut: ~ return win_paths + posix_paths + home_paths