feat(reasoning): stream reasoning content as a first-class channel

Reasoning now flows as its own stream — symmetric to the answer's
``delta`` / ``stream_end`` pair — instead of being shipped as one
oversized progress message. This lets WebUI render a live "Thinking…"
bubble that updates in place, then auto-collapses when the stream
closes. Other channels remain plugin no-ops by default.

## Protocol

New metadata: ``_reasoning_delta`` (chunk) and ``_reasoning_end``
(close marker). ChannelManager routes both to the dedicated plugin
hooks below; the legacy one-shot ``_reasoning`` is kept for back-compat
and BaseChannel expands it into a single delta + end pair so plugins
only ever implement the streaming primitives.

WebSocket emits two new events:

- ``reasoning_delta`` (event, chat_id, text, optional stream_id)
- ``reasoning_end`` (event, chat_id, optional stream_id)

## BaseChannel surface

- ``send_reasoning_delta(chat_id, delta, metadata)`` — no-op default
- ``send_reasoning_end(chat_id, metadata)`` — no-op default
- ``send_reasoning(msg)`` — back-compat wrapper, base impl forwards
  to the streaming primitives

A channel adds reasoning support by overriding the two streaming
primitives. Telegram / Slack / Discord / Feishu / WeChat / Matrix keep
the base no-ops until their bubble UIs are adapted; reasoning silently
drops at dispatch, never as a stray text message.

## AgentHook

Adds ``emit_reasoning_end`` to the hook lifecycle. ``_LoopHook`` tracks
whether a reasoning segment is open and closes it on:

- the first answer delta arriving (so the UI locks the bubble before
  the answer renders below),
- ``on_stream_end``,
- one-shot ``reasoning_content`` / ``thinking_blocks`` after a single
  non-streaming response.

## WebUI

- ``UIMessage.reasoning`` is now a single accumulated string with a
  companion ``reasoningStreaming`` flag.
- ``useNanobotStream`` consumes ``reasoning_delta`` / ``reasoning_end``;
  legacy ``kind: "reasoning"`` is auto-translated to a delta + end.
- New ``ReasoningBubble``: shimmer header + auto-expanded while
  streaming, collapses to a clickable "Thinking" pill once closed,
  respects ``prefers-reduced-motion``.
- Answer deltas adopt the reasoning placeholder so the bubble and the
  answer share one assistant row.

## Tests

- ``tests/channels/test_channel_manager_reasoning.py`` — manager routes
  delta + end, drops on channel opt-out, expands one-shot back-compat.
- ``tests/channels/test_websocket_channel.py`` — new ``reasoning_delta``
  / ``reasoning_end`` frames, empty-chunk safety, no-subscriber safety,
  back-compat expansion.
- ``tests/agent/test_runner_reasoning.py`` — runner closes the segment
  on streaming answer start and after one-shot reasoning.
- WebUI ``useNanobotStream`` + ``message-bubble`` cover the new
  protocol and the shimmer styling.

## Docs

``docs/configuration.md`` and ``docs/websocket.md`` document the new
events and the plugin contract.

Co-authored-by: Cursor <cursoragent@cursor.com>
This commit is contained in:
Xubin Ren 2026-05-13 07:13:43 +00:00
parent a6b059d379
commit 458b4ba235
19 changed files with 649 additions and 221 deletions

View File

@ -743,7 +743,7 @@ Global settings that apply to all channels. Configure under the `channels` secti
|---------|---------|-------------| |---------|---------|-------------|
| `sendProgress` | `true` | Stream agent's text progress to the channel | | `sendProgress` | `true` | Stream agent's text progress to the channel |
| `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) | | `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) |
| `showReasoning` | `true` | Allow channels to surface model reasoning/thinking content (DeepSeek-R1 `reasoning_content`, Anthropic `thinking_blocks`, inline `<think>` tags). The setting is a plugin opt-in: even when `true`, a channel only renders reasoning if it overrides `send_reasoning()`. Currently surfaced on CLI and WebSocket/WebUI; other channels (Telegram, Slack, Discord, ...) keep it as a silent no-op until their bubble UI is adapted. Independent of `sendProgress`. | | `showReasoning` | `true` | Allow channels to surface model reasoning/thinking content (DeepSeek-R1 `reasoning_content`, Anthropic `thinking_blocks`, inline `<think>` tags). Reasoning flows as a dedicated stream with `_reasoning_delta` / `_reasoning_end` markers — channels override `send_reasoning_delta` / `send_reasoning_end` to render in-place updates. Even with `true`, channels without those overrides stay no-op silently. Currently surfaced on CLI and WebSocket/WebUI (italic shimmer header, auto-collapses after the stream ends); Telegram / Slack / Discord / Feishu / WeChat / Matrix keep the base no-op until their bubble UI is adapted. Independent of `sendProgress`. |
| `sendMaxRetries` | `3` | Max delivery attempts per outbound message, including the initial send (0-10 configured, minimum 1 actual attempt) | | `sendMaxRetries` | `3` | Max delivery attempts per outbound message, including the initial send (0-10 configured, minimum 1 actual attempt) |
| `transcriptionProvider` | `"groq"` | Voice transcription backend: `"groq"` (free tier, default) or `"openai"`. API key is auto-resolved from the matching provider config. | | `transcriptionProvider` | `"groq"` | Voice transcription backend: `"groq"` (free tier, default) or `"openai"`. API key is auto-resolved from the matching provider config. |
| `transcriptionLanguage` | `null` | Optional ISO-639-1 language hint for audio transcription, e.g. `"en"`, `"ko"`, `"ja"`. | | `transcriptionLanguage` | `null` | Optional ISO-639-1 language hint for audio transcription, e.g. `"en"`, `"ko"`, `"ja"`. |

View File

@ -128,6 +128,29 @@ All frames are JSON text. Each message has an `event` field.
} }
``` ```
**`reasoning_delta`** — incremental model reasoning / thinking chunk for the active assistant turn. Mirrors `delta` but targets the reasoning bubble above the answer rather than the answer body:
```json
{
"event": "reasoning_delta",
"chat_id": "uuid-v4",
"text": "Let me decompose ",
"stream_id": "r1"
}
```
**`reasoning_end`** — close marker for the active reasoning stream. WebUI uses this to lock the in-place bubble and switch from the shimmer header to a static collapsed state:
```json
{
"event": "reasoning_end",
"chat_id": "uuid-v4",
"stream_id": "r1"
}
```
Reasoning frames only flow when the channel's `showReasoning` is `true` (default) and the model returns reasoning content (DeepSeek-R1 / Kimi / MiMo / OpenAI reasoning models, Anthropic extended thinking, or inline `<think>` / `<thought>` tags). Models without reasoning produce zero `reasoning_delta` frames.
**`runtime_model_updated`** — broadcast when the gateway runtime model changes, for example after `/model <preset>`: **`runtime_model_updated`** — broadcast when the gateway runtime model changes, for example after `/model <preset>`:
```json ```json

View File

@ -52,6 +52,14 @@ class AgentHook:
async def emit_reasoning(self, reasoning_content: str | None) -> None: async def emit_reasoning(self, reasoning_content: str | None) -> None:
pass pass
async def emit_reasoning_end(self) -> None:
"""Mark the end of an in-flight reasoning stream.
Hooks that buffer ``emit_reasoning`` chunks (for in-place UI updates)
flush and freeze the rendered group here. One-shot hooks ignore.
"""
pass
async def after_iteration(self, context: AgentHookContext) -> None: async def after_iteration(self, context: AgentHookContext) -> None:
pass pass
@ -102,6 +110,9 @@ class CompositeHook(AgentHook):
async def emit_reasoning(self, reasoning_content: str | None) -> None: async def emit_reasoning(self, reasoning_content: str | None) -> None:
await self._for_each_hook_safe("emit_reasoning", reasoning_content) await self._for_each_hook_safe("emit_reasoning", reasoning_content)
async def emit_reasoning_end(self) -> None:
await self._for_each_hook_safe("emit_reasoning_end")
async def after_iteration(self, context: AgentHookContext) -> None: async def after_iteration(self, context: AgentHookContext) -> None:
await self._for_each_hook_safe("after_iteration", context) await self._for_each_hook_safe("after_iteration", context)

View File

@ -87,6 +87,7 @@ class _LoopHook(AgentHook):
self._session_key = session_key self._session_key = session_key
self._stream_buf = "" self._stream_buf = ""
self._think_extractor = IncrementalThinkExtractor() self._think_extractor = IncrementalThinkExtractor()
self._reasoning_open = False
def wants_streaming(self) -> bool: def wants_streaming(self) -> bool:
return self._on_stream is not None return self._on_stream is not None
@ -102,10 +103,15 @@ class _LoopHook(AgentHook):
if await self._think_extractor.feed(self._stream_buf, self.emit_reasoning): if await self._think_extractor.feed(self._stream_buf, self.emit_reasoning):
context.streamed_reasoning = True context.streamed_reasoning = True
if incremental and self._on_stream: if incremental:
await self._on_stream(incremental) # Answer text has started — close any open reasoning segment so
# the UI can lock the bubble before the answer renders below it.
await self.emit_reasoning_end()
if self._on_stream:
await self._on_stream(incremental)
async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None: async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None:
await self.emit_reasoning_end()
if self._on_stream_end: if self._on_stream_end:
await self._on_stream_end(resuming=resuming) await self._on_stream_end(resuming=resuming)
self._stream_buf = "" self._stream_buf = ""
@ -147,16 +153,27 @@ class _LoopHook(AgentHook):
) )
async def emit_reasoning(self, reasoning_content: str | None) -> None: async def emit_reasoning(self, reasoning_content: str | None) -> None:
"""Publish reasoning content; channel plugins decide whether to render. """Publish a reasoning chunk; channel plugins decide whether to render.
The loop is intentionally not the gate: ``ChannelsConfig.show_reasoning`` Each call is one delta in a streaming session. ``emit_reasoning_end``
is a default that ``ChannelManager`` and ``BaseChannel.send_reasoning`` closes the segment. The loop is intentionally not the gate:
consult per channel. A channel without a low-emphasis UI primitive ``ChannelsConfig.show_reasoning`` is a default that ``ChannelManager``
keeps the base no-op and the content drops at the dispatch boundary. and ``BaseChannel.send_reasoning_delta`` consult per channel a
channel without a low-emphasis UI primitive keeps the base no-op
and the content drops at the dispatch boundary.
""" """
if self._on_progress and reasoning_content: if self._on_progress and reasoning_content:
self._reasoning_open = True
await self._on_progress(reasoning_content, reasoning=True) await self._on_progress(reasoning_content, reasoning=True)
async def emit_reasoning_end(self) -> None:
"""Close the current reasoning stream segment, if any was open."""
if self._reasoning_open and self._on_progress:
self._reasoning_open = False
await self._on_progress("", reasoning_end=True)
else:
self._reasoning_open = False
async def after_iteration(self, context: AgentHookContext) -> None: async def after_iteration(self, context: AgentHookContext) -> None:
if ( if (
self._on_progress self._on_progress
@ -665,12 +682,15 @@ class AgentLoop:
tool_hint: bool = False, tool_hint: bool = False,
tool_events: list[dict[str, Any]] | None = None, tool_events: list[dict[str, Any]] | None = None,
reasoning: bool = False, reasoning: bool = False,
reasoning_end: bool = False,
) -> None: ) -> None:
meta = dict(msg.metadata or {}) meta = dict(msg.metadata or {})
meta["_progress"] = True meta["_progress"] = True
meta["_tool_hint"] = tool_hint meta["_tool_hint"] = tool_hint
if reasoning: if reasoning:
meta["_reasoning"] = True meta["_reasoning_delta"] = True
if reasoning_end:
meta["_reasoning_end"] = True
if tool_events: if tool_events:
meta["_tool_events"] = tool_events meta["_tool_events"] = tool_events
await self.bus.publish_outbound( await self.bus.publish_outbound(

View File

@ -291,6 +291,7 @@ class AgentRunner:
response.content = cleaned_content response.content = cleaned_content
if reasoning_text and not context.streamed_reasoning: if reasoning_text and not context.streamed_reasoning:
await hook.emit_reasoning(reasoning_text) await hook.emit_reasoning(reasoning_text)
await hook.emit_reasoning_end()
context.streamed_reasoning = True context.streamed_reasoning = True
if response.should_execute_tools: if response.should_execute_tools:
@ -617,6 +618,8 @@ class AgentRunner:
and getattr(self.provider, "supports_progress_deltas", False) is True and getattr(self.provider, "supports_progress_deltas", False) is True
) )
progress_state: dict[str, bool] | None = None
if wants_streaming: if wants_streaming:
async def _stream(delta: str) -> None: async def _stream(delta: str) -> None:
if delta: if delta:
@ -630,6 +633,7 @@ class AgentRunner:
elif wants_progress_streaming: elif wants_progress_streaming:
stream_buf = "" stream_buf = ""
think_extractor = IncrementalThinkExtractor() think_extractor = IncrementalThinkExtractor()
progress_state = {"reasoning_open": False}
async def _stream_progress(delta: str) -> None: async def _stream_progress(delta: str) -> None:
nonlocal stream_buf nonlocal stream_buf
@ -642,8 +646,12 @@ class AgentRunner:
if await think_extractor.feed(stream_buf, hook.emit_reasoning): if await think_extractor.feed(stream_buf, hook.emit_reasoning):
context.streamed_reasoning = True context.streamed_reasoning = True
progress_state["reasoning_open"] = True
if incremental: if incremental:
if progress_state["reasoning_open"]:
await hook.emit_reasoning_end()
progress_state["reasoning_open"] = False
context.streamed_content = True context.streamed_content = True
await spec.progress_callback(incremental) await spec.progress_callback(incremental)
@ -654,16 +662,20 @@ class AgentRunner:
else: else:
coro = self.provider.chat_with_retry(**kwargs) coro = self.provider.chat_with_retry(**kwargs)
if timeout_s is None:
return await coro
try: try:
return await asyncio.wait_for(coro, timeout=timeout_s) response = (
await coro if timeout_s is None
else await asyncio.wait_for(coro, timeout=timeout_s)
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
return LLMResponse( return LLMResponse(
content=f"Error calling LLM: timed out after {timeout_s:g}s", content=f"Error calling LLM: timed out after {timeout_s:g}s",
finish_reason="error", finish_reason="error",
error_kind="timeout", error_kind="timeout",
) )
if progress_state and progress_state.get("reasoning_open"):
await hook.emit_reasoning_end()
return response
async def _request_finalization_retry( async def _request_finalization_retry(
self, self,

View File

@ -121,18 +121,53 @@ class BaseChannel(ABC):
""" """
pass pass
async def send_reasoning(self, msg: OutboundMessage) -> None: async def send_reasoning_delta(
"""Surface model reasoning/thinking content. self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Stream a chunk of model reasoning/thinking content.
Default is no-op. Channels with a native low-emphasis primitive Default is no-op. Channels with a native low-emphasis primitive
(Slack context block, Telegram expandable blockquote, Discord (Slack context block, Telegram expandable blockquote, Discord
subtext, WebUI italic bubble, ...) override to render reasoning subtext, WebUI italic bubble, ...) override to render reasoning
as a subordinate trace. Channels without a suitable affordance as a subordinate trace that updates in place as the model thinks.
keep this no-op: silently dropping is better than leaking raw
model thoughts as regular conversational messages. Streaming contract mirrors :meth:`send_delta`: ``_reasoning_delta``
is a chunk, ``_reasoning_end`` ends the current reasoning segment,
and stateful implementations should key buffers by ``_stream_id``
rather than only by ``chat_id``.
""" """
return return
async def send_reasoning_end(
self, chat_id: str, metadata: dict[str, Any] | None = None
) -> None:
"""Mark the end of a reasoning stream segment.
Default is no-op. Channels that buffer ``send_reasoning_delta``
chunks for in-place updates use this signal to flush and freeze
the rendered group; one-shot channels can ignore it entirely.
"""
return
async def send_reasoning(self, msg: OutboundMessage) -> None:
"""Deliver a complete reasoning block.
Default implementation reuses the streaming pair so plugins only
need to override the delta/end methods. Equivalent to one delta
with the full content followed immediately by an end marker
keeps a single rendering path for both streamed and one-shot
reasoning (e.g. DeepSeek-R1's final-response ``reasoning_content``).
"""
if not msg.content:
return
meta = dict(msg.metadata or {})
meta.setdefault("_reasoning_delta", True)
await self.send_reasoning_delta(msg.chat_id, msg.content, meta)
end_meta = dict(meta)
end_meta.pop("_reasoning_delta", None)
end_meta["_reasoning_end"] = True
await self.send_reasoning_end(msg.chat_id, end_meta)
@property @property
def supports_streaming(self) -> bool: def supports_streaming(self) -> bool:
"""True when config enables streaming AND this subclass implements send_delta.""" """True when config enables streaming AND this subclass implements send_delta."""

View File

@ -283,13 +283,18 @@ class ChannelManager:
timeout=1.0 timeout=1.0
) )
if msg.metadata.get("_reasoning"): if (
# Reasoning rides its own plugin channel: only delivered when msg.metadata.get("_reasoning_delta")
# the destination channel both opts in (``show_reasoning``) or msg.metadata.get("_reasoning_end")
# and overrides ``send_reasoning``. Channels without a or msg.metadata.get("_reasoning")
# low-emphasis UI primitive keep the base no-op and the ):
# content silently drops here rather than leak as a # Reasoning rides its own plugin channel: only delivered
# conversational reply. # when the destination channel opts in via ``show_reasoning``
# and overrides the streaming primitives. Channels without
# a low-emphasis UI affordance keep the base no-op and the
# content silently drops here. ``_reasoning`` (one-shot)
# is accepted for backward compatibility with hooks that
# haven't migrated to delta/end yet.
channel = self.channels.get(msg.channel) channel = self.channels.get(msg.channel)
if channel is not None and channel.show_reasoning: if channel is not None and channel.show_reasoning:
await self._send_with_retry(channel, msg) await self._send_with_retry(channel, msg)
@ -345,7 +350,14 @@ class ChannelManager:
@staticmethod @staticmethod
async def _send_once(channel: BaseChannel, msg: OutboundMessage) -> None: async def _send_once(channel: BaseChannel, msg: OutboundMessage) -> None:
"""Send one outbound message without retry policy.""" """Send one outbound message without retry policy."""
if msg.metadata.get("_reasoning"): if msg.metadata.get("_reasoning_end"):
await channel.send_reasoning_end(msg.chat_id, msg.metadata)
elif msg.metadata.get("_reasoning_delta"):
await channel.send_reasoning_delta(msg.chat_id, msg.content, msg.metadata)
elif msg.metadata.get("_reasoning"):
# Back-compat: one-shot reasoning. BaseChannel translates this
# to a single delta + end pair so plugins only implement the
# streaming primitives.
await channel.send_reasoning(msg) await channel.send_reasoning(msg)
elif msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): elif msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"):
await channel.send_delta(msg.chat_id, msg.content, msg.metadata) await channel.send_delta(msg.chat_id, msg.content, msg.metadata)

View File

@ -1487,30 +1487,54 @@ class WebSocketChannel(BaseChannel):
for connection in conns: for connection in conns:
await self._safe_send_to(connection, raw, label=" ") await self._safe_send_to(connection, raw, label=" ")
async def send_reasoning(self, msg: OutboundMessage) -> None: async def send_reasoning_delta(
"""Stream model reasoning as a subordinate trace frame. self,
chat_id: str,
Renders as ``kind=reasoning`` alongside the existing ``tool_hint`` / delta: str,
``progress`` frames; the WebUI mounts these on the active assistant metadata: dict[str, Any] | None = None,
bubble rather than as a conversational reply. ) -> None:
"""Push one chunk of model reasoning. Mirrors ``send_delta`` shape so
WebUI receives a stream that opens, updates in place, and closes
rendered above the active assistant bubble with a shimmer header
until the matching ``reasoning_end`` arrives.
""" """
conns = list(self._subs.get(msg.chat_id, ())) conns = list(self._subs.get(chat_id, ()))
if not conns: if not conns or not delta:
return return
if not msg.content: meta = metadata or {}
return body: dict[str, Any] = {
payload: dict[str, Any] = { "event": "reasoning_delta",
"event": "message", "chat_id": chat_id,
"chat_id": msg.chat_id, "text": delta,
"text": msg.content,
"kind": "reasoning",
} }
if msg.reply_to: stream_id = meta.get("_stream_id")
payload["reply_to"] = msg.reply_to if stream_id is not None:
raw = json.dumps(payload, ensure_ascii=False) body["stream_id"] = stream_id
raw = json.dumps(body, ensure_ascii=False)
for connection in conns: for connection in conns:
await self._safe_send_to(connection, raw, label=" reasoning ") await self._safe_send_to(connection, raw, label=" reasoning ")
async def send_reasoning_end(
self,
chat_id: str,
metadata: dict[str, Any] | None = None,
) -> None:
"""Close the current reasoning stream segment for in-place renderers."""
conns = list(self._subs.get(chat_id, ()))
if not conns:
return
meta = metadata or {}
body: dict[str, Any] = {
"event": "reasoning_end",
"chat_id": chat_id,
}
stream_id = meta.get("_stream_id")
if stream_id is not None:
body["stream_id"] = stream_id
raw = json.dumps(body, ensure_ascii=False)
for connection in conns:
await self._safe_send_to(connection, raw, label=" reasoning_end ")
async def send_delta( async def send_delta(
self, self,
chat_id: str, chat_id: str,

View File

@ -24,11 +24,15 @@ class _RecordingHook(AgentHook):
def __init__(self) -> None: def __init__(self) -> None:
super().__init__() super().__init__()
self.emitted: list[str] = [] self.emitted: list[str] = []
self.end_calls = 0
async def emit_reasoning(self, reasoning_content: str | None) -> None: async def emit_reasoning(self, reasoning_content: str | None) -> None:
if reasoning_content: if reasoning_content:
self.emitted.append(reasoning_content) self.emitted.append(reasoning_content)
async def emit_reasoning_end(self) -> None:
self.end_calls += 1
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_runner_preserves_reasoning_fields_in_assistant_history(): async def test_runner_preserves_reasoning_fields_in_assistant_history():
@ -277,3 +281,41 @@ async def test_runner_does_not_double_emit_when_inline_think_already_streamed():
assert result.final_content == "The answer." assert result.final_content == "The answer."
assert hook.emitted == ["working..."] assert hook.emitted == ["working..."]
assert hook.end_calls >= 1, "reasoning stream must be closed once the answer starts"
@pytest.mark.asyncio
async def test_runner_closes_reasoning_stream_after_one_shot_response():
"""A non-streaming response carrying ``reasoning_content`` must emit
both a reasoning delta and an end marker so channels can finalize the
in-place bubble."""
from nanobot.agent.runner import AgentRunSpec, AgentRunner
provider = MagicMock()
async def chat_with_retry(**kwargs):
return LLMResponse(
content="answer",
reasoning_content="hidden thought",
tool_calls=[],
usage={"prompt_tokens": 5, "completion_tokens": 3},
)
provider.chat_with_retry = chat_with_retry
tools = MagicMock()
tools.get_definitions.return_value = []
hook = _RecordingHook()
runner = AgentRunner(provider)
result = await runner.run(AgentRunSpec(
initial_messages=[{"role": "user", "content": "q"}],
tools=tools,
model="test-model",
max_iterations=3,
max_tool_result_chars=_MAX_TOOL_RESULT_CHARS,
hook=hook,
))
assert result.final_content == "answer"
assert hook.emitted == ["hidden thought"]
assert hook.end_calls == 1

View File

@ -1,14 +1,22 @@
"""Tests for ChannelManager routing of model reasoning content. """Tests for ChannelManager routing of model reasoning content.
Reasoning is delivered as a separate plugin action (``send_reasoning``) Reasoning is delivered through plugin streaming primitives
rather than a metadata flag on a regular outbound. The manager routes (``send_reasoning_delta`` / ``send_reasoning_end``) so each channel
``_reasoning`` messages only to channels that opt in via controls in-place rendering mirroring the existing answer ``send_delta``
``channel.show_reasoning``; channels without a low-emphasis UI primitive / ``stream_end`` pair. The manager forwards reasoning frames only to
keep the base no-op and the content silently drops at dispatch. channels that opt in via ``channel.show_reasoning``; plugins without a
low-emphasis UI primitive keep the base no-op and the content silently
drops at dispatch.
One-shot ``_reasoning`` frames are accepted for back-compat with hooks
that haven't migrated yet — ``BaseChannel.send_reasoning`` expands them
to a single delta + end pair so plugins only implement the streaming
primitives.
""" """
from __future__ import annotations from __future__ import annotations
import asyncio
from unittest.mock import AsyncMock from unittest.mock import AsyncMock
import pytest import pytest
@ -27,7 +35,8 @@ class _MockChannel(BaseChannel):
def __init__(self, config, bus): def __init__(self, config, bus):
super().__init__(config, bus) super().__init__(config, bus)
self._send_mock = AsyncMock() self._send_mock = AsyncMock()
self._send_reasoning_mock = AsyncMock() self._delta_mock = AsyncMock()
self._end_mock = AsyncMock()
async def start(self): # pragma: no cover - not exercised async def start(self): # pragma: no cover - not exercised
pass pass
@ -38,8 +47,11 @@ class _MockChannel(BaseChannel):
async def send(self, msg): async def send(self, msg):
return await self._send_mock(msg) return await self._send_mock(msg)
async def send_reasoning(self, msg): async def send_reasoning_delta(self, chat_id, delta, metadata=None):
return await self._send_reasoning_mock(msg) return await self._delta_mock(chat_id, delta, metadata)
async def send_reasoning_end(self, chat_id, metadata=None):
return await self._end_mock(chat_id, metadata)
@pytest.fixture @pytest.fixture
@ -50,17 +62,52 @@ def manager() -> ChannelManager:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_reasoning_routes_to_send_reasoning_not_send(manager): async def test_reasoning_delta_routes_to_send_reasoning_delta(manager):
channel = manager.channels["mock"] channel = manager.channels["mock"]
msg = OutboundMessage( msg = OutboundMessage(
channel="mock", channel="mock",
chat_id="c1", chat_id="c1",
content="step-by-step thinking", content="step-by-step",
metadata={"_progress": True, "_reasoning_delta": True, "_stream_id": "r1"},
)
await manager._send_once(channel, msg)
channel._delta_mock.assert_awaited_once()
args = channel._delta_mock.await_args.args
assert args[0] == "c1"
assert args[1] == "step-by-step"
channel._send_mock.assert_not_awaited()
channel._end_mock.assert_not_awaited()
@pytest.mark.asyncio
async def test_reasoning_end_routes_to_send_reasoning_end(manager):
channel = manager.channels["mock"]
msg = OutboundMessage(
channel="mock",
chat_id="c1",
content="",
metadata={"_progress": True, "_reasoning_end": True, "_stream_id": "r1"},
)
await manager._send_once(channel, msg)
channel._end_mock.assert_awaited_once()
channel._delta_mock.assert_not_awaited()
@pytest.mark.asyncio
async def test_legacy_one_shot_reasoning_expands_to_delta_plus_end(manager):
"""`_reasoning` (no delta/end pair) falls back through `send_reasoning`
which the base class expands to a single delta + end. Hooks that haven't
migrated still surface in WebUI as a complete stream segment."""
channel = manager.channels["mock"]
msg = OutboundMessage(
channel="mock",
chat_id="c1",
content="one-shot reasoning",
metadata={"_progress": True, "_reasoning": True}, metadata={"_progress": True, "_reasoning": True},
) )
await manager._send_once(channel, msg) await manager._send_once(channel, msg)
channel._send_reasoning_mock.assert_awaited_once_with(msg) channel._delta_mock.assert_awaited_once()
channel._send_mock.assert_not_awaited() channel._end_mock.assert_awaited_once()
@pytest.mark.asyncio @pytest.mark.asyncio
@ -71,14 +118,14 @@ async def test_dispatch_drops_reasoning_when_channel_opts_out(manager):
channel="mock", channel="mock",
chat_id="c1", chat_id="c1",
content="hidden thinking", content="hidden thinking",
metadata={"_progress": True, "_reasoning": True}, metadata={"_progress": True, "_reasoning_delta": True},
) )
await manager.bus.publish_outbound(msg) await manager.bus.publish_outbound(msg)
pumped = await _pump_one(manager) await _pump_one(manager)
assert pumped is True channel._delta_mock.assert_not_awaited()
channel._send_reasoning_mock.assert_not_awaited() channel._end_mock.assert_not_awaited()
channel._send_mock.assert_not_awaited() channel._send_mock.assert_not_awaited()
@ -86,20 +133,24 @@ async def test_dispatch_drops_reasoning_when_channel_opts_out(manager):
async def test_dispatch_delivers_reasoning_when_channel_opts_in(manager): async def test_dispatch_delivers_reasoning_when_channel_opts_in(manager):
channel = manager.channels["mock"] channel = manager.channels["mock"]
channel.show_reasoning = True channel.show_reasoning = True
msg = OutboundMessage( for chunk in ("first ", "second"):
await manager.bus.publish_outbound(OutboundMessage(
channel="mock",
chat_id="c1",
content=chunk,
metadata={"_progress": True, "_reasoning_delta": True, "_stream_id": "r1"},
))
await manager.bus.publish_outbound(OutboundMessage(
channel="mock", channel="mock",
chat_id="c1", chat_id="c1",
content="visible thinking", content="",
metadata={"_progress": True, "_reasoning": True}, metadata={"_progress": True, "_reasoning_end": True, "_stream_id": "r1"},
) ))
await manager.bus.publish_outbound(msg)
pumped = await _pump_one(manager) await _pump_one(manager)
assert pumped is True assert channel._delta_mock.await_count == 2
channel._send_reasoning_mock.assert_awaited_once() channel._end_mock.assert_awaited_once()
delivered = channel._send_reasoning_mock.await_args.args[0]
assert delivered.content == "visible thinking"
@pytest.mark.asyncio @pytest.mark.asyncio
@ -108,21 +159,19 @@ async def test_dispatch_silently_drops_reasoning_for_unknown_channel(manager):
channel="ghost", channel="ghost",
chat_id="c1", chat_id="c1",
content="nobody home", content="nobody home",
metadata={"_progress": True, "_reasoning": True}, metadata={"_progress": True, "_reasoning_delta": True},
) )
await manager.bus.publish_outbound(msg) await manager.bus.publish_outbound(msg)
pumped = await _pump_one(manager) await _pump_one(manager)
assert pumped is True manager.channels["mock"]._delta_mock.assert_not_awaited()
# Mock channel must not receive anything destined for a different channel.
manager.channels["mock"]._send_reasoning_mock.assert_not_awaited()
manager.channels["mock"]._send_mock.assert_not_awaited() manager.channels["mock"]._send_mock.assert_not_awaited()
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_base_channel_send_reasoning_is_noop_safe(): async def test_base_channel_reasoning_primitives_are_noop_safe():
"""Plugins that don't override `send_reasoning` must not blow up.""" """Plugins that don't override the streaming primitives must not blow up."""
class _Plain(BaseChannel): class _Plain(BaseChannel):
name = "plain" name = "plain"
@ -138,7 +187,9 @@ async def test_base_channel_send_reasoning_is_noop_safe():
pass pass
channel = _Plain({}, MessageBus()) channel = _Plain({}, MessageBus())
# No exception, returns None. assert await channel.send_reasoning_delta("c", "x") is None
assert await channel.send_reasoning_end("c") is None
# And the one-shot wrapper translates without raising.
assert await channel.send_reasoning( assert await channel.send_reasoning(
OutboundMessage(channel="plain", chat_id="c", content="x", metadata={}) OutboundMessage(channel="plain", chat_id="c", content="x", metadata={})
) is None ) is None
@ -151,26 +202,21 @@ async def test_reasoning_routing_does_not_consult_send_progress(manager):
channel = manager.channels["mock"] channel = manager.channels["mock"]
channel.send_progress = False channel.send_progress = False
channel.show_reasoning = True channel.show_reasoning = True
msg = OutboundMessage( await manager.bus.publish_outbound(OutboundMessage(
channel="mock", channel="mock",
chat_id="c1", chat_id="c1",
content="still surfaces", content="still surfaces",
metadata={"_progress": True, "_reasoning": True}, metadata={"_progress": True, "_reasoning_delta": True},
) ))
await manager.bus.publish_outbound(msg)
pumped = await _pump_one(manager) await _pump_one(manager)
assert pumped is True channel._delta_mock.assert_awaited_once()
channel._send_reasoning_mock.assert_awaited_once()
async def _pump_one(manager: ChannelManager) -> bool: async def _pump_one(manager: ChannelManager) -> None:
"""Drive the dispatcher for exactly one message, then cancel.""" """Drive the dispatcher until the outbound queue drains, then cancel."""
import asyncio
task = asyncio.create_task(manager._dispatch_outbound()) task = asyncio.create_task(manager._dispatch_outbound())
# Yield control until the queue drains.
for _ in range(50): for _ in range(50):
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
if manager.bus.outbound.qsize() == 0: if manager.bus.outbound.qsize() == 0:
@ -180,4 +226,3 @@ async def _pump_one(manager: ChannelManager) -> bool:
await task await task
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
return True

View File

@ -359,30 +359,44 @@ async def test_send_delta_emits_delta_and_stream_end() -> None:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_reasoning_emits_reasoning_kind_frame() -> None: async def test_send_reasoning_delta_emits_streaming_frame() -> None:
bus = MagicMock() bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock() mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1") channel._attach(mock_ws, "chat-1")
await channel.send_reasoning(OutboundMessage( await channel.send_reasoning_delta(
channel="websocket", "chat-1",
chat_id="chat-1", "step-by-step thinking",
content="step-by-step thinking", {"_reasoning_delta": True, "_stream_id": "r1"},
metadata={"_progress": True, "_reasoning": True}, )
))
mock_ws.send.assert_awaited_once() mock_ws.send.assert_awaited_once()
payload = json.loads(mock_ws.send.await_args.args[0]) payload = json.loads(mock_ws.send.await_args.args[0])
assert payload["event"] == "message" assert payload["event"] == "reasoning_delta"
assert payload["chat_id"] == "chat-1" assert payload["chat_id"] == "chat-1"
assert payload["text"] == "step-by-step thinking" assert payload["text"] == "step-by-step thinking"
assert payload["kind"] == "reasoning" assert payload["stream_id"] == "r1"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_send_reasoning_drops_empty_content() -> None: async def test_send_reasoning_end_emits_close_frame() -> None:
"""Empty reasoning emits nothing — keeps the frontend bubble clean.""" bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send_reasoning_end("chat-1", {"_reasoning_end": True, "_stream_id": "r1"})
payload = json.loads(mock_ws.send.await_args.args[0])
assert payload == {"event": "reasoning_end", "chat_id": "chat-1", "stream_id": "r1"}
@pytest.mark.asyncio
async def test_send_reasoning_one_shot_expands_to_delta_plus_end() -> None:
"""``send_reasoning`` is back-compat for hooks that haven't migrated:
the base implementation must produce one delta and one end so the
WebUI sees the same shape either way."""
bus = MagicMock() bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock() mock_ws = AsyncMock()
@ -391,10 +405,27 @@ async def test_send_reasoning_drops_empty_content() -> None:
await channel.send_reasoning(OutboundMessage( await channel.send_reasoning(OutboundMessage(
channel="websocket", channel="websocket",
chat_id="chat-1", chat_id="chat-1",
content="", content="thinking",
metadata={"_reasoning": True}, metadata={"_reasoning": True},
)) ))
assert mock_ws.send.await_count == 2
first = json.loads(mock_ws.send.call_args_list[0][0][0])
second = json.loads(mock_ws.send.call_args_list[1][0][0])
assert first["event"] == "reasoning_delta"
assert first["text"] == "thinking"
assert second["event"] == "reasoning_end"
@pytest.mark.asyncio
async def test_send_reasoning_delta_drops_empty_chunks() -> None:
bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
mock_ws = AsyncMock()
channel._attach(mock_ws, "chat-1")
await channel.send_reasoning_delta("chat-1", "", {"_reasoning_delta": True})
mock_ws.send.assert_not_awaited() mock_ws.send.assert_not_awaited()
@ -403,12 +434,8 @@ async def test_send_reasoning_without_subscribers_is_noop() -> None:
bus = MagicMock() bus = MagicMock()
channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus)
await channel.send_reasoning(OutboundMessage( await channel.send_reasoning_delta("unattached", "thinking", None)
channel="websocket", await channel.send_reasoning_end("unattached", None)
chat_id="unattached",
content="thinking",
metadata={"_reasoning": True},
))
# No subscribers, no exception, no send. # No subscribers, no exception, no send.

View File

@ -1,4 +1,4 @@
import { useCallback, useEffect, useMemo, useRef, useState } from "react"; import { useCallback, useEffect, useRef, useState } from "react";
import { Check, ChevronRight, Copy, FileIcon, ImageIcon, PlaySquare, Sparkles, Wrench } from "lucide-react"; import { Check, ChevronRight, Copy, FileIcon, ImageIcon, PlaySquare, Sparkles, Wrench } from "lucide-react";
import { useTranslation } from "react-i18next"; import { useTranslation } from "react-i18next";
@ -85,12 +85,16 @@ export function MessageBubble({ message }: MessageBubbleProps) {
const empty = message.content.trim().length === 0; const empty = message.content.trim().length === 0;
const media = message.media ?? []; const media = message.media ?? [];
const reasoning = message.role === "assistant" ? message.reasoning ?? [] : []; const reasoning = message.role === "assistant" ? message.reasoning ?? "" : "";
const reasoningStreaming = !!(message.role === "assistant" && message.reasoningStreaming);
const hasReasoning = reasoning.length > 0 || reasoningStreaming;
const showAssistantActions = message.role === "assistant" && !message.isStreaming && !empty; const showAssistantActions = message.role === "assistant" && !message.isStreaming && !empty;
return ( return (
<div className={cn("w-full text-[15px]", baseAnim)} style={{ lineHeight: "var(--cjk-line-height)" }}> <div className={cn("w-full text-[15px]", baseAnim)} style={{ lineHeight: "var(--cjk-line-height)" }}>
{reasoning.length > 0 ? <ReasoningBubble lines={reasoning} /> : null} {hasReasoning ? (
{empty && message.isStreaming && reasoning.length === 0 ? ( <ReasoningBubble text={reasoning} streaming={reasoningStreaming} />
) : null}
{empty && message.isStreaming && !hasReasoning ? (
<TypingDots /> <TypingDots />
) : empty && message.isStreaming ? null : ( ) : empty && message.isStreaming ? null : (
<> <>
@ -437,33 +441,52 @@ function TraceGroup({ message, animClass }: TraceGroupProps) {
} }
interface ReasoningBubbleProps { interface ReasoningBubbleProps {
lines: string[]; text: string;
streaming: boolean;
} }
/** /**
* Subordinate "thinking" trace shown above an assistant turn. Mirrors the * Subordinate "thinking" trace shown above an assistant turn.
* CLI's italic dim ``ChevronRight`` row visually; collapsible because *
* reasoning from models like DeepSeek-R1 / o-series can run long. Defaults * Lifecycle:
* to expanded while the answer is still streaming (so the user sees the * - While ``streaming`` is true (``reasoning_delta`` frames still arriving),
* model "thinking out loud"), but the toggle persists across rerenders. * the bubble defaults to open and the header runs a shimmer + pulse so
* the user sees the model "thinking out loud" in real time.
* - On ``reasoning_end`` the bubble auto-collapses for prose density
* the user can re-expand to inspect the chain of thought. The local
* toggle persists once the user interacts.
*/ */
function ReasoningBubble({ lines }: ReasoningBubbleProps) { function ReasoningBubble({ text, streaming }: ReasoningBubbleProps) {
const { t } = useTranslation(); const { t } = useTranslation();
const [open, setOpen] = useState(true); const [userToggled, setUserToggled] = useState(false);
const text = useMemo(() => lines.join("\n\n"), [lines]); const [openLocal, setOpenLocal] = useState(true);
const open = userToggled ? openLocal : streaming;
const onToggle = () => {
setUserToggled(true);
setOpenLocal((v) => (userToggled ? !v : !open));
};
return ( return (
<div className="mb-2 w-full animate-in fade-in-0 slide-in-from-top-1 duration-200"> <div className="mb-2 w-full animate-in fade-in-0 slide-in-from-top-1 duration-200">
<button <button
type="button" type="button"
onClick={() => setOpen((v) => !v)} onClick={onToggle}
className={cn( className={cn(
"flex w-full items-center gap-2 rounded-md px-2 py-1.5", "flex w-full items-center gap-2 rounded-md px-2 py-1.5",
"text-xs text-muted-foreground transition-colors hover:bg-muted/45", "text-xs text-muted-foreground transition-colors hover:bg-muted/45",
streaming && "reasoning-shimmer",
)} )}
aria-expanded={open} aria-expanded={open}
aria-live={streaming ? "polite" : undefined}
> >
<Sparkles className="h-3.5 w-3.5" aria-hidden /> <Sparkles
<span className="font-medium">{t("message.reasoning", { defaultValue: "Thinking" })}</span> className={cn("h-3.5 w-3.5", streaming && "animate-pulse")}
aria-hidden
/>
<span className="font-medium">
{streaming
? t("message.reasoningStreaming", { defaultValue: "Thinking…" })
: t("message.reasoning", { defaultValue: "Thinking" })}
</span>
<ChevronRight <ChevronRight
aria-hidden aria-hidden
className={cn( className={cn(
@ -472,7 +495,7 @@ function ReasoningBubble({ lines }: ReasoningBubbleProps) {
)} )}
/> />
</button> </button>
{open && ( {open && text.length > 0 && (
<div <div
className={cn( className={cn(
"mt-1 whitespace-pre-wrap break-words border-l border-muted-foreground/20 pl-3", "mt-1 whitespace-pre-wrap break-words border-l border-muted-foreground/20 pl-3",

View File

@ -117,6 +117,34 @@
--cjk-line-height: 1.625; --cjk-line-height: 1.625;
} }
/* Shimmer band sweeping across the reasoning header while
``reasoning_delta`` frames are arriving. Pure CSS, no JS animation,
respects ``prefers-reduced-motion``. */
@keyframes reasoning-shimmer-sweep {
0% {
background-position: -200% 0;
}
100% {
background-position: 200% 0;
}
}
.reasoning-shimmer {
background-image: linear-gradient(
90deg,
transparent 0%,
hsl(var(--muted-foreground) / 0.18) 50%,
transparent 100%
);
background-size: 200% 100%;
background-repeat: no-repeat;
animation: reasoning-shimmer-sweep 2.2s linear infinite;
}
@media (prefers-reduced-motion: reduce) {
.reasoning-shimmer {
animation: none;
}
}
/* Subtle scrollbar that doesn't fight the dark background. */ /* Subtle scrollbar that doesn't fight the dark background. */
.scrollbar-thin { .scrollbar-thin {
scrollbar-width: thin; scrollbar-width: thin;

View File

@ -18,6 +18,82 @@ interface StreamBuffer {
parts: string[]; parts: string[];
} }
/**
* Append a reasoning chunk to the last open reasoning stream in ``prev``.
*
* Lookup rule: find the most recent assistant turn that is either still
* streaming reasoning (``reasoningStreaming``) or has no answer text yet.
* Anything else starts a fresh streaming placeholder so a new turn's
* reasoning never bleeds into the previous answer.
*/
function attachReasoningChunk(prev: UIMessage[], chunk: string): UIMessage[] {
for (let i = prev.length - 1; i >= 0; i -= 1) {
const candidate = prev[i];
if (candidate.role !== "assistant" || candidate.kind === "trace") continue;
const hasAnswer = candidate.content.length > 0;
if (candidate.reasoningStreaming || (!hasAnswer && candidate.reasoning !== undefined)) {
const merged: UIMessage = {
...candidate,
reasoning: (candidate.reasoning ?? "") + chunk,
reasoningStreaming: true,
};
return [...prev.slice(0, i), merged, ...prev.slice(i + 1)];
}
if (!hasAnswer && candidate.isStreaming) {
const merged: UIMessage = {
...candidate,
reasoning: chunk,
reasoningStreaming: true,
};
return [...prev.slice(0, i), merged, ...prev.slice(i + 1)];
}
break;
}
return [
...prev,
{
id: crypto.randomUUID(),
role: "assistant",
content: "",
isStreaming: true,
reasoning: chunk,
reasoningStreaming: true,
createdAt: Date.now(),
},
];
}
/**
* Find the most recent assistant placeholder that an incoming answer
* delta should adopt instead of spawning a parallel row. We look for an
* empty-content assistant turn that is still marked ``isStreaming``
* typically created earlier by ``reasoning_delta``. Anything else means
* the model already produced an answer in a previous turn, so the new
* delta belongs in a fresh row.
*/
function findActiveAssistantPlaceholder(prev: UIMessage[]): string | null {
const last = prev[prev.length - 1];
if (!last) return null;
if (last.role !== "assistant" || last.kind === "trace") return null;
if (last.content.length > 0) return null;
if (!last.isStreaming) return null;
return last.id;
}
/**
* Close the active reasoning stream segment, if any. Idempotent: a
* ``reasoning_end`` with no preceding deltas is a harmless no-op.
*/
function closeReasoningStream(prev: UIMessage[]): UIMessage[] {
for (let i = prev.length - 1; i >= 0; i -= 1) {
const candidate = prev[i];
if (!candidate.reasoningStreaming) continue;
const merged: UIMessage = { ...candidate, reasoningStreaming: false };
return [...prev.slice(0, i), merged, ...prev.slice(i + 1)];
}
return prev;
}
/** /**
* Subscribe to a chat by ID. Returns the in-memory message list for the chat, * Subscribe to a chat by ID. Returns the in-memory message list for the chat,
* a streaming flag, and a ``send`` function. Initial history must be seeded * a streaming flag, and a ``send`` function. Initial history must be seeded
@ -122,27 +198,42 @@ export function useNanobotStream(
if (ev.event === "delta") { if (ev.event === "delta") {
if (suppressStreamUntilTurnEndRef.current) return; if (suppressStreamUntilTurnEndRef.current) return;
const id = buffer.current?.messageId ?? crypto.randomUUID(); const chunk = ev.text;
if (!buffer.current) { setIsStreaming(true);
buffer.current = { messageId: id, parts: [] }; setMessages((prev) => {
setMessages((prev) => [ // Reuse an in-flight assistant placeholder (typically created by
...prev, // ``reasoning_delta``) so the answer renders below its own
{ // thinking trace instead of in a parallel row.
id, const adopted = !buffer.current ? findActiveAssistantPlaceholder(prev) : null;
role: "assistant", let targetId: string;
content: "", let next: UIMessage[];
isStreaming: true, if (buffer.current) {
createdAt: Date.now(), targetId = buffer.current.messageId;
}, next = prev;
]); } else if (adopted) {
setIsStreaming(true); targetId = adopted;
} buffer.current = { messageId: targetId, parts: [] };
buffer.current.parts.push(ev.text); next = prev;
const combined = buffer.current.parts.join(""); } else {
const targetId = buffer.current.messageId; targetId = crypto.randomUUID();
setMessages((prev) => buffer.current = { messageId: targetId, parts: [] };
prev.map((m) => (m.id === targetId ? { ...m, content: combined } : m)), next = [
); ...prev,
{
id: targetId,
role: "assistant",
content: "",
isStreaming: true,
createdAt: Date.now(),
},
];
}
buffer.current.parts.push(chunk);
const combined = buffer.current.parts.join("");
return next.map((m) =>
m.id === targetId ? { ...m, content: combined, isStreaming: true } : m,
);
});
return; return;
} }
@ -159,6 +250,21 @@ export function useNanobotStream(
return; return;
} }
if (ev.event === "reasoning_delta") {
if (suppressStreamUntilTurnEndRef.current) return;
const chunk = ev.text;
if (!chunk) return;
setMessages((prev) => attachReasoningChunk(prev, chunk));
setIsStreaming(true);
return;
}
if (ev.event === "reasoning_end") {
if (suppressStreamUntilTurnEndRef.current) return;
setMessages((prev) => closeReasoningStream(prev));
return;
}
if (ev.event === "turn_end") { if (ev.event === "turn_end") {
// Definitive signal that the turn is fully complete. Cancel any // Definitive signal that the turn is fully complete. Cancel any
// pending debounce timer and stop the loading indicator immediately. // pending debounce timer and stop the loading indicator immediately.
@ -187,37 +293,13 @@ export function useNanobotStream(
) { ) {
return; return;
} }
// Model reasoning rides its own channel: stash it on the next // Back-compat: a legacy ``kind: "reasoning"`` message (no streaming
// assistant turn so the bubble renders it as a subordinate trace. // partner) is treated as one complete delta + immediate end so the
// If the assistant message hasn't materialized yet (typical, since // bubble renders identically to the streaming path.
// reasoning fires before tool calls/answers), park it on a sentinel
// pending row that the next assistant message absorbs.
if (ev.kind === "reasoning") { if (ev.kind === "reasoning") {
const line = ev.text; const line = ev.text;
if (!line) return; if (!line) return;
setMessages((prev) => { setMessages((prev) => closeReasoningStream(attachReasoningChunk(prev, line)));
for (let i = prev.length - 1; i >= 0; i -= 1) {
const candidate = prev[i];
if (candidate.role === "assistant" && candidate.kind !== "trace") {
const merged: UIMessage = {
...candidate,
reasoning: [...(candidate.reasoning ?? []), line],
};
return [...prev.slice(0, i), merged, ...prev.slice(i + 1)];
}
}
return [
...prev,
{
id: crypto.randomUUID(),
role: "assistant",
content: "",
isStreaming: true,
reasoning: [line],
createdAt: Date.now(),
},
];
});
return; return;
} }
// Intermediate agent breadcrumbs (tool-call hints, raw progress). // Intermediate agent breadcrumbs (tool-call hints, raw progress).

View File

@ -333,6 +333,7 @@
"toolSingle": "Using a tool", "toolSingle": "Using a tool",
"toolMany": "Used {{count}} tools", "toolMany": "Used {{count}} tools",
"reasoning": "Thinking", "reasoning": "Thinking",
"reasoningStreaming": "Thinking…",
"imageAttachment": "Image attachment", "imageAttachment": "Image attachment",
"copyReply": "Copy reply", "copyReply": "Copy reply",
"copiedReply": "Copied reply" "copiedReply": "Copied reply"

View File

@ -320,7 +320,8 @@
"assistantTyping": "助手正在输入", "assistantTyping": "助手正在输入",
"toolSingle": "正在使用工具", "toolSingle": "正在使用工具",
"toolMany": "已使用 {{count}} 个工具", "toolMany": "已使用 {{count}} 个工具",
"reasoning": "思考中", "reasoning": "思考过程",
"reasoningStreaming": "正在思考…",
"imageAttachment": "图片附件", "imageAttachment": "图片附件",
"copyReply": "复制回复", "copyReply": "复制回复",
"copiedReply": "已复制回复" "copiedReply": "已复制回复"

View File

@ -44,10 +44,13 @@ export interface UIMessage {
images?: UIImage[]; images?: UIImage[];
/** Signed or local UI-renderable media attachments. */ /** Signed or local UI-renderable media attachments. */
media?: UIMediaAttachment[]; media?: UIMediaAttachment[];
/** Assistant turn: model reasoning / thinking content collected from /** Assistant turn: accumulated model reasoning / thinking text. Built up
* `kind: "reasoning"` frames. Each entry is one emit cycle, joined with * incrementally from ``reasoning_delta`` frames; finalized when
* blank lines on render. */ * ``reasoning_end`` arrives. */
reasoning?: string[]; reasoning?: string;
/** True while ``reasoning_delta`` frames are still arriving for this turn.
* Drives the shimmer header on ``ReasoningBubble``. */
reasoningStreaming?: boolean;
} }
export interface ChatSummary { export interface ChatSummary {
@ -158,6 +161,17 @@ export type InboundEvent =
chat_id: string; chat_id: string;
stream_id?: string; stream_id?: string;
} }
| {
event: "reasoning_delta";
chat_id: string;
text: string;
stream_id?: string;
}
| {
event: "reasoning_end";
chat_id: string;
stream_id?: string;
}
| { | {
event: "runtime_model_updated"; event: "runtime_model_updated";
model_name: string; model_name: string;

View File

@ -103,37 +103,41 @@ describe("MessageBubble", () => {
expect(container.querySelector("video[controls]")).toBeInTheDocument(); expect(container.querySelector("video[controls]")).toBeInTheDocument();
}); });
it("surfaces reasoning content above the assistant answer when provided", () => { it("auto-expands the reasoning trace while streaming with a shimmer header", () => {
const message: UIMessage = { const message: UIMessage = {
id: "a-reasoning", id: "a-reasoning-streaming",
role: "assistant",
content: "",
createdAt: Date.now(),
reasoning: "Step 1: parse intent. Step 2: compute.",
reasoningStreaming: true,
};
const { container } = render(<MessageBubble message={message} />);
expect(screen.getByText("Thinking…")).toBeInTheDocument();
expect(screen.getByText(/Step 1: parse intent\./)).toBeInTheDocument();
expect(container.querySelector(".reasoning-shimmer")).toBeInTheDocument();
});
it("collapses the reasoning section by default once streaming ends", () => {
const message: UIMessage = {
id: "a-reasoning-done",
role: "assistant", role: "assistant",
content: "The answer is 42.", content: "The answer is 42.",
createdAt: Date.now(), createdAt: Date.now(),
reasoning: ["Step 1: parse intent.", "Step 2: compute."], reasoning: "hidden until expanded",
reasoningStreaming: false,
}; };
render(<MessageBubble message={message} />); render(<MessageBubble message={message} />);
expect(screen.getByText("Thinking")).toBeInTheDocument(); expect(screen.getByText("Thinking")).toBeInTheDocument();
expect(screen.getByText(/Step 1: parse intent\./)).toBeInTheDocument();
expect(screen.getByText(/Step 2: compute\./)).toBeInTheDocument();
expect(screen.getByText("The answer is 42.")).toBeInTheDocument(); expect(screen.getByText("The answer is 42.")).toBeInTheDocument();
}); expect(screen.queryByText("hidden until expanded")).not.toBeInTheDocument();
it("collapses the reasoning section when toggled", () => {
const message: UIMessage = {
id: "a-reasoning-collapse",
role: "assistant",
content: "done",
createdAt: Date.now(),
reasoning: ["hidden after toggle"],
};
render(<MessageBubble message={message} />);
expect(screen.getByText("hidden after toggle")).toBeInTheDocument();
fireEvent.click(screen.getByRole("button", { name: /thinking/i })); fireEvent.click(screen.getByRole("button", { name: /thinking/i }));
expect(screen.queryByText("hidden after toggle")).not.toBeInTheDocument(); expect(screen.getByText("hidden until expanded")).toBeInTheDocument();
}); });
it("renders assistant image media as a larger generated result", () => { it("renders assistant image media as a larger generated result", () => {

View File

@ -113,7 +113,7 @@ describe("useNanobotStream", () => {
expect(result.current.messages[1].kind).toBeUndefined(); expect(result.current.messages[1].kind).toBeUndefined();
}); });
it("parks reasoning frames on a placeholder assistant message until the answer arrives", () => { it("accumulates reasoning_delta chunks on a placeholder until reasoning_end", () => {
const fake = fakeClient(); const fake = fakeClient();
const { result } = renderHook(() => useNanobotStream("chat-r", EMPTY_MESSAGES), { const { result } = renderHook(() => useNanobotStream("chat-r", EMPTY_MESSAGES), {
wrapper: wrap(fake.client), wrapper: wrap(fake.client),
@ -121,28 +121,31 @@ describe("useNanobotStream", () => {
act(() => { act(() => {
fake.emit("chat-r", { fake.emit("chat-r", {
event: "message", event: "reasoning_delta",
chat_id: "chat-r", chat_id: "chat-r",
text: "Let me think step by step.", text: "Let me think ",
kind: "reasoning",
}); });
fake.emit("chat-r", { fake.emit("chat-r", {
event: "message", event: "reasoning_delta",
chat_id: "chat-r", chat_id: "chat-r",
text: "First, decompose the request.", text: "step by step.",
kind: "reasoning",
}); });
}); });
expect(result.current.messages).toHaveLength(1); expect(result.current.messages).toHaveLength(1);
expect(result.current.messages[0].role).toBe("assistant"); expect(result.current.messages[0].role).toBe("assistant");
expect(result.current.messages[0].reasoning).toEqual([ expect(result.current.messages[0].reasoning).toBe("Let me think step by step.");
"Let me think step by step.", expect(result.current.messages[0].reasoningStreaming).toBe(true);
"First, decompose the request.",
]); act(() => {
fake.emit("chat-r", { event: "reasoning_end", chat_id: "chat-r" });
});
expect(result.current.messages[0].reasoningStreaming).toBe(false);
expect(result.current.messages[0].reasoning).toBe("Let me think step by step.");
}); });
it("attaches reasoning to the latest assistant turn rather than spawning a new one", () => { it("absorbs a streaming reasoning placeholder into the answer turn that follows", () => {
const fake = fakeClient(); const fake = fakeClient();
const { result } = renderHook(() => useNanobotStream("chat-r2", EMPTY_MESSAGES), { const { result } = renderHook(() => useNanobotStream("chat-r2", EMPTY_MESSAGES), {
wrapper: wrap(fake.client), wrapper: wrap(fake.client),
@ -150,24 +153,26 @@ describe("useNanobotStream", () => {
act(() => { act(() => {
fake.emit("chat-r2", { fake.emit("chat-r2", {
event: "message", event: "reasoning_delta",
chat_id: "chat-r2",
text: "Plan first.",
});
fake.emit("chat-r2", { event: "reasoning_end", chat_id: "chat-r2" });
fake.emit("chat-r2", {
event: "delta",
chat_id: "chat-r2", chat_id: "chat-r2",
text: "The answer is 42.", text: "The answer is 42.",
}); });
fake.emit("chat-r2", { fake.emit("chat-r2", { event: "stream_end", chat_id: "chat-r2" });
event: "message",
chat_id: "chat-r2",
text: "Reasoning surfaced post-hoc.",
kind: "reasoning",
});
}); });
expect(result.current.messages).toHaveLength(1); expect(result.current.messages).toHaveLength(1);
expect(result.current.messages[0].content).toBe("The answer is 42."); expect(result.current.messages[0].content).toBe("The answer is 42.");
expect(result.current.messages[0].reasoning).toEqual(["Reasoning surfaced post-hoc."]); expect(result.current.messages[0].reasoning).toBe("Plan first.");
expect(result.current.messages[0].reasoningStreaming).toBe(false);
}); });
it("ignores empty reasoning frames", () => { it("ignores empty reasoning_delta frames", () => {
const fake = fakeClient(); const fake = fakeClient();
const { result } = renderHook(() => useNanobotStream("chat-r3", EMPTY_MESSAGES), { const { result } = renderHook(() => useNanobotStream("chat-r3", EMPTY_MESSAGES), {
wrapper: wrap(fake.client), wrapper: wrap(fake.client),
@ -175,16 +180,35 @@ describe("useNanobotStream", () => {
act(() => { act(() => {
fake.emit("chat-r3", { fake.emit("chat-r3", {
event: "message", event: "reasoning_delta",
chat_id: "chat-r3", chat_id: "chat-r3",
text: "", text: "",
kind: "reasoning",
}); });
}); });
expect(result.current.messages).toHaveLength(0); expect(result.current.messages).toHaveLength(0);
}); });
it("treats legacy kind=reasoning messages as a complete delta + end pair", () => {
const fake = fakeClient();
const { result } = renderHook(() => useNanobotStream("chat-r4", EMPTY_MESSAGES), {
wrapper: wrap(fake.client),
});
act(() => {
fake.emit("chat-r4", {
event: "message",
chat_id: "chat-r4",
text: "one-shot reasoning",
kind: "reasoning",
});
});
expect(result.current.messages).toHaveLength(1);
expect(result.current.messages[0].reasoning).toBe("one-shot reasoning");
expect(result.current.messages[0].reasoningStreaming).toBe(false);
});
it("attaches assistant media_urls to complete messages", () => { it("attaches assistant media_urls to complete messages", () => {
const fake = fakeClient(); const fake = fakeClient();
const { result } = renderHook(() => useNanobotStream("chat-m", EMPTY_MESSAGES), { const { result } = renderHook(() => useNanobotStream("chat-m", EMPTY_MESSAGES), {