mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-01 15:25:56 +00:00
fix: 错误消息流转路径:1. 当 LLM 服务出现临时性错误(如网络波动、超时、429限流等)时, base.py 中的 _run_with_retry 方法会启动重试机制。2. 在重试等待期间, _sleep_with_heartbeat 方法会周期性调用 on_retry_wait 回调函数,发送类似 'Model request failed, retry in 1s (attempt 1)' 的心跳消息。3. 之前 on_retry_wait 参数被错误地绑定到 _bus_progress ,导致这些内部诊断消息被当作普通进度消息发送到飞书客户端。4. manager.py 的消息分发器没有过滤这类重试心跳消息。 修复方案:1. loop.py - 新增重试等待回调- 新增独立的 _on_retry_wait 回调函数,为重试消息添加 _retry_wait: True 元数据标识- 在 AgentRunSpec 中传入 retry_wait_callback 参数。2. runner.py - 支持重试回调参数- 在 AgentRunSpec 数据类中新增 retry_wait_callback 字段- 在 _build_request_kwargs 中将 on_retry_wait 参数从 progress_callback 改为 retry_wait_callback。3. manager.py - 过滤重试心跳消息- 在 _dispatch_outbound 方法中新增过滤逻辑,丢弃所有带 _retry_wait 标识的消息,确保重试心跳不会发送到任何客户端。
This commit is contained in:
parent
c8d834a504
commit
9c19de67bf
@ -351,6 +351,7 @@ class AgentLoop:
|
||||
on_progress: Callable[..., Awaitable[None]] | None = None,
|
||||
on_stream: Callable[[str], Awaitable[None]] | None = None,
|
||||
on_stream_end: Callable[..., Awaitable[None]] | None = None,
|
||||
on_retry_wait: Callable[[str], Awaitable[None]] | None = None,
|
||||
*,
|
||||
session: Session | None = None,
|
||||
channel: str = "cli",
|
||||
@ -428,6 +429,7 @@ class AgentLoop:
|
||||
context_block_limit=self.context_block_limit,
|
||||
provider_retry_mode=self.provider_retry_mode,
|
||||
progress_callback=on_progress,
|
||||
retry_wait_callback=on_retry_wait,
|
||||
checkpoint_callback=_checkpoint,
|
||||
injection_callback=_drain_pending,
|
||||
))
|
||||
@ -738,6 +740,18 @@ class AgentLoop:
|
||||
)
|
||||
)
|
||||
|
||||
async def _on_retry_wait(content: str) -> None:
|
||||
meta = dict(msg.metadata or {})
|
||||
meta["_retry_wait"] = True
|
||||
await self.bus.publish_outbound(
|
||||
OutboundMessage(
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
content=content,
|
||||
metadata=meta,
|
||||
)
|
||||
)
|
||||
|
||||
# Persist the triggering user message immediately, before running the
|
||||
# agent loop. If the process is killed mid-turn (OOM, SIGKILL, self-
|
||||
# restart, etc.), the existing runtime_checkpoint preserves the
|
||||
@ -756,6 +770,7 @@ class AgentLoop:
|
||||
on_progress=on_progress or _bus_progress,
|
||||
on_stream=on_stream,
|
||||
on_stream_end=on_stream_end,
|
||||
on_retry_wait=_on_retry_wait,
|
||||
session=session,
|
||||
channel=msg.channel,
|
||||
chat_id=msg.chat_id,
|
||||
|
||||
@ -71,6 +71,7 @@ class AgentRunSpec:
|
||||
context_block_limit: int | None = None
|
||||
provider_retry_mode: str = "standard"
|
||||
progress_callback: Any | None = None
|
||||
retry_wait_callback: Any | None = None
|
||||
checkpoint_callback: Any | None = None
|
||||
injection_callback: Any | None = None
|
||||
|
||||
@ -552,7 +553,7 @@ class AgentRunner:
|
||||
"tools": tools,
|
||||
"model": spec.model,
|
||||
"retry_mode": spec.provider_retry_mode,
|
||||
"on_retry_wait": spec.progress_callback,
|
||||
"on_retry_wait": spec.retry_wait_callback,
|
||||
}
|
||||
if spec.temperature is not None:
|
||||
kwargs["temperature"] = spec.temperature
|
||||
|
||||
@ -189,6 +189,9 @@ class ChannelManager:
|
||||
if not msg.metadata.get("_tool_hint") and not self.config.channels.send_progress:
|
||||
continue
|
||||
|
||||
if msg.metadata.get("_retry_wait"):
|
||||
continue
|
||||
|
||||
# Coalesce consecutive _stream_delta messages for the same (channel, chat_id)
|
||||
# to reduce API calls and improve streaming latency
|
||||
if msg.metadata.get("_stream_delta") and not msg.metadata.get("_stream_end"):
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user