From 8ca99600775e0150f1d6004feafe6a0587329add Mon Sep 17 00:00:00 2001 From: chengyongru Date: Tue, 7 Apr 2026 11:32:28 +0800 Subject: [PATCH] feat(agent): rewrite _tool_hint with registry, path abbreviation, and call folding --- nanobot/agent/loop.py | 101 ++++++++++++++++++++++- tests/agent/test_tool_hint.py | 149 ++++++++++++++++++++++++++++++++++ 2 files changed, 246 insertions(+), 4 deletions(-) create mode 100644 tests/agent/test_tool_hint.py diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 9e5ca0bc9..e94999a57 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -325,14 +325,107 @@ class AgentLoop: @staticmethod def _tool_hint(tool_calls: list) -> str: - """Format tool calls as concise hint, e.g. 'web_search("query")'.""" - def _fmt(tc): + """Format tool calls as concise hints with smart abbreviation.""" + if not tool_calls: + return "" + + from nanobot.utils.path import abbreviate_path + + def _group_consecutive(calls): + """Group consecutive calls to the same tool: [(name, count, first), ...].""" + groups = [] + for tc in calls: + if groups and groups[-1][0] == tc.name: + groups[-1] = (groups[-1][0], groups[-1][1] + 1, groups[-1][2]) + else: + groups.append((tc.name, 1, tc)) + return groups + + def _extract_arg(tc, key_args): + """Extract the first available value from preferred key names.""" + args = (tc.arguments[0] if isinstance(tc.arguments, list) else tc.arguments) or {} + if not isinstance(args, dict): + return None + for key in key_args: + val = args.get(key) + if isinstance(val, str) and val: + return val + # Fallback: first string value + for val in args.values(): + if isinstance(val, str) and val: + return val + return None + + def _fmt_known(tc, fmt): + """Format a registered tool using its template.""" + val = _extract_arg(tc, fmt[0]) + if val is None: + return tc.name + if fmt[2]: # is_path + val = abbreviate_path(val) + elif fmt[3]: # is_command + val = val[:40] + "\u2026" if len(val) > 40 else val + template = fmt[1] + if '"{}"' in template: + return template.format(val) + return template.format(val) + + def _fmt_mcp(tc): + """Format MCP tool as server::tool.""" + name = tc.name + # mcp___ or mcp__ + if "__" in name: + parts = name.split("__", 1) + server = parts[0].removeprefix("mcp_") + tool = parts[1] + else: + rest = name.removeprefix("mcp_") + parts = rest.split("_", 1) + server = parts[0] if parts else rest + tool = parts[1] if len(parts) > 1 else "" + if not tool: + return name + args = (tc.arguments[0] if isinstance(tc.arguments, list) else tc.arguments) or {} + val = next((v for v in args.values() if isinstance(v, str) and v), None) + if val is None: + return f"{server}::{tool}" + return f'{server}::{tool}("{abbreviate_path(val, 40)}")' + + def _fmt_fallback(tc): + """Original formatting logic for unregistered tools.""" args = (tc.arguments[0] if isinstance(tc.arguments, list) else tc.arguments) or {} val = next(iter(args.values()), None) if isinstance(args, dict) else None if not isinstance(val, str): return tc.name - return f'{tc.name}("{val[:40]}…")' if len(val) > 40 else f'{tc.name}("{val}")' - return ", ".join(_fmt(tc) for tc in tool_calls) + return f'{tc.name}("{abbreviate_path(val, 40)}")' if len(val) > 40 else f'{tc.name}("{val}")' + + # Registry: tool_name -> (key_args, template, is_path, is_command) + FORMATS: dict[str, tuple[list[str], str, bool, bool]] = { + "read_file": (["path", "file_path"], "read {}", True, False), + "write_file": (["path", "file_path"], "write {}", True, False), + "edit": (["file_path", "path"], "edit {}", True, False), + "glob": (["pattern"], 'glob "{}"', False, False), + "grep": (["pattern"], 'grep "{}"', False, False), + "exec": (["command"], "$ {}", False, True), + "web_search": (["query"], 'search "{}"', False, False), + "web_fetch": (["url"], "fetch {}", True, False), + } + + hints = [] + for name, count, example_tc in _group_consecutive(tool_calls): + fmt = FORMATS.get(name) + if fmt: + hint = _fmt_known(example_tc, fmt) + elif name.startswith("mcp_"): + hint = _fmt_mcp(example_tc) + else: + hint = _fmt_fallback(example_tc) + + if count > 1: + hint = f"{hint} \u00d7 {count}" + hints.append(hint) + + return ", ".join(hints) async def _run_agent_loop( self, diff --git a/tests/agent/test_tool_hint.py b/tests/agent/test_tool_hint.py new file mode 100644 index 000000000..f0c324083 --- /dev/null +++ b/tests/agent/test_tool_hint.py @@ -0,0 +1,149 @@ +"""Tests for AgentLoop._tool_hint() formatting.""" + +from nanobot.agent.loop import AgentLoop +from nanobot.providers.base import ToolCallRequest + + +def _tc(name: str, args: dict) -> ToolCallRequest: + return ToolCallRequest(id="c1", name=name, arguments=args) + + +class TestToolHintKnownTools: + """Test registered tool types produce correct formatted output.""" + + def test_read_file_short_path(self): + result = AgentLoop._tool_hint([_tc("read_file", {"path": "foo.txt"})]) + assert result == 'read foo.txt' + + def test_read_file_long_path(self): + result = AgentLoop._tool_hint([_tc("read_file", {"path": "/home/user/.local/share/uv/tools/nanobot/agent/loop.py"})]) + assert "loop.py" in result + assert "read " in result + + def test_write_file_shows_path_not_content(self): + result = AgentLoop._tool_hint([_tc("write_file", {"path": "docs/api.md", "content": "# API Reference\n\nLong content..."})]) + assert result == "write docs/api.md" + + def test_edit_shows_path(self): + result = AgentLoop._tool_hint([_tc("edit", {"file_path": "src/main.py", "old_string": "x", "new_string": "y"})]) + assert "main.py" in result + assert "edit " in result + + def test_glob_shows_pattern(self): + result = AgentLoop._tool_hint([_tc("glob", {"pattern": "**/*.py", "path": "src"})]) + assert result == 'glob "**/*.py"' + + def test_grep_shows_pattern(self): + result = AgentLoop._tool_hint([_tc("grep", {"pattern": "TODO|FIXME", "path": "src"})]) + assert result == 'grep "TODO|FIXME"' + + def test_exec_shows_command(self): + result = AgentLoop._tool_hint([_tc("exec", {"command": "npm install typescript"})]) + assert result == "$ npm install typescript" + + def test_exec_truncates_long_command(self): + cmd = "cd /very/long/path && cat file && echo done && sleep 1 && ls -la" + result = AgentLoop._tool_hint([_tc("exec", {"command": cmd})]) + assert result.startswith("$ ") + assert len(result) <= 50 # reasonable limit + + def test_web_search(self): + result = AgentLoop._tool_hint([_tc("web_search", {"query": "Claude 4 vs GPT-4"})]) + assert result == 'search "Claude 4 vs GPT-4"' + + def test_web_fetch(self): + result = AgentLoop._tool_hint([_tc("web_fetch", {"url": "https://example.com/page"})]) + assert result == "fetch https://example.com/page" + + +class TestToolHintMCP: + """Test MCP tools are abbreviated to server::tool format.""" + + def test_mcp_standard_format(self): + result = AgentLoop._tool_hint([_tc("mcp_4_5v_mcp__analyze_image", {"imageSource": "https://img.jpg", "prompt": "describe"})]) + assert "4_5v" in result + assert "analyze_image" in result + + def test_mcp_simple_name(self): + result = AgentLoop._tool_hint([_tc("mcp_github__create_issue", {"title": "Bug fix"})]) + assert "github" in result + assert "create_issue" in result + + +class TestToolHintFallback: + """Test unknown tools fall back to original behavior.""" + + def test_unknown_tool_with_string_arg(self): + result = AgentLoop._tool_hint([_tc("custom_tool", {"data": "hello world"})]) + assert result == 'custom_tool("hello world")' + + def test_unknown_tool_with_long_arg_truncates(self): + long_val = "a" * 60 + result = AgentLoop._tool_hint([_tc("custom_tool", {"data": long_val})]) + assert len(result) < 80 + assert "\u2026" in result + + def test_unknown_tool_no_string_arg(self): + result = AgentLoop._tool_hint([_tc("custom_tool", {"count": 42})]) + assert result == "custom_tool" + + def test_empty_tool_calls(self): + result = AgentLoop._tool_hint([]) + assert result == "" + + +class TestToolHintFolding: + """Test consecutive same-tool calls are folded.""" + + def test_single_call_no_fold(self): + calls = [_tc("grep", {"pattern": "*.py"})] + result = AgentLoop._tool_hint(calls) + assert "\u00d7" not in result + + def test_two_consecutive_same_folded(self): + calls = [ + _tc("grep", {"pattern": "*.py"}), + _tc("grep", {"pattern": "*.ts"}), + ] + result = AgentLoop._tool_hint(calls) + assert "\u00d7 2" in result + + def test_three_consecutive_same_folded(self): + calls = [ + _tc("read_file", {"path": "a.py"}), + _tc("read_file", {"path": "b.py"}), + _tc("read_file", {"path": "c.py"}), + ] + result = AgentLoop._tool_hint(calls) + assert "\u00d7 3" in result + + def test_different_tools_not_folded(self): + calls = [ + _tc("grep", {"pattern": "TODO"}), + _tc("read_file", {"path": "a.py"}), + ] + result = AgentLoop._tool_hint(calls) + assert "\u00d7" not in result + + def test_interleaved_same_tools_not_folded(self): + calls = [ + _tc("grep", {"pattern": "a"}), + _tc("read_file", {"path": "f.py"}), + _tc("grep", {"pattern": "b"}), + ] + result = AgentLoop._tool_hint(calls) + assert "\u00d7" not in result + + +class TestToolHintMultipleCalls: + """Test multiple different tool calls are comma-separated.""" + + def test_two_different_tools(self): + calls = [ + _tc("grep", {"pattern": "TODO"}), + _tc("read_file", {"path": "main.py"}), + ] + result = AgentLoop._tool_hint(calls) + assert 'grep "TODO"' in result + assert "read main.py" in result + assert ", " in result