refactor(apply_patch): remove deprecated patch mode, keep edits-only

Drop the legacy unified-diff patch parameter and all related parsing/
generation logic (_parse_patch, _generate_patch, _apply_hunks, etc.).
The tool now accepts only the structured `edits` array, eliminating the
intermediate diff-string round-trip.

Also update file_edit_events tracking and tests to work exclusively
with edits.

Benchmark (zhipu glm-5.1, edits mode): 15/15 cases passed.
This commit is contained in:
chengyongru 2026-05-22 16:24:31 +08:00 committed by Xubin Ren
parent effc1efd92
commit 3d9f50a0cc
4 changed files with 459 additions and 640 deletions

View File

@ -1,4 +1,4 @@
"""Structured patch editing tool for coding workflows."""
"""Apply file edits by providing structured edit instructions."""
from __future__ import annotations
@ -6,29 +6,17 @@ import difflib
import re
from dataclasses import dataclass
from pathlib import Path
from typing import Any, Literal
from typing import Any
from nanobot.agent.tools.base import tool_parameters
from nanobot.agent.tools.filesystem import _FsTool
from nanobot.agent.tools.schema import BooleanSchema, StringSchema, tool_parameters_schema
PatchKind = Literal["add", "delete", "update"]
@dataclass(slots=True)
class _Hunk:
header: str | None
lines: list[tuple[str, str]]
@dataclass(slots=True)
class _PatchOp:
kind: PatchKind
path: str
new_path: str | None = None
add_lines: list[str] | None = None
hunks: list[_Hunk] | None = None
from nanobot.agent.tools.schema import (
ArraySchema,
BooleanSchema,
ObjectSchema,
StringSchema,
tool_parameters_schema,
)
@dataclass(slots=True)
@ -37,22 +25,13 @@ class _PatchSummary:
path: str
added: int = 0
deleted: int = 0
new_path: str | None = None
class _PatchError(ValueError):
pass
_ABSOLUTE_WINDOWS_RE = re.compile(r"^[A-Za-z]:[\\/]")
def _is_file_header(line: str) -> bool:
return (
line.startswith("*** Add File: ")
or line.startswith("*** Delete File: ")
or line.startswith("*** Update File: ")
)
_ABSOLUTE_WINDOWS_RE = re.compile(r"^[A-Za-z]:[\/]")
def _validate_relative_path(path: str) -> str:
@ -63,7 +42,7 @@ def _validate_relative_path(path: str) -> str:
raise _PatchError(f"patch path contains a null byte: {path!r}")
if normalized.startswith(("~", "/", "\\")) or _ABSOLUTE_WINDOWS_RE.match(normalized):
raise _PatchError(f"patch path must be relative: {path}")
if any(part == ".." for part in re.split(r"[\\/]+", normalized)):
if any(part == ".." for part in re.split(r"[\/]+", normalized)):
raise _PatchError(f"patch path must not contain '..': {path}")
return normalized
@ -97,198 +76,43 @@ def _line_diff_stats(before: str, after: str) -> tuple[int, int]:
def _format_summary(summary: _PatchSummary) -> str:
path = (
f"{summary.path} -> {summary.new_path}"
if summary.new_path
else summary.path
)
stats = ""
if summary.added or summary.deleted:
stats = f" (+{summary.added}/-{summary.deleted})"
return f"- {summary.action} {path}{stats}"
def _parse_patch(patch: str) -> list[_PatchOp]:
lines = patch.replace("\r\n", "\n").replace("\r", "\n").split("\n")
if lines and lines[-1] == "":
lines.pop()
if not lines or lines[0] != "*** Begin Patch":
raise _PatchError("patch must start with '*** Begin Patch'")
if len(lines) < 2 or lines[-1] != "*** End Patch":
raise _PatchError("patch must end with '*** End Patch'")
ops: list[_PatchOp] = []
i = 1
end = len(lines) - 1
while i < end:
line = lines[i]
if line.startswith("*** Add File: "):
path = _validate_relative_path(line.removeprefix("*** Add File: "))
i += 1
add_lines: list[str] = []
while i < end and not _is_file_header(lines[i]):
if not lines[i].startswith("+"):
raise _PatchError(f"Add File lines must start with '+': {lines[i]!r}")
add_lines.append(lines[i][1:])
i += 1
ops.append(_PatchOp(kind="add", path=path, add_lines=add_lines))
continue
if line.startswith("*** Delete File: "):
path = _validate_relative_path(line.removeprefix("*** Delete File: "))
ops.append(_PatchOp(kind="delete", path=path))
i += 1
continue
if line.startswith("*** Update File: "):
path = _validate_relative_path(line.removeprefix("*** Update File: "))
i += 1
new_path: str | None = None
if i < end and lines[i].startswith("*** Move to: "):
new_path = _validate_relative_path(lines[i].removeprefix("*** Move to: "))
i += 1
hunks: list[_Hunk] = []
while i < end and not _is_file_header(lines[i]):
if not lines[i].startswith("@@"):
raise _PatchError(f"Update File sections require '@@' hunks: {lines[i]!r}")
header = lines[i][2:].strip() or None
i += 1
hunk_lines: list[tuple[str, str]] = []
while i < end and not lines[i].startswith("@@") and not _is_file_header(lines[i]):
if lines[i] == "*** End of File":
i += 1
break
if lines[i] == r"\ No newline at end of file":
i += 1
continue
if not lines[i] or lines[i][0] not in {" ", "+", "-"}:
raise _PatchError(f"Hunk lines must start with ' ', '+', or '-': {lines[i]!r}")
hunk_lines.append((lines[i][0], lines[i][1:]))
i += 1
if not hunk_lines:
raise _PatchError(f"Update File hunk is empty: {path}")
hunks.append(_Hunk(header=header, lines=hunk_lines))
if not hunks and new_path is None:
raise _PatchError(f"Update File requires at least one hunk or Move to: {path}")
ops.append(_PatchOp(kind="update", path=path, new_path=new_path, hunks=hunks))
continue
raise _PatchError(f"unknown patch header: {line!r}")
if not ops:
raise _PatchError("patch contains no file operations")
return ops
def _find_with_eof_fallback(content: str, needle: str, start: int) -> tuple[int, int]:
pos = content.find(needle, start)
if pos >= 0:
return pos, len(needle)
if needle.endswith("\n"):
trimmed = needle[:-1]
pos = content.find(trimmed, start)
if pos >= 0 and pos + len(trimmed) == len(content):
return pos, len(trimmed)
return -1, 0
def _line_offset(content: str, line_number: int) -> int:
if line_number <= 1:
return 0
offset = 0
for current, line in enumerate(content.splitlines(keepends=True), start=1):
if current >= line_number:
return offset
offset += len(line)
return len(content)
def _line_hint(header: str | None) -> int | None:
if not header:
return None
match = re.search(r"-(\d+)(?:,\d+)?", header)
return int(match.group(1)) if match else None
def _hunk_mismatch(path: str, old_text: str, content: str, header: str | None) -> str:
lines = content.splitlines(keepends=True)
old_lines = old_text.splitlines(keepends=True)
window = max(1, len(old_lines))
best_ratio, best_start = -1.0, 0
best_lines: list[str] = []
for i in range(max(1, len(lines) - window + 1)):
current = lines[i : i + window]
ratio = difflib.SequenceMatcher(None, "".join(old_lines), "".join(current)).ratio()
if ratio > best_ratio:
best_ratio, best_start, best_lines = ratio, i, current
label = f" after header {header!r}" if header else ""
if best_ratio <= 0:
return f"hunk does not match {path}{label}"
diff = "\n".join(difflib.unified_diff(
old_lines,
best_lines,
fromfile="patch hunk",
tofile=f"{path} (actual, line {best_start + 1})",
lineterm="",
))
return (
f"hunk does not match {path}{label}. "
f"Best match ({best_ratio:.0%} similar) at line {best_start + 1}:\n{diff}"
)
def _apply_hunks(path: str, content: str, hunks: list[_Hunk]) -> str:
cursor = 0
for hunk in hunks:
old_lines = [text for marker, text in hunk.lines if marker in {" ", "-"}]
new_lines = [text for marker, text in hunk.lines if marker in {" ", "+"}]
old_text = _lines_to_text(old_lines)
new_text = _lines_to_text(new_lines)
search_start = cursor
line_hint = None
if hunk.header:
line_hint = _line_hint(hunk.header)
if line_hint is not None:
search_start = _line_offset(content, line_hint)
else:
header_pos = content.find(hunk.header, cursor)
if header_pos >= 0:
search_start = header_pos
if old_text:
pos, match_len = _find_with_eof_fallback(content, old_text, search_start)
if pos < 0 and search_start != 0 and line_hint is None:
pos, match_len = _find_with_eof_fallback(content, old_text, 0)
if pos < 0:
raise _PatchError(_hunk_mismatch(path, old_text, content, hunk.header))
else:
pos = search_start
match_len = 0
content = content[:pos] + new_text + content[pos + match_len:]
cursor = pos + len(new_text)
return content
return f"- {summary.action} {summary.path}{stats}"
@tool_parameters(
tool_parameters_schema(
patch=StringSchema(
"Full patch text. Use *** Begin Patch / *** End Patch and file sections "
"for Add File, Update File, Delete File, and optional Move to.",
min_length=1,
edits=ArraySchema(
items=ObjectSchema(
path=StringSchema("Relative path to the file to edit."),
action=StringSchema(
"Operation type: replace (find and replace text), add (append new content or create file), delete (remove text).",
enum=["replace", "add", "delete"],
),
old_text=StringSchema(
"Exact text to search for in the file. Required for replace and delete.",
nullable=True,
),
new_text=StringSchema(
"Text to replace with or append. Required for replace and add.",
nullable=True,
),
required=["path", "action"],
),
description="List of edits to apply. Each edit specifies a file and the change to make.",
min_items=1,
max_items=20,
),
dry_run=BooleanSchema(
description="Validate and summarize the patch without writing files.",
default=False,
),
required=["patch"],
)
)
class ApplyPatchTool(_FsTool):
"""Apply a structured multi-file patch."""
"""Apply file edits by providing structured edit instructions."""
_scopes = {"core", "subagent"}
@property
@ -298,102 +122,190 @@ class ApplyPatchTool(_FsTool):
@property
def description(self) -> str:
return (
"Default tool for code edits. Apply a structured patch with "
"*** Begin Patch and *** End Patch. Supports Add File, Update File, "
"Delete File, and Move to across one or more files. Use this for "
"multi-file changes, structural edits, generated code, or any edit "
"where a reviewable patch is clearer than an exact replacement. "
"Paths must be relative. Set dry_run=true to validate and preview "
"the change summary without writing files. Use edit_file only for "
"small exact replacements copied from read_file."
"Default tool for code edits. Supports multi-file changes in a single call. "
"Provide a list of structured edits, each specifying a file path, action (replace/add/delete), and the text to change. "
"Paths must be relative. Set dry_run=true to validate and preview without writing files. "
"Use edit_file only for small exact replacements on a single file."
)
async def execute(self, patch: str, dry_run: bool = False, **kwargs: Any) -> str:
async def execute(
self,
edits: list[dict] | None = None,
dry_run: bool = False,
**kwargs: Any,
) -> str:
try:
ops = _parse_patch(patch)
if not edits:
raise _PatchError("must provide edits")
writes: dict[Path, str] = {}
deletes: set[Path] = set()
summaries: list[_PatchSummary] = []
for op in ops:
source = self._resolve(op.path)
if op.kind == "add":
if source.exists() or source in writes:
raise _PatchError(f"file to add already exists: {op.path}")
new_content = _lines_to_text(op.add_lines or [])
writes[source] = new_content
deletes.discard(source)
summaries.append(_PatchSummary(
action="add",
path=op.path,
added=_text_line_count(new_content),
))
continue
for edit in edits:
path = _validate_relative_path(edit["path"])
action = edit["action"]
source = self._resolve(path)
if op.kind == "delete":
pending_content = writes.get(source)
if pending_content is None and not source.exists():
raise _PatchError(f"file to delete does not exist: {op.path}")
if pending_content is None and not source.is_file():
raise _PatchError(f"path to delete is not a file: {op.path}")
deleted_lines = 0
if pending_content is not None:
deleted_lines = _text_line_count(pending_content)
else:
raw = source.read_bytes()
try:
deleted_lines = _text_line_count(raw.decode("utf-8"))
except UnicodeDecodeError:
deleted_lines = 0
deletes.add(source)
writes.pop(source, None)
summaries.append(_PatchSummary(
action="delete",
path=op.path,
deleted=deleted_lines,
))
continue
if action == "add":
new_text = edit.get("new_text")
if new_text is None:
raise _PatchError(f"new_text required for add: {path}")
pending_content = writes.get(source)
if pending_content is None and not source.exists():
raise _PatchError(f"file to update does not exist: {op.path}")
if pending_content is None and not source.is_file():
raise _PatchError(f"path to update is not a file: {op.path}")
if pending_content is not None:
content = pending_content
else:
pending = writes.get(source)
if pending is not None:
content = pending
exists = True
elif source.exists():
raw = source.read_bytes()
try:
content = raw.decode("utf-8")
except UnicodeDecodeError as exc:
raise _PatchError(f"file to update is not UTF-8 text: {op.path}") from exc
uses_crlf = "\r\n" in content
content = content.replace("\r\n", "\n")
new_content = _apply_hunks(op.path, content, op.hunks or [])
added, deleted = _line_diff_stats(content, new_content)
if uses_crlf:
new_content = new_content.replace("\n", "\r\n")
except UnicodeDecodeError:
raise _PatchError(f"file is not UTF-8 text: {path}")
exists = True
else:
content = ""
exists = False
target = self._resolve(op.new_path) if op.new_path else source
if op.new_path and (target.exists() or target in writes) and target != source:
raise _PatchError(f"move target already exists: {op.new_path}")
writes[target] = new_content
deletes.discard(target)
if target != source:
if exists:
uses_crlf = "\r\n" in content
new_norm = content.replace("\r\n", "\n") + new_text.replace("\r\n", "\n")
if new_norm and not new_norm.endswith("\n"):
new_norm += "\n"
if uses_crlf:
new_norm = new_norm.replace("\n", "\r\n")
writes[source] = new_norm
deletes.discard(source)
added, deleted = _line_diff_stats(content, new_norm)
action_name = "update"
else:
new_norm = _lines_to_text(new_text.splitlines())
writes[source] = new_norm
deletes.discard(source)
added = _text_line_count(new_norm)
deleted = 0
action_name = "add"
summaries.append(
_PatchSummary(
action=action_name, path=path, added=added, deleted=deleted
)
)
elif action == "replace":
old_text = edit.get("old_text") or ""
if not old_text:
raise _PatchError(f"old_text required for replace: {path}")
new_text = edit.get("new_text")
if new_text is None:
raise _PatchError(f"new_text required for replace: {path}")
pending = writes.get(source)
if pending is not None:
content = pending
elif source.exists():
raw = source.read_bytes()
try:
content = raw.decode("utf-8")
except UnicodeDecodeError:
raise _PatchError(f"file is not UTF-8 text: {path}")
else:
raise _PatchError(f"file to update does not exist: {path}")
if pending is None and not source.is_file():
raise _PatchError(f"path to update is not a file: {path}")
uses_crlf = "\r\n" in content
norm_content = content.replace("\r\n", "\n")
norm_old = old_text.replace("\r\n", "\n")
pos = norm_content.find(norm_old)
if pos < 0:
raise _PatchError(f"old_text not found in {path}")
if norm_content.find(norm_old, pos + 1) >= 0:
raise _PatchError(f"old_text appears multiple times in {path}")
new_norm = (
norm_content[:pos]
+ new_text.replace("\r\n", "\n")
+ norm_content[pos + len(norm_old) :]
)
if new_norm and not new_norm.endswith("\n"):
new_norm += "\n"
if uses_crlf:
new_norm = new_norm.replace("\n", "\r\n")
writes[source] = new_norm
deletes.discard(source)
added, deleted = _line_diff_stats(content, new_norm)
summaries.append(
_PatchSummary(
action="update", path=path, added=added, deleted=deleted
)
)
elif action == "delete":
old_text = edit.get("old_text") or ""
if not old_text:
raise _PatchError(f"old_text required for delete: {path}")
pending = writes.get(source)
if pending is not None:
content = pending
elif source.exists():
raw = source.read_bytes()
try:
content = raw.decode("utf-8")
except UnicodeDecodeError:
raise _PatchError(f"file is not UTF-8 text: {path}")
else:
raise _PatchError(f"file to update does not exist: {path}")
if pending is None and not source.is_file():
raise _PatchError(f"path to update is not a file: {path}")
uses_crlf = "\r\n" in content
norm_content = content.replace("\r\n", "\n")
norm_old = old_text.replace("\r\n", "\n")
pos = norm_content.find(norm_old)
if pos < 0:
raise _PatchError(f"old_text not found in {path}")
if norm_content.find(norm_old, pos + 1) >= 0:
raise _PatchError(f"old_text appears multiple times in {path}")
if norm_old.strip() == norm_content.strip():
deletes.add(source)
writes.pop(source, None)
summaries.append(_PatchSummary(
action="move" if op.new_path else "update",
path=op.path,
new_path=op.new_path,
added=added,
deleted=deleted,
))
added, deleted = 0, _text_line_count(content)
summaries.append(
_PatchSummary(
action="delete", path=path, added=added, deleted=deleted
)
)
else:
new_norm = (
norm_content[:pos] + norm_content[pos + len(norm_old) :]
)
if new_norm and not new_norm.endswith("\n"):
new_norm += "\n"
if uses_crlf:
new_norm = new_norm.replace("\n", "\r\n")
writes[source] = new_norm
deletes.discard(source)
added, deleted = _line_diff_stats(content, new_norm)
summaries.append(
_PatchSummary(
action="update", path=path, added=added, deleted=deleted
)
)
else:
raise _PatchError(f"unknown action: {action}")
if dry_run:
return (
"Patch dry-run succeeded:\n"
+ "\n".join(_format_summary(summary) for summary in summaries)
return "Patch dry-run succeeded:\n" + "\n".join(
_format_summary(summary) for summary in summaries
)
backups: dict[Path, bytes | None] = {}
@ -419,9 +331,8 @@ class ApplyPatchTool(_FsTool):
for path in set(writes) | deletes:
self._file_states.record_write(path)
return (
"Patch applied:\n"
+ "\n".join(_format_summary(summary) for summary in summaries)
return "Patch applied:\n" + "\n".join(
_format_summary(summary) for summary in summaries
)
except PermissionError as exc:
return f"Error: {exc}"

View File

@ -215,25 +215,23 @@ def _resolve_apply_patch_paths(
) -> list[Path]:
if not isinstance(params, dict):
return []
patch = params.get("patch")
if not isinstance(patch, str) or not patch.strip():
edits = params.get("edits")
if not isinstance(edits, list) or not edits:
return []
if params.get("dry_run") is True:
return []
try:
from nanobot.agent.tools.apply_patch import _parse_patch
ops = _parse_patch(patch)
except Exception:
return []
resolved: list[Path] = []
for op in ops:
for raw_path in (op.path, op.new_path):
if not raw_path:
seen: set[Path] = set()
for edit in edits:
if not isinstance(edit, dict):
continue
raw_path = edit.get("path")
if not isinstance(raw_path, str) or not raw_path.strip():
continue
path = _resolve_raw_file_edit_path(tool, workspace, raw_path)
if path is not None:
if path is not None and path not in seen:
seen.add(path)
resolved.append(path)
return resolved
@ -438,16 +436,34 @@ class StreamingFileEditTracker:
async def _update_apply_patch(self, state: _StreamingFileEditState) -> None:
if _json_bool_true(state.arguments, "dry_run"):
return
patch = _extract_json_string_prefix(state.arguments, "patch")
if not patch:
return
tool = self._tools.get("apply_patch") if hasattr(self._tools, "get") else None
events: list[dict[str, Any]] = []
now = time.monotonic()
for raw_path, added, deleted, delete_file in _streaming_apply_patch_stats(patch):
path_matches = list(re.finditer(r'"path"\s*:\s*"([^"]+)"', state.arguments))
if not path_matches:
return
for i, m in enumerate(path_matches):
raw_path = m.group(1)
path = _resolve_raw_file_edit_path(tool, self._workspace, raw_path)
if path is None:
continue
segment_start = m.start()
segment_end = path_matches[i + 1].start() if i + 1 < len(path_matches) else len(state.arguments)
segment = state.arguments[segment_start:segment_end]
action_match = re.search(r'"action"\s*:\s*"(replace|add|delete)"', segment)
action = action_match.group(1) if action_match else "replace"
old_text = _extract_json_string_prefix(segment, "old_text") or ""
new_text = _extract_json_string_prefix(segment, "new_text") or ""
added = _text_line_count(new_text) if action in ("replace", "add") else 0
deleted = _text_line_count(old_text) if action in ("replace", "delete") else 0
delete_file = action == "delete"
file_state = state.patch_files.get(raw_path)
if file_state is None:
tracker = FileEditTracker(
@ -779,9 +795,10 @@ class _StreamingFileEditState:
arguments = getattr(tool_call, "arguments", None)
if not isinstance(arguments, dict):
return False
patch = arguments.get("patch")
streamed_patch = _extract_complete_json_string(self.arguments, "patch")
return isinstance(patch, str) and streamed_patch == patch
edits = arguments.get("edits")
if not isinstance(edits, list):
return False
return '"edits"' in self.arguments
arguments = getattr(tool_call, "arguments", None)
if not isinstance(arguments, dict):
return False
@ -849,65 +866,6 @@ def _extract_json_string_prefix(source: str, key: str) -> str | None:
return "".join(out)
def _streaming_apply_patch_stats(patch: str) -> list[tuple[str, int, int, bool]]:
stats: dict[str, list[Any]] = {}
order: list[str] = []
current: str | None = None
def ensure(path: str, *, delete_file: bool = False) -> list[Any]:
if path not in stats:
stats[path] = [0, 0, False]
order.append(path)
if delete_file:
stats[path][2] = True
return stats[path]
lines = patch.splitlines()
tail = ""
if patch and not patch.endswith(("\n", "\r")) and lines:
tail = lines.pop()
for line in lines:
if line.startswith("*** Add File: "):
current = line[len("*** Add File: "):].strip()
if current:
ensure(current)
continue
if line.startswith("*** Update File: "):
current = line[len("*** Update File: "):].strip()
if current:
ensure(current)
continue
if line.startswith("*** Delete File: "):
current = line[len("*** Delete File: "):].strip()
if current:
ensure(current, delete_file=True)
continue
if line.startswith("*** Move to: "):
moved = line[len("*** Move to: "):].strip()
if moved:
current = moved
ensure(current)
continue
if line.startswith("*** "):
current = None
continue
if not current:
continue
if line.startswith("+") and not line.startswith("+++"):
ensure(current)[0] += 1
elif line.startswith("-") and not line.startswith("---"):
ensure(current)[1] += 1
if current and tail:
if tail.startswith("+") and not tail.startswith("+++"):
ensure(current)[0] += 1
elif tail.startswith("-") and not tail.startswith("---"):
ensure(current)[1] += 1
return [(path, int(stats[path][0]), int(stats[path][1]), bool(stats[path][2])) for path in order]
def _extract_complete_json_string(source: str, key: str) -> str | None:
match = re.search(rf'"{re.escape(key)}"\s*:\s*"', source)
if match is None:

View File

@ -5,283 +5,249 @@ import asyncio
from nanobot.agent.tools.apply_patch import ApplyPatchTool
def test_apply_patch_adds_file(tmp_path):
def test_apply_patch_edits_replace(tmp_path):
target = tmp_path / "calc.py"
target.write_text("def add(a, b):\n return a + b\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Add File: hello.txt
+Hello
+world
*** End Patch
"""
))
result = asyncio.run(
tool.execute(
edits=[
{
"path": "calc.py",
"action": "replace",
"old_text": " return a + b",
"new_text": " return a - b",
}
]
)
)
assert "Patch applied" in result
assert (tmp_path / "hello.txt").read_text() == "Hello\nworld\n"
assert "update calc.py" in result
assert target.read_text() == "def add(a, b):\n return a - b\n"
def test_apply_patch_updates_multiple_hunks(tmp_path):
target = tmp_path / "multi.txt"
target.write_text("line1\nline2\nline3\nline4\n")
def test_apply_patch_edits_add_new_file(tmp_path):
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: multi.txt
@@
-line2
+changed2
@@
-line4
+changed4
*** End Patch
"""
))
result = asyncio.run(
tool.execute(
edits=[
{
"path": "config.py",
"action": "add",
"new_text": "DEBUG = True",
}
]
)
)
assert "update multi.txt" in result
assert "(+2/-2)" in result
assert target.read_text() == "line1\nchanged2\nline3\nchanged4\n"
assert "add config.py" in result
assert (tmp_path / "config.py").read_text() == "DEBUG = True\n"
def test_apply_patch_dry_run_validates_without_writing(tmp_path):
target = tmp_path / "dry.txt"
target.write_text("before\n")
def test_apply_patch_edits_add_to_existing_file(tmp_path):
target = tmp_path / "log.py"
target.write_text("import logging\n\nlogger = logging.getLogger(__name__)\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: dry.txt
@@
-before
+after
*** Add File: added.txt
+new
*** End Patch
""",
dry_run=True,
))
result = asyncio.run(
tool.execute(
edits=[
{
"path": "log.py",
"action": "add",
"new_text": "def debug(msg):\n logger.debug(msg)",
}
]
)
)
assert "Patch dry-run succeeded" in result
assert "- update dry.txt (+1/-1)" in result
assert "- add added.txt (+1/-0)" in result
assert target.read_text() == "before\n"
assert not (tmp_path / "added.txt").exists()
assert "update log.py" in result
assert (
target.read_text()
== "import logging\n\nlogger = logging.getLogger(__name__)\ndef debug(msg):\n logger.debug(msg)\n"
)
def test_apply_patch_applies_repeated_update_sections_sequentially(tmp_path):
target = tmp_path / "repeat.txt"
target.write_text("one\ntwo\nthree\n")
def test_apply_patch_edits_delete(tmp_path):
target = tmp_path / "utils.py"
target.write_text("def unused():\n pass\ndef used():\n return 1\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: repeat.txt
@@
-one
+ONE
*** Update File: repeat.txt
@@
-three
+THREE
*** End Patch
"""
))
result = asyncio.run(
tool.execute(
edits=[
{
"path": "utils.py",
"action": "delete",
"old_text": "def unused():\n pass\n",
}
]
)
)
assert result.count("update repeat.txt") == 2
assert target.read_text() == "ONE\ntwo\nTHREE\n"
assert "update utils.py" in result
assert target.read_text() == "def used():\n return 1\n"
def test_apply_patch_ignores_standard_no_newline_marker(tmp_path):
target = tmp_path / "plain.txt"
target.write_text("before")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: plain.txt
@@ -1,1 +1,1 @@
-before
\\ No newline at end of file
+after
\\ No newline at end of file
*** End Patch
"""
))
assert "update plain.txt" in result
assert target.read_text() == "after\n"
def test_apply_patch_rejects_empty_hunk(tmp_path):
target = tmp_path / "plain.txt"
target.write_text("before\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: plain.txt
@@
*** End Patch
"""
))
assert "hunk is empty" in result
assert target.read_text() == "before\n"
def test_apply_patch_uses_unified_diff_line_hint(tmp_path):
target = tmp_path / "repeated.txt"
target.write_text("target\nmiddle\ntarget\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: repeated.txt
@@ -3,1 +3,1 @@
-target
+changed
*** End Patch
"""
))
assert "update repeated.txt" in result
assert target.read_text() == "target\nmiddle\nchanged\n"
def test_apply_patch_line_hint_does_not_fallback_to_earlier_match(tmp_path):
target = tmp_path / "repeated.txt"
target.write_text("target\nmiddle\nother\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: repeated.txt
@@ -3,1 +3,1 @@
-target
+changed
*** End Patch
"""
))
assert "hunk does not match repeated.txt" in result
assert target.read_text() == "target\nmiddle\nother\n"
def test_apply_patch_mismatch_reports_best_match(tmp_path):
target = tmp_path / "near.txt"
target.write_text("alpha\nbeta\ngamma\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: near.txt
@@ -2,1 +2,1 @@
-betx
+delta
*** End Patch
"""
))
assert "hunk does not match near.txt" in result
assert "Best match" in result
assert "line 2" in result
assert target.read_text() == "alpha\nbeta\ngamma\n"
def test_apply_patch_moves_and_updates_file(tmp_path):
source = tmp_path / "old" / "name.txt"
source.parent.mkdir()
source.write_text("old content\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: old/name.txt
*** Move to: renamed/dir/name.txt
@@
-old content
+new content
*** End Patch
"""
))
assert "move old/name.txt -> renamed/dir/name.txt" in result
assert not source.exists()
assert (tmp_path / "renamed" / "dir" / "name.txt").read_text() == "new content\n"
def test_apply_patch_deletes_file(tmp_path):
def test_apply_patch_edits_delete_entire_file(tmp_path):
target = tmp_path / "obsolete.txt"
target.write_text("remove me\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Delete File: obsolete.txt
*** End Patch
"""
))
result = asyncio.run(
tool.execute(
edits=[
{
"path": "obsolete.txt",
"action": "delete",
"old_text": "remove me\n",
}
]
)
)
assert "delete obsolete.txt" in result
assert not target.exists()
def test_apply_patch_rejects_absolute_and_parent_paths(tmp_path):
def test_apply_patch_edits_batch_multiple_files(tmp_path):
a = tmp_path / "a.py"
a.write_text("X = 1\n")
b = tmp_path / "b.py"
b.write_text("from a import X\nprint(X)\n")
tool = ApplyPatchTool(workspace=tmp_path)
absolute = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Add File: /tmp/owned.txt
+nope
*** End Patch
"""
))
parent = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Add File: ../owned.txt
+nope
*** End Patch
"""
))
result = asyncio.run(
tool.execute(
edits=[
{
"path": "a.py",
"action": "replace",
"old_text": "X = 1",
"new_text": "Y = 1",
},
{
"path": "b.py",
"action": "replace",
"old_text": "from a import X",
"new_text": "from a import Y",
},
]
)
)
assert "update a.py" in result
assert "update b.py" in result
assert a.read_text() == "Y = 1\n"
assert b.read_text() == "from a import Y\nprint(X)\n"
def test_apply_patch_edits_rejects_ambiguous_old_text(tmp_path):
target = tmp_path / "repeated.txt"
target.write_text("target\nmiddle\ntarget\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(
tool.execute(
edits=[
{
"path": "repeated.txt",
"action": "replace",
"old_text": "target",
"new_text": "changed",
}
]
)
)
assert "old_text appears multiple times" in result
assert target.read_text() == "target\nmiddle\ntarget\n"
def test_apply_patch_edits_dry_run_validates_without_writing(tmp_path):
target = tmp_path / "dry.txt"
target.write_text("before\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(
tool.execute(
edits=[
{
"path": "dry.txt",
"action": "replace",
"old_text": "before",
"new_text": "after",
},
{
"path": "added.txt",
"action": "add",
"new_text": "new",
},
],
dry_run=True,
)
)
assert "Patch dry-run succeeded" in result
assert target.read_text() == "before\n"
assert not (tmp_path / "added.txt").exists()
def test_apply_patch_edits_rejects_absolute_and_parent_paths(tmp_path):
tool = ApplyPatchTool(workspace=tmp_path)
absolute = asyncio.run(
tool.execute(
edits=[
{
"path": "/tmp/owned.txt",
"action": "add",
"new_text": "nope",
}
]
)
)
parent = asyncio.run(
tool.execute(
edits=[
{
"path": "../owned.txt",
"action": "add",
"new_text": "nope",
}
]
)
)
assert "must be relative" in absolute
assert "must not contain '..'" in parent
assert not (tmp_path.parent / "owned.txt").exists()
def test_apply_patch_does_not_overwrite_existing_file_with_add(tmp_path):
target = tmp_path / "existing.txt"
target.write_text("keep me\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Add File: existing.txt
+replace me
*** End Patch
"""
))
assert "file to add already exists" in result
assert target.read_text() == "keep me\n"
def test_apply_patch_rolls_back_when_late_operation_fails(tmp_path):
def test_apply_patch_edits_rolls_back_when_late_operation_fails(tmp_path):
first = tmp_path / "first.txt"
first.write_text("before\n")
tool = ApplyPatchTool(workspace=tmp_path)
result = asyncio.run(tool.execute(
patch="""*** Begin Patch
*** Update File: first.txt
@@
-before
+after
*** Delete File: missing.txt
*** End Patch
"""
))
result = asyncio.run(
tool.execute(
edits=[
{
"path": "first.txt",
"action": "replace",
"old_text": "before",
"new_text": "after",
},
{
"path": "missing.txt",
"action": "delete",
"old_text": "remove me",
},
]
)
)
assert "file to delete does not exist" in result
assert "file to update does not exist: missing.txt" in result
assert first.read_text() == "before\n"

View File

@ -89,23 +89,18 @@ def test_apply_patch_prepares_trackers_for_each_touched_file(tmp_path: Path) ->
delete_me = tmp_path / "src" / "delete_me.py"
delete_me.write_text("gone\n", encoding="utf-8")
patch = """*** Begin Patch
*** Add File: src/new.py
+fresh
*** Update File: src/existing.py
@@
-old
+new
keep
*** Delete File: src/delete_me.py
*** End Patch"""
edits = [
{"path": "src/new.py", "action": "add", "new_text": "fresh"},
{"path": "src/existing.py", "action": "replace", "old_text": "old", "new_text": "new"},
{"path": "src/delete_me.py", "action": "delete", "old_text": "gone\n"},
]
trackers = prepare_file_edit_trackers(
call_id="call-patch",
tool_name="apply_patch",
tool=None,
workspace=tmp_path,
params={"patch": patch},
params={"edits": edits},
)
assert [tracker.display_path for tracker in trackers] == [
@ -118,7 +113,7 @@ def test_apply_patch_prepares_trackers_for_each_touched_file(tmp_path: Path) ->
existing.write_text("new\nkeep\n", encoding="utf-8")
delete_me.unlink()
events = [build_file_edit_end_event(tracker, {"patch": patch}) for tracker in trackers]
events = [build_file_edit_end_event(tracker, {"edits": edits}) for tracker in trackers]
by_path = {event["path"]: event for event in events}
assert (by_path["src/new.py"]["added"], by_path["src/new.py"]["deleted"]) == (1, 0)
assert (by_path["src/existing.py"]["added"], by_path["src/existing.py"]["deleted"]) == (1, 1)
@ -135,12 +130,9 @@ def test_apply_patch_dry_run_does_not_prepare_file_edit_trackers(tmp_path: Path)
workspace=tmp_path,
params={
"dry_run": True,
"patch": """*** Begin Patch
*** Update File: file.txt
@@
-old
+new
*** End Patch""",
"edits": [
{"path": "file.txt", "action": "replace", "old_text": "old", "new_text": "new"}
],
},
)
@ -221,14 +213,8 @@ def test_streaming_apply_patch_tracker_emits_live_counts_per_file(tmp_path: Path
"call_id": "call-patch",
"name": "apply_patch",
"arguments_delta": (
'{"patch":"*** Begin Patch\\n'
'*** Update File: src/existing.py\\n'
'@@\\n'
'-old\\n'
'+new\\n'
' keep\\n'
'*** Add File: src/new.py\\n'
'+fresh\\n'
'{"edits":[{"path":"src/existing.py","action":"replace","old_text":"old","new_text":"new"}'
',{"path":"src/new.py","action":"add","new_text":"fresh"}]}'
),
})
@ -255,9 +241,7 @@ def test_streaming_apply_patch_tracker_skips_dry_run(tmp_path: Path) -> None:
"call_id": "call-patch",
"name": "apply_patch",
"arguments_delta": (
'{"dry_run":true,"patch":"*** Begin Patch\\n'
'*** Add File: dry.md\\n'
'+preview\\n'
'{"dry_run":true,"edits":[{"path":"dry.md","action":"add","new_text":"preview"}]}'
),
})