From be2e0172d1a9360418d49253d82b7af55d7d38f6 Mon Sep 17 00:00:00 2001
From: Xubin Ren <52506698+Re-bin@users.noreply.github.com>
Date: Mon, 1 Jun 2026 00:56:39 +0800
Subject: [PATCH] fix(agent): extend sustained goal iteration budget
---
nanobot/agent/loop.py | 61 ++++-
nanobot/session/goal_state.py | 23 +-
nanobot/session/turn_continuation.py | 240 ++++++++++++++++++
nanobot/session/webui_turns.py | 23 +-
tests/agent/test_loop_runner_integration.py | 25 +-
tests/agent/test_loop_save_turn.py | 224 ++++++++++++++++
tests/session/test_turn_continuation.py | 127 +++++++++
tests/utils/test_webui_turn_helpers.py | 13 +
.../src/components/thread/ThreadComposer.tsx | 11 +-
webui/src/globals.css | 56 ++++
webui/src/tests/thread-composer.test.tsx | 1 +
11 files changed, 783 insertions(+), 21 deletions(-)
create mode 100644 nanobot/session/turn_continuation.py
create mode 100644 tests/session/test_turn_continuation.py
diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py
index 159423800..9d3f5771f 100644
--- a/nanobot/agent/loop.py
+++ b/nanobot/agent/loop.py
@@ -45,6 +45,7 @@ from nanobot.session.goal_state import (
sustained_goal_active,
)
from nanobot.session.manager import Session, SessionManager
+from nanobot.session import turn_continuation
from nanobot.session.webui_turns import (
WebuiTurnCoordinator,
build_bus_progress_callback,
@@ -112,6 +113,7 @@ class TurnContext:
save_skip: int = 0
outbound: OutboundMessage | None = None
+ suppress_response: bool = False
on_progress: Callable[..., Awaitable[None]] | None = None
on_stream: Callable[[str], Awaitable[None]] | None = None
@@ -121,6 +123,7 @@ class TurnContext:
pending_queue: asyncio.Queue | None = None
pending_summary: str | None = None
turn_wall_started_at: float = field(default_factory=time.time)
+ visible_run_started_at: float | None = None
turn_latency_ms: int | None = None
trace: list[StateTraceEntry] = field(default_factory=list)
@@ -565,6 +568,8 @@ class AgentLoop:
Returns True if the message was persisted.
"""
+ if not turn_continuation.should_persist_user_message(msg.metadata):
+ return False
media_paths = [p for p in (msg.media or []) if isinstance(p, str) and p]
has_text = isinstance(msg.content, str) and msg.content.strip()
if has_text or media_paths:
@@ -771,6 +776,7 @@ class AgentLoop:
+ "\n\nPlease continue working toward the objective using your tools, "
"or call complete_goal if the work is truly finished."
) if _goal_lines else SUSTAINED_GOAL_CONTINUE_PROMPT
+ session_metadata = session.metadata if session is not None else None
try:
result = await self.runner.run(AgentRunSpec(
initial_messages=initial_messages,
@@ -796,7 +802,8 @@ class AgentLoop:
llm_timeout_s=runner_wall_llm_timeout_s(
self.sessions,
session.key if session is not None else session_key,
- metadata=(session.metadata if session is not None else None),
+ metadata=session_metadata,
+ message_metadata=metadata,
),
goal_active_predicate=lambda: sustained_goal_active(session.metadata) if session is not None else False,
goal_continue_message=_goal_continue,
@@ -808,9 +815,15 @@ class AgentLoop:
self._last_usage = result.usage
if result.stop_reason == "max_iterations":
logger.warning("Max iterations ({}) reached", self.max_iterations)
+ should_stream = turn_continuation.should_stream_budget_response(
+ stop_reason=result.stop_reason,
+ pending_queue_available=pending_queue is not None and session is not None,
+ session_metadata=session_metadata,
+ message_metadata=metadata,
+ )
# Push final content through stream so streaming channels (e.g. Feishu)
# update the card instead of leaving it empty.
- if on_stream and on_stream_end:
+ if on_stream and on_stream_end and should_stream:
await on_stream(result.final_content or "")
await on_stream_end(resuming=False)
elif result.stop_reason == "error":
@@ -953,7 +966,8 @@ class AgentLoop:
channel=msg.channel, chat_id=msg.chat_id,
content="", metadata=msg.metadata or {},
))
- if msg.channel == "websocket":
+ continuing = turn_continuation.internal_continuation_pending(msg.metadata)
+ if msg.channel == "websocket" and not continuing:
turn_lat = self._pending_turn_latency_ms.pop(session_key, None)
await self._webui_turns.handle_turn_end(
msg,
@@ -1017,9 +1031,10 @@ class AgentLoop:
"Re-published {} leftover message(s) to bus for session {}",
leftover, session_key,
)
- await self._webui_turns.publish_run_status(msg, "idle")
- self._pending_turn_latency_ms.pop(session_key, None)
- self._webui_turns.discard(session_key)
+ if not turn_continuation.internal_continuation_pending(msg.metadata):
+ await self._webui_turns.publish_run_status(msg, "idle")
+ self._pending_turn_latency_ms.pop(session_key, None)
+ self._webui_turns.discard(session_key)
finally:
if pending is None:
await self._webui_turns.publish_run_status(msg, "idle")
@@ -1167,12 +1182,17 @@ class AgentLoop:
)
key = session_key or msg.session_key
+ t0 = time.time()
ctx = TurnContext(
msg=msg,
session=None,
session_key=key,
state=TurnState.RESTORE,
turn_id=f"{key}:{time.time_ns()}",
+ turn_wall_started_at=t0,
+ visible_run_started_at=turn_continuation.internal_continuation_run_started_at(
+ msg.metadata,
+ ),
on_progress=on_progress,
on_stream=on_stream,
on_stream_end=on_stream_end,
@@ -1378,7 +1398,13 @@ class AgentLoop:
return "ok"
async def _state_run(self, ctx: TurnContext) -> str:
- await self._webui_turns.publish_run_status(ctx.msg, "running")
+ if ctx.visible_run_started_at is None:
+ ctx.visible_run_started_at = time.time()
+ await self._webui_turns.publish_run_status(
+ ctx.msg,
+ "running",
+ started_at=ctx.visible_run_started_at,
+ )
result = await self._run_agent_loop(
ctx.initial_messages,
on_progress=ctx.on_progress,
@@ -1399,15 +1425,25 @@ class AgentLoop:
ctx.all_messages = all_msgs
ctx.stop_reason = stop_reason
ctx.had_injections = had_injections
+ await turn_continuation.maybe_continue_turn(ctx)
return "ok"
async def _state_save(self, ctx: TurnContext) -> str:
- if ctx.final_content is None or not ctx.final_content.strip():
+ turn_continuation.prepare_save_boundary(ctx)
+
+ if (
+ (ctx.final_content is None or not ctx.final_content.strip())
+ and not ctx.suppress_response
+ ):
ctx.final_content = EMPTY_FINAL_RESPONSE_MESSAGE
- ctx.save_skip = 1 + len(ctx.history) + (1 if ctx.user_persisted_early else 0)
-
- ctx.turn_latency_ms = max(0, int((time.time() - ctx.turn_wall_started_at) * 1000))
+ latency_started_at = (
+ ctx.visible_run_started_at
+ if turn_continuation.internal_continuation_inbound(ctx.msg.metadata)
+ and ctx.visible_run_started_at is not None
+ else ctx.turn_wall_started_at
+ )
+ ctx.turn_latency_ms = max(0, int((time.time() - latency_started_at) * 1000))
self._save_turn(
ctx.session, ctx.all_messages, ctx.save_skip,
turn_latency_ms=ctx.turn_latency_ms,
@@ -1427,6 +1463,9 @@ class AgentLoop:
return "ok"
async def _state_respond(self, ctx: TurnContext) -> str:
+ if ctx.suppress_response:
+ ctx.outbound = None
+ return "ok"
ctx.outbound = self._assemble_outbound(
ctx.msg,
ctx.final_content,
diff --git a/nanobot/session/goal_state.py b/nanobot/session/goal_state.py
index a5e382f25..2ef21bd14 100644
--- a/nanobot/session/goal_state.py
+++ b/nanobot/session/goal_state.py
@@ -43,6 +43,19 @@ def sustained_goal_active(metadata: Mapping[str, Any] | None) -> bool:
return isinstance(goal, dict) and goal.get("status") == "active"
+def sustained_goal_turn(
+ metadata: Mapping[str, Any] | None,
+ *,
+ message_metadata: Mapping[str, Any] | None = None,
+) -> bool:
+ """True when this turn should use sustained-goal runtime limits."""
+ if sustained_goal_active(metadata):
+ return True
+ if not message_metadata:
+ return False
+ return str(message_metadata.get("original_command") or "").strip() == "/goal"
+
+
def parse_goal_state(blob: Any) -> dict[str, Any] | None:
if blob is None:
return None
@@ -98,14 +111,16 @@ def runner_wall_llm_timeout_s(
session_key: str | None,
*,
metadata: Mapping[str, Any] | None = None,
+ message_metadata: Mapping[str, Any] | None = None,
) -> float | None:
"""Wall-clock cap for :class:`~nanobot.agent.runner.AgentRunner` when streaming an LLM.
- Returns ``0.0`` to disable ``asyncio.wait_for`` around the request when a sustained goal is
- active; ``None`` means use ``NANOBOT_LLM_TIMEOUT_S``. Pass in-memory ``metadata`` when the
- caller already holds :attr:`~nanobot.session.manager.Session.metadata` for this turn.
+ Returns ``0.0`` to disable ``asyncio.wait_for`` around the request when this is a
+ sustained-goal turn; ``None`` means use ``NANOBOT_LLM_TIMEOUT_S``. Pass in-memory
+ ``metadata`` when the caller already holds :attr:`~nanobot.session.manager.Session.metadata`
+ for this turn.
"""
meta: Mapping[str, Any] | None = metadata
if meta is None and session_key:
meta = sessions.get_or_create(session_key).metadata
- return 0.0 if sustained_goal_active(meta) else None
+ return 0.0 if sustained_goal_turn(meta, message_metadata=message_metadata) else None
diff --git a/nanobot/session/turn_continuation.py b/nanobot/session/turn_continuation.py
new file mode 100644
index 000000000..28c77bf64
--- /dev/null
+++ b/nanobot/session/turn_continuation.py
@@ -0,0 +1,240 @@
+"""Internal turn continuation helpers.
+
+This module keeps budget-boundary continuation policy out of ``AgentLoop``.
+The loop calls a small set of helpers; those helpers decide whether an internal
+continuation is allowed and, when it is, queue the next turn directly.
+"""
+
+from __future__ import annotations
+
+import dataclasses
+from typing import Any, Mapping, MutableMapping
+
+from loguru import logger
+
+from nanobot.session.goal_state import (
+ goal_state_runtime_lines,
+ sustained_goal_active,
+ sustained_goal_turn,
+)
+
+INTERNAL_CONTINUATION_META = "_internal_continuation"
+INTERNAL_CONTINUATION_KIND_META = "_internal_continuation_kind"
+INTERNAL_CONTINUATION_PENDING_META = "_internal_continuation_pending"
+INTERNAL_CONTINUATION_RUN_STARTED_AT_META = "_internal_continuation_run_started_at"
+
+_GOAL_CONTINUATION_KIND = "sustained_goal"
+_GOAL_CONTINUATION_SENDER = "system:continuation"
+_GOAL_CONTINUATION_ROUNDS_KEY = "_sustained_goal_continuation_rounds"
+_MAX_GOAL_CONTINUATION_ROUNDS = 12
+_STRIPPED_INBOUND_META_KEYS = {
+ "_stream_id",
+ "_stream_delta",
+ "_stream_end",
+ "_resuming",
+ INTERNAL_CONTINUATION_PENDING_META,
+}
+
+
+def internal_continuation_inbound(metadata: Mapping[str, Any] | None) -> bool:
+ """True for an inbound message created by an internal continuation policy."""
+ return bool(metadata and metadata.get(INTERNAL_CONTINUATION_META) is True)
+
+
+def internal_continuation_pending(metadata: Mapping[str, Any] | None) -> bool:
+ """True when the current turn scheduled an invisible continuation slice."""
+ return bool(metadata and metadata.get(INTERNAL_CONTINUATION_PENDING_META) is True)
+
+
+def internal_continuation_run_started_at(metadata: Mapping[str, Any] | None) -> float | None:
+ """Return the user-visible run start propagated across continuation slices."""
+ if not metadata:
+ return None
+ value = metadata.get(INTERNAL_CONTINUATION_RUN_STARTED_AT_META)
+ if not isinstance(value, int | float):
+ return None
+ started_at = float(value)
+ return started_at if started_at > 0 else None
+
+
+def should_persist_user_message(metadata: Mapping[str, Any] | None) -> bool:
+ """Return whether this inbound message should be persisted as user input."""
+ return not internal_continuation_inbound(metadata)
+
+
+def should_stream_budget_response(
+ *,
+ stop_reason: str,
+ pending_queue_available: bool,
+ session_metadata: Mapping[str, Any] | None,
+ message_metadata: Mapping[str, Any] | None = None,
+) -> bool:
+ """Return whether the budget-boundary response should be sent to the user."""
+ return not _continuation_available(
+ stop_reason=stop_reason,
+ pending_queue_available=pending_queue_available,
+ session_metadata=session_metadata,
+ message_metadata=message_metadata,
+ )
+
+
+async def maybe_continue_turn(ctx: Any) -> bool:
+ """Queue an internal continuation for *ctx* when policy allows it."""
+ if ctx.session is None or ctx.pending_queue is None:
+ return False
+ if not _continuation_available(
+ stop_reason=ctx.stop_reason,
+ pending_queue_available=True,
+ session_metadata=ctx.session.metadata,
+ message_metadata=ctx.msg.metadata,
+ ):
+ return False
+
+ metadata = _internal_continuation_metadata(
+ ctx.msg.metadata,
+ run_started_at=getattr(ctx, "visible_run_started_at", None),
+ )
+ content = _goal_continuation_prompt(ctx.session.metadata)
+ messages = _strip_terminal_assistant(ctx.all_messages, ctx.final_content)
+ _increment_goal_continuation_round(ctx.session.metadata)
+
+ logger.info("Turn budget reached; scheduling internal continuation")
+ ctx.msg.metadata[INTERNAL_CONTINUATION_PENDING_META] = True
+ ctx.final_content = ""
+ ctx.all_messages = messages
+ ctx.suppress_response = True
+ await ctx.pending_queue.put(
+ dataclasses.replace(
+ ctx.msg,
+ sender_id=_GOAL_CONTINUATION_SENDER,
+ content=content,
+ media=[],
+ metadata=metadata,
+ session_key_override=ctx.session_key,
+ )
+ )
+ return True
+
+
+def prepare_save_boundary(ctx: Any) -> None:
+ """Prepare continuation bookkeeping and the history append boundary."""
+ if ctx.session is not None:
+ clear_internal_continuation_state(ctx.session.metadata)
+
+ ctx.save_skip = _save_skip_for_turn(
+ message_metadata=ctx.msg.metadata,
+ initial_message_count=len(ctx.initial_messages),
+ history_count=len(ctx.history),
+ user_persisted_early=ctx.user_persisted_early,
+ )
+
+
+def _continuation_available(
+ *,
+ stop_reason: str,
+ pending_queue_available: bool,
+ session_metadata: Mapping[str, Any] | None,
+ message_metadata: Mapping[str, Any] | None = None,
+) -> bool:
+ if stop_reason != "max_iterations" or not pending_queue_available:
+ return False
+ return _goal_continuation_available(
+ session_metadata,
+ message_metadata=message_metadata,
+ )
+
+
+def clear_internal_continuation_state(metadata: MutableMapping[str, Any]) -> None:
+ """Reset policy bookkeeping once its owning runtime mode is inactive."""
+ if not sustained_goal_active(metadata):
+ metadata.pop(_GOAL_CONTINUATION_ROUNDS_KEY, None)
+
+
+def _save_skip_for_turn(
+ *,
+ message_metadata: Mapping[str, Any] | None,
+ initial_message_count: int,
+ history_count: int,
+ user_persisted_early: bool,
+) -> int:
+ """Return the persisted-message append boundary for this turn."""
+ if internal_continuation_inbound(message_metadata):
+ return initial_message_count
+ return 1 + history_count + (1 if user_persisted_early else 0)
+
+
+def _goal_continuation_available(
+ session_metadata: Mapping[str, Any] | None,
+ *,
+ message_metadata: Mapping[str, Any] | None = None,
+ max_rounds: int = _MAX_GOAL_CONTINUATION_ROUNDS,
+) -> bool:
+ if not sustained_goal_turn(session_metadata, message_metadata=message_metadata):
+ return False
+ if not sustained_goal_active(session_metadata):
+ return False
+ try:
+ rounds = int((session_metadata or {}).get(_GOAL_CONTINUATION_ROUNDS_KEY) or 0)
+ except (TypeError, ValueError):
+ rounds = 0
+ return rounds < max(0, max_rounds)
+
+
+def _increment_goal_continuation_round(session_metadata: MutableMapping[str, Any]) -> None:
+ try:
+ rounds = int(session_metadata.get(_GOAL_CONTINUATION_ROUNDS_KEY) or 0)
+ except (TypeError, ValueError):
+ rounds = 0
+ session_metadata[_GOAL_CONTINUATION_ROUNDS_KEY] = rounds + 1
+
+
+def _internal_continuation_metadata(
+ message_metadata: Mapping[str, Any] | None,
+ *,
+ run_started_at: float | None = None,
+) -> dict[str, Any]:
+ metadata = dict(message_metadata or {})
+ metadata[INTERNAL_CONTINUATION_META] = True
+ metadata[INTERNAL_CONTINUATION_KIND_META] = _GOAL_CONTINUATION_KIND
+ if run_started_at is not None:
+ metadata[INTERNAL_CONTINUATION_RUN_STARTED_AT_META] = float(run_started_at)
+ for key in _STRIPPED_INBOUND_META_KEYS:
+ metadata.pop(key, None)
+ return metadata
+
+
+def _goal_continuation_prompt(metadata: Mapping[str, Any] | None) -> str:
+ lines = goal_state_runtime_lines(metadata)
+ if lines:
+ goal = "\n".join(lines)
+ return (
+ "Continue the active sustained goal after the previous turn reached "
+ "its tool-call budget.\n\n"
+ f"{goal}\n\n"
+ "Continue from the saved context. Do not mention the continuation "
+ "boundary to the user. Use tools as needed, and call complete_goal "
+ "when the objective is truly finished."
+ )
+ return (
+ "Continue the active sustained goal after the previous turn reached "
+ "its tool-call budget. Continue from the saved context. Do not mention "
+ "the continuation boundary to the user. Use tools as needed, and call "
+ "complete_goal when the objective is truly finished."
+ )
+
+
+def _strip_terminal_assistant(
+ messages: list[dict[str, Any]],
+ final_content: str | None,
+) -> list[dict[str, Any]]:
+ """Drop the synthetic max-iteration assistant message before saving history."""
+ if not messages:
+ return messages
+ last = messages[-1]
+ if last.get("role") != "assistant":
+ return messages
+ if final_content is None or last.get("content") != final_content:
+ return messages
+ if last.get("tool_calls"):
+ return messages
+ return messages[:-1]
diff --git a/nanobot/session/webui_turns.py b/nanobot/session/webui_turns.py
index 864ef6a35..47614f0b1 100644
--- a/nanobot/session/webui_turns.py
+++ b/nanobot/session/webui_turns.py
@@ -178,7 +178,13 @@ def websocket_turn_wall_started_at(chat_id: str) -> float | None:
return _WEBSOCKET_TURN_WALL_STARTED_AT.get(chat_id)
-async def publish_turn_run_status(bus: MessageBus, msg: InboundMessage, status: str) -> None:
+async def publish_turn_run_status(
+ bus: MessageBus,
+ msg: InboundMessage,
+ status: str,
+ *,
+ started_at: float | None = None,
+) -> None:
"""Notify WebSocket clients while a user turn is executing (timing strip)."""
if msg.channel != "websocket":
return
@@ -189,7 +195,10 @@ async def publish_turn_run_status(bus: MessageBus, msg: InboundMessage, status:
"goal_status": status,
}
if status == "running":
- t0 = time.time()
+ if isinstance(started_at, int | float) and started_at > 0:
+ t0 = float(started_at)
+ else:
+ t0 = time.time()
meta["started_at"] = t0
_WEBSOCKET_TURN_WALL_STARTED_AT[cid] = t0
else:
@@ -300,8 +309,14 @@ class WebuiTurnCoordinator:
def discard(self, session_key: str) -> None:
self._title_contexts.pop(session_key, None)
- async def publish_run_status(self, msg: InboundMessage, status: str) -> None:
- await publish_turn_run_status(self.bus, msg, status)
+ async def publish_run_status(
+ self,
+ msg: InboundMessage,
+ status: str,
+ *,
+ started_at: float | None = None,
+ ) -> None:
+ await publish_turn_run_status(self.bus, msg, status, started_at=started_at)
async def handle_turn_end(
self,
diff --git a/tests/agent/test_loop_runner_integration.py b/tests/agent/test_loop_runner_integration.py
index 3cfe07f41..5f9c356ce 100644
--- a/tests/agent/test_loop_runner_integration.py
+++ b/tests/agent/test_loop_runner_integration.py
@@ -2,7 +2,6 @@
from __future__ import annotations
-import asyncio
import time
from unittest.mock import AsyncMock, MagicMock, patch
@@ -48,6 +47,30 @@ async def test_loop_max_iterations_message_stays_stable(tmp_path):
)
+@pytest.mark.asyncio
+async def test_loop_goal_turn_uses_standard_iteration_budget(tmp_path):
+ loop = _make_loop(tmp_path)
+ loop.provider.chat_with_retry = AsyncMock(return_value=LLMResponse(
+ content="working",
+ tool_calls=[ToolCallRequest(id="call_1", name="list_dir", arguments={})],
+ ))
+ loop.tools.get_definitions = MagicMock(return_value=[])
+ loop.tools.execute = AsyncMock(return_value="ok")
+ loop.max_iterations = 2
+
+ final_content, _, _, stop_reason, _ = await loop._run_agent_loop(
+ [],
+ metadata={"original_command": "/goal"},
+ )
+
+ assert stop_reason == "max_iterations"
+ assert loop.provider.chat_with_retry.await_count == 2
+ assert final_content == (
+ "I reached the maximum number of tool call iterations (2) "
+ "without completing the task. You can try breaking the task into smaller steps."
+ )
+
+
@pytest.mark.asyncio
async def test_loop_stream_filter_handles_think_only_prefix_without_crashing(tmp_path):
loop = _make_loop(tmp_path)
diff --git a/tests/agent/test_loop_save_turn.py b/tests/agent/test_loop_save_turn.py
index 290cda0a6..f9847df8b 100644
--- a/tests/agent/test_loop_save_turn.py
+++ b/tests/agent/test_loop_save_turn.py
@@ -11,6 +11,10 @@ from nanobot.bus.queue import MessageBus
from nanobot.providers.base import LLMResponse
from nanobot.session.goal_state import GOAL_STATE_KEY
from nanobot.session.manager import Session, SessionManager
+from nanobot.session.turn_continuation import (
+ INTERNAL_CONTINUATION_META,
+ INTERNAL_CONTINUATION_RUN_STARTED_AT_META,
+)
from nanobot.session.webui_turns import (
TITLE_GENERATION_MAX_TOKENS,
TITLE_GENERATION_REASONING_EFFORT,
@@ -560,6 +564,226 @@ async def test_process_message_does_not_duplicate_early_persisted_user_message(t
assert AgentLoop._PENDING_USER_TURN_KEY not in session.metadata
+@pytest.mark.asyncio
+async def test_internal_continuation_queues_turn_without_fake_user_history(
+ tmp_path: Path,
+) -> None:
+ loop = _make_full_loop(tmp_path)
+ loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
+ session = loop.sessions.get_or_create("feishu:c-auto")
+ session.metadata[GOAL_STATE_KEY] = {
+ "status": "active",
+ "objective": "Finish the long goal.",
+ }
+ loop.sessions.save(session)
+
+ calls: list[dict] = []
+
+ async def fake_run_agent_loop(initial_messages, *, metadata=None, **_kwargs):
+ calls.append({"initial_messages": initial_messages, "metadata": metadata})
+ if len(calls) == 1:
+ return (
+ "paused",
+ [],
+ [*initial_messages, {"role": "assistant", "content": "paused"}],
+ "max_iterations",
+ False,
+ )
+ return (
+ "done",
+ [],
+ [*initial_messages, {"role": "assistant", "content": "done"}],
+ "completed",
+ False,
+ )
+
+ loop._run_agent_loop = fake_run_agent_loop # type: ignore[method-assign]
+ pending: asyncio.Queue[InboundMessage] = asyncio.Queue()
+
+ first = await loop._process_message(
+ InboundMessage(
+ channel="feishu",
+ sender_id="u1",
+ chat_id="c-auto",
+ content="start the goal",
+ ),
+ pending_queue=pending,
+ )
+
+ assert first is None
+ queued = pending.get_nowait()
+ assert queued.sender_id == "system:continuation"
+ assert queued.metadata[INTERNAL_CONTINUATION_META] is True
+ assert "Finish the long goal." in queued.content
+
+ session = loop.sessions.get_or_create("feishu:c-auto")
+ assert [
+ {k: v for k, v in m.items() if k in {"role", "content"}}
+ for m in session.messages
+ ] == [{"role": "user", "content": "start the goal"}]
+
+ second = await loop._process_message(queued, pending_queue=asyncio.Queue())
+
+ assert second is not None
+ assert second.content == "done"
+ session = loop.sessions.get_or_create("feishu:c-auto")
+ assert [
+ {k: v for k, v in m.items() if k in {"role", "content"}}
+ for m in session.messages
+ ] == [
+ {"role": "user", "content": "start the goal"},
+ {"role": "assistant", "content": "done"},
+ ]
+
+
+@pytest.mark.asyncio
+async def test_internal_continuation_preserves_streaming_route_metadata(
+ tmp_path: Path,
+) -> None:
+ loop = _make_full_loop(tmp_path)
+ loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
+ session = loop.sessions.get_or_create("feishu:c-stream")
+ session.metadata[GOAL_STATE_KEY] = {
+ "status": "active",
+ "objective": "Finish the streamed long goal.",
+ }
+ loop.sessions.save(session)
+
+ calls = 0
+
+ async def fake_run_agent_loop(initial_messages, *, on_stream=None, on_stream_end=None, **_kwargs):
+ nonlocal calls
+ calls += 1
+ if calls == 1:
+ return (
+ "paused",
+ [],
+ [*initial_messages, {"role": "assistant", "content": "paused"}],
+ "max_iterations",
+ False,
+ )
+ assert on_stream is not None
+ assert on_stream_end is not None
+ await on_stream("done")
+ await on_stream_end(resuming=False)
+ return (
+ "done",
+ [],
+ [*initial_messages, {"role": "assistant", "content": "done"}],
+ "completed",
+ False,
+ )
+
+ loop._run_agent_loop = fake_run_agent_loop # type: ignore[method-assign]
+
+ await loop._dispatch(InboundMessage(
+ channel="feishu",
+ sender_id="u1",
+ chat_id="c-stream",
+ content="start the goal",
+ metadata={
+ "_wants_stream": True,
+ "message_id": "om_001",
+ "origin_message_id": "root_001",
+ "_stream_id": "old-stream",
+ },
+ ))
+
+ assert loop.bus.outbound_size == 0
+ queued = await asyncio.wait_for(loop.bus.consume_inbound(), timeout=0.5)
+ assert queued.metadata[INTERNAL_CONTINUATION_META] is True
+ assert queued.metadata["_wants_stream"] is True
+ assert queued.metadata["message_id"] == "om_001"
+ assert queued.metadata["origin_message_id"] == "root_001"
+ assert "_stream_id" not in queued.metadata
+
+ await loop._dispatch(queued)
+
+ outbound = []
+ while loop.bus.outbound_size:
+ outbound.append(await loop.bus.consume_outbound())
+ deltas = [m for m in outbound if m.metadata.get("_stream_delta")]
+ ends = [m for m in outbound if m.metadata.get("_stream_end")]
+ streamed_markers = [m for m in outbound if m.metadata.get("_streamed")]
+
+ assert [m.content for m in deltas] == ["done"]
+ assert len(ends) == 1
+ assert ends[0].metadata["_resuming"] is False
+ assert ends[0].metadata["message_id"] == "om_001"
+ assert ends[0].metadata["origin_message_id"] == "root_001"
+ assert isinstance(ends[0].metadata.get("_stream_id"), str)
+ assert streamed_markers and streamed_markers[-1].content == "done"
+
+
+@pytest.mark.asyncio
+async def test_websocket_internal_continuation_keeps_single_visible_run(
+ tmp_path: Path,
+) -> None:
+ loop = _make_full_loop(tmp_path)
+ loop.consolidator.maybe_consolidate_by_tokens = AsyncMock(return_value=False) # type: ignore[method-assign]
+ session = loop.sessions.get_or_create("websocket:c-auto")
+ session.metadata[GOAL_STATE_KEY] = {
+ "status": "active",
+ "objective": "Finish the long goal.",
+ }
+ loop.sessions.save(session)
+
+ calls = 0
+
+ async def fake_run_agent_loop(initial_messages, **_kwargs):
+ nonlocal calls
+ calls += 1
+ if calls == 1:
+ return (
+ "paused",
+ [],
+ [*initial_messages, {"role": "assistant", "content": "paused"}],
+ "max_iterations",
+ False,
+ )
+ return (
+ "done",
+ [],
+ [*initial_messages, {"role": "assistant", "content": "done"}],
+ "completed",
+ False,
+ )
+
+ loop._run_agent_loop = fake_run_agent_loop # type: ignore[method-assign]
+
+ await loop._dispatch(InboundMessage(
+ channel="websocket",
+ sender_id="u1",
+ chat_id="c-auto",
+ content="start the goal",
+ metadata={"webui": True},
+ ))
+
+ first_outbound = []
+ while loop.bus.outbound_size:
+ first_outbound.append(await loop.bus.consume_outbound())
+ first_statuses = [m.metadata for m in first_outbound if m.metadata.get("_goal_status")]
+ assert [m["goal_status"] for m in first_statuses] == ["running"]
+ assert not [m for m in first_outbound if m.metadata.get("_turn_end")]
+ started_at = first_statuses[0]["started_at"]
+
+ queued = await asyncio.wait_for(loop.bus.consume_inbound(), timeout=0.5)
+ assert queued.metadata[INTERNAL_CONTINUATION_META] is True
+ assert queued.metadata[INTERNAL_CONTINUATION_RUN_STARTED_AT_META] == started_at
+
+ await loop._dispatch(queued)
+
+ second_outbound = []
+ while loop.bus.outbound_size:
+ second_outbound.append(await loop.bus.consume_outbound())
+ second_statuses = [m.metadata for m in second_outbound if m.metadata.get("_goal_status")]
+ assert [m["goal_status"] for m in second_statuses] == ["running", "idle"]
+ assert second_statuses[0]["started_at"] == started_at
+ turn_end = [m for m in second_outbound if m.metadata.get("_turn_end")]
+ assert len(turn_end) == 1
+ assert isinstance(turn_end[0].metadata.get("latency_ms"), int)
+
+
@pytest.mark.asyncio
async def test_process_message_uses_context_chat_id_for_runtime_prompt(tmp_path: Path) -> None:
loop = _make_full_loop(tmp_path)
diff --git a/tests/session/test_turn_continuation.py b/tests/session/test_turn_continuation.py
new file mode 100644
index 000000000..c6d58e5dc
--- /dev/null
+++ b/tests/session/test_turn_continuation.py
@@ -0,0 +1,127 @@
+"""Tests for internal turn continuation policy."""
+
+from __future__ import annotations
+
+import asyncio
+from types import SimpleNamespace
+
+import pytest
+
+from nanobot.bus.events import InboundMessage
+from nanobot.session.goal_state import GOAL_STATE_KEY
+from nanobot.session.turn_continuation import (
+ INTERNAL_CONTINUATION_KIND_META,
+ INTERNAL_CONTINUATION_META,
+ INTERNAL_CONTINUATION_PENDING_META,
+ INTERNAL_CONTINUATION_RUN_STARTED_AT_META,
+ internal_continuation_pending,
+ internal_continuation_run_started_at,
+ maybe_continue_turn,
+ should_stream_budget_response,
+)
+
+
+@pytest.mark.asyncio
+async def test_maybe_continue_turn_queues_internal_message():
+ meta = {
+ GOAL_STATE_KEY: {
+ "status": "active",
+ "objective": "Finish the migration.",
+ "ui_summary": "migration",
+ },
+ }
+ messages = [
+ {"role": "system", "content": "system"},
+ {"role": "user", "content": "start"},
+ {"role": "assistant", "content": "paused"},
+ ]
+ pending: asyncio.Queue[InboundMessage] = asyncio.Queue()
+ ctx = SimpleNamespace(
+ session=SimpleNamespace(metadata=meta),
+ msg=InboundMessage(
+ channel="feishu",
+ sender_id="u1",
+ chat_id="c1",
+ content="start",
+ metadata={
+ "message_id": "msg-1",
+ "origin_message_id": "msg-0",
+ "_wants_stream": True,
+ "_stream_id": "stream-1",
+ "_stream_delta": True,
+ "_stream_end": True,
+ "_resuming": True,
+ "webui": True,
+ },
+ ),
+ session_key="feishu:c1",
+ pending_queue=pending,
+ stop_reason="max_iterations",
+ final_content="paused",
+ all_messages=messages,
+ suppress_response=False,
+ visible_run_started_at=1234.5,
+ )
+
+ assert await maybe_continue_turn(ctx) is True
+
+ queued = pending.get_nowait()
+ assert queued.sender_id == "system:continuation"
+ assert queued.metadata[INTERNAL_CONTINUATION_META] is True
+ assert queued.metadata[INTERNAL_CONTINUATION_KIND_META] == "sustained_goal"
+ assert queued.metadata[INTERNAL_CONTINUATION_RUN_STARTED_AT_META] == 1234.5
+ assert internal_continuation_run_started_at(queued.metadata) == 1234.5
+ assert internal_continuation_pending(ctx.msg.metadata)
+ assert queued.metadata["webui"] is True
+ assert queued.metadata["message_id"] == "msg-1"
+ assert queued.metadata["origin_message_id"] == "msg-0"
+ assert queued.metadata["_wants_stream"] is True
+ assert "_stream_id" not in queued.metadata
+ assert "_stream_delta" not in queued.metadata
+ assert "_stream_end" not in queued.metadata
+ assert "_resuming" not in queued.metadata
+ assert "Finish the migration." in queued.content
+ assert ctx.all_messages == messages[:-1]
+ assert ctx.final_content == ""
+ assert ctx.suppress_response is True
+ assert ctx.msg.metadata[INTERNAL_CONTINUATION_PENDING_META] is True
+ assert meta["_sustained_goal_continuation_rounds"] == 1
+
+
+@pytest.mark.asyncio
+async def test_internal_continuation_respects_round_limit():
+ meta = {
+ GOAL_STATE_KEY: {"status": "active", "objective": "x"},
+ "_sustained_goal_continuation_rounds": 12,
+ }
+ ctx = SimpleNamespace(
+ session=SimpleNamespace(metadata=meta),
+ msg=InboundMessage(channel="feishu", sender_id="u1", chat_id="c1", content="start"),
+ session_key="feishu:c1",
+ pending_queue=asyncio.Queue(),
+ stop_reason="max_iterations",
+ final_content="paused",
+ all_messages=[],
+ )
+
+ assert should_stream_budget_response(
+ stop_reason="max_iterations",
+ pending_queue_available=True,
+ session_metadata=meta,
+ )
+ assert await maybe_continue_turn(ctx) is False
+
+
+def test_internal_continuation_requires_budget_boundary_and_queue():
+ meta = {GOAL_STATE_KEY: {"status": "active", "objective": "x"}}
+
+ assert should_stream_budget_response(
+ stop_reason="completed",
+ pending_queue_available=True,
+ session_metadata=meta,
+ )
+ assert should_stream_budget_response(
+ stop_reason="max_iterations",
+ pending_queue_available=False,
+ session_metadata=meta,
+ )
diff --git a/tests/utils/test_webui_turn_helpers.py b/tests/utils/test_webui_turn_helpers.py
index be01c5f45..cb8cbf488 100644
--- a/tests/utils/test_webui_turn_helpers.py
+++ b/tests/utils/test_webui_turn_helpers.py
@@ -31,6 +31,19 @@ async def test_publish_turn_run_status_running_records_wall_clock() -> None:
assert call.metadata.get("started_at") == t0
+@pytest.mark.asyncio
+async def test_publish_turn_run_status_reuses_explicit_wall_clock() -> None:
+ bus = MagicMock()
+ bus.publish_outbound = AsyncMock()
+ msg = InboundMessage(channel="websocket", sender_id="u", chat_id="chat-a", content="hi")
+
+ await wth.publish_turn_run_status(bus, msg, "running", started_at=1234.5)
+
+ assert wth.websocket_turn_wall_started_at("chat-a") == 1234.5
+ call = bus.publish_outbound.await_args[0][0]
+ assert call.metadata.get("started_at") == 1234.5
+
+
@pytest.mark.asyncio
async def test_publish_turn_run_status_idle_clears_wall_clock() -> None:
bus = MagicMock()
diff --git a/webui/src/components/thread/ThreadComposer.tsx b/webui/src/components/thread/ThreadComposer.tsx
index 32becaec5..44029c036 100644
--- a/webui/src/components/thread/ThreadComposer.tsx
+++ b/webui/src/components/thread/ThreadComposer.tsx
@@ -393,6 +393,15 @@ function mcpPresetMentionPayload(preset: McpPresetInfo): OutboundMcpPresetMentio
};
}
+function RunPulseIcon() {
+ return (
+
+
+
+
+ );
+}
+
function RunElapsedStrip({
startedAt,
goalState,
@@ -586,7 +595,7 @@ function RunElapsedStrip({
aria-label={ariaLabel}
>
{displayShowTimer ? (
-
+
) : (
)}
diff --git a/webui/src/globals.css b/webui/src/globals.css
index 0afb9c334..e6a2af90f 100644
--- a/webui/src/globals.css
+++ b/webui/src/globals.css
@@ -301,6 +301,50 @@
.composer-status-strip[data-state="exit"] {
animation: composer-status-strip-exit 180ms ease-in both;
}
+ @keyframes run-pulse-dot {
+ 0%,
+ 100% {
+ transform: scale(0.9);
+ opacity: 0.76;
+ }
+ 50% {
+ transform: scale(1.08);
+ opacity: 1;
+ }
+ }
+ @keyframes run-pulse-ring {
+ 0% {
+ transform: scale(0.42);
+ opacity: 0.34;
+ }
+ 100% {
+ transform: scale(1.28);
+ opacity: 0;
+ }
+ }
+ .run-pulse-icon {
+ color: hsl(204 82% 46%);
+ }
+ .run-pulse-icon__ring,
+ .run-pulse-icon__dot {
+ display: block;
+ border-radius: 999px;
+ pointer-events: none;
+ }
+ .run-pulse-icon__ring {
+ position: absolute;
+ height: 12px;
+ width: 12px;
+ background: hsl(204 82% 46% / 0.22);
+ animation: run-pulse-ring 1.55s ease-out infinite;
+ }
+ .run-pulse-icon__dot {
+ height: 6px;
+ width: 6px;
+ background: currentColor;
+ box-shadow: 0 0 0 1px hsl(204 82% 46% / 0.14);
+ animation: run-pulse-dot 1.55s ease-in-out infinite;
+ }
@keyframes queued-prompt-row-enter {
0% {
opacity: 0;
@@ -321,6 +365,18 @@
.composer-status-strip[data-state] {
animation: none;
}
+ .run-pulse-icon,
+ .run-pulse-icon * {
+ animation: none;
+ }
+ .run-pulse-icon__ring {
+ opacity: 0.18;
+ transform: scale(1);
+ }
+ .run-pulse-icon__dot {
+ opacity: 1;
+ transform: scale(1);
+ }
.queued-prompt-row {
animation: none;
}
diff --git a/webui/src/tests/thread-composer.test.tsx b/webui/src/tests/thread-composer.test.tsx
index 45f70387e..26cd5ce13 100644
--- a/webui/src/tests/thread-composer.test.tsx
+++ b/webui/src/tests/thread-composer.test.tsx
@@ -386,6 +386,7 @@ describe("ThreadComposer", () => {
expect(status).toHaveTextContent(/2:05/);
expect(status.parentElement).toHaveClass("composer-status-strip");
expect(status.parentElement).toHaveAttribute("data-state", "enter");
+ expect(status.querySelector(".run-pulse-icon")).not.toBeNull();
vi.useRealTimers();
});