nanobot/tests/agent/test_runner_governance.py
chengyongru 99cc6ee808 test(agent): expand coverage and refactor test structure
- Add 42 tests for ContextBuilder (context.py: 0→42 tests)
- Add 37 tests for SubagentManager lifecycle (subagent.py: 2→37 tests)
- Add 42 unit tests for AutoCompact in isolation
- Split monolithic test_runner.py (3313 lines) into 9 focused files:
  test_runner_core, test_runner_hooks, test_runner_errors,
  test_runner_safety, test_runner_persistence, test_runner_governance,
  test_runner_tool_execution, test_runner_injections,
  test_loop_runner_integration
- Add 3 config passthrough tests (temperature/max_tokens/reasoning_effort)
- Fix fragile patch.object(__init__) in test_stop_preserves_context
- Create shared conftest.py with make_provider/make_loop factories

Total: 934 tests passing, 0 regressions
2026-05-13 12:49:17 +08:00

644 lines
24 KiB
Python

"""Tests for AgentRunner context governance: backfill, orphan cleanup, microcompact, snip_history."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from nanobot.config.schema import AgentDefaults
from nanobot.providers.base import LLMResponse, ToolCallRequest
_MAX_TOOL_RESULT_CHARS = AgentDefaults().max_tool_result_chars
def _make_loop(tmp_path):
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
with patch("nanobot.agent.loop.ContextBuilder"), \
patch("nanobot.agent.loop.SessionManager"), \
patch("nanobot.agent.loop.SubagentManager") as MockSubMgr:
MockSubMgr.return_value.cancel_by_session = AsyncMock(return_value=0)
loop = AgentLoop(bus=bus, provider=provider, workspace=tmp_path)
return loop
async def test_runner_uses_raw_messages_when_context_governance_fails():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
captured_messages: list[dict] = []
async def chat_with_retry(*, messages, **kwargs):
captured_messages[:] = messages
return LLMResponse(content="done", tool_calls=[], usage={})
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
initial_messages = [
{"role": "system", "content": "system"},
{"role": "user", "content": "hello"},
]
runner = AgentRunner(provider)
runner._snip_history = MagicMock(side_effect=RuntimeError("boom")) # type: ignore[method-assign]
result = await runner.run(AgentRunSpec(
initial_messages=initial_messages,
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert result.final_content == "done"
assert captured_messages == initial_messages
def test_snip_history_drops_orphaned_tool_results_from_trimmed_slice(monkeypatch):
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
tools = MagicMock()
tools.get_definitions.return_value = []
runner = AgentRunner(provider)
messages = [
{"role": "system", "content": "system"},
{"role": "user", "content": "old user"},
{
"role": "assistant",
"content": "tool call",
"tool_calls": [{"id": "call_1", "type": "function", "function": {"name": "ls", "arguments": "{}"}}],
},
{"role": "tool", "tool_call_id": "call_1", "content": "tool output"},
{"role": "assistant", "content": "after tool"},
]
spec = AgentRunSpec(
initial_messages=messages,
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
context_window_tokens=2000,
context_block_limit=100,
)
monkeypatch.setattr("nanobot.agent.runner.estimate_prompt_tokens_chain", lambda *_args, **_kwargs: (500, None))
token_sizes = {
"old user": 120,
"tool call": 120,
"tool output": 40,
"after tool": 40,
"system": 0,
}
monkeypatch.setattr(
"nanobot.agent.runner.estimate_message_tokens",
lambda msg: token_sizes.get(str(msg.get("content")), 40),
)
trimmed = runner._snip_history(spec, messages)
# After the fix, the user message is recovered so the sequence is valid
# for providers that require system → user (e.g. GLM error 1214).
assert trimmed[0]["role"] == "system"
non_system = [m for m in trimmed if m["role"] != "system"]
assert non_system[0]["role"] == "user", f"Expected user after system, got {non_system[0]['role']}"
async def test_backfill_missing_tool_results_inserts_error():
"""Orphaned tool_use (no matching tool_result) should get a synthetic error."""
from nanobot.agent.runner import AgentRunner, _BACKFILL_CONTENT
messages = [
{"role": "user", "content": "hi"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_a", "type": "function", "function": {"name": "exec", "arguments": "{}"}},
{"id": "call_b", "type": "function", "function": {"name": "read_file", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_a", "name": "exec", "content": "ok"},
]
result = AgentRunner._backfill_missing_tool_results(messages)
tool_msgs = [m for m in result if m.get("role") == "tool"]
assert len(tool_msgs) == 2
backfilled = [m for m in tool_msgs if m.get("tool_call_id") == "call_b"]
assert len(backfilled) == 1
assert backfilled[0]["content"] == _BACKFILL_CONTENT
assert backfilled[0]["name"] == "read_file"
def test_drop_orphan_tool_results_removes_unmatched_tool_messages():
from nanobot.agent.runner import AgentRunner
messages = [
{"role": "system", "content": "system"},
{"role": "user", "content": "old user"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_ok", "type": "function", "function": {"name": "read_file", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_ok", "name": "read_file", "content": "ok"},
{"role": "tool", "tool_call_id": "call_orphan", "name": "exec", "content": "stale"},
{"role": "assistant", "content": "after tool"},
]
cleaned = AgentRunner._drop_orphan_tool_results(messages)
assert cleaned == [
{"role": "system", "content": "system"},
{"role": "user", "content": "old user"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_ok", "type": "function", "function": {"name": "read_file", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_ok", "name": "read_file", "content": "ok"},
{"role": "assistant", "content": "after tool"},
]
@pytest.mark.asyncio
async def test_backfill_noop_when_complete():
"""Complete message chains should not be modified."""
from nanobot.agent.runner import AgentRunner
messages = [
{"role": "user", "content": "hi"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{"id": "call_x", "type": "function", "function": {"name": "exec", "arguments": "{}"}},
],
},
{"role": "tool", "tool_call_id": "call_x", "name": "exec", "content": "done"},
{"role": "assistant", "content": "all good"},
]
result = AgentRunner._backfill_missing_tool_results(messages)
assert result is messages # same object — no copy
@pytest.mark.asyncio
async def test_runner_drops_orphan_tool_results_before_model_request():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
captured_messages: list[dict] = []
async def chat_with_retry(*, messages, **kwargs):
captured_messages[:] = messages
return LLMResponse(content="done", tool_calls=[], usage={})
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[
{"role": "system", "content": "system"},
{"role": "user", "content": "old user"},
{"role": "tool", "tool_call_id": "call_orphan", "name": "exec", "content": "stale"},
{"role": "assistant", "content": "after orphan"},
{"role": "user", "content": "new prompt"},
],
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert all(
message.get("tool_call_id") != "call_orphan"
for message in captured_messages
if message.get("role") == "tool"
)
assert result.messages[2]["tool_call_id"] == "call_orphan"
assert result.final_content == "done"
@pytest.mark.asyncio
async def test_backfill_repairs_model_context_without_shifting_save_turn_boundary(tmp_path):
"""Historical backfill should not duplicate old tail messages on persist."""
from nanobot.agent.loop import AgentLoop
from nanobot.agent.runner import _BACKFILL_CONTENT
from nanobot.bus.events import InboundMessage
from nanobot.bus.queue import MessageBus
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
response = LLMResponse(content="new answer", tool_calls=[], usage={})
provider.chat_with_retry = AsyncMock(return_value=response)
provider.chat_stream_with_retry = AsyncMock(return_value=response)
loop = AgentLoop(
bus=MessageBus(),
provider=provider,
workspace=tmp_path,
model="test-model",
)
loop.tools.get_definitions = MagicMock(return_value=[])
loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
session = loop.sessions.get_or_create("cli:test")
session.messages = [
{"role": "user", "content": "old user", "timestamp": "2026-01-01T00:00:00"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_missing",
"type": "function",
"function": {"name": "read_file", "arguments": "{}"},
}
],
"timestamp": "2026-01-01T00:00:01",
},
{"role": "assistant", "content": "old tail", "timestamp": "2026-01-01T00:00:02"},
]
loop.sessions.save(session)
result = await loop._process_message(
InboundMessage(channel="cli", sender_id="user", chat_id="test", content="new prompt")
)
assert result is not None
assert result.content == "new answer"
request_messages = provider.chat_with_retry.await_args.kwargs["messages"]
synthetic = [
message
for message in request_messages
if message.get("role") == "tool" and message.get("tool_call_id") == "call_missing"
]
assert len(synthetic) == 1
assert synthetic[0]["content"] == _BACKFILL_CONTENT
session_after = loop.sessions.get_or_create("cli:test")
assert [
{
key: value
for key, value in message.items()
if key in {"role", "content", "tool_call_id", "name", "tool_calls"}
}
for message in session_after.messages
] == [
{"role": "user", "content": "old user"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_missing",
"type": "function",
"function": {"name": "read_file", "arguments": "{}"},
}
],
},
{"role": "assistant", "content": "old tail"},
{"role": "user", "content": "new prompt"},
{"role": "assistant", "content": "new answer"},
]
@pytest.mark.asyncio
async def test_runner_backfill_only_mutates_model_context_not_returned_messages():
"""Runner should repair orphaned tool calls for the model without rewriting result.messages."""
from nanobot.agent.runner import AgentRunSpec, AgentRunner, _BACKFILL_CONTENT
provider = MagicMock()
captured_messages: list[dict] = []
async def chat_with_retry(*, messages, **kwargs):
captured_messages[:] = messages
return LLMResponse(content="done", tool_calls=[], usage={})
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
initial_messages = [
{"role": "system", "content": "system"},
{"role": "user", "content": "old user"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_missing",
"type": "function",
"function": {"name": "read_file", "arguments": "{}"},
}
],
},
{"role": "assistant", "content": "old tail"},
{"role": "user", "content": "new prompt"},
]
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=initial_messages,
tools=tools,
model="test-model",
max_iterations=3,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
synthetic = [
message
for message in captured_messages
if message.get("role") == "tool" and message.get("tool_call_id") == "call_missing"
]
assert len(synthetic) == 1
assert synthetic[0]["content"] == _BACKFILL_CONTENT
assert [
{
key: value
for key, value in message.items()
if key in {"role", "content", "tool_call_id", "name", "tool_calls"}
}
for message in result.messages
] == [
{"role": "system", "content": "system"},
{"role": "user", "content": "old user"},
{
"role": "assistant",
"content": "",
"tool_calls": [
{
"id": "call_missing",
"type": "function",
"function": {"name": "read_file", "arguments": "{}"},
}
],
},
{"role": "assistant", "content": "old tail"},
{"role": "user", "content": "new prompt"},
{"role": "assistant", "content": "done"},
]
# ---------------------------------------------------------------------------
# Microcompact (stale tool result compaction)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_microcompact_replaces_old_tool_results():
"""Tool results beyond _MICROCOMPACT_KEEP_RECENT should be summarized."""
from nanobot.agent.runner import AgentRunner, _MICROCOMPACT_KEEP_RECENT
total = _MICROCOMPACT_KEEP_RECENT + 5
long_content = "x" * 600
messages: list[dict] = [{"role": "system", "content": "sys"}]
for i in range(total):
messages.append({
"role": "assistant",
"content": "",
"tool_calls": [{"id": f"c{i}", "type": "function", "function": {"name": "read_file", "arguments": "{}"}}],
})
messages.append({
"role": "tool", "tool_call_id": f"c{i}", "name": "read_file",
"content": long_content,
})
result = AgentRunner._microcompact(messages)
tool_msgs = [m for m in result if m.get("role") == "tool"]
stale_count = total - _MICROCOMPACT_KEEP_RECENT
compacted = [m for m in tool_msgs if "omitted from context" in str(m.get("content", ""))]
preserved = [m for m in tool_msgs if m.get("content") == long_content]
assert len(compacted) == stale_count
assert len(preserved) == _MICROCOMPACT_KEEP_RECENT
@pytest.mark.asyncio
async def test_microcompact_preserves_short_results():
"""Short tool results (< _MICROCOMPACT_MIN_CHARS) should not be replaced."""
from nanobot.agent.runner import AgentRunner, _MICROCOMPACT_KEEP_RECENT
total = _MICROCOMPACT_KEEP_RECENT + 5
messages: list[dict] = []
for i in range(total):
messages.append({
"role": "assistant",
"content": "",
"tool_calls": [{"id": f"c{i}", "type": "function", "function": {"name": "exec", "arguments": "{}"}}],
})
messages.append({
"role": "tool", "tool_call_id": f"c{i}", "name": "exec",
"content": "short",
})
result = AgentRunner._microcompact(messages)
assert result is messages # no copy needed — all stale results are short
@pytest.mark.asyncio
async def test_microcompact_skips_non_compactable_tools():
"""Non-compactable tools (e.g. 'message') should never be replaced."""
from nanobot.agent.runner import AgentRunner, _MICROCOMPACT_KEEP_RECENT
total = _MICROCOMPACT_KEEP_RECENT + 5
long_content = "y" * 1000
messages: list[dict] = []
for i in range(total):
messages.append({
"role": "assistant",
"content": "",
"tool_calls": [{"id": f"c{i}", "type": "function", "function": {"name": "message", "arguments": "{}"}}],
})
messages.append({
"role": "tool", "tool_call_id": f"c{i}", "name": "message",
"content": long_content,
})
result = AgentRunner._microcompact(messages)
assert result is messages # no compactable tools found
def test_governance_repairs_orphans_after_snip():
"""After _snip_history clips an assistant+tool_calls, the second
_drop_orphan_tool_results pass must clean up the resulting orphans."""
from nanobot.agent.runner import AgentRunner
messages = [
{"role": "system", "content": "system"},
{"role": "user", "content": "old msg"},
{"role": "assistant", "content": None,
"tool_calls": [{"id": "tc_old", "type": "function",
"function": {"name": "search", "arguments": "{}"}}]},
{"role": "tool", "tool_call_id": "tc_old", "name": "search",
"content": "old result"},
{"role": "assistant", "content": "old answer"},
{"role": "user", "content": "new msg"},
]
# Simulate snipping that keeps only the tail: drop the assistant with
# tool_calls but keep its tool result (orphan).
snipped = [
{"role": "system", "content": "system"},
{"role": "tool", "tool_call_id": "tc_old", "name": "search",
"content": "old result"},
{"role": "assistant", "content": "old answer"},
{"role": "user", "content": "new msg"},
]
cleaned = AgentRunner._drop_orphan_tool_results(snipped)
# The orphan tool result should be removed.
assert not any(
m.get("role") == "tool" and m.get("tool_call_id") == "tc_old"
for m in cleaned
)
def test_governance_fallback_still_repairs_orphans():
"""When full governance fails, the fallback must still run
_drop_orphan_tool_results and _backfill_missing_tool_results."""
from nanobot.agent.runner import AgentRunner
# Messages with an orphan tool result (no matching assistant tool_call).
messages = [
{"role": "user", "content": "hello"},
{"role": "tool", "tool_call_id": "orphan_tc", "name": "read",
"content": "stale"},
{"role": "assistant", "content": "hi"},
]
repaired = AgentRunner._drop_orphan_tool_results(messages)
repaired = AgentRunner._backfill_missing_tool_results(repaired)
# Orphan tool result should be gone.
assert not any(m.get("tool_call_id") == "orphan_tc" for m in repaired)
def test_snip_history_preserves_user_message_after_truncation(monkeypatch):
"""When _snip_history truncates messages and the only user message ends up
outside the kept window, the method must recover the nearest user message
so the resulting sequence is valid for providers like GLM (which reject
system→assistant with error 1214).
This reproduces the exact scenario from the bug report:
- Normal interaction: user asks, assistant calls tool, tool returns,
assistant replies.
- Injection adds a phantom user message, triggering more tool calls.
- _snip_history activates, keeping only recent assistant/tool pairs.
- The injected user message is in the truncated prefix and gets lost.
"""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
tools = MagicMock()
tools.get_definitions.return_value = []
runner = AgentRunner(provider)
messages = [
{"role": "system", "content": "system"},
{"role": "assistant", "content": "previous reply"},
{"role": "user", "content": ".nanobot的同目录"},
{
"role": "assistant",
"content": None,
"tool_calls": [{"id": "tc_1", "type": "function", "function": {"name": "exec", "arguments": "{}"}}],
},
{"role": "tool", "tool_call_id": "tc_1", "content": "tool output 1"},
{
"role": "assistant",
"content": None,
"tool_calls": [{"id": "tc_2", "type": "function", "function": {"name": "exec", "arguments": "{}"}}],
},
{"role": "tool", "tool_call_id": "tc_2", "content": "tool output 2"},
]
spec = AgentRunSpec(
initial_messages=messages,
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
context_window_tokens=2000,
context_block_limit=100,
)
# Make estimate_prompt_tokens_chain report above budget so _snip_history activates.
monkeypatch.setattr("nanobot.agent.runner.estimate_prompt_tokens_chain", lambda *_a, **_kw: (500, None))
# Make kept window small: only the last 2 messages fit the budget.
token_sizes = {
"system": 0,
"previous reply": 200,
".nanobot的同目录": 80,
"tool output 1": 80,
"tool output 2": 80,
}
monkeypatch.setattr(
"nanobot.agent.runner.estimate_message_tokens",
lambda msg: token_sizes.get(str(msg.get("content")), 100),
)
trimmed = runner._snip_history(spec, messages)
# The first non-system message MUST be user (not assistant).
non_system = [m for m in trimmed if m.get("role") != "system"]
assert non_system, "trimmed should contain at least one non-system message"
assert non_system[0]["role"] == "user", (
f"First non-system message must be 'user', got '{non_system[0]['role']}'. "
f"Roles: {[m['role'] for m in trimmed]}"
)
def test_snip_history_no_user_at_all_falls_back_gracefully(monkeypatch):
"""Edge case: if non_system has zero user messages, _snip_history should
still return a valid sequence (not crash or produce system→assistant)."""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
tools = MagicMock()
tools.get_definitions.return_value = []
runner = AgentRunner(provider)
messages = [
{"role": "system", "content": "system"},
{"role": "assistant", "content": "reply"},
{"role": "tool", "tool_call_id": "tc_1", "content": "result"},
{"role": "assistant", "content": "reply 2"},
{"role": "tool", "tool_call_id": "tc_2", "content": "result 2"},
]
spec = AgentRunSpec(
initial_messages=messages,
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
context_window_tokens=2000,
context_block_limit=100,
)
monkeypatch.setattr("nanobot.agent.runner.estimate_prompt_tokens_chain", lambda *_a, **_kw: (500, None))
monkeypatch.setattr(
"nanobot.agent.runner.estimate_message_tokens",
lambda msg: 100,
)
trimmed = runner._snip_history(spec, messages)
# Should not crash. The result should still be a valid list.
assert isinstance(trimmed, list)
# Must have at least system.
assert any(m.get("role") == "system" for m in trimmed)
# The _enforce_role_alternation safety net must be able to fix whatever
# _snip_history returns here — verify it produces a valid sequence.
from nanobot.providers.base import LLMProvider
fixed = LLMProvider._enforce_role_alternation(trimmed)
non_system = [m for m in fixed if m["role"] != "system"]
if non_system:
assert non_system[0]["role"] in ("user", "tool"), (
f"Safety net should ensure first non-system is user/tool, got {non_system[0]['role']}"
)