mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-19 16:12:30 +00:00
fix(agent): align LLM wall timeout with sustained goals for main + subagents
Centralize runner_wall_llm_timeout_s in session goal_state metadata helpers so spawned subagents inherit the same policy as AgentLoop without coupling to long_task. Pass optional resolver into SubagentManager and add tests. Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
parent
cf09a8d691
commit
e804f2fddb
@ -35,7 +35,7 @@ from nanobot.providers.factory import ProviderSnapshot
|
|||||||
from nanobot.session.goal_state import (
|
from nanobot.session.goal_state import (
|
||||||
goal_state_runtime_lines,
|
goal_state_runtime_lines,
|
||||||
goal_state_ws_blob,
|
goal_state_ws_blob,
|
||||||
sustained_goal_active,
|
runner_wall_llm_timeout_s,
|
||||||
)
|
)
|
||||||
from nanobot.session.manager import Session, SessionManager
|
from nanobot.session.manager import Session, SessionManager
|
||||||
from nanobot.utils.artifacts import generated_image_paths_from_messages
|
from nanobot.utils.artifacts import generated_image_paths_from_messages
|
||||||
@ -253,6 +253,7 @@ class AgentLoop:
|
|||||||
restrict_to_workspace=restrict_to_workspace,
|
restrict_to_workspace=restrict_to_workspace,
|
||||||
disabled_skills=disabled_skills,
|
disabled_skills=disabled_skills,
|
||||||
max_iterations=self.max_iterations,
|
max_iterations=self.max_iterations,
|
||||||
|
llm_wall_timeout_for_session=lambda sk: runner_wall_llm_timeout_s(self.sessions, sk),
|
||||||
)
|
)
|
||||||
self._unified_session = unified_session
|
self._unified_session = unified_session
|
||||||
self._max_messages = max_messages if max_messages > 0 else 120
|
self._max_messages = max_messages if max_messages > 0 else 120
|
||||||
@ -795,10 +796,10 @@ class AgentLoop:
|
|||||||
injection_callback=_drain_pending,
|
injection_callback=_drain_pending,
|
||||||
# Sustained goals may legitimately exceed NANOBOT_LLM_TIMEOUT_S; idle stall
|
# Sustained goals may legitimately exceed NANOBOT_LLM_TIMEOUT_S; idle stall
|
||||||
# is still capped by NANOBOT_STREAM_IDLE_TIMEOUT_S in streaming providers.
|
# is still capped by NANOBOT_STREAM_IDLE_TIMEOUT_S in streaming providers.
|
||||||
llm_timeout_s=(
|
llm_timeout_s=runner_wall_llm_timeout_s(
|
||||||
0.0
|
self.sessions,
|
||||||
if session is not None and sustained_goal_active(session.metadata)
|
session.key if session is not None else session_key,
|
||||||
else None
|
metadata=(session.metadata if session is not None else None),
|
||||||
),
|
),
|
||||||
))
|
))
|
||||||
finally:
|
finally:
|
||||||
|
|||||||
@ -6,7 +6,7 @@ import time
|
|||||||
import uuid
|
import uuid
|
||||||
from dataclasses import dataclass, field
|
from dataclasses import dataclass, field
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from typing import Any
|
from typing import Any, Callable
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
@ -79,6 +79,7 @@ class SubagentManager:
|
|||||||
restrict_to_workspace: bool = False,
|
restrict_to_workspace: bool = False,
|
||||||
disabled_skills: list[str] | None = None,
|
disabled_skills: list[str] | None = None,
|
||||||
max_iterations: int | None = None,
|
max_iterations: int | None = None,
|
||||||
|
llm_wall_timeout_for_session: Callable[[str | None], float | None] | None = None,
|
||||||
):
|
):
|
||||||
defaults = AgentDefaults()
|
defaults = AgentDefaults()
|
||||||
self.provider = provider
|
self.provider = provider
|
||||||
@ -96,6 +97,7 @@ class SubagentManager:
|
|||||||
)
|
)
|
||||||
self.max_concurrent_subagents = defaults.max_concurrent_subagents
|
self.max_concurrent_subagents = defaults.max_concurrent_subagents
|
||||||
self.runner = AgentRunner(provider)
|
self.runner = AgentRunner(provider)
|
||||||
|
self._llm_wall_timeout_for_session = llm_wall_timeout_for_session
|
||||||
self._running_tasks: dict[str, asyncio.Task[None]] = {}
|
self._running_tasks: dict[str, asyncio.Task[None]] = {}
|
||||||
self._task_statuses: dict[str, SubagentStatus] = {}
|
self._task_statuses: dict[str, SubagentStatus] = {}
|
||||||
self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...}
|
self._session_tasks: dict[str, set[str]] = {} # session_key -> {task_id, ...}
|
||||||
@ -196,6 +198,12 @@ class SubagentManager:
|
|||||||
{"role": "user", "content": task},
|
{"role": "user", "content": task},
|
||||||
]
|
]
|
||||||
|
|
||||||
|
sess_key = origin.get("session_key")
|
||||||
|
llm_timeout = (
|
||||||
|
self._llm_wall_timeout_for_session(sess_key)
|
||||||
|
if self._llm_wall_timeout_for_session
|
||||||
|
else None
|
||||||
|
)
|
||||||
result = await self.runner.run(AgentRunSpec(
|
result = await self.runner.run(AgentRunSpec(
|
||||||
initial_messages=messages,
|
initial_messages=messages,
|
||||||
tools=tools,
|
tools=tools,
|
||||||
@ -207,6 +215,8 @@ class SubagentManager:
|
|||||||
error_message=None,
|
error_message=None,
|
||||||
fail_on_tool_error=True,
|
fail_on_tool_error=True,
|
||||||
checkpoint_callback=_on_checkpoint,
|
checkpoint_callback=_on_checkpoint,
|
||||||
|
session_key=sess_key,
|
||||||
|
llm_timeout_s=llm_timeout,
|
||||||
))
|
))
|
||||||
status.phase = "done"
|
status.phase = "done"
|
||||||
status.stop_reason = result.stop_reason
|
status.stop_reason = result.stop_reason
|
||||||
|
|||||||
@ -1,8 +1,8 @@
|
|||||||
"""Session metadata helpers for sustained goals (e.g. ``long_task`` / ``complete_goal``).
|
"""Session metadata helpers for sustained goals (e.g. ``long_task`` / ``complete_goal``).
|
||||||
|
|
||||||
Tools set ``metadata[GOAL_STATE_KEY]``. Reads accept the legacy session key ``thread_goal``
|
Tools set ``metadata[GOAL_STATE_KEY]``. Reads accept the legacy session key ``thread_goal``
|
||||||
for older sessions. The agent uses ``goal_state_runtime_lines`` and
|
for older sessions. Callers use ``goal_state_runtime_lines``, ``goal_state_ws_blob``, and
|
||||||
``goal_state_ws_blob`` without importing tool implementations.
|
``runner_wall_llm_timeout_s`` without importing tool implementations.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
@ -10,6 +10,8 @@ from __future__ import annotations
|
|||||||
import json
|
import json
|
||||||
from typing import Any, Mapping, MutableMapping
|
from typing import Any, Mapping, MutableMapping
|
||||||
|
|
||||||
|
from nanobot.session.manager import SessionManager
|
||||||
|
|
||||||
GOAL_STATE_KEY = "goal_state"
|
GOAL_STATE_KEY = "goal_state"
|
||||||
# Older builds stored the same JSON blob under this key.
|
# Older builds stored the same JSON blob under this key.
|
||||||
_LEGACY_GOAL_STATE_SESSION_KEY = "thread_goal"
|
_LEGACY_GOAL_STATE_SESSION_KEY = "thread_goal"
|
||||||
@ -89,3 +91,21 @@ def goal_state_ws_blob(metadata: Mapping[str, Any] | None) -> dict[str, Any]:
|
|||||||
blob["objective"] = objective
|
blob["objective"] = objective
|
||||||
return blob
|
return blob
|
||||||
return {"active": False}
|
return {"active": False}
|
||||||
|
|
||||||
|
|
||||||
|
def runner_wall_llm_timeout_s(
|
||||||
|
sessions: SessionManager,
|
||||||
|
session_key: str | None,
|
||||||
|
*,
|
||||||
|
metadata: Mapping[str, Any] | None = None,
|
||||||
|
) -> float | None:
|
||||||
|
"""Wall-clock cap for :class:`~nanobot.agent.runner.AgentRunner` when streaming an LLM.
|
||||||
|
|
||||||
|
Returns ``0.0`` to disable ``asyncio.wait_for`` around the request when a sustained goal is
|
||||||
|
active; ``None`` means use ``NANOBOT_LLM_TIMEOUT_S``. Pass in-memory ``metadata`` when the
|
||||||
|
caller already holds :attr:`~nanobot.session.manager.Session.metadata` for this turn.
|
||||||
|
"""
|
||||||
|
meta: Mapping[str, Any] | None = metadata
|
||||||
|
if meta is None and session_key:
|
||||||
|
meta = sessions.get_or_create(session_key).metadata
|
||||||
|
return 0.0 if sustained_goal_active(meta) else None
|
||||||
|
|||||||
46
tests/agent/test_loop_goal_wall_timeout.py
Normal file
46
tests/agent/test_loop_goal_wall_timeout.py
Normal file
@ -0,0 +1,46 @@
|
|||||||
|
"""Subagent forwards loop-provided LLM wall-timeout resolver into AgentRunSpec."""
|
||||||
|
|
||||||
|
from pathlib import Path
|
||||||
|
from unittest.mock import AsyncMock, MagicMock
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from nanobot.agent.runner import AgentRunResult
|
||||||
|
from nanobot.agent.subagent import SubagentManager, SubagentStatus
|
||||||
|
from nanobot.bus.queue import MessageBus
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_subagent_forwards_resolver_to_agent_run_spec(tmp_path: Path) -> None:
|
||||||
|
provider = MagicMock()
|
||||||
|
provider.get_default_model.return_value = "m"
|
||||||
|
mgr = SubagentManager(
|
||||||
|
provider=provider,
|
||||||
|
workspace=tmp_path,
|
||||||
|
bus=MessageBus(),
|
||||||
|
max_tool_result_chars=64,
|
||||||
|
llm_wall_timeout_for_session=lambda sk: 0.0 if sk == "cli:direct" else None,
|
||||||
|
)
|
||||||
|
|
||||||
|
mgr.runner.run = AsyncMock(
|
||||||
|
return_value=AgentRunResult(final_content="ok", messages=[], stop_reason="completed")
|
||||||
|
)
|
||||||
|
mgr._announce_result = AsyncMock()
|
||||||
|
|
||||||
|
status = SubagentStatus(
|
||||||
|
task_id="t1",
|
||||||
|
label="lbl",
|
||||||
|
task_description="task",
|
||||||
|
started_at=0.0,
|
||||||
|
)
|
||||||
|
await mgr._run_subagent(
|
||||||
|
"t1",
|
||||||
|
"task",
|
||||||
|
"lbl",
|
||||||
|
{"channel": "cli", "chat_id": "direct", "session_key": "cli:direct"},
|
||||||
|
status,
|
||||||
|
)
|
||||||
|
mgr.runner.run.assert_called_once()
|
||||||
|
spec = mgr.runner.run.call_args[0][0]
|
||||||
|
assert spec.session_key == "cli:direct"
|
||||||
|
assert spec.llm_timeout_s == 0.0
|
||||||
@ -8,8 +8,10 @@ from nanobot.session.goal_state import (
|
|||||||
goal_state_runtime_lines,
|
goal_state_runtime_lines,
|
||||||
goal_state_ws_blob,
|
goal_state_ws_blob,
|
||||||
parse_goal_state,
|
parse_goal_state,
|
||||||
|
runner_wall_llm_timeout_s,
|
||||||
sustained_goal_active,
|
sustained_goal_active,
|
||||||
)
|
)
|
||||||
|
from nanobot.session.manager import SessionManager
|
||||||
|
|
||||||
|
|
||||||
def test_runtime_lines_empty_when_no_metadata():
|
def test_runtime_lines_empty_when_no_metadata():
|
||||||
@ -105,3 +107,25 @@ def test_sustained_goal_active_true_when_active():
|
|||||||
def test_sustained_goal_active_respects_legacy_thread_goal_key():
|
def test_sustained_goal_active_respects_legacy_thread_goal_key():
|
||||||
meta = {"thread_goal": {"status": "active", "objective": "Legacy."}}
|
meta = {"thread_goal": {"status": "active", "objective": "Legacy."}}
|
||||||
assert sustained_goal_active(meta) is True
|
assert sustained_goal_active(meta) is True
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_wall_llm_timeout_uses_metadata_override(tmp_path):
|
||||||
|
sm = SessionManager(tmp_path)
|
||||||
|
assert (
|
||||||
|
runner_wall_llm_timeout_s(
|
||||||
|
sm,
|
||||||
|
"cli:test",
|
||||||
|
metadata={GOAL_STATE_KEY: {"status": "active", "objective": "x"}},
|
||||||
|
)
|
||||||
|
== 0.0
|
||||||
|
)
|
||||||
|
assert runner_wall_llm_timeout_s(sm, "cli:test", metadata={}) is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_runner_wall_llm_timeout_reads_session_when_metadata_missing(tmp_path):
|
||||||
|
sm = SessionManager(tmp_path)
|
||||||
|
sess = sm.get_or_create("c:d")
|
||||||
|
sess.metadata = {GOAL_STATE_KEY: {"status": "active", "objective": "z"}}
|
||||||
|
assert runner_wall_llm_timeout_s(sm, "c:d") == 0.0
|
||||||
|
sess.metadata = {}
|
||||||
|
assert runner_wall_llm_timeout_s(sm, "c:d") is None
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user