fix(agent): deliver LLM errors to streaming channels and avoid polluting session context

When the LLM returns an error (e.g. 429 quota exceeded, stream timeout),
streaming channels silently drop the error message because `_streamed=True`
is set in metadata even though no content was actually streamed.

This change:
- Skips setting `_streamed` when stop_reason is "error", so error messages
  go through the normal channel.send() path and reach the user
- Stops appending error content to session history, preventing error
  messages from polluting subsequent conversation context
- Exposes stop_reason from _run_agent_loop to enable the above check
This commit is contained in:
yanghan-cyber 2026-04-09 15:32:03 +08:00 committed by Xubin Ren
parent ba8bce0f45
commit 10f6c875a5
5 changed files with 12 additions and 13 deletions

View File

@ -308,7 +308,7 @@ class AgentLoop:
channel: str = "cli",
chat_id: str = "direct",
message_id: str | None = None,
) -> tuple[str | None, list[str], list[dict]]:
) -> tuple[str | None, list[str], list[dict], str]:
"""Run the agent iteration loop.
*on_stream*: called with each content delta during streaming.
@ -358,7 +358,7 @@ class AgentLoop:
logger.warning("Max iterations ({}) reached", self.max_iterations)
elif result.stop_reason == "error":
logger.error("LLM returned error: {}", (result.final_content or "")[:200])
return result.final_content, result.tools_used, result.messages
return result.final_content, result.tools_used, result.messages, result.stop_reason
async def run(self) -> None:
"""Run the agent loop, dispatching messages as tasks to stay responsive to /stop."""
@ -505,7 +505,7 @@ class AgentLoop:
current_message=msg.content, channel=channel, chat_id=chat_id,
current_role=current_role,
)
final_content, _, all_msgs = await self._run_agent_loop(
final_content, _, all_msgs, _ = await self._run_agent_loop(
messages, session=session, channel=channel, chat_id=chat_id,
message_id=msg.metadata.get("message_id"),
)
@ -553,7 +553,7 @@ class AgentLoop:
channel=msg.channel, chat_id=msg.chat_id, content=content, metadata=meta,
))
final_content, _, all_msgs = await self._run_agent_loop(
final_content, _, all_msgs, stop_reason = await self._run_agent_loop(
initial_messages,
on_progress=on_progress or _bus_progress,
on_stream=on_stream,
@ -578,7 +578,7 @@ class AgentLoop:
logger.info("Response to {}:{}: {}", msg.channel, msg.sender_id, preview)
meta = dict(msg.metadata or {})
if on_stream is not None:
if on_stream is not None and stop_reason != "error":
meta["_streamed"] = True
return OutboundMessage(
channel=msg.channel, chat_id=msg.chat_id, content=final_content,

View File

@ -257,7 +257,6 @@ class AgentRunner:
final_content = clean or spec.error_message or _DEFAULT_ERROR_MESSAGE
stop_reason = "error"
error = final_content
self._append_final_message(messages, final_content)
context.final_content = final_content
context.error = error
context.stop_reason = stop_reason

View File

@ -307,7 +307,7 @@ async def test_agent_loop_extra_hook_receives_calls(tmp_path):
)
loop.tools.get_definitions = MagicMock(return_value=[])
content, tools_used, messages = await loop._run_agent_loop(
content, tools_used, messages, _ = await loop._run_agent_loop(
[{"role": "user", "content": "hi"}]
)
@ -331,7 +331,7 @@ async def test_agent_loop_extra_hook_error_isolation(tmp_path):
)
loop.tools.get_definitions = MagicMock(return_value=[])
content, _, _ = await loop._run_agent_loop(
content, _, _, _ = await loop._run_agent_loop(
[{"role": "user", "content": "hi"}]
)
@ -373,7 +373,7 @@ async def test_agent_loop_no_hooks_backward_compat(tmp_path):
loop.tools.execute = AsyncMock(return_value="ok")
loop.max_iterations = 2
content, tools_used, _ = await loop._run_agent_loop([])
content, tools_used, _, _ = await loop._run_agent_loop([])
assert content == (
"I reached the maximum number of tool call iterations (2) "
"without completing the task. You can try breaking the task into smaller steps."

View File

@ -798,7 +798,7 @@ async def test_loop_max_iterations_message_stays_stable(tmp_path):
loop.tools.execute = AsyncMock(return_value="ok")
loop.max_iterations = 2
final_content, _, _ = await loop._run_agent_loop([])
final_content, _, _, _ = await loop._run_agent_loop([])
assert final_content == (
"I reached the maximum number of tool call iterations (2) "
@ -825,7 +825,7 @@ async def test_loop_stream_filter_handles_think_only_prefix_without_crashing(tmp
async def on_stream_end(*, resuming: bool = False) -> None:
endings.append(resuming)
final_content, _, _ = await loop._run_agent_loop(
final_content, _, _, _ = await loop._run_agent_loop(
[],
on_stream=on_stream,
on_stream_end=on_stream_end,
@ -849,7 +849,7 @@ async def test_loop_retries_think_only_final_response(tmp_path):
loop.provider.chat_with_retry = chat_with_retry
final_content, _, _ = await loop._run_agent_loop([])
final_content, _, _, _ = await loop._run_agent_loop([])
assert final_content == "Recovered answer"
assert call_count["n"] == 2

View File

@ -107,7 +107,7 @@ class TestMessageToolSuppressLogic:
async def on_progress(content: str, *, tool_hint: bool = False) -> None:
progress.append((content, tool_hint))
final_content, _, _ = await loop._run_agent_loop([], on_progress=on_progress)
final_content, _, _, _ = await loop._run_agent_loop([], on_progress=on_progress)
assert final_content == "Done"
assert progress == [