nanobot/tests/agent/tools/test_self_tool.py
chengyongru c3f54088a6 fix(self_tool): address code review issues — dead code, None ambiguity, size limit, watchdog tests
- Remove duplicate BLOCKED check in _modify (dead code)
- Use hasattr() instead of None check to distinguish missing vs None-valued attributes
- Add 64-key cap on _runtime_vars to prevent unbounded memory growth
- Refactor watchdog tests to call actual _watchdog_check() instead of inline logic
2026-03-30 16:56:31 +08:00

550 lines
21 KiB
Python

"""Tests for SelfTool — agent runtime self-modification."""
from __future__ import annotations
from pathlib import Path
from unittest.mock import MagicMock
import pytest
from nanobot.agent.loop import AgentLoop
from nanobot.agent.tools.self import SelfTool
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _make_mock_loop(**overrides):
"""Build a lightweight mock AgentLoop with the attributes SelfTool reads."""
loop = MagicMock()
loop.model = "anthropic/claude-sonnet-4-20250514"
loop.max_iterations = 40
loop.context_window_tokens = 65_536
loop.context_budget_tokens = 500
loop.workspace = Path("/tmp/workspace")
loop.restrict_to_workspace = False
loop._start_time = 1000.0
loop.web_proxy = None
loop.web_search_config = MagicMock()
loop.exec_config = MagicMock()
loop.input_limits = MagicMock()
loop.channels_config = MagicMock()
loop._last_usage = {"prompt_tokens": 100, "completion_tokens": 50}
loop._runtime_vars = {}
loop._unregistered_tools = {}
loop._config_defaults = {
"max_iterations": 40,
"context_window_tokens": 65_536,
"context_budget_tokens": 500,
"model": "anthropic/claude-sonnet-4-20250514",
}
loop._critical_tool_backup = {}
# Tools registry mock
loop.tools = MagicMock()
loop.tools.tool_names = ["read_file", "write_file", "exec", "web_search", "self"]
loop.tools.has.side_effect = lambda n: n in loop.tools.tool_names
loop.tools.get.return_value = None
# Attach the real _watchdog_check method to the mock so tests exercise actual code
loop._watchdog_check = AgentLoop._watchdog_check.__get__(loop)
for k, v in overrides.items():
setattr(loop, k, v)
return loop
def _make_tool(loop=None):
if loop is None:
loop = _make_mock_loop()
return SelfTool(loop=loop)
# ---------------------------------------------------------------------------
# inspect
# ---------------------------------------------------------------------------
class TestInspect:
@pytest.mark.asyncio
async def test_inspect_returns_current_state(self):
tool = _make_tool()
result = await tool.execute(action="inspect")
assert "max_iterations: 40" in result
assert "context_window_tokens: 65536" in result
assert "tools:" in result
@pytest.mark.asyncio
async def test_inspect_single_key(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="max_iterations")
assert "max_iterations: 40" in result
@pytest.mark.asyncio
async def test_inspect_blocked_returns_error(self):
tool = _make_tool()
result = await tool.execute(action="inspect", key="bus")
assert "not accessible" in result
@pytest.mark.asyncio
async def test_inspect_runtime_vars(self):
loop = _make_mock_loop()
loop._runtime_vars = {"task": "review"}
tool = _make_tool(loop)
result = await tool.execute(action="inspect", key="_runtime_vars")
assert "task" in result
assert "review" in result
@pytest.mark.asyncio
async def test_inspect_none_attribute_shows_value(self):
"""Attributes that are legitimately None should show their value, not 'not found'."""
tool = _make_tool()
# web_proxy is initialized as None in the mock
result = await tool.execute(action="inspect", key="web_proxy")
assert "web_proxy: None" in result
assert "not found" not in result
@pytest.mark.asyncio
async def test_inspect_unknown_key(self):
"""Use a real object (not MagicMock) so hasattr returns False for missing attrs."""
class MinimalLoop:
model = "test-model"
max_iterations = 40
context_window_tokens = 65536
context_budget_tokens = 500
_runtime_vars = {}
_unregistered_tools = {}
_config_defaults = {}
_critical_tool_backup = {}
loop = MinimalLoop()
tool = SelfTool(loop=loop)
result = await tool.execute(action="inspect", key="nonexistent_key")
assert "not found" in result
# ---------------------------------------------------------------------------
# modify — restricted
# ---------------------------------------------------------------------------
class TestModifyRestricted:
@pytest.mark.asyncio
async def test_modify_restricted_valid(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=80)
assert "Set max_iterations = 80" in result
assert tool._loop.max_iterations == 80
@pytest.mark.asyncio
async def test_modify_restricted_out_of_range(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=0)
assert "Error" in result
assert tool._loop.max_iterations == 40 # unchanged
@pytest.mark.asyncio
async def test_modify_restricted_max_exceeded(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=999)
assert "Error" in result
assert tool._loop.max_iterations == 40 # unchanged
@pytest.mark.asyncio
async def test_modify_restricted_wrong_type(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value="not_an_int")
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_restricted_bool_rejected_as_int(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=True)
assert "Error" in result
assert "bool" in result
@pytest.mark.asyncio
async def test_modify_restricted_string_int_coerced(self):
"""LLM may send numeric values as strings; coercion should handle it."""
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value="80")
assert "Set max_iterations = 80" in result
assert tool._loop.max_iterations == 80
assert isinstance(tool._loop.max_iterations, int)
@pytest.mark.asyncio
async def test_modify_context_window_valid(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="context_window_tokens", value=131072)
assert "Set context_window_tokens = 131072" in result
assert tool._loop.context_window_tokens == 131072
# ---------------------------------------------------------------------------
# modify — blocked & readonly
# ---------------------------------------------------------------------------
class TestModifyBlockedReadonly:
@pytest.mark.asyncio
async def test_modify_blocked_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="bus", value="hacked")
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_tools_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="tools", value={})
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_subagents_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="subagents", value=None)
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_context_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="context", value=None)
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_readonly_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="workspace", value="/tmp/evil")
assert "read-only" in result
@pytest.mark.asyncio
async def test_modify_exec_config_readonly(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="exec_config", value={})
assert "read-only" in result
# ---------------------------------------------------------------------------
# modify — free tier
# ---------------------------------------------------------------------------
class TestModifyFree:
@pytest.mark.asyncio
async def test_modify_free_key_stored(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="my_var", value="hello")
assert "Set _runtime_vars.my_var = 'hello'" in result
assert tool._loop._runtime_vars["my_var"] == "hello"
@pytest.mark.asyncio
async def test_modify_free_numeric_value(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="count", value=42)
assert tool._loop._runtime_vars["count"] == 42
@pytest.mark.asyncio
async def test_modify_rejects_callable(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="evil", value=lambda: None)
assert "callable" in result
assert "evil" not in tool._loop._runtime_vars
@pytest.mark.asyncio
async def test_modify_rejects_complex_objects(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="obj", value=Path("/tmp"))
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_allows_list(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="items", value=[1, 2, 3])
assert tool._loop._runtime_vars["items"] == [1, 2, 3]
@pytest.mark.asyncio
async def test_modify_allows_dict(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="data", value={"a": 1})
assert tool._loop._runtime_vars["data"] == {"a": 1}
@pytest.mark.asyncio
async def test_modify_free_rejects_when_max_keys_reached(self):
loop = _make_mock_loop()
loop._runtime_vars = {f"key_{i}": i for i in range(64)}
tool = _make_tool(loop)
result = await tool.execute(action="modify", key="overflow", value="data")
assert "full" in result
assert "overflow" not in loop._runtime_vars
@pytest.mark.asyncio
async def test_modify_free_allows_update_existing_key_at_max(self):
loop = _make_mock_loop()
loop._runtime_vars = {f"key_{i}": i for i in range(64)}
tool = _make_tool(loop)
result = await tool.execute(action="modify", key="key_0", value="updated")
assert "Error" not in result
assert loop._runtime_vars["key_0"] == "updated"
# ---------------------------------------------------------------------------
# unregister_tool / register_tool
# ---------------------------------------------------------------------------
class TestToolManagement:
@pytest.mark.asyncio
async def test_unregister_tool_success(self):
loop = _make_mock_loop()
tool = _make_tool(loop)
result = await tool.execute(action="unregister_tool", name="web_search")
assert "Unregistered" in result
assert "web_search" in loop._unregistered_tools
loop.tools.unregister.assert_called_once_with("web_search")
@pytest.mark.asyncio
async def test_unregister_self_rejected(self):
tool = _make_tool()
result = await tool.execute(action="unregister_tool", name="self")
assert "lockout" in result
@pytest.mark.asyncio
async def test_unregister_nonexistent_tool(self):
tool = _make_tool()
result = await tool.execute(action="unregister_tool", name="nonexistent")
assert "not currently registered" in result
@pytest.mark.asyncio
async def test_register_tool_restores(self):
loop = _make_mock_loop()
mock_tool = MagicMock()
loop._unregistered_tools = {"web_search": mock_tool}
tool = _make_tool(loop)
result = await tool.execute(action="register_tool", name="web_search")
assert "Re-registered" in result
loop.tools.register.assert_called_once_with(mock_tool)
assert "web_search" not in loop._unregistered_tools
@pytest.mark.asyncio
async def test_register_unknown_tool_rejected(self):
tool = _make_tool()
result = await tool.execute(action="register_tool", name="web_search")
assert "was not previously unregistered" in result
@pytest.mark.asyncio
async def test_register_requires_name(self):
tool = _make_tool()
result = await tool.execute(action="register_tool")
assert "Error" in result
# ---------------------------------------------------------------------------
# reset
# ---------------------------------------------------------------------------
class TestReset:
@pytest.mark.asyncio
async def test_reset_restores_default(self):
tool = _make_tool()
# Modify first
await tool.execute(action="modify", key="max_iterations", value=80)
assert tool._loop.max_iterations == 80
# Reset
result = await tool.execute(action="reset", key="max_iterations")
assert "Reset max_iterations = 40" in result
assert tool._loop.max_iterations == 40
@pytest.mark.asyncio
async def test_reset_blocked_rejected(self):
tool = _make_tool()
result = await tool.execute(action="reset", key="bus")
assert "protected" in result
@pytest.mark.asyncio
async def test_reset_readonly_rejected(self):
tool = _make_tool()
result = await tool.execute(action="reset", key="workspace")
assert "read-only" in result
@pytest.mark.asyncio
async def test_reset_deletes_runtime_var(self):
tool = _make_tool()
await tool.execute(action="modify", key="temp", value="data")
result = await tool.execute(action="reset", key="temp")
assert "Deleted" in result
assert "temp" not in tool._loop._runtime_vars
@pytest.mark.asyncio
async def test_reset_unknown_key(self):
tool = _make_tool()
result = await tool.execute(action="reset", key="nonexistent")
assert "not a known property" in result
# ---------------------------------------------------------------------------
# Edge cases from code review
# ---------------------------------------------------------------------------
class TestEdgeCases:
@pytest.mark.asyncio
async def test_inspect_dunder_blocked(self):
tool = _make_tool()
for attr in ("__class__", "__dict__", "__bases__", "__subclasses__", "__mro__"):
result = await tool.execute(action="inspect", key=attr)
assert "not accessible" in result
@pytest.mark.asyncio
async def test_modify_dunder_blocked(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="__class__", value="evil")
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_internal_attributes_blocked(self):
tool = _make_tool()
for attr in ("_config_defaults", "_runtime_vars", "_unregistered_tools", "_critical_tool_backup"):
result = await tool.execute(action="modify", key=attr, value={})
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_free_nested_dict_with_object_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="evil", value={"nested": object()})
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_free_nested_list_with_callable_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="evil", value=[1, 2, lambda: None])
assert "Error" in result
@pytest.mark.asyncio
async def test_modify_free_deep_nesting_rejected(self):
tool = _make_tool()
# Create a deeply nested dict (>10 levels)
deep = {"level": 0}
current = deep
for i in range(1, 15):
current["child"] = {"level": i}
current = current["child"]
result = await tool.execute(action="modify", key="deep", value=deep)
assert "nesting too deep" in result
@pytest.mark.asyncio
async def test_modify_free_dict_with_non_str_key_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="evil", value={42: "value"})
assert "key must be str" in result
@pytest.mark.asyncio
async def test_modify_free_valid_nested_structure(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="data", value={"a": [1, 2, {"b": True}]})
assert "Error" not in result
assert tool._loop._runtime_vars["data"] == {"a": [1, 2, {"b": True}]}
@pytest.mark.asyncio
async def test_whitespace_key_rejected(self):
tool = _make_tool()
result = await tool.execute(action="modify", key=" ", value="test")
assert "cannot be empty or whitespace" in result
@pytest.mark.asyncio
async def test_whitespace_name_rejected(self):
tool = _make_tool()
result = await tool.execute(action="unregister_tool", name=" ")
assert "cannot be empty or whitespace" in result
@pytest.mark.asyncio
async def test_modify_none_value_for_restricted_int(self):
tool = _make_tool()
result = await tool.execute(action="modify", key="max_iterations", value=None)
assert "Error" in result
assert "must be int" in result
@pytest.mark.asyncio
async def test_inspect_all_truncates_large_runtime_vars(self):
loop = _make_mock_loop()
# Create a large runtime vars dict
loop._runtime_vars = {f"key_{i}": f"value_{i}" * 100 for i in range(100)}
tool = _make_tool(loop)
result = await tool.execute(action="inspect")
# The output should be truncated
assert "truncated" in result
@pytest.mark.asyncio
async def test_reset_internal_attribute_returns_error(self):
tool = _make_tool()
result = await tool.execute(action="reset", key="_config_defaults")
assert "protected" in result
@pytest.mark.asyncio
async def test_modify_denied_attrs_non_dunder_blocked(self):
"""Non-dunder entries in _DENIED_ATTRS (e.g. func_globals) must be blocked."""
tool = _make_tool()
for attr in ("func_globals", "func_code"):
result = await tool.execute(action="modify", key=attr, value="evil")
assert "protected" in result, f"{attr} should be blocked"
@pytest.mark.asyncio
async def test_modify_free_value_too_large_rejected(self):
"""Values exceeding _MAX_VALUE_ELEMENTS should be rejected."""
tool = _make_tool()
big_list = list(range(2000))
result = await tool.execute(action="modify", key="big", value=big_list)
assert "too large" in result
assert "big" not in tool._loop._runtime_vars
@pytest.mark.asyncio
async def test_reset_with_none_default_succeeds(self):
"""Reset should work even if the config default is legitimately None."""
loop = _make_mock_loop()
loop._config_defaults["max_iterations"] = None
loop.max_iterations = 80
tool = _make_tool(loop)
result = await tool.execute(action="reset", key="max_iterations")
assert "Reset max_iterations = None" in result
# ---------------------------------------------------------------------------
# watchdog (tested via AgentLoop method, using a real loop-like object)
# ---------------------------------------------------------------------------
class TestWatchdog:
def test_watchdog_corrects_invalid_iterations(self):
loop = _make_mock_loop()
loop.max_iterations = 0
loop._watchdog_check()
assert loop.max_iterations == 40
def test_watchdog_corrects_invalid_context_window(self):
loop = _make_mock_loop()
loop.context_window_tokens = 100
loop._watchdog_check()
assert loop.context_window_tokens == 65_536
def test_watchdog_restores_critical_tools(self):
loop = _make_mock_loop()
backup = MagicMock()
loop._critical_tool_backup = {"self": backup}
loop.tools.has.return_value = False
loop.tools.tool_names = []
loop._watchdog_check()
loop.tools.register.assert_called()
# Verify it was called with a copy, not the original
called_arg = loop.tools.register.call_args[0][0]
assert called_arg is not backup # deep copy
def test_watchdog_does_not_reset_valid_state(self):
loop = _make_mock_loop()
loop.max_iterations = 50
loop.context_window_tokens = 131072
original_max = loop.max_iterations
original_ctx = loop.context_window_tokens
loop._watchdog_check()
assert loop.max_iterations == original_max
assert loop.context_window_tokens == original_ctx