diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index 2e5b04091..4a68a19fc 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -33,6 +33,7 @@ from nanobot.config.schema import AgentDefaults from nanobot.providers.base import LLMProvider from nanobot.session.manager import Session, SessionManager from nanobot.utils.helpers import image_placeholder_text, truncate_text +from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE if TYPE_CHECKING: from nanobot.config.schema import ChannelsConfig, ExecToolConfig, WebSearchConfig @@ -588,8 +589,8 @@ class AgentLoop: message_id=msg.metadata.get("message_id"), ) - if final_content is None: - final_content = "I've completed processing but have no response to give." + if final_content is None or not final_content.strip(): + final_content = EMPTY_FINAL_RESPONSE_MESSAGE self._save_turn(session, all_msgs, 1 + len(history)) self._clear_runtime_checkpoint(session) diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index 90b286c0a..a8676a8e0 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -20,6 +20,13 @@ from nanobot.utils.helpers import ( maybe_persist_tool_result, truncate_text, ) +from nanobot.utils.runtime import ( + EMPTY_FINAL_RESPONSE_MESSAGE, + build_finalization_retry_message, + ensure_nonempty_tool_result, + is_blank_text, + repeated_external_lookup_error, +) _DEFAULT_MAX_ITERATIONS_MESSAGE = ( "I reached the maximum number of tool call iterations ({max_iterations}) " @@ -77,10 +84,11 @@ class AgentRunner: messages = list(spec.initial_messages) final_content: str | None = None tools_used: list[str] = [] - usage: dict[str, int] = {} + usage: dict[str, int] = {"prompt_tokens": 0, "completion_tokens": 0} error: str | None = None stop_reason = "completed" tool_events: list[dict[str, str]] = [] + external_lookup_counts: dict[str, int] = {} for iteration in range(spec.max_iterations): try: @@ -96,41 +104,12 @@ class AgentRunner: messages_for_model = messages context = AgentHookContext(iteration=iteration, messages=messages) await hook.before_iteration(context) - kwargs: dict[str, Any] = { - "messages": messages_for_model, - "tools": spec.tools.get_definitions(), - "model": spec.model, - "retry_mode": spec.provider_retry_mode, - "on_retry_wait": spec.progress_callback, - } - if spec.temperature is not None: - kwargs["temperature"] = spec.temperature - if spec.max_tokens is not None: - kwargs["max_tokens"] = spec.max_tokens - if spec.reasoning_effort is not None: - kwargs["reasoning_effort"] = spec.reasoning_effort - - if hook.wants_streaming(): - async def _stream(delta: str) -> None: - await hook.on_stream(context, delta) - - response = await self.provider.chat_stream_with_retry( - **kwargs, - on_content_delta=_stream, - ) - else: - response = await self.provider.chat_with_retry(**kwargs) - - raw_usage = response.usage or {} + response = await self._request_model(spec, messages_for_model, hook, context) + raw_usage = self._usage_dict(response.usage) context.response = response - context.usage = raw_usage + context.usage = dict(raw_usage) context.tool_calls = list(response.tool_calls) - # Accumulate standard fields into result usage. - usage["prompt_tokens"] = usage.get("prompt_tokens", 0) + int(raw_usage.get("prompt_tokens", 0) or 0) - usage["completion_tokens"] = usage.get("completion_tokens", 0) + int(raw_usage.get("completion_tokens", 0) or 0) - cached = raw_usage.get("cached_tokens") - if cached: - usage["cached_tokens"] = usage.get("cached_tokens", 0) + int(cached) + self._accumulate_usage(usage, raw_usage) if response.has_tool_calls: if hook.wants_streaming(): @@ -158,13 +137,20 @@ class AgentRunner: await hook.before_execute_tools(context) - results, new_events, fatal_error = await self._execute_tools(spec, response.tool_calls) + results, new_events, fatal_error = await self._execute_tools( + spec, + response.tool_calls, + external_lookup_counts, + ) tool_events.extend(new_events) context.tool_results = list(results) context.tool_events = list(new_events) if fatal_error is not None: error = f"Error: {type(fatal_error).__name__}: {fatal_error}" + final_content = error stop_reason = "tool_error" + self._append_final_message(messages, final_content) + context.final_content = final_content context.error = error context.stop_reason = stop_reason await hook.after_iteration(context) @@ -178,6 +164,7 @@ class AgentRunner: "content": self._normalize_tool_result( spec, tool_call.id, + tool_call.name, result, ), } @@ -197,10 +184,27 @@ class AgentRunner: await hook.after_iteration(context) continue + clean = hook.finalize_content(context, response.content) + if response.finish_reason != "error" and is_blank_text(clean): + logger.warning( + "Empty final response on turn {} for {}; retrying with explicit finalization prompt", + iteration, + spec.session_key or "default", + ) + if hook.wants_streaming(): + await hook.on_stream_end(context, resuming=False) + response = await self._request_finalization_retry(spec, messages_for_model) + retry_usage = self._usage_dict(response.usage) + self._accumulate_usage(usage, retry_usage) + raw_usage = self._merge_usage(raw_usage, retry_usage) + context.response = response + context.usage = dict(raw_usage) + context.tool_calls = list(response.tool_calls) + clean = hook.finalize_content(context, response.content) + if hook.wants_streaming(): await hook.on_stream_end(context, resuming=False) - clean = hook.finalize_content(context, response.content) if response.finish_reason == "error": final_content = clean or spec.error_message or _DEFAULT_ERROR_MESSAGE stop_reason = "error" @@ -211,6 +215,16 @@ class AgentRunner: context.stop_reason = stop_reason await hook.after_iteration(context) break + if is_blank_text(clean): + final_content = EMPTY_FINAL_RESPONSE_MESSAGE + stop_reason = "empty_final_response" + error = final_content + self._append_final_message(messages, final_content) + context.final_content = final_content + context.error = error + context.stop_reason = stop_reason + await hook.after_iteration(context) + break messages.append(build_assistant_message( clean, @@ -249,22 +263,101 @@ class AgentRunner: tool_events=tool_events, ) + def _build_request_kwargs( + self, + spec: AgentRunSpec, + messages: list[dict[str, Any]], + *, + tools: list[dict[str, Any]] | None, + ) -> dict[str, Any]: + kwargs: dict[str, Any] = { + "messages": messages, + "tools": tools, + "model": spec.model, + "retry_mode": spec.provider_retry_mode, + "on_retry_wait": spec.progress_callback, + } + if spec.temperature is not None: + kwargs["temperature"] = spec.temperature + if spec.max_tokens is not None: + kwargs["max_tokens"] = spec.max_tokens + if spec.reasoning_effort is not None: + kwargs["reasoning_effort"] = spec.reasoning_effort + return kwargs + + async def _request_model( + self, + spec: AgentRunSpec, + messages: list[dict[str, Any]], + hook: AgentHook, + context: AgentHookContext, + ): + kwargs = self._build_request_kwargs( + spec, + messages, + tools=spec.tools.get_definitions(), + ) + if hook.wants_streaming(): + async def _stream(delta: str) -> None: + await hook.on_stream(context, delta) + + return await self.provider.chat_stream_with_retry( + **kwargs, + on_content_delta=_stream, + ) + return await self.provider.chat_with_retry(**kwargs) + + async def _request_finalization_retry( + self, + spec: AgentRunSpec, + messages: list[dict[str, Any]], + ): + retry_messages = list(messages) + retry_messages.append(build_finalization_retry_message()) + kwargs = self._build_request_kwargs(spec, retry_messages, tools=None) + return await self.provider.chat_with_retry(**kwargs) + + @staticmethod + def _usage_dict(usage: dict[str, Any] | None) -> dict[str, int]: + if not usage: + return {} + result: dict[str, int] = {} + for key, value in usage.items(): + try: + result[key] = int(value or 0) + except (TypeError, ValueError): + continue + return result + + @staticmethod + def _accumulate_usage(target: dict[str, int], addition: dict[str, int]) -> None: + for key, value in addition.items(): + target[key] = target.get(key, 0) + value + + @staticmethod + def _merge_usage(left: dict[str, int], right: dict[str, int]) -> dict[str, int]: + merged = dict(left) + for key, value in right.items(): + merged[key] = merged.get(key, 0) + value + return merged + async def _execute_tools( self, spec: AgentRunSpec, tool_calls: list[ToolCallRequest], + external_lookup_counts: dict[str, int], ) -> tuple[list[Any], list[dict[str, str]], BaseException | None]: batches = self._partition_tool_batches(spec, tool_calls) tool_results: list[tuple[Any, dict[str, str], BaseException | None]] = [] for batch in batches: if spec.concurrent_tools and len(batch) > 1: tool_results.extend(await asyncio.gather(*( - self._run_tool(spec, tool_call) + self._run_tool(spec, tool_call, external_lookup_counts) for tool_call in batch ))) else: for tool_call in batch: - tool_results.append(await self._run_tool(spec, tool_call)) + tool_results.append(await self._run_tool(spec, tool_call, external_lookup_counts)) results: list[Any] = [] events: list[dict[str, str]] = [] @@ -280,8 +373,23 @@ class AgentRunner: self, spec: AgentRunSpec, tool_call: ToolCallRequest, + external_lookup_counts: dict[str, int], ) -> tuple[Any, dict[str, str], BaseException | None]: _HINT = "\n\n[Analyze the error above and try a different approach.]" + lookup_error = repeated_external_lookup_error( + tool_call.name, + tool_call.arguments, + external_lookup_counts, + ) + if lookup_error: + event = { + "name": tool_call.name, + "status": "error", + "detail": "repeated external lookup blocked", + } + if spec.fail_on_tool_error: + return lookup_error + _HINT, event, RuntimeError(lookup_error) + return lookup_error + _HINT, event, None prepare_call = getattr(spec.tools, "prepare_call", None) tool, params, prep_error = None, tool_call.arguments, None if callable(prepare_call): @@ -361,8 +469,10 @@ class AgentRunner: self, spec: AgentRunSpec, tool_call_id: str, + tool_name: str, result: Any, ) -> Any: + result = ensure_nonempty_tool_result(tool_name, result) try: content = maybe_persist_tool_result( spec.workspace, @@ -395,6 +505,7 @@ class AgentRunner: normalized = self._normalize_tool_result( spec, str(message.get("tool_call_id") or f"tool_{idx}"), + str(message.get("name") or "tool"), message.get("content"), ) if normalized != message.get("content"): diff --git a/nanobot/api/server.py b/nanobot/api/server.py index 9494b6e31..2bfeddd05 100644 --- a/nanobot/api/server.py +++ b/nanobot/api/server.py @@ -14,6 +14,8 @@ from typing import Any from aiohttp import web from loguru import logger +from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE + API_SESSION_KEY = "api:default" API_CHAT_ID = "default" @@ -98,7 +100,7 @@ async def handle_chat_completions(request: web.Request) -> web.Response: logger.info("API request session_key={} content={}", session_key, user_content[:80]) - _FALLBACK = "I've completed processing but have no response to give." + _FALLBACK = EMPTY_FINAL_RESPONSE_MESSAGE try: async with session_lock: diff --git a/nanobot/utils/helpers.py b/nanobot/utils/helpers.py index fa3e423b8..9e0a69d5e 100644 --- a/nanobot/utils/helpers.py +++ b/nanobot/utils/helpers.py @@ -120,7 +120,7 @@ def find_legal_message_start(messages: list[dict[str, Any]]) -> int: return start -def _stringify_text_blocks(content: list[dict[str, Any]]) -> str | None: +def stringify_text_blocks(content: list[dict[str, Any]]) -> str | None: parts: list[str] = [] for block in content: if not isinstance(block, dict): @@ -201,7 +201,7 @@ def maybe_persist_tool_result( if isinstance(content, str): text_payload = content elif isinstance(content, list): - text_payload = _stringify_text_blocks(content) + text_payload = stringify_text_blocks(content) if text_payload is None: return content suffix = "json" diff --git a/nanobot/utils/runtime.py b/nanobot/utils/runtime.py new file mode 100644 index 000000000..7164629c5 --- /dev/null +++ b/nanobot/utils/runtime.py @@ -0,0 +1,88 @@ +"""Runtime-specific helper functions and constants.""" + +from __future__ import annotations + +from typing import Any + +from loguru import logger + +from nanobot.utils.helpers import stringify_text_blocks + +_MAX_REPEAT_EXTERNAL_LOOKUPS = 2 + +EMPTY_FINAL_RESPONSE_MESSAGE = ( + "I completed the tool steps but couldn't produce a final answer. " + "Please try again or narrow the task." +) + +FINALIZATION_RETRY_PROMPT = ( + "You have already finished the tool work. Do not call any more tools. " + "Using only the conversation and tool results above, provide the final answer for the user now." +) + + +def empty_tool_result_message(tool_name: str) -> str: + """Short prompt-safe marker for tools that completed without visible output.""" + return f"({tool_name} completed with no output)" + + +def ensure_nonempty_tool_result(tool_name: str, content: Any) -> Any: + """Replace semantically empty tool results with a short marker string.""" + if content is None: + return empty_tool_result_message(tool_name) + if isinstance(content, str) and not content.strip(): + return empty_tool_result_message(tool_name) + if isinstance(content, list): + if not content: + return empty_tool_result_message(tool_name) + text_payload = stringify_text_blocks(content) + if text_payload is not None and not text_payload.strip(): + return empty_tool_result_message(tool_name) + return content + + +def is_blank_text(content: str | None) -> bool: + """True when *content* is missing or only whitespace.""" + return content is None or not content.strip() + + +def build_finalization_retry_message() -> dict[str, str]: + """A short no-tools-allowed prompt for final answer recovery.""" + return {"role": "user", "content": FINALIZATION_RETRY_PROMPT} + + +def external_lookup_signature(tool_name: str, arguments: dict[str, Any]) -> str | None: + """Stable signature for repeated external lookups we want to throttle.""" + if tool_name == "web_fetch": + url = str(arguments.get("url") or "").strip() + if url: + return f"web_fetch:{url.lower()}" + if tool_name == "web_search": + query = str(arguments.get("query") or arguments.get("search_term") or "").strip() + if query: + return f"web_search:{query.lower()}" + return None + + +def repeated_external_lookup_error( + tool_name: str, + arguments: dict[str, Any], + seen_counts: dict[str, int], +) -> str | None: + """Block repeated external lookups after a small retry budget.""" + signature = external_lookup_signature(tool_name, arguments) + if signature is None: + return None + count = seen_counts.get(signature, 0) + 1 + seen_counts[signature] = count + if count <= _MAX_REPEAT_EXTERNAL_LOOKUPS: + return None + logger.warning( + "Blocking repeated external lookup {} on attempt {}", + signature[:160], + count, + ) + return ( + "Error: repeated external lookup blocked. " + "Use the results you already have to answer, or try a meaningfully different source." + ) diff --git a/tests/agent/test_runner.py b/tests/agent/test_runner.py index 9009480e3..dcdd15031 100644 --- a/tests/agent/test_runner.py +++ b/tests/agent/test_runner.py @@ -385,6 +385,44 @@ def test_persist_tool_result_logs_cleanup_failures(monkeypatch, tmp_path): assert warnings and "Failed to clean stale tool result buckets" in warnings[0] +@pytest.mark.asyncio +async def test_runner_replaces_empty_tool_result_with_marker(): + from nanobot.agent.runner import AgentRunSpec, AgentRunner + + provider = MagicMock() + captured_second_call: list[dict] = [] + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return LLMResponse( + content="working", + tool_calls=[ToolCallRequest(id="call_1", name="noop", arguments={})], + usage={}, + ) + captured_second_call[:] = messages + return LLMResponse(content="done", tool_calls=[], usage={}) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock(return_value="") + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "do task"}], + tools=tools, + model="test-model", + max_iterations=2, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + )) + + assert result.final_content == "done" + tool_message = next(msg for msg in captured_second_call if msg.get("role") == "tool") + assert tool_message["content"] == "(noop completed with no output)" + + @pytest.mark.asyncio async def test_runner_uses_raw_messages_when_context_governance_fails(): from nanobot.agent.runner import AgentRunSpec, AgentRunner @@ -418,6 +456,75 @@ async def test_runner_uses_raw_messages_when_context_governance_fails(): assert captured_messages == initial_messages +@pytest.mark.asyncio +async def test_runner_retries_empty_final_response_with_summary_prompt(): + from nanobot.agent.runner import AgentRunSpec, AgentRunner + + provider = MagicMock() + calls: list[dict] = [] + + async def chat_with_retry(*, messages, tools=None, **kwargs): + calls.append({"messages": messages, "tools": tools}) + if len(calls) == 1: + return LLMResponse( + content=None, + tool_calls=[], + usage={"prompt_tokens": 10, "completion_tokens": 1}, + ) + return LLMResponse( + content="final answer", + tool_calls=[], + usage={"prompt_tokens": 3, "completion_tokens": 7}, + ) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "do task"}], + tools=tools, + model="test-model", + max_iterations=1, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + )) + + assert result.final_content == "final answer" + assert len(calls) == 2 + assert calls[1]["tools"] is None + assert "Do not call any more tools" in calls[1]["messages"][-1]["content"] + assert result.usage["prompt_tokens"] == 13 + assert result.usage["completion_tokens"] == 8 + + +@pytest.mark.asyncio +async def test_runner_uses_specific_message_after_empty_finalization_retry(): + from nanobot.agent.runner import AgentRunSpec, AgentRunner + from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE + + provider = MagicMock() + + async def chat_with_retry(*, messages, **kwargs): + return LLMResponse(content=None, tool_calls=[], usage={}) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "do task"}], + tools=tools, + model="test-model", + max_iterations=1, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + )) + + assert result.final_content == EMPTY_FINAL_RESPONSE_MESSAGE + assert result.stop_reason == "empty_final_response" + + def test_snip_history_drops_orphaned_tool_results_from_trimmed_slice(monkeypatch): from nanobot.agent.runner import AgentRunSpec, AgentRunner @@ -564,6 +671,7 @@ async def test_runner_batches_read_only_tools_before_exclusive_work(): ToolCallRequest(id="ro2", name="read_b", arguments={}), ToolCallRequest(id="rw1", name="write_a", arguments={}), ], + {}, ) assert shared_events[0:2] == ["start:read_a", "start:read_b"] @@ -573,6 +681,48 @@ async def test_runner_batches_read_only_tools_before_exclusive_work(): assert shared_events[-2:] == ["start:write_a", "end:write_a"] +@pytest.mark.asyncio +async def test_runner_blocks_repeated_external_fetches(): + from nanobot.agent.runner import AgentRunSpec, AgentRunner + + provider = MagicMock() + captured_final_call: list[dict] = [] + call_count = {"n": 0} + + async def chat_with_retry(*, messages, **kwargs): + call_count["n"] += 1 + if call_count["n"] <= 3: + return LLMResponse( + content="working", + tool_calls=[ToolCallRequest(id=f"call_{call_count['n']}", name="web_fetch", arguments={"url": "https://example.com"})], + usage={}, + ) + captured_final_call[:] = messages + return LLMResponse(content="done", tool_calls=[], usage={}) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock(return_value="page content") + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "research task"}], + tools=tools, + model="test-model", + max_iterations=4, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + )) + + assert result.final_content == "done" + assert tools.execute.await_count == 2 + blocked_tool_message = [ + msg for msg in captured_final_call + if msg.get("role") == "tool" and msg.get("tool_call_id") == "call_3" + ][0] + assert "repeated external lookup blocked" in blocked_tool_message["content"] + + @pytest.mark.asyncio async def test_loop_max_iterations_message_stays_stable(tmp_path): loop = _make_loop(tmp_path) @@ -622,6 +772,57 @@ async def test_loop_stream_filter_handles_think_only_prefix_without_crashing(tmp assert endings == [False] +@pytest.mark.asyncio +async def test_loop_retries_think_only_final_response(tmp_path): + loop = _make_loop(tmp_path) + call_count = {"n": 0} + + async def chat_with_retry(**kwargs): + call_count["n"] += 1 + if call_count["n"] == 1: + return LLMResponse(content="hidden", tool_calls=[], usage={}) + return LLMResponse(content="Recovered answer", tool_calls=[], usage={}) + + loop.provider.chat_with_retry = chat_with_retry + + final_content, _, _ = await loop._run_agent_loop([]) + + assert final_content == "Recovered answer" + assert call_count["n"] == 2 + + +@pytest.mark.asyncio +async def test_runner_tool_error_sets_final_content(): + from nanobot.agent.runner import AgentRunSpec, AgentRunner + + provider = MagicMock() + + async def chat_with_retry(*, messages, **kwargs): + return LLMResponse( + content="working", + tool_calls=[ToolCallRequest(id="call_1", name="read_file", arguments={"path": "x"})], + usage={}, + ) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + tools.execute = AsyncMock(side_effect=RuntimeError("boom")) + + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "do task"}], + tools=tools, + model="test-model", + max_iterations=1, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + fail_on_tool_error=True, + )) + + assert result.final_content == "Error: RuntimeError: boom" + assert result.stop_reason == "tool_error" + + @pytest.mark.asyncio async def test_subagent_max_iterations_announces_existing_fallback(tmp_path, monkeypatch): from nanobot.agent.subagent import SubagentManager diff --git a/tests/test_openai_api.py b/tests/test_openai_api.py index 42fec33ed..2d4ae8580 100644 --- a/tests/test_openai_api.py +++ b/tests/test_openai_api.py @@ -347,6 +347,8 @@ async def test_empty_response_retry_then_success(aiohttp_client) -> None: @pytest.mark.skipif(not HAS_AIOHTTP, reason="aiohttp not installed") @pytest.mark.asyncio async def test_empty_response_falls_back(aiohttp_client) -> None: + from nanobot.utils.runtime import EMPTY_FINAL_RESPONSE_MESSAGE + call_count = 0 async def always_empty(content, session_key="", channel="", chat_id=""): @@ -367,5 +369,5 @@ async def test_empty_response_falls_back(aiohttp_client) -> None: ) assert resp.status == 200 body = await resp.json() - assert body["choices"][0]["message"]["content"] == "I've completed processing but have no response to give." + assert body["choices"][0]["message"]["content"] == EMPTY_FINAL_RESPONSE_MESSAGE assert call_count == 2