nanobot/nanobot/agent/tools/filesystem.py
Xubin Ren 1c2ea1aad2
feat(goal): /goal command & long-running tasks (long_task)
* feat(long-task): add LongTaskTool for multi-step agent tasks

Implements a meta-ReAct loop where long-running tasks are broken into
sequential subagent steps, each starting fresh with the original goal
and progress from the previous step. This prevents context drift when
agents work on complex, multi-step tasks.

- Extract build_tool_registry() from SubagentManager for reuse
- Add run_step() for synchronous subagent execution (no bus announcement)
- Add HandoffTool and CompleteTool as signal mechanisms via shared dict
- Add LongTaskTool orchestrator with simplified prompt (8 iterations/step)
- Register LongTaskTool in main agent loop
- Add _extract_handoff_from_messages fallback for robustness

* fix(long-task): add debug logging for step-level observability

* feat(long-task): major overhaul with structured handoffs, validation, and observability

- Structured HandoffState: HandoffTool now accepts files_created,
  files_modified, next_step_hint, and verification fields instead of
  a plain string. Progress is passed between steps as structured data.

- Completion validation round: After complete() is called, a dedicated
  validator step runs to verify the claim against the original goal.
  If validation fails, the task continues rather than returning
  a false completion.

- Dynamic prompt system: 3 Jinja2 templates (step_start, step_middle,
  step_final) selected based on step number. Final steps get tighter
  budget and stronger "wrap up" guidance.

- Automatic file change tracking: Extracts write_file/edit_file events
  from tool_events and injects them into the next step's context if
  the subagent forgot to report them explicitly.

- Budget tracking & adaptive strategy: Cumulative token usage is tracked
  across steps. Per-step tool budget drops from 8 to 4 in the last
  two steps to force handoff/completion.

- Crash retry with graceful degradation: A step that crashes is retried
  once. Persistent crashes terminate the task and return partial progress.

- Full observability hooks for future WebUI integration:
  - set_hooks() with on_step_start, on_step_complete, on_handoff,
    on_validation_started, on_validation_passed, on_validation_failed,
    on_task_complete, on_task_error, and catch-all on_event.
  - Readable state properties: current_step, total_steps, status,
    last_handoff, cumulative_usage, goal.
  - inject_correction() allows external code to send user corrections
    that are injected into the next step's prompt.

- run_step() accepts optional max_iterations for dynamic budget control.

All 27 long-task tests and 11 subagent tests pass.

* test(long-task): add boundary tests and fix race conditions

- Add 7 edge-case tests: validation crash resilience, hook exception safety, mid-run correction injection, FIFO correction ordering, explicit file changes overriding auto-detection, final budget for max_steps=1, and dynamic budget switching boundaries

- Fix assertion in test_long_task_completes_after_multiple_handoffs to match exact prompt format

- Remove asyncio timing hack from test_state_exposure

- Add asyncio.sleep(0) yield in test_inject_correction_during_execution to prevent race between signal injection and step continuation

- All 34 tests passing

* fix(long-task): address code review findings

- Declare _scopes = {"core"} explicitly to prevent recursive nesting in subagent scope
- Document fragile coupling in _extract_file_changes: path extraction depends on
  write_file/edit_file detail format; add debug log for unexpected formats
- Align final-template threshold (max_steps - 2) with budget switch threshold
- Eliminate hasattr(self, "_state") in _reset_state by initializing in __init__

* fix(long-task): honor final signal and file tracking

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(long-task): improve prompt structure and agent contract

- Expand LongTaskTool.description to instruct parent agent on goal
  construction, return value semantics, and how to handle results.
- Expand CompleteTool.description to emphasize that the summary IS the
  final answer returned to the parent agent.
- Prefix validated return value with an explicit "final answer" directive
  to stop parent agent from re-running work.
- Redesign step_start.md: Step 1 is now explicitly for exploration,
  planning, and skeleton-building. complete() is discouraged.
- Remove bulky payload debug logging from _emit(); add targeted
  info/warning/error logs at key state transitions instead.
- Add signal_type to HandoffState for cleaner signal detection.

* test(long-task): expect wrapped completion message after validation

Align assertions with LongTaskTool final return shape on main.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): turn timing strip, latency, and session-switch restore

- Agent loop: publish goal_status run/idle for WebSocket turns; attach
  wall-clock latency_ms on turn_end and persisted assistant metadata.
- WebSocket channel: forward goal_status and latency fields to clients.
- NanobotClient: track goal_status started_at per chat without requiring
  onChat; useNanobotStream restores run strip when returning to a chat.
- Thread UI: composer/shell viewport hooks for run duration and latency;
  format helpers and i18n strings.
- MessageBubble: drop trailing StreamCursor (layout artifact vs block markdown).
- Builtin / tests: model command coverage, websocket and loop tests.

Covers multi-session UX and round-trip timing visibility for the WebUI.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix: keep message-tool file attachments after canonical history hydrate

- MessageTool records per-turn media paths delivered to the active chat.
- nanobot.utils.session_attachments stages out-of-media-root files and
  merges into the last assistant message before save (loop stays a thin call).
- WebUI MediaCell: use a signed URL as a real download link when present.

Fixes attachments flashing then vanishing on turn_end when paths lived
outside get_media_dir (e.g. workspace files).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): agent activity cluster, stable keys, LTR sheen labels

- Group reasoning and tool traces in AgentActivityCluster with i18n summaries
- Stabilize React list keys for activity clusters (first message id anchor)
- Replace background-clip shimmer with overlay sheen for streaming labels
- ThreadMessages/MessageList integration and locale strings

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): render assistant reasoning with Markdown + deferred stream

- Use MarkdownText for ReasoningBubble body (same GFM/KaTeX path as replies)
- Apply muted/italic prose tokens so thinking stays visually subordinate
- useDeferredValue while reasoningStreaming to ease parser work during deltas
- Preload markdown chunk when trace opens; add regression test with preloaded renderer

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): default-collapse agent activity cluster while Working

Outer fold no longer auto-expands during isTurnStreaming; user opens to see traces.
Header sheen and live summary unchanged.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(long_task): cumulative run history, file union, and prompt tuning

Inject cross-step summaries and merged file paths into middle/final step
templates so chains do not lose early context. Strip the last run-history
block when it duplicates Previous Progress to save tokens. Add optional
cumulative_prompt_max_chars and cumulative_step_body_max_chars parameters
with clamped defaults.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): session switch keeps in-flight thread and replays buffered WS

Save the prior chat message list to the per-chat cache in a layout effect
when chatId changes (before stale writes could corrupt another chat).
Skip one post-switch layout cache tick so we do not snapshot the wrong tab.

Buffer inbound events per chat_id when no onChat subscriber is registered
(e.g. user focused another session) and drain on resubscribe up to a cap,
so streaming deltas are not lost while off-tab.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): snap thread scroll to bottom on session open (no smooth glide)

Use scroll-behavior auto on the viewport, instant programmatic scroll when
following new messages and on scrollToBottomSignal. Keep smooth only for
the explicit scroll-to-bottom button.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): respect manual scroll-up after opening a session

Track when the user leaves the bottom with a ref and skip ResizeObserver
and deferred bottom snaps until they return or the conversation is reset.
Remove the time-based force-bottom window that overrode atBottom.

Multi-frame scrollToBottom honours the same guard unless force (scroll button).

Co-authored-by: Cursor <cursoragent@cursor.com>

* Publish long_task UI snapshots on outbound metadata

- Add OUTBOUND_META_AGENT_UI (_agent_ui) for channel-agnostic structured state
- LongTaskTool publishes {kind: long_task, data: snapshot} on the bus with _progress
- WebSocket send forwards metadata as agent_ui for WebUI clients
- Tests for bus payload, WS frame, and progress assertions
- Fix loop progress tests: ignore _goal_status in streaming final filter and
  avoid brittle outbound[-1] ordering after goal status idle messages

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat: WebUI long_task activity card and resilient history merge

Add optional ui_summary to the long_task tool for one-line UI labels. Stream
long_task agent_ui into a dedicated message row with timeline, markdown peek,
and a right sheet for details. Merge canonical history after turn_end while
re-inserting long_task rows before the final assistant reply. Collapse
duplicate task_start/step_start steps in the timeline and extend i18n.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor: align long_task with thread_goal and drop orchestrator UI

- Persist sustained objectives via session metadata (long_task / complete_goal); no subagent wiring or tool-driven agent_ui payloads.\n- Remove WebUI long-task activity UI, types, and translations; history merge preserves trace replay only, with legacy long_task rows normalized to traces.\n- Drop long_task prompt templates and get_long_task_run_dir; add webui thread disk helper for gateway persistence tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(agent): thread goal runtime context, tools, and skill

- Add thread_goal_state helper and mirror active objectives into Runtime Context
- Wire loop/context/memory/events as needed for goal metadata in turns
- Expand long_task / complete_goal semantics (pivot/cancel/honest recap)
- Add always-on thread-goal SKILL.md; align /goal command prompt
- Tests for context builder and thread goal state
- Remove unused webui ChatPane component

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(thread-goal): add websocket snapshot helper and publish goal updates from long_task

Introduce thread_goal_ws_blob for bounded JSON snapshots, attach snapshots to
websocket turn_end metadata in AgentLoop, and let long_task fan-out dedicated
thread_goal frames on the websocket channel after persisting session metadata.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(channels): websocket thread_goal frames, turn_end replay, and session API scrub for subagent inject

Emit thread_goal events and optional thread_goal on turn_end; scrub persisted
subagent announce blobs on GET /api/sessions/.../messages and shorten session
list previews so WebUI does not surface full Task/Summarize scaffolding.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): merge ephemeral traces per user turn when reconciling canonical history

Preserve disk/live trace rows inside the matching user–assistant segment instead
of stacking every trace before the final assistant reply (fixes inflated tool
counts after refresh or session switch).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): show assistant reply copy only on the last slice before the next user turn

Avoid duplicate copy affordances on intermediate assistant bubbles that precede
more agent activity in the same turn (tools or further assistant text).

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(webui): thread_goal stream plumbing, composer goal strip, sky glow, and client-side subagent scrub projection

Track thread_goal and turn_goal snapshots in NanobotClient, hydrate React state
from thread_goal frames and turn_end, surface objective/elapsed in the composer,
add breathing sky halo CSS while goals are active, mirror server scrub logic on
history hydration and webui_thread snapshots, and extend tests/client mocks.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(channels): add Slack Socket Mode connect timeout with actionable timeout errors

Abort hung websockets.connect handshakes after a bounded wait, log REST-vs-WSS
guidance, surface RuntimeError to channel startup, and log successful WSS setup.

Co-authored-by: Cursor <cursoragent@cursor.com>

* webui: expand thread goal in composer bottom sheet

Add ChevronUp control on the run/goal strip that opens a bottom Sheet
with full ui_summary and objective. Inline preview logic in RunElapsedStrip,
add i18n strings across locales, and a composer unit test.

Co-authored-by: Cursor <cursoragent@cursor.com>

* fix(webui): widen dedupeToolCallsForUi input for session API typing

fetchSessionMessages types tool_calls as unknown; accept unknown so tsc
build passes when passing message.tool_calls through.

Co-authored-by: Cursor <cursoragent@cursor.com>

* refactor(agent): extract WebSocket turn run status to webui_turn_helpers

* refactor(skills): rename thread-goal to long-task and document idempotent goals

* feat(skills): rename sustained-goal skill to long-goal and tighten long_task guidance

* chore: remove unused subagent/context/router helpers

* feat(session): rename sustained goal to goal_state and align WS/WebUI

- Move helpers from agent/thread_goal_state to session/goal_state:
  GOAL_STATE_KEY, goal_state_runtime_lines, goal_state_ws_blob, parse_goal_state.
- Session metadata now uses "goal_state"; still read legacy "thread_goal";
  long_task writes drop the legacy key after save.
- WebSocket: event/field goal_state, _goal_state_sync; turn_end carries goal_state;
  accept legacy _thread_goal_sync/thread_goal inbound metadata for dispatch.
- WebUI: GoalStateWsPayload, goalState hook/client props, i18n keys goalState*.
- Runtime Context copy uses "Goal (active):" instead of "Thread goal".

* feat(agent): stream Anthropic thinking deltas and fix stream idle timeout

* refactor(webui): transcript jsonl as sole timeline source

* fix(agent): reject mismatched WS message chat_id and stream reasoning deltas

* feat(webui): hydrate sustained goal and run timer after websocket subscribe

* chore(webui,websocket): remove unused fetch helpers and legacy thread_goal WS paths

* Raise default max_tokens and context window in agent schema.

Align AgentDefaults and ModelPresetConfig with typical Claude-scale usage
(32k completion budget, 256k context window) and update migration tests.

Co-authored-by: Cursor <cursoragent@cursor.com>

* feat(gateway): bootstrap prefers in-memory model; clarify websocket naming

* fix(websocket): websocket _handle_message passes is_dm; refresh /status test expectations

---------

Co-authored-by: chengyongru <2755839590@qq.com>
Co-authored-by: chengyongru <chengyongru.ai@gmail.com>
Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-16 01:14:11 +08:00

919 lines
33 KiB
Python

"""File system tools: read, write, edit, list."""
import difflib
import mimetypes
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from nanobot.agent.tools.base import Tool, tool_parameters
from nanobot.agent.tools.file_state import FileStates, _hash_file, current_file_states
from nanobot.agent.tools.path_utils import resolve_workspace_path
from nanobot.agent.tools.schema import (
BooleanSchema,
IntegerSchema,
StringSchema,
tool_parameters_schema,
)
from nanobot.utils.helpers import build_image_content_blocks, detect_image_mime
class _FsTool(Tool):
"""Shared base for filesystem tools — common init and path resolution."""
def __init__(
self,
workspace: Path | None = None,
allowed_dir: Path | None = None,
extra_allowed_dirs: list[Path] | None = None,
file_states: FileStates | None = None,
):
self._workspace = workspace
self._allowed_dir = allowed_dir
self._extra_allowed_dirs = extra_allowed_dirs
# Explicit state is used by isolated runners like Dream/subagents.
# Main AgentLoop tools leave this unset and resolve state from the
# current async task, which keeps shared tool instances session-safe.
self._explicit_file_states = file_states
self._fallback_file_states = FileStates()
@classmethod
def create(cls, ctx: Any) -> Tool:
from nanobot.agent.skills import BUILTIN_SKILLS_DIR
restrict = (
ctx.config.restrict_to_workspace
or ctx.config.exec.sandbox
)
allowed_dir = Path(ctx.workspace) if restrict else None
extra_read = [BUILTIN_SKILLS_DIR] if allowed_dir else None
return cls(
workspace=Path(ctx.workspace),
allowed_dir=allowed_dir,
extra_allowed_dirs=extra_read,
file_states=ctx.file_state_store,
)
@property
def _file_states(self) -> FileStates:
if self._explicit_file_states is not None:
return self._explicit_file_states
return current_file_states(self._fallback_file_states)
def _resolve(self, path: str) -> Path:
return resolve_workspace_path(
path,
self._workspace,
self._allowed_dir,
self._extra_allowed_dirs,
)
# ---------------------------------------------------------------------------
# read_file
# ---------------------------------------------------------------------------
_BLOCKED_DEVICE_PATHS = frozenset({
"/dev/zero", "/dev/random", "/dev/urandom", "/dev/full",
"/dev/stdin", "/dev/stdout", "/dev/stderr",
"/dev/tty", "/dev/console",
"/dev/fd/0", "/dev/fd/1", "/dev/fd/2",
})
def _is_blocked_device(path: str | Path) -> bool:
"""Check if path is a blocked device that could hang or produce infinite output."""
import re
raw = str(path)
# Resolve symlinks to check the actual target
try:
resolved = str(Path(raw).resolve())
except (OSError, ValueError):
resolved = raw
if raw in _BLOCKED_DEVICE_PATHS or resolved in _BLOCKED_DEVICE_PATHS:
return True
if re.match(r"/proc/\d+/fd/[012]$", raw) or re.match(r"/proc/self/fd/[012]$", raw):
return True
if re.match(r"/proc/\d+/fd/[012]$", resolved) or re.match(r"/proc/self/fd/[012]$", resolved):
return True
# Check if resolved path starts with /dev/ (covers symlinks to devices)
if resolved.startswith("/dev/"):
return True
return False
def _parse_page_range(pages: str, total: int) -> tuple[int, int]:
"""Parse a page range like '2-5' into 0-based (start, end) inclusive."""
parts = pages.strip().split("-")
if len(parts) == 1:
p = int(parts[0])
return max(0, p - 1), min(p - 1, total - 1)
start = int(parts[0])
end = int(parts[1])
return max(0, start - 1), min(end - 1, total - 1)
@tool_parameters(
tool_parameters_schema(
path=StringSchema("The file path to read"),
offset=IntegerSchema(
1,
description="Line number to start reading from (1-indexed, default 1)",
minimum=1,
),
limit=IntegerSchema(
2000,
description="Maximum number of lines to read (default 2000)",
minimum=1,
),
pages=StringSchema("Page range for PDF files, e.g. '1-5' (default: all, max 20 pages)"),
required=["path"],
)
)
class ReadFileTool(_FsTool):
"""Read file contents with optional line-based pagination."""
_scopes = {"core", "subagent", "memory"}
_MAX_CHARS = 128_000
_DEFAULT_LIMIT = 2000
_MAX_PDF_PAGES = 20
@property
def name(self) -> str:
return "read_file"
@property
def description(self) -> str:
return (
"Read a file (text, image, or document). "
"Text output format: LINE_NUM|CONTENT. "
"Images return visual content for analysis. "
"Supports PDF, DOCX, XLSX, PPTX documents. "
"Use offset and limit for large text files. "
"Reads exceeding ~128K chars are truncated."
)
@property
def read_only(self) -> bool:
return True
async def execute(self, path: str | None = None, offset: int = 1, limit: int | None = None, pages: str | None = None, **kwargs: Any) -> Any:
try:
if not path:
return "Error reading file: Unknown path"
# Device path blacklist
if _is_blocked_device(path):
return f"Error: Reading {path} is blocked (device path that could hang or produce infinite output)."
fp = self._resolve(path)
if _is_blocked_device(fp):
return f"Error: Reading {fp} is blocked (device path that could hang or produce infinite output)."
if not fp.exists():
return f"Error: File not found: {path}"
if not fp.is_file():
return f"Error: Not a file: {path}"
# PDF support
if fp.suffix.lower() == ".pdf":
return self._read_pdf(fp, pages)
# Office document support
if fp.suffix.lower() in {".docx", ".xlsx", ".pptx"}:
return self._read_office_doc(fp)
raw = fp.read_bytes()
if not raw:
return f"(Empty file: {path})"
mime = detect_image_mime(raw) or mimetypes.guess_type(path)[0]
if mime and mime.startswith("image/"):
return build_image_content_blocks(raw, mime, str(fp), f"(Image file: {path})")
# Read dedup: same path + offset + limit + unchanged mtime → stub
# Always check for external modifications before dedup
entry = self._file_states.get(fp)
try:
current_mtime = os.path.getmtime(fp)
except OSError:
current_mtime = 0.0
if entry and entry.can_dedup and entry.offset == offset and entry.limit == limit:
if current_mtime != entry.mtime:
# File was modified externally - force full read and mark as not dedupable
entry.can_dedup = False
self._file_states.record_read(fp, offset=offset, limit=limit) # Update state with new mtime
# Continue to read full content (don't return dedup message)
else:
# File unchanged - return dedup message
# But only if content is actually unchanged (not just mtime)
current_hash = _hash_file(str(fp))
if current_hash == entry.content_hash:
return f"[File unchanged since last read: {path}]"
else:
# Content changed despite same mtime - force full read
entry.can_dedup = False
self._file_states.record_read(fp, offset=offset, limit=limit)
else:
# No previous state or marked as not dedupable - read full content
self._file_states.record_read(fp, offset=offset, limit=limit)
# Force full read by setting can_dedup to False for this read
if entry:
entry.can_dedup = False
# Read the file content after dedup check
raw = fp.read_bytes()
try:
text_content = raw.decode("utf-8")
except UnicodeDecodeError:
# Binary file - return error message
mime = detect_image_mime(raw) or mimetypes.guess_type(path)[0]
if mime and mime.startswith("image/"):
return build_image_content_blocks(raw, mime, str(fp), f"(Image file: {path})")
return f"Error: Cannot read binary file {path} (MIME: {mime or 'unknown'}). Only UTF-8 text and images are supported."
# Normalize CRLF -> LF before line-splitting. Primarily a Windows
# concern (git checkouts with autocrlf, editors saving CRLF) but
# applied on all platforms so downstream StrReplace/Grep behavior
# is consistent regardless of where the file was written.
text_content = text_content.replace("\r\n", "\n")
all_lines = text_content.splitlines()
total = len(all_lines)
if offset < 1:
offset = 1
if offset > total:
return f"Error: offset {offset} is beyond end of file ({total} lines)"
start = offset - 1
end = min(start + (limit or self._DEFAULT_LIMIT), total)
numbered = [f"{start + i + 1}| {line}" for i, line in enumerate(all_lines[start:end])]
result = "\n".join(numbered)
if len(result) > self._MAX_CHARS:
trimmed, chars = [], 0
for line in numbered:
chars += len(line) + 1
if chars > self._MAX_CHARS:
break
trimmed.append(line)
end = start + len(trimmed)
result = "\n".join(trimmed)
if end < total:
result += f"\n\n(Showing lines {offset}-{end} of {total}. Use offset={end + 1} to continue.)"
else:
result += f"\n\n(End of file — {total} lines total)"
self._file_states.record_read(fp, offset=offset, limit=limit)
return result
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error reading file: {e}"
def _read_pdf(self, fp: Path, pages: str | None) -> str:
try:
import fitz # pymupdf
except ImportError:
return "Error: PDF reading requires pymupdf. Install with: pip install pymupdf"
try:
doc = fitz.open(str(fp))
except Exception as e:
return f"Error reading PDF: {e}"
total_pages = len(doc)
if pages:
try:
start, end = _parse_page_range(pages, total_pages)
except (ValueError, IndexError):
doc.close()
return f"Error: Invalid page range '{pages}'. Use format like '1-5'."
if start > end or start >= total_pages:
doc.close()
return f"Error: Page range '{pages}' is out of bounds (document has {total_pages} pages)."
else:
start = 0
end = min(total_pages - 1, self._MAX_PDF_PAGES - 1)
if end - start + 1 > self._MAX_PDF_PAGES:
end = start + self._MAX_PDF_PAGES - 1
parts: list[str] = []
for i in range(start, end + 1):
page = doc[i]
text = page.get_text().strip()
if text:
parts.append(f"--- Page {i + 1} ---\n{text}")
doc.close()
if not parts:
return f"(PDF has no extractable text: {fp})"
result = "\n\n".join(parts)
if end < total_pages - 1:
result += f"\n\n(Showing pages {start + 1}-{end + 1} of {total_pages}. Use pages='{end + 2}-{min(end + 1 + self._MAX_PDF_PAGES, total_pages)}' to continue.)"
if len(result) > self._MAX_CHARS:
result = result[:self._MAX_CHARS] + "\n\n(PDF text truncated at ~128K chars)"
return result
def _read_office_doc(self, fp: Path) -> str:
from nanobot.utils.document import extract_text
result = extract_text(fp)
if result is None:
return f"Error: Unsupported file format: {fp.suffix}"
if result.startswith("[error:"):
return f"Error reading {fp.suffix.upper()} file: {result}"
if not result:
return f"({fp.suffix.upper().lstrip('.')} has no extractable text: {fp})"
if len(result) > self._MAX_CHARS:
result = result[:self._MAX_CHARS] + "\n\n(Document text truncated at ~128K chars)"
return result
# ---------------------------------------------------------------------------
# write_file
# ---------------------------------------------------------------------------
@tool_parameters(
tool_parameters_schema(
path=StringSchema("The file path to write to"),
content=StringSchema("The content to write"),
required=["path", "content"],
)
)
class WriteFileTool(_FsTool):
"""Write content to a file."""
_scopes = {"core", "subagent", "memory"}
@property
def name(self) -> str:
return "write_file"
@property
def description(self) -> str:
return (
"Write content to a file. Overwrites if the file already exists; "
"creates parent directories as needed. "
"For partial edits, prefer edit_file instead."
)
async def execute(self, path: str | None = None, content: str | None = None, **kwargs: Any) -> str:
try:
if not path:
raise ValueError("Unknown path")
if content is None:
raise ValueError("Unknown content")
fp = self._resolve(path)
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(content, encoding="utf-8")
self._file_states.record_write(fp)
return f"Successfully wrote {len(content)} characters to {fp}"
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error writing file: {e}"
# ---------------------------------------------------------------------------
# edit_file
# ---------------------------------------------------------------------------
_QUOTE_TABLE = str.maketrans({
"\u2018": "'", "\u2019": "'", # curly single → straight
"\u201c": '"', "\u201d": '"', # curly double → straight
"'": "'", '"': '"', # identity (kept for completeness)
})
def _normalize_quotes(s: str) -> str:
return s.translate(_QUOTE_TABLE)
def _curly_double_quotes(text: str) -> str:
parts: list[str] = []
opening = True
for ch in text:
if ch == '"':
parts.append("\u201c" if opening else "\u201d")
opening = not opening
else:
parts.append(ch)
return "".join(parts)
def _curly_single_quotes(text: str) -> str:
parts: list[str] = []
opening = True
for i, ch in enumerate(text):
if ch != "'":
parts.append(ch)
continue
prev_ch = text[i - 1] if i > 0 else ""
next_ch = text[i + 1] if i + 1 < len(text) else ""
if prev_ch.isalnum() and next_ch.isalnum():
parts.append("\u2019")
continue
parts.append("\u2018" if opening else "\u2019")
opening = not opening
return "".join(parts)
def _preserve_quote_style(old_text: str, actual_text: str, new_text: str) -> str:
"""Preserve curly quote style when a quote-normalized fallback matched."""
if _normalize_quotes(old_text.strip()) != _normalize_quotes(actual_text.strip()) or old_text == actual_text:
return new_text
styled = new_text
if any(ch in actual_text for ch in ("\u201c", "\u201d")) and '"' in styled:
styled = _curly_double_quotes(styled)
if any(ch in actual_text for ch in ("\u2018", "\u2019")) and "'" in styled:
styled = _curly_single_quotes(styled)
return styled
def _leading_ws(line: str) -> str:
return line[: len(line) - len(line.lstrip(" \t"))]
def _reindent_like_match(old_text: str, actual_text: str, new_text: str) -> str:
"""Preserve the outer indentation from the actual matched block."""
old_lines = old_text.split("\n")
actual_lines = actual_text.split("\n")
if len(old_lines) != len(actual_lines):
return new_text
comparable = [
(old_line, actual_line)
for old_line, actual_line in zip(old_lines, actual_lines)
if old_line.strip() and actual_line.strip()
]
if not comparable or any(
_normalize_quotes(old_line.strip()) != _normalize_quotes(actual_line.strip())
for old_line, actual_line in comparable
):
return new_text
old_ws = _leading_ws(comparable[0][0])
actual_ws = _leading_ws(comparable[0][1])
if actual_ws == old_ws:
return new_text
if old_ws:
if not actual_ws.startswith(old_ws):
return new_text
delta = actual_ws[len(old_ws):]
else:
delta = actual_ws
if not delta:
return new_text
return "\n".join((delta + line) if line else line for line in new_text.split("\n"))
@dataclass(slots=True)
class _MatchSpan:
start: int
end: int
text: str
line: int
def _find_exact_matches(content: str, old_text: str) -> list[_MatchSpan]:
matches: list[_MatchSpan] = []
start = 0
while True:
idx = content.find(old_text, start)
if idx == -1:
break
matches.append(
_MatchSpan(
start=idx,
end=idx + len(old_text),
text=content[idx : idx + len(old_text)],
line=content.count("\n", 0, idx) + 1,
)
)
start = idx + max(1, len(old_text))
return matches
def _find_trim_matches(content: str, old_text: str, *, normalize_quotes: bool = False) -> list[_MatchSpan]:
old_lines = old_text.splitlines()
if not old_lines:
return []
content_lines = content.splitlines()
content_lines_keepends = content.splitlines(keepends=True)
if len(content_lines) < len(old_lines):
return []
offsets: list[int] = []
pos = 0
for line in content_lines_keepends:
offsets.append(pos)
pos += len(line)
offsets.append(pos)
if normalize_quotes:
stripped_old = [_normalize_quotes(line.strip()) for line in old_lines]
else:
stripped_old = [line.strip() for line in old_lines]
matches: list[_MatchSpan] = []
window_size = len(stripped_old)
for i in range(len(content_lines) - window_size + 1):
window = content_lines[i : i + window_size]
if normalize_quotes:
comparable = [_normalize_quotes(line.strip()) for line in window]
else:
comparable = [line.strip() for line in window]
if comparable != stripped_old:
continue
start = offsets[i]
end = offsets[i + window_size]
if content_lines_keepends[i + window_size - 1].endswith("\n"):
end -= 1
matches.append(
_MatchSpan(
start=start,
end=end,
text=content[start:end],
line=i + 1,
)
)
return matches
def _find_quote_matches(content: str, old_text: str) -> list[_MatchSpan]:
norm_content = _normalize_quotes(content)
norm_old = _normalize_quotes(old_text)
matches: list[_MatchSpan] = []
start = 0
while True:
idx = norm_content.find(norm_old, start)
if idx == -1:
break
matches.append(
_MatchSpan(
start=idx,
end=idx + len(old_text),
text=content[idx : idx + len(old_text)],
line=content.count("\n", 0, idx) + 1,
)
)
start = idx + max(1, len(norm_old))
return matches
def _find_matches(content: str, old_text: str) -> list[_MatchSpan]:
"""Locate all matches using progressively looser strategies."""
for matcher in (
lambda: _find_exact_matches(content, old_text),
lambda: _find_trim_matches(content, old_text),
lambda: _find_trim_matches(content, old_text, normalize_quotes=True),
lambda: _find_quote_matches(content, old_text),
):
matches = matcher()
if matches:
return matches
return []
def _collapse_internal_whitespace(text: str) -> str:
return "\n".join(" ".join(line.split()) for line in text.splitlines())
def _diagnose_near_match(old_text: str, actual_text: str) -> list[str]:
"""Return actionable hints describing why text was close but not exact."""
hints: list[str] = []
if old_text.lower() == actual_text.lower() and old_text != actual_text:
hints.append("letter case differs")
if _collapse_internal_whitespace(old_text) == _collapse_internal_whitespace(actual_text) and old_text != actual_text:
hints.append("whitespace differs")
if old_text.rstrip("\n") == actual_text.rstrip("\n") and old_text != actual_text:
hints.append("trailing newline differs")
if _normalize_quotes(old_text) == _normalize_quotes(actual_text) and old_text != actual_text:
hints.append("quote style differs")
return hints
def _best_window(old_text: str, content: str) -> tuple[float, int, list[str], list[str]]:
"""Find the closest line-window match and return ratio/start/snippet/hints."""
lines = content.splitlines(keepends=True)
old_lines = old_text.splitlines(keepends=True)
window = max(1, len(old_lines))
best_ratio, best_start = -1.0, 0
best_window_lines: list[str] = []
for i in range(max(1, len(lines) - window + 1)):
current = lines[i : i + window]
ratio = difflib.SequenceMatcher(None, old_lines, current).ratio()
if ratio > best_ratio:
best_ratio, best_start = ratio, i
best_window_lines = current
actual_text = "".join(best_window_lines).replace("\r\n", "\n").rstrip("\n")
hints = _diagnose_near_match(old_text.replace("\r\n", "\n").rstrip("\n"), actual_text)
return best_ratio, best_start, best_window_lines, hints
def _find_match(content: str, old_text: str) -> tuple[str | None, int]:
"""Locate old_text in content with a multi-level fallback chain:
1. Exact substring match
2. Line-trimmed sliding window (handles indentation differences)
3. Smart quote normalization (curly ↔ straight quotes)
Both inputs should use LF line endings (caller normalises CRLF).
Returns (matched_fragment, count) or (None, 0).
"""
matches = _find_matches(content, old_text)
if not matches:
return None, 0
return matches[0].text, len(matches)
@tool_parameters(
tool_parameters_schema(
path=StringSchema("The file path to edit"),
old_text=StringSchema("The text to find and replace"),
new_text=StringSchema("The text to replace with"),
replace_all=BooleanSchema(description="Replace all occurrences (default false)"),
required=["path", "old_text", "new_text"],
)
)
class EditFileTool(_FsTool):
"""Edit a file by replacing text with fallback matching."""
_scopes = {"core", "subagent", "memory"}
_MAX_EDIT_FILE_SIZE = 1024 * 1024 * 1024 # 1 GiB
_MARKDOWN_EXTS = frozenset({".md", ".mdx", ".markdown"})
@property
def name(self) -> str:
return "edit_file"
@property
def description(self) -> str:
return (
"Edit a file by replacing old_text with new_text. "
"Tolerates minor whitespace/indentation differences and curly/straight quote mismatches. "
"If old_text matches multiple times, you must provide more context "
"or set replace_all=true. Shows a diff of the closest match on failure."
)
@staticmethod
def _strip_trailing_ws(text: str) -> str:
"""Strip trailing whitespace from each line."""
return "\n".join(line.rstrip() for line in text.split("\n"))
async def execute(
self, path: str | None = None, old_text: str | None = None,
new_text: str | None = None,
replace_all: bool = False, **kwargs: Any,
) -> str:
try:
if not path:
raise ValueError("Unknown path")
if old_text is None:
raise ValueError("Unknown old_text")
if new_text is None:
raise ValueError("Unknown new_text")
# .ipynb detection
if path.endswith(".ipynb"):
return "Error: This is a Jupyter notebook. Use the notebook_edit tool instead of edit_file."
fp = self._resolve(path)
# Create-file semantics: old_text='' + file doesn't exist → create
if not fp.exists():
if old_text == "":
fp.parent.mkdir(parents=True, exist_ok=True)
fp.write_text(new_text, encoding="utf-8")
self._file_states.record_write(fp)
return f"Successfully created {fp}"
return self._file_not_found_msg(path, fp)
# File size protection
try:
fsize = fp.stat().st_size
except OSError:
fsize = 0
if fsize > self._MAX_EDIT_FILE_SIZE:
return f"Error: File too large to edit ({fsize / (1024**3):.1f} GiB). Maximum is 1 GiB."
# Create-file: old_text='' but file exists and not empty → reject
if old_text == "":
raw = fp.read_bytes()
content = raw.decode("utf-8")
if content.strip():
return f"Error: Cannot create file — {path} already exists and is not empty."
fp.write_text(new_text, encoding="utf-8")
self._file_states.record_write(fp)
return f"Successfully edited {fp}"
# Read-before-edit check
warning = self._file_states.check_read(fp)
raw = fp.read_bytes()
uses_crlf = b"\r\n" in raw
content = raw.decode("utf-8").replace("\r\n", "\n")
norm_old = old_text.replace("\r\n", "\n")
matches = _find_matches(content, norm_old)
if not matches:
return self._not_found_msg(old_text, content, path)
count = len(matches)
if count > 1 and not replace_all:
line_numbers = [match.line for match in matches]
preview = ", ".join(f"line {n}" for n in line_numbers[:3])
if len(line_numbers) > 3:
preview += ", ..."
location_hint = f" at {preview}" if preview else ""
return (
f"Warning: old_text appears {count} times{location_hint}. "
"Provide more context to make it unique, or set replace_all=true."
)
norm_new = new_text.replace("\r\n", "\n")
# Trailing whitespace stripping (skip markdown to preserve double-space line breaks)
if fp.suffix.lower() not in self._MARKDOWN_EXTS:
norm_new = self._strip_trailing_ws(norm_new)
selected = matches if replace_all else matches[:1]
new_content = content
for match in reversed(selected):
replacement = _preserve_quote_style(norm_old, match.text, norm_new)
replacement = _reindent_like_match(norm_old, match.text, replacement)
# Delete-line cleanup: when deleting text (new_text=''), consume trailing
# newline to avoid leaving a blank line
end = match.end
if replacement == "" and not match.text.endswith("\n") and content[end:end + 1] == "\n":
end += 1
new_content = new_content[: match.start] + replacement + new_content[end:]
if uses_crlf:
new_content = new_content.replace("\n", "\r\n")
fp.write_bytes(new_content.encode("utf-8"))
self._file_states.record_write(fp)
msg = f"Successfully edited {fp}"
if warning:
msg = f"{warning}\n{msg}"
return msg
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error editing file: {e}"
def _file_not_found_msg(self, path: str, fp: Path) -> str:
"""Build an error message with 'Did you mean ...?' suggestions."""
parent = fp.parent
suggestions: list[str] = []
if parent.is_dir():
siblings = [f.name for f in parent.iterdir() if f.is_file()]
close = difflib.get_close_matches(fp.name, siblings, n=3, cutoff=0.6)
suggestions = [str(parent / c) for c in close]
parts = [f"Error: File not found: {path}"]
if suggestions:
parts.append("Did you mean: " + ", ".join(suggestions) + "?")
return "\n".join(parts)
@staticmethod
def _not_found_msg(old_text: str, content: str, path: str) -> str:
best_ratio, best_start, best_window_lines, hints = _best_window(old_text, content)
if best_ratio > 0.5:
diff = "\n".join(difflib.unified_diff(
old_text.splitlines(keepends=True),
best_window_lines,
fromfile="old_text (provided)",
tofile=f"{path} (actual, line {best_start + 1})",
lineterm="",
))
hint_text = ""
if hints:
hint_text = "\nPossible cause: " + ", ".join(hints) + "."
return (
f"Error: old_text not found in {path}."
f"{hint_text}\nBest match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}"
)
if hints:
return (
f"Error: old_text not found in {path}. "
f"Possible cause: {', '.join(hints)}. "
"Copy the exact text from read_file and try again."
)
return f"Error: old_text not found in {path}. No similar text found. Verify the file content."
# ---------------------------------------------------------------------------
# list_dir
# ---------------------------------------------------------------------------
@tool_parameters(
tool_parameters_schema(
path=StringSchema("The directory path to list"),
recursive=BooleanSchema(description="Recursively list all files (default false)"),
max_entries=IntegerSchema(
200,
description="Maximum entries to return (default 200)",
minimum=1,
),
required=["path"],
)
)
class ListDirTool(_FsTool):
"""List directory contents with optional recursion."""
_scopes = {"core", "subagent"}
_DEFAULT_MAX = 200
_IGNORE_DIRS = {
".git", "node_modules", "__pycache__", ".venv", "venv",
"dist", "build", ".tox", ".mypy_cache", ".pytest_cache",
".ruff_cache", ".coverage", "htmlcov",
}
@property
def name(self) -> str:
return "list_dir"
@property
def description(self) -> str:
return (
"List the contents of a directory. "
"Set recursive=true to explore nested structure. "
"Common noise directories (.git, node_modules, __pycache__, etc.) are auto-ignored."
)
@property
def read_only(self) -> bool:
return True
async def execute(
self, path: str | None = None, recursive: bool = False,
max_entries: int | None = None, **kwargs: Any,
) -> str:
try:
if path is None:
raise ValueError("Unknown path")
dp = self._resolve(path)
if not dp.exists():
return f"Error: Directory not found: {path}"
if not dp.is_dir():
return f"Error: Not a directory: {path}"
cap = max_entries or self._DEFAULT_MAX
items: list[str] = []
total = 0
if recursive:
for item in sorted(dp.rglob("*")):
if any(p in self._IGNORE_DIRS for p in item.parts):
continue
total += 1
if len(items) < cap:
rel = item.relative_to(dp)
items.append(f"{rel}/" if item.is_dir() else str(rel))
else:
for item in sorted(dp.iterdir()):
if item.name in self._IGNORE_DIRS:
continue
total += 1
if len(items) < cap:
pfx = "📁 " if item.is_dir() else "📄 "
items.append(f"{pfx}{item.name}")
if not items and total == 0:
return f"Directory {path} is empty"
result = "\n".join(items)
if total > cap:
result += f"\n\n(truncated, showing first {cap} of {total} entries)"
return result
except PermissionError as e:
return f"Error: {e}"
except Exception as e:
return f"Error listing directory: {e}"