refactor: extract runtime response guards into utils runtime module

This commit is contained in:
Xubin Ren 2026-04-02 13:54:40 +00:00
parent 714a4c7bb6
commit e4b335ce81
7 changed files with 449 additions and 44 deletions

View File

@ -33,6 +33,7 @@ from nanobot.config.schema import AgentDefaults
from nanobot.providers.base import LLMProvider
from nanobot.session.manager import Session, SessionManager
from nanobot.utils.helpers import image_placeholder_text, truncate_text
from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE
if TYPE_CHECKING:
from nanobot.config.schema import ChannelsConfig, ExecToolConfig, WebSearchConfig
@ -588,8 +589,8 @@ class AgentLoop:
message_id=msg.metadata.get("message_id"),
)
if final_content is None:
final_content = "I've completed processing but have no response to give."
if final_content is None or not final_content.strip():
final_content = EMPTY_FINAL_RESPONSE_MESSAGE
self._save_turn(session, all_msgs, 1 + len(history))
self._clear_runtime_checkpoint(session)

View File

@ -20,6 +20,13 @@ from nanobot.utils.helpers import (
maybe_persist_tool_result,
truncate_text,
)
from nanobot.utils.runtime import (
EMPTY_FINAL_RESPONSE_MESSAGE,
build_finalization_retry_message,
ensure_nonempty_tool_result,
is_blank_text,
repeated_external_lookup_error,
)
_DEFAULT_MAX_ITERATIONS_MESSAGE = (
"I reached the maximum number of tool call iterations ({max_iterations}) "
@ -77,10 +84,11 @@ class AgentRunner:
messages = list(spec.initial_messages)
final_content: str | None = None
tools_used: list[str] = []
usage: dict[str, int] = {}
usage: dict[str, int] = {"prompt_tokens": 0, "completion_tokens": 0}
error: str | None = None
stop_reason = "completed"
tool_events: list[dict[str, str]] = []
external_lookup_counts: dict[str, int] = {}
for iteration in range(spec.max_iterations):
try:
@ -96,41 +104,12 @@ class AgentRunner:
messages_for_model = messages
context = AgentHookContext(iteration=iteration, messages=messages)
await hook.before_iteration(context)
kwargs: dict[str, Any] = {
"messages": messages_for_model,
"tools": spec.tools.get_definitions(),
"model": spec.model,
"retry_mode": spec.provider_retry_mode,
"on_retry_wait": spec.progress_callback,
}
if spec.temperature is not None:
kwargs["temperature"] = spec.temperature
if spec.max_tokens is not None:
kwargs["max_tokens"] = spec.max_tokens
if spec.reasoning_effort is not None:
kwargs["reasoning_effort"] = spec.reasoning_effort
if hook.wants_streaming():
async def _stream(delta: str) -> None:
await hook.on_stream(context, delta)
response = await self.provider.chat_stream_with_retry(
**kwargs,
on_content_delta=_stream,
)
else:
response = await self.provider.chat_with_retry(**kwargs)
raw_usage = response.usage or {}
response = await self._request_model(spec, messages_for_model, hook, context)
raw_usage = self._usage_dict(response.usage)
context.response = response
context.usage = raw_usage
context.usage = dict(raw_usage)
context.tool_calls = list(response.tool_calls)
# Accumulate standard fields into result usage.
usage["prompt_tokens"] = usage.get("prompt_tokens", 0) + int(raw_usage.get("prompt_tokens", 0) or 0)
usage["completion_tokens"] = usage.get("completion_tokens", 0) + int(raw_usage.get("completion_tokens", 0) or 0)
cached = raw_usage.get("cached_tokens")
if cached:
usage["cached_tokens"] = usage.get("cached_tokens", 0) + int(cached)
self._accumulate_usage(usage, raw_usage)
if response.has_tool_calls:
if hook.wants_streaming():
@ -158,13 +137,20 @@ class AgentRunner:
await hook.before_execute_tools(context)
results, new_events, fatal_error = await self._execute_tools(spec, response.tool_calls)
results, new_events, fatal_error = await self._execute_tools(
spec,
response.tool_calls,
external_lookup_counts,
)
tool_events.extend(new_events)
context.tool_results = list(results)
context.tool_events = list(new_events)
if fatal_error is not None:
error = f"Error: {type(fatal_error).__name__}: {fatal_error}"
final_content = error
stop_reason = "tool_error"
self._append_final_message(messages, final_content)
context.final_content = final_content
context.error = error
context.stop_reason = stop_reason
await hook.after_iteration(context)
@ -178,6 +164,7 @@ class AgentRunner:
"content": self._normalize_tool_result(
spec,
tool_call.id,
tool_call.name,
result,
),
}
@ -197,10 +184,27 @@ class AgentRunner:
await hook.after_iteration(context)
continue
clean = hook.finalize_content(context, response.content)
if response.finish_reason != "error" and is_blank_text(clean):
logger.warning(
"Empty final response on turn {} for {}; retrying with explicit finalization prompt",
iteration,
spec.session_key or "default",
)
if hook.wants_streaming():
await hook.on_stream_end(context, resuming=False)
response = await self._request_finalization_retry(spec, messages_for_model)
retry_usage = self._usage_dict(response.usage)
self._accumulate_usage(usage, retry_usage)
raw_usage = self._merge_usage(raw_usage, retry_usage)
context.response = response
context.usage = dict(raw_usage)
context.tool_calls = list(response.tool_calls)
clean = hook.finalize_content(context, response.content)
if hook.wants_streaming():
await hook.on_stream_end(context, resuming=False)
clean = hook.finalize_content(context, response.content)
if response.finish_reason == "error":
final_content = clean or spec.error_message or _DEFAULT_ERROR_MESSAGE
stop_reason = "error"
@ -211,6 +215,16 @@ class AgentRunner:
context.stop_reason = stop_reason
await hook.after_iteration(context)
break
if is_blank_text(clean):
final_content = EMPTY_FINAL_RESPONSE_MESSAGE
stop_reason = "empty_final_response"
error = final_content
self._append_final_message(messages, final_content)
context.final_content = final_content
context.error = error
context.stop_reason = stop_reason
await hook.after_iteration(context)
break
messages.append(build_assistant_message(
clean,
@ -249,22 +263,101 @@ class AgentRunner:
tool_events=tool_events,
)
def _build_request_kwargs(
self,
spec: AgentRunSpec,
messages: list[dict[str, Any]],
*,
tools: list[dict[str, Any]] | None,
) -> dict[str, Any]:
kwargs: dict[str, Any] = {
"messages": messages,
"tools": tools,
"model": spec.model,
"retry_mode": spec.provider_retry_mode,
"on_retry_wait": spec.progress_callback,
}
if spec.temperature is not None:
kwargs["temperature"] = spec.temperature
if spec.max_tokens is not None:
kwargs["max_tokens"] = spec.max_tokens
if spec.reasoning_effort is not None:
kwargs["reasoning_effort"] = spec.reasoning_effort
return kwargs
async def _request_model(
self,
spec: AgentRunSpec,
messages: list[dict[str, Any]],
hook: AgentHook,
context: AgentHookContext,
):
kwargs = self._build_request_kwargs(
spec,
messages,
tools=spec.tools.get_definitions(),
)
if hook.wants_streaming():
async def _stream(delta: str) -> None:
await hook.on_stream(context, delta)
return await self.provider.chat_stream_with_retry(
**kwargs,
on_content_delta=_stream,
)
return await self.provider.chat_with_retry(**kwargs)
async def _request_finalization_retry(
self,
spec: AgentRunSpec,
messages: list[dict[str, Any]],
):
retry_messages = list(messages)
retry_messages.append(build_finalization_retry_message())
kwargs = self._build_request_kwargs(spec, retry_messages, tools=None)
return await self.provider.chat_with_retry(**kwargs)
@staticmethod
def _usage_dict(usage: dict[str, Any] | None) -> dict[str, int]:
if not usage:
return {}
result: dict[str, int] = {}
for key, value in usage.items():
try:
result[key] = int(value or 0)
except (TypeError, ValueError):
continue
return result
@staticmethod
def _accumulate_usage(target: dict[str, int], addition: dict[str, int]) -> None:
for key, value in addition.items():
target[key] = target.get(key, 0) + value
@staticmethod
def _merge_usage(left: dict[str, int], right: dict[str, int]) -> dict[str, int]:
merged = dict(left)
for key, value in right.items():
merged[key] = merged.get(key, 0) + value
return merged
async def _execute_tools(
self,
spec: AgentRunSpec,
tool_calls: list[ToolCallRequest],
external_lookup_counts: dict[str, int],
) -> tuple[list[Any], list[dict[str, str]], BaseException | None]:
batches = self._partition_tool_batches(spec, tool_calls)
tool_results: list[tuple[Any, dict[str, str], BaseException | None]] = []
for batch in batches:
if spec.concurrent_tools and len(batch) > 1:
tool_results.extend(await asyncio.gather(*(
self._run_tool(spec, tool_call)
self._run_tool(spec, tool_call, external_lookup_counts)
for tool_call in batch
)))
else:
for tool_call in batch:
tool_results.append(await self._run_tool(spec, tool_call))
tool_results.append(await self._run_tool(spec, tool_call, external_lookup_counts))
results: list[Any] = []
events: list[dict[str, str]] = []
@ -280,8 +373,23 @@ class AgentRunner:
self,
spec: AgentRunSpec,
tool_call: ToolCallRequest,
external_lookup_counts: dict[str, int],
) -> tuple[Any, dict[str, str], BaseException | None]:
_HINT = "\n\n[Analyze the error above and try a different approach.]"
lookup_error = repeated_external_lookup_error(
tool_call.name,
tool_call.arguments,
external_lookup_counts,
)
if lookup_error:
event = {
"name": tool_call.name,
"status": "error",
"detail": "repeated external lookup blocked",
}
if spec.fail_on_tool_error:
return lookup_error + _HINT, event, RuntimeError(lookup_error)
return lookup_error + _HINT, event, None
prepare_call = getattr(spec.tools, "prepare_call", None)
tool, params, prep_error = None, tool_call.arguments, None
if callable(prepare_call):
@ -361,8 +469,10 @@ class AgentRunner:
self,
spec: AgentRunSpec,
tool_call_id: str,
tool_name: str,
result: Any,
) -> Any:
result = ensure_nonempty_tool_result(tool_name, result)
try:
content = maybe_persist_tool_result(
spec.workspace,
@ -395,6 +505,7 @@ class AgentRunner:
normalized = self._normalize_tool_result(
spec,
str(message.get("tool_call_id") or f"tool_{idx}"),
str(message.get("name") or "tool"),
message.get("content"),
)
if normalized != message.get("content"):

View File

@ -14,6 +14,8 @@ from typing import Any
from aiohttp import web
from loguru import logger
from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE
API_SESSION_KEY = "api:default"
API_CHAT_ID = "default"
@ -98,7 +100,7 @@ async def handle_chat_completions(request: web.Request) -> web.Response:
logger.info("API request session_key={} content={}", session_key, user_content[:80])
_FALLBACK = "I've completed processing but have no response to give."
_FALLBACK = EMPTY_FINAL_RESPONSE_MESSAGE
try:
async with session_lock:

View File

@ -120,7 +120,7 @@ def find_legal_message_start(messages: list[dict[str, Any]]) -> int:
return start
def _stringify_text_blocks(content: list[dict[str, Any]]) -> str | None:
def stringify_text_blocks(content: list[dict[str, Any]]) -> str | None:
parts: list[str] = []
for block in content:
if not isinstance(block, dict):
@ -201,7 +201,7 @@ def maybe_persist_tool_result(
if isinstance(content, str):
text_payload = content
elif isinstance(content, list):
text_payload = _stringify_text_blocks(content)
text_payload = stringify_text_blocks(content)
if text_payload is None:
return content
suffix = "json"

88
nanobot/utils/runtime.py Normal file
View File

@ -0,0 +1,88 @@
"""Runtime-specific helper functions and constants."""
from __future__ import annotations
from typing import Any
from loguru import logger
from nanobot.utils.helpers import stringify_text_blocks
_MAX_REPEAT_EXTERNAL_LOOKUPS = 2
EMPTY_FINAL_RESPONSE_MESSAGE = (
"I completed the tool steps but couldn't produce a final answer. "
"Please try again or narrow the task."
)
FINALIZATION_RETRY_PROMPT = (
"You have already finished the tool work. Do not call any more tools. "
"Using only the conversation and tool results above, provide the final answer for the user now."
)
def empty_tool_result_message(tool_name: str) -> str:
"""Short prompt-safe marker for tools that completed without visible output."""
return f"({tool_name} completed with no output)"
def ensure_nonempty_tool_result(tool_name: str, content: Any) -> Any:
"""Replace semantically empty tool results with a short marker string."""
if content is None:
return empty_tool_result_message(tool_name)
if isinstance(content, str) and not content.strip():
return empty_tool_result_message(tool_name)
if isinstance(content, list):
if not content:
return empty_tool_result_message(tool_name)
text_payload = stringify_text_blocks(content)
if text_payload is not None and not text_payload.strip():
return empty_tool_result_message(tool_name)
return content
def is_blank_text(content: str | None) -> bool:
"""True when *content* is missing or only whitespace."""
return content is None or not content.strip()
def build_finalization_retry_message() -> dict[str, str]:
"""A short no-tools-allowed prompt for final answer recovery."""
return {"role": "user", "content": FINALIZATION_RETRY_PROMPT}
def external_lookup_signature(tool_name: str, arguments: dict[str, Any]) -> str | None:
"""Stable signature for repeated external lookups we want to throttle."""
if tool_name == "web_fetch":
url = str(arguments.get("url") or "").strip()
if url:
return f"web_fetch:{url.lower()}"
if tool_name == "web_search":
query = str(arguments.get("query") or arguments.get("search_term") or "").strip()
if query:
return f"web_search:{query.lower()}"
return None
def repeated_external_lookup_error(
tool_name: str,
arguments: dict[str, Any],
seen_counts: dict[str, int],
) -> str | None:
"""Block repeated external lookups after a small retry budget."""
signature = external_lookup_signature(tool_name, arguments)
if signature is None:
return None
count = seen_counts.get(signature, 0) + 1
seen_counts[signature] = count
if count <= _MAX_REPEAT_EXTERNAL_LOOKUPS:
return None
logger.warning(
"Blocking repeated external lookup {} on attempt {}",
signature[:160],
count,
)
return (
"Error: repeated external lookup blocked. "
"Use the results you already have to answer, or try a meaningfully different source."
)

View File

@ -385,6 +385,44 @@ def test_persist_tool_result_logs_cleanup_failures(monkeypatch, tmp_path):
assert warnings and "Failed to clean stale tool result buckets" in warnings[0]
@pytest.mark.asyncio
async def test_runner_replaces_empty_tool_result_with_marker():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
captured_second_call: list[dict] = []
call_count = {"n": 0}
async def chat_with_retry(*, messages, **kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
return LLMResponse(
content="working",
tool_calls=[ToolCallRequest(id="call_1", name="noop", arguments={})],
usage={},
)
captured_second_call[:] = messages
return LLMResponse(content="done", tool_calls=[], usage={})
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(return_value="")
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[{"role": "user", "content": "do task"}],
tools=tools,
model="test-model",
max_iterations=2,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert result.final_content == "done"
tool_message = next(msg for msg in captured_second_call if msg.get("role") == "tool")
assert tool_message["content"] == "(noop completed with no output)"
@pytest.mark.asyncio
async def test_runner_uses_raw_messages_when_context_governance_fails():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
@ -418,6 +456,75 @@ async def test_runner_uses_raw_messages_when_context_governance_fails():
assert captured_messages == initial_messages
@pytest.mark.asyncio
async def test_runner_retries_empty_final_response_with_summary_prompt():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
calls: list[dict] = []
async def chat_with_retry(*, messages, tools=None, **kwargs):
calls.append({"messages": messages, "tools": tools})
if len(calls) == 1:
return LLMResponse(
content=None,
tool_calls=[],
usage={"prompt_tokens": 10, "completion_tokens": 1},
)
return LLMResponse(
content="final answer",
tool_calls=[],
usage={"prompt_tokens": 3, "completion_tokens": 7},
)
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[{"role": "user", "content": "do task"}],
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert result.final_content == "final answer"
assert len(calls) == 2
assert calls[1]["tools"] is None
assert "Do not call any more tools" in calls[1]["messages"][-1]["content"]
assert result.usage["prompt_tokens"] == 13
assert result.usage["completion_tokens"] == 8
@pytest.mark.asyncio
async def test_runner_uses_specific_message_after_empty_finalization_retry():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE
provider = MagicMock()
async def chat_with_retry(*, messages, **kwargs):
return LLMResponse(content=None, tool_calls=[], usage={})
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[{"role": "user", "content": "do task"}],
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert result.final_content == EMPTY_FINAL_RESPONSE_MESSAGE
assert result.stop_reason == "empty_final_response"
def test_snip_history_drops_orphaned_tool_results_from_trimmed_slice(monkeypatch):
from nanobot.agent.runner import AgentRunSpec, AgentRunner
@ -564,6 +671,7 @@ async def test_runner_batches_read_only_tools_before_exclusive_work():
ToolCallRequest(id="ro2", name="read_b", arguments={}),
ToolCallRequest(id="rw1", name="write_a", arguments={}),
],
{},
)
assert shared_events[0:2] == ["start:read_a", "start:read_b"]
@ -573,6 +681,48 @@ async def test_runner_batches_read_only_tools_before_exclusive_work():
assert shared_events[-2:] == ["start:write_a", "end:write_a"]
@pytest.mark.asyncio
async def test_runner_blocks_repeated_external_fetches():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
captured_final_call: list[dict] = []
call_count = {"n": 0}
async def chat_with_retry(*, messages, **kwargs):
call_count["n"] += 1
if call_count["n"] <= 3:
return LLMResponse(
content="working",
tool_calls=[ToolCallRequest(id=f"call_{call_count['n']}", name="web_fetch", arguments={"url": "https://example.com"})],
usage={},
)
captured_final_call[:] = messages
return LLMResponse(content="done", tool_calls=[], usage={})
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(return_value="page content")
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[{"role": "user", "content": "research task"}],
tools=tools,
model="test-model",
max_iterations=4,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
))
assert result.final_content == "done"
assert tools.execute.await_count == 2
blocked_tool_message = [
msg for msg in captured_final_call
if msg.get("role") == "tool" and msg.get("tool_call_id") == "call_3"
][0]
assert "repeated external lookup blocked" in blocked_tool_message["content"]
@pytest.mark.asyncio
async def test_loop_max_iterations_message_stays_stable(tmp_path):
loop = _make_loop(tmp_path)
@ -622,6 +772,57 @@ async def test_loop_stream_filter_handles_think_only_prefix_without_crashing(tmp
assert endings == [False]
@pytest.mark.asyncio
async def test_loop_retries_think_only_final_response(tmp_path):
loop = _make_loop(tmp_path)
call_count = {"n": 0}
async def chat_with_retry(**kwargs):
call_count["n"] += 1
if call_count["n"] == 1:
return LLMResponse(content="<think>hidden</think>", tool_calls=[], usage={})
return LLMResponse(content="Recovered answer", tool_calls=[], usage={})
loop.provider.chat_with_retry = chat_with_retry
final_content, _, _ = await loop._run_agent_loop([])
assert final_content == "Recovered answer"
assert call_count["n"] == 2
@pytest.mark.asyncio
async def test_runner_tool_error_sets_final_content():
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
async def chat_with_retry(*, messages, **kwargs):
return LLMResponse(
content="working",
tool_calls=[ToolCallRequest(id="call_1", name="read_file", arguments={"path": "x"})],
usage={},
)
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
tools.execute = AsyncMock(side_effect=RuntimeError("boom"))
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[{"role": "user", "content": "do task"}],
tools=tools,
model="test-model",
max_iterations=1,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
fail_on_tool_error=True,
))
assert result.final_content == "Error: RuntimeError: boom"
assert result.stop_reason == "tool_error"
@pytest.mark.asyncio
async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, monkeypatch):
from nanobot.agent.subagent import SubagentManager

View File

@ -347,6 +347,8 @@ async def test_empty_response_retry_then_success(aiohttp_client) -> None:
@pytest.mark.skipif(not HAS_AIOHTTP, reason="aiohttp not installed")
@pytest.mark.asyncio
async def test_empty_response_falls_back(aiohttp_client) -> None:
from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE
call_count = 0
async def always_empty(content, session_key="", channel="", chat_id=""):
@ -367,5 +369,5 @@ async def test_empty_response_falls_back(aiohttp_client) -> None:
)
assert resp.status == 200
body = await resp.json()
assert body["choices"][0]["message"]["content"] == "I've completed processing but have no response to give."
assert body["choices"][0]["message"]["content"] == EMPTY_FINAL_RESPONSE_MESSAGE
assert call_count == 2