From 458b4ba235b40e00139386a2c767670b91384903 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Wed, 13 May 2026 07:13:43 +0000 Subject: [PATCH] feat(reasoning): stream reasoning content as a first-class channel MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Reasoning now flows as its own stream — symmetric to the answer's ``delta`` / ``stream_end`` pair — instead of being shipped as one oversized progress message. This lets WebUI render a live "Thinking…" bubble that updates in place, then auto-collapses when the stream closes. Other channels remain plugin no-ops by default. ## Protocol New metadata: ``_reasoning_delta`` (chunk) and ``_reasoning_end`` (close marker). ChannelManager routes both to the dedicated plugin hooks below; the legacy one-shot ``_reasoning`` is kept for back-compat and BaseChannel expands it into a single delta + end pair so plugins only ever implement the streaming primitives. WebSocket emits two new events: - ``reasoning_delta`` (event, chat_id, text, optional stream_id) - ``reasoning_end`` (event, chat_id, optional stream_id) ## BaseChannel surface - ``send_reasoning_delta(chat_id, delta, metadata)`` — no-op default - ``send_reasoning_end(chat_id, metadata)`` — no-op default - ``send_reasoning(msg)`` — back-compat wrapper, base impl forwards to the streaming primitives A channel adds reasoning support by overriding the two streaming primitives. Telegram / Slack / Discord / Feishu / WeChat / Matrix keep the base no-ops until their bubble UIs are adapted; reasoning silently drops at dispatch, never as a stray text message. ## AgentHook Adds ``emit_reasoning_end`` to the hook lifecycle. ``_LoopHook`` tracks whether a reasoning segment is open and closes it on: - the first answer delta arriving (so the UI locks the bubble before the answer renders below), - ``on_stream_end``, - one-shot ``reasoning_content`` / ``thinking_blocks`` after a single non-streaming response. ## WebUI - ``UIMessage.reasoning`` is now a single accumulated string with a companion ``reasoningStreaming`` flag. - ``useNanobotStream`` consumes ``reasoning_delta`` / ``reasoning_end``; legacy ``kind: "reasoning"`` is auto-translated to a delta + end. - New ``ReasoningBubble``: shimmer header + auto-expanded while streaming, collapses to a clickable "Thinking" pill once closed, respects ``prefers-reduced-motion``. - Answer deltas adopt the reasoning placeholder so the bubble and the answer share one assistant row. ## Tests - ``tests/channels/test_channel_manager_reasoning.py`` — manager routes delta + end, drops on channel opt-out, expands one-shot back-compat. - ``tests/channels/test_websocket_channel.py`` — new ``reasoning_delta`` / ``reasoning_end`` frames, empty-chunk safety, no-subscriber safety, back-compat expansion. - ``tests/agent/test_runner_reasoning.py`` — runner closes the segment on streaming answer start and after one-shot reasoning. - WebUI ``useNanobotStream`` + ``message-bubble`` cover the new protocol and the shimmer styling. ## Docs ``docs/configuration.md`` and ``docs/websocket.md`` document the new events and the plugin contract. Co-authored-by: Cursor --- docs/configuration.md | 2 +- docs/websocket.md | 23 +++ nanobot/agent/hook.py | 11 ++ nanobot/agent/loop.py | 36 +++- nanobot/agent/runner.py | 18 +- nanobot/channels/base.py | 45 ++++- nanobot/channels/manager.py | 28 ++- nanobot/channels/websocket.py | 60 ++++-- tests/agent/test_runner_reasoning.py | 42 ++++ .../test_channel_manager_reasoning.py | 139 +++++++++----- tests/channels/test_websocket_channel.py | 63 ++++-- webui/src/components/MessageBubble.tsx | 57 ++++-- webui/src/globals.css | 28 +++ webui/src/hooks/useNanobotStream.ts | 180 +++++++++++++----- webui/src/i18n/locales/en/common.json | 1 + webui/src/i18n/locales/zh-CN/common.json | 3 +- webui/src/lib/types.ts | 22 ++- webui/src/tests/message-bubble.test.tsx | 42 ++-- webui/src/tests/useNanobotStream.test.tsx | 70 ++++--- 19 files changed, 649 insertions(+), 221 deletions(-) diff --git a/docs/configuration.md b/docs/configuration.md index ed5a534cf..0123017d2 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -743,7 +743,7 @@ Global settings that apply to all channels. Configure under the `channels` secti |---------|---------|-------------| | `sendProgress` | `true` | Stream agent's text progress to the channel | | `sendToolHints` | `false` | Stream tool-call hints (e.g. `read_file("…")`) | -| `showReasoning` | `true` | Allow channels to surface model reasoning/thinking content (DeepSeek-R1 `reasoning_content`, Anthropic `thinking_blocks`, inline `` tags). The setting is a plugin opt-in: even when `true`, a channel only renders reasoning if it overrides `send_reasoning()`. Currently surfaced on CLI and WebSocket/WebUI; other channels (Telegram, Slack, Discord, ...) keep it as a silent no-op until their bubble UI is adapted. Independent of `sendProgress`. | +| `showReasoning` | `true` | Allow channels to surface model reasoning/thinking content (DeepSeek-R1 `reasoning_content`, Anthropic `thinking_blocks`, inline `` tags). Reasoning flows as a dedicated stream with `_reasoning_delta` / `_reasoning_end` markers — channels override `send_reasoning_delta` / `send_reasoning_end` to render in-place updates. Even with `true`, channels without those overrides stay no-op silently. Currently surfaced on CLI and WebSocket/WebUI (italic shimmer header, auto-collapses after the stream ends); Telegram / Slack / Discord / Feishu / WeChat / Matrix keep the base no-op until their bubble UI is adapted. Independent of `sendProgress`. | | `sendMaxRetries` | `3` | Max delivery attempts per outbound message, including the initial send (0-10 configured, minimum 1 actual attempt) | | `transcriptionProvider` | `"groq"` | Voice transcription backend: `"groq"` (free tier, default) or `"openai"`. API key is auto-resolved from the matching provider config. | | `transcriptionLanguage` | `null` | Optional ISO-639-1 language hint for audio transcription, e.g. `"en"`, `"ko"`, `"ja"`. | diff --git a/docs/websocket.md b/docs/websocket.md index 556bb5bb6..d6a816ac1 100644 --- a/docs/websocket.md +++ b/docs/websocket.md @@ -128,6 +128,29 @@ All frames are JSON text. Each message has an `event` field. } ``` +**`reasoning_delta`** — incremental model reasoning / thinking chunk for the active assistant turn. Mirrors `delta` but targets the reasoning bubble above the answer rather than the answer body: + +```json +{ + "event": "reasoning_delta", + "chat_id": "uuid-v4", + "text": "Let me decompose ", + "stream_id": "r1" +} +``` + +**`reasoning_end`** — close marker for the active reasoning stream. WebUI uses this to lock the in-place bubble and switch from the shimmer header to a static collapsed state: + +```json +{ + "event": "reasoning_end", + "chat_id": "uuid-v4", + "stream_id": "r1" +} +``` + +Reasoning frames only flow when the channel's `showReasoning` is `true` (default) and the model returns reasoning content (DeepSeek-R1 / Kimi / MiMo / OpenAI reasoning models, Anthropic extended thinking, or inline `` / `` tags). Models without reasoning produce zero `reasoning_delta` frames. + **`runtime_model_updated`** — broadcast when the gateway runtime model changes, for example after `/model `: ```json diff --git a/nanobot/agent/hook.py b/nanobot/agent/hook.py index 86775742d..5b6fed445 100644 --- a/nanobot/agent/hook.py +++ b/nanobot/agent/hook.py @@ -52,6 +52,14 @@ class AgentHook: async def emit_reasoning(self, reasoning_content: str | None) -> None: pass + async def emit_reasoning_end(self) -> None: + """Mark the end of an in-flight reasoning stream. + + Hooks that buffer ``emit_reasoning`` chunks (for in-place UI updates) + flush and freeze the rendered group here. One-shot hooks ignore. + """ + pass + async def after_iteration(self, context: AgentHookContext) -> None: pass @@ -102,6 +110,9 @@ class CompositeHook(AgentHook): async def emit_reasoning(self, reasoning_content: str | None) -> None: await self._for_each_hook_safe("emit_reasoning", reasoning_content) + async def emit_reasoning_end(self) -> None: + await self._for_each_hook_safe("emit_reasoning_end") + async def after_iteration(self, context: AgentHookContext) -> None: await self._for_each_hook_safe("after_iteration", context) diff --git a/nanobot/agent/loop.py b/nanobot/agent/loop.py index e7b045f01..7897f89dd 100644 --- a/nanobot/agent/loop.py +++ b/nanobot/agent/loop.py @@ -87,6 +87,7 @@ class _LoopHook(AgentHook): self._session_key = session_key self._stream_buf = "" self._think_extractor = IncrementalThinkExtractor() + self._reasoning_open = False def wants_streaming(self) -> bool: return self._on_stream is not None @@ -102,10 +103,15 @@ class _LoopHook(AgentHook): if await self._think_extractor.feed(self._stream_buf, self.emit_reasoning): context.streamed_reasoning = True - if incremental and self._on_stream: - await self._on_stream(incremental) + if incremental: + # Answer text has started — close any open reasoning segment so + # the UI can lock the bubble before the answer renders below it. + await self.emit_reasoning_end() + if self._on_stream: + await self._on_stream(incremental) async def on_stream_end(self, context: AgentHookContext, *, resuming: bool) -> None: + await self.emit_reasoning_end() if self._on_stream_end: await self._on_stream_end(resuming=resuming) self._stream_buf = "" @@ -147,16 +153,27 @@ class _LoopHook(AgentHook): ) async def emit_reasoning(self, reasoning_content: str | None) -> None: - """Publish reasoning content; channel plugins decide whether to render. + """Publish a reasoning chunk; channel plugins decide whether to render. - The loop is intentionally not the gate: ``ChannelsConfig.show_reasoning`` - is a default that ``ChannelManager`` and ``BaseChannel.send_reasoning`` - consult per channel. A channel without a low-emphasis UI primitive - keeps the base no-op and the content drops at the dispatch boundary. + Each call is one delta in a streaming session. ``emit_reasoning_end`` + closes the segment. The loop is intentionally not the gate: + ``ChannelsConfig.show_reasoning`` is a default that ``ChannelManager`` + and ``BaseChannel.send_reasoning_delta`` consult per channel — a + channel without a low-emphasis UI primitive keeps the base no-op + and the content drops at the dispatch boundary. """ if self._on_progress and reasoning_content: + self._reasoning_open = True await self._on_progress(reasoning_content, reasoning=True) + async def emit_reasoning_end(self) -> None: + """Close the current reasoning stream segment, if any was open.""" + if self._reasoning_open and self._on_progress: + self._reasoning_open = False + await self._on_progress("", reasoning_end=True) + else: + self._reasoning_open = False + async def after_iteration(self, context: AgentHookContext) -> None: if ( self._on_progress @@ -665,12 +682,15 @@ class AgentLoop: tool_hint: bool = False, tool_events: list[dict[str, Any]] | None = None, reasoning: bool = False, + reasoning_end: bool = False, ) -> None: meta = dict(msg.metadata or {}) meta["_progress"] = True meta["_tool_hint"] = tool_hint if reasoning: - meta["_reasoning"] = True + meta["_reasoning_delta"] = True + if reasoning_end: + meta["_reasoning_end"] = True if tool_events: meta["_tool_events"] = tool_events await self.bus.publish_outbound( diff --git a/nanobot/agent/runner.py b/nanobot/agent/runner.py index 6b8e5383c..37da63872 100644 --- a/nanobot/agent/runner.py +++ b/nanobot/agent/runner.py @@ -291,6 +291,7 @@ class AgentRunner: response.content = cleaned_content if reasoning_text and not context.streamed_reasoning: await hook.emit_reasoning(reasoning_text) + await hook.emit_reasoning_end() context.streamed_reasoning = True if response.should_execute_tools: @@ -617,6 +618,8 @@ class AgentRunner: and getattr(self.provider, "supports_progress_deltas", False) is True ) + progress_state: dict[str, bool] | None = None + if wants_streaming: async def _stream(delta: str) -> None: if delta: @@ -630,6 +633,7 @@ class AgentRunner: elif wants_progress_streaming: stream_buf = "" think_extractor = IncrementalThinkExtractor() + progress_state = {"reasoning_open": False} async def _stream_progress(delta: str) -> None: nonlocal stream_buf @@ -642,8 +646,12 @@ class AgentRunner: if await think_extractor.feed(stream_buf, hook.emit_reasoning): context.streamed_reasoning = True + progress_state["reasoning_open"] = True if incremental: + if progress_state["reasoning_open"]: + await hook.emit_reasoning_end() + progress_state["reasoning_open"] = False context.streamed_content = True await spec.progress_callback(incremental) @@ -654,16 +662,20 @@ class AgentRunner: else: coro = self.provider.chat_with_retry(**kwargs) - if timeout_s is None: - return await coro try: - return await asyncio.wait_for(coro, timeout=timeout_s) + response = ( + await coro if timeout_s is None + else await asyncio.wait_for(coro, timeout=timeout_s) + ) except asyncio.TimeoutError: return LLMResponse( content=f"Error calling LLM: timed out after {timeout_s:g}s", finish_reason="error", error_kind="timeout", ) + if progress_state and progress_state.get("reasoning_open"): + await hook.emit_reasoning_end() + return response async def _request_finalization_retry( self, diff --git a/nanobot/channels/base.py b/nanobot/channels/base.py index c82003d88..257127d5a 100644 --- a/nanobot/channels/base.py +++ b/nanobot/channels/base.py @@ -121,18 +121,53 @@ class BaseChannel(ABC): """ pass - async def send_reasoning(self, msg: OutboundMessage) -> None: - """Surface model reasoning/thinking content. + async def send_reasoning_delta( + self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None + ) -> None: + """Stream a chunk of model reasoning/thinking content. Default is no-op. Channels with a native low-emphasis primitive (Slack context block, Telegram expandable blockquote, Discord subtext, WebUI italic bubble, ...) override to render reasoning - as a subordinate trace. Channels without a suitable affordance - keep this no-op: silently dropping is better than leaking raw - model thoughts as regular conversational messages. + as a subordinate trace that updates in place as the model thinks. + + Streaming contract mirrors :meth:`send_delta`: ``_reasoning_delta`` + is a chunk, ``_reasoning_end`` ends the current reasoning segment, + and stateful implementations should key buffers by ``_stream_id`` + rather than only by ``chat_id``. """ return + async def send_reasoning_end( + self, chat_id: str, metadata: dict[str, Any] | None = None + ) -> None: + """Mark the end of a reasoning stream segment. + + Default is no-op. Channels that buffer ``send_reasoning_delta`` + chunks for in-place updates use this signal to flush and freeze + the rendered group; one-shot channels can ignore it entirely. + """ + return + + async def send_reasoning(self, msg: OutboundMessage) -> None: + """Deliver a complete reasoning block. + + Default implementation reuses the streaming pair so plugins only + need to override the delta/end methods. Equivalent to one delta + with the full content followed immediately by an end marker — + keeps a single rendering path for both streamed and one-shot + reasoning (e.g. DeepSeek-R1's final-response ``reasoning_content``). + """ + if not msg.content: + return + meta = dict(msg.metadata or {}) + meta.setdefault("_reasoning_delta", True) + await self.send_reasoning_delta(msg.chat_id, msg.content, meta) + end_meta = dict(meta) + end_meta.pop("_reasoning_delta", None) + end_meta["_reasoning_end"] = True + await self.send_reasoning_end(msg.chat_id, end_meta) + @property def supports_streaming(self) -> bool: """True when config enables streaming AND this subclass implements send_delta.""" diff --git a/nanobot/channels/manager.py b/nanobot/channels/manager.py index abf9bf043..3a6b6e50f 100644 --- a/nanobot/channels/manager.py +++ b/nanobot/channels/manager.py @@ -283,13 +283,18 @@ class ChannelManager: timeout=1.0 ) - if msg.metadata.get("_reasoning"): - # Reasoning rides its own plugin channel: only delivered when - # the destination channel both opts in (``show_reasoning``) - # and overrides ``send_reasoning``. Channels without a - # low-emphasis UI primitive keep the base no-op and the - # content silently drops here rather than leak as a - # conversational reply. + if ( + msg.metadata.get("_reasoning_delta") + or msg.metadata.get("_reasoning_end") + or msg.metadata.get("_reasoning") + ): + # Reasoning rides its own plugin channel: only delivered + # when the destination channel opts in via ``show_reasoning`` + # and overrides the streaming primitives. Channels without + # a low-emphasis UI affordance keep the base no-op and the + # content silently drops here. ``_reasoning`` (one-shot) + # is accepted for backward compatibility with hooks that + # haven't migrated to delta/end yet. channel = self.channels.get(msg.channel) if channel is not None and channel.show_reasoning: await self._send_with_retry(channel, msg) @@ -345,7 +350,14 @@ class ChannelManager: @staticmethod async def _send_once(channel: BaseChannel, msg: OutboundMessage) -> None: """Send one outbound message without retry policy.""" - if msg.metadata.get("_reasoning"): + if msg.metadata.get("_reasoning_end"): + await channel.send_reasoning_end(msg.chat_id, msg.metadata) + elif msg.metadata.get("_reasoning_delta"): + await channel.send_reasoning_delta(msg.chat_id, msg.content, msg.metadata) + elif msg.metadata.get("_reasoning"): + # Back-compat: one-shot reasoning. BaseChannel translates this + # to a single delta + end pair so plugins only implement the + # streaming primitives. await channel.send_reasoning(msg) elif msg.metadata.get("_stream_delta") or msg.metadata.get("_stream_end"): await channel.send_delta(msg.chat_id, msg.content, msg.metadata) diff --git a/nanobot/channels/websocket.py b/nanobot/channels/websocket.py index bba68397f..a77c8594f 100644 --- a/nanobot/channels/websocket.py +++ b/nanobot/channels/websocket.py @@ -1487,30 +1487,54 @@ class WebSocketChannel(BaseChannel): for connection in conns: await self._safe_send_to(connection, raw, label=" ") - async def send_reasoning(self, msg: OutboundMessage) -> None: - """Stream model reasoning as a subordinate trace frame. - - Renders as ``kind=reasoning`` alongside the existing ``tool_hint`` / - ``progress`` frames; the WebUI mounts these on the active assistant - bubble rather than as a conversational reply. + async def send_reasoning_delta( + self, + chat_id: str, + delta: str, + metadata: dict[str, Any] | None = None, + ) -> None: + """Push one chunk of model reasoning. Mirrors ``send_delta`` shape so + WebUI receives a stream that opens, updates in place, and closes — + rendered above the active assistant bubble with a shimmer header + until the matching ``reasoning_end`` arrives. """ - conns = list(self._subs.get(msg.chat_id, ())) - if not conns: + conns = list(self._subs.get(chat_id, ())) + if not conns or not delta: return - if not msg.content: - return - payload: dict[str, Any] = { - "event": "message", - "chat_id": msg.chat_id, - "text": msg.content, - "kind": "reasoning", + meta = metadata or {} + body: dict[str, Any] = { + "event": "reasoning_delta", + "chat_id": chat_id, + "text": delta, } - if msg.reply_to: - payload["reply_to"] = msg.reply_to - raw = json.dumps(payload, ensure_ascii=False) + stream_id = meta.get("_stream_id") + if stream_id is not None: + body["stream_id"] = stream_id + raw = json.dumps(body, ensure_ascii=False) for connection in conns: await self._safe_send_to(connection, raw, label=" reasoning ") + async def send_reasoning_end( + self, + chat_id: str, + metadata: dict[str, Any] | None = None, + ) -> None: + """Close the current reasoning stream segment for in-place renderers.""" + conns = list(self._subs.get(chat_id, ())) + if not conns: + return + meta = metadata or {} + body: dict[str, Any] = { + "event": "reasoning_end", + "chat_id": chat_id, + } + stream_id = meta.get("_stream_id") + if stream_id is not None: + body["stream_id"] = stream_id + raw = json.dumps(body, ensure_ascii=False) + for connection in conns: + await self._safe_send_to(connection, raw, label=" reasoning_end ") + async def send_delta( self, chat_id: str, diff --git a/tests/agent/test_runner_reasoning.py b/tests/agent/test_runner_reasoning.py index 512f3d2e9..d971e05a1 100644 --- a/tests/agent/test_runner_reasoning.py +++ b/tests/agent/test_runner_reasoning.py @@ -24,11 +24,15 @@ class _RecordingHook(AgentHook): def __init__(self) -> None: super().__init__() self.emitted: list[str] = [] + self.end_calls = 0 async def emit_reasoning(self, reasoning_content: str | None) -> None: if reasoning_content: self.emitted.append(reasoning_content) + async def emit_reasoning_end(self) -> None: + self.end_calls += 1 + @pytest.mark.asyncio async def test_runner_preserves_reasoning_fields_in_assistant_history(): @@ -277,3 +281,41 @@ async def test_runner_does_not_double_emit_when_inline_think_already_streamed(): assert result.final_content == "The answer." assert hook.emitted == ["working..."] + assert hook.end_calls >= 1, "reasoning stream must be closed once the answer starts" + + +@pytest.mark.asyncio +async def test_runner_closes_reasoning_stream_after_one_shot_response(): + """A non-streaming response carrying ``reasoning_content`` must emit + both a reasoning delta and an end marker so channels can finalize the + in-place bubble.""" + from nanobot.agent.runner import AgentRunSpec, AgentRunner + + provider = MagicMock() + + async def chat_with_retry(**kwargs): + return LLMResponse( + content="answer", + reasoning_content="hidden thought", + tool_calls=[], + usage={"prompt_tokens": 5, "completion_tokens": 3}, + ) + + provider.chat_with_retry = chat_with_retry + tools = MagicMock() + tools.get_definitions.return_value = [] + + hook = _RecordingHook() + runner = AgentRunner(provider) + result = await runner.run(AgentRunSpec( + initial_messages=[{"role": "user", "content": "q"}], + tools=tools, + model="test-model", + max_iterations=3, + max_tool_result_chars=_MAX_TOOL_RESULT_CHARS, + hook=hook, + )) + + assert result.final_content == "answer" + assert hook.emitted == ["hidden thought"] + assert hook.end_calls == 1 diff --git a/tests/channels/test_channel_manager_reasoning.py b/tests/channels/test_channel_manager_reasoning.py index 2200f4be2..bc2a640c6 100644 --- a/tests/channels/test_channel_manager_reasoning.py +++ b/tests/channels/test_channel_manager_reasoning.py @@ -1,14 +1,22 @@ """Tests for ChannelManager routing of model reasoning content. -Reasoning is delivered as a separate plugin action (``send_reasoning``) -rather than a metadata flag on a regular outbound. The manager routes -``_reasoning`` messages only to channels that opt in via -``channel.show_reasoning``; channels without a low-emphasis UI primitive -keep the base no-op and the content silently drops at dispatch. +Reasoning is delivered through plugin streaming primitives +(``send_reasoning_delta`` / ``send_reasoning_end``) so each channel +controls in-place rendering — mirroring the existing answer ``send_delta`` +/ ``stream_end`` pair. The manager forwards reasoning frames only to +channels that opt in via ``channel.show_reasoning``; plugins without a +low-emphasis UI primitive keep the base no-op and the content silently +drops at dispatch. + +One-shot ``_reasoning`` frames are accepted for back-compat with hooks +that haven't migrated yet — ``BaseChannel.send_reasoning`` expands them +to a single delta + end pair so plugins only implement the streaming +primitives. """ from __future__ import annotations +import asyncio from unittest.mock import AsyncMock import pytest @@ -27,7 +35,8 @@ class _MockChannel(BaseChannel): def __init__(self, config, bus): super().__init__(config, bus) self._send_mock = AsyncMock() - self._send_reasoning_mock = AsyncMock() + self._delta_mock = AsyncMock() + self._end_mock = AsyncMock() async def start(self): # pragma: no cover - not exercised pass @@ -38,8 +47,11 @@ class _MockChannel(BaseChannel): async def send(self, msg): return await self._send_mock(msg) - async def send_reasoning(self, msg): - return await self._send_reasoning_mock(msg) + async def send_reasoning_delta(self, chat_id, delta, metadata=None): + return await self._delta_mock(chat_id, delta, metadata) + + async def send_reasoning_end(self, chat_id, metadata=None): + return await self._end_mock(chat_id, metadata) @pytest.fixture @@ -50,17 +62,52 @@ def manager() -> ChannelManager: @pytest.mark.asyncio -async def test_reasoning_routes_to_send_reasoning_not_send(manager): +async def test_reasoning_delta_routes_to_send_reasoning_delta(manager): channel = manager.channels["mock"] msg = OutboundMessage( channel="mock", chat_id="c1", - content="step-by-step thinking", + content="step-by-step", + metadata={"_progress": True, "_reasoning_delta": True, "_stream_id": "r1"}, + ) + await manager._send_once(channel, msg) + channel._delta_mock.assert_awaited_once() + args = channel._delta_mock.await_args.args + assert args[0] == "c1" + assert args[1] == "step-by-step" + channel._send_mock.assert_not_awaited() + channel._end_mock.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_reasoning_end_routes_to_send_reasoning_end(manager): + channel = manager.channels["mock"] + msg = OutboundMessage( + channel="mock", + chat_id="c1", + content="", + metadata={"_progress": True, "_reasoning_end": True, "_stream_id": "r1"}, + ) + await manager._send_once(channel, msg) + channel._end_mock.assert_awaited_once() + channel._delta_mock.assert_not_awaited() + + +@pytest.mark.asyncio +async def test_legacy_one_shot_reasoning_expands_to_delta_plus_end(manager): + """`_reasoning` (no delta/end pair) falls back through `send_reasoning` + which the base class expands to a single delta + end. Hooks that haven't + migrated still surface in WebUI as a complete stream segment.""" + channel = manager.channels["mock"] + msg = OutboundMessage( + channel="mock", + chat_id="c1", + content="one-shot reasoning", metadata={"_progress": True, "_reasoning": True}, ) await manager._send_once(channel, msg) - channel._send_reasoning_mock.assert_awaited_once_with(msg) - channel._send_mock.assert_not_awaited() + channel._delta_mock.assert_awaited_once() + channel._end_mock.assert_awaited_once() @pytest.mark.asyncio @@ -71,14 +118,14 @@ async def test_dispatch_drops_reasoning_when_channel_opts_out(manager): channel="mock", chat_id="c1", content="hidden thinking", - metadata={"_progress": True, "_reasoning": True}, + metadata={"_progress": True, "_reasoning_delta": True}, ) await manager.bus.publish_outbound(msg) - pumped = await _pump_one(manager) + await _pump_one(manager) - assert pumped is True - channel._send_reasoning_mock.assert_not_awaited() + channel._delta_mock.assert_not_awaited() + channel._end_mock.assert_not_awaited() channel._send_mock.assert_not_awaited() @@ -86,20 +133,24 @@ async def test_dispatch_drops_reasoning_when_channel_opts_out(manager): async def test_dispatch_delivers_reasoning_when_channel_opts_in(manager): channel = manager.channels["mock"] channel.show_reasoning = True - msg = OutboundMessage( + for chunk in ("first ", "second"): + await manager.bus.publish_outbound(OutboundMessage( + channel="mock", + chat_id="c1", + content=chunk, + metadata={"_progress": True, "_reasoning_delta": True, "_stream_id": "r1"}, + )) + await manager.bus.publish_outbound(OutboundMessage( channel="mock", chat_id="c1", - content="visible thinking", - metadata={"_progress": True, "_reasoning": True}, - ) - await manager.bus.publish_outbound(msg) + content="", + metadata={"_progress": True, "_reasoning_end": True, "_stream_id": "r1"}, + )) - pumped = await _pump_one(manager) + await _pump_one(manager) - assert pumped is True - channel._send_reasoning_mock.assert_awaited_once() - delivered = channel._send_reasoning_mock.await_args.args[0] - assert delivered.content == "visible thinking" + assert channel._delta_mock.await_count == 2 + channel._end_mock.assert_awaited_once() @pytest.mark.asyncio @@ -108,21 +159,19 @@ async def test_dispatch_silently_drops_reasoning_for_unknown_channel(manager): channel="ghost", chat_id="c1", content="nobody home", - metadata={"_progress": True, "_reasoning": True}, + metadata={"_progress": True, "_reasoning_delta": True}, ) await manager.bus.publish_outbound(msg) - pumped = await _pump_one(manager) + await _pump_one(manager) - assert pumped is True - # Mock channel must not receive anything destined for a different channel. - manager.channels["mock"]._send_reasoning_mock.assert_not_awaited() + manager.channels["mock"]._delta_mock.assert_not_awaited() manager.channels["mock"]._send_mock.assert_not_awaited() @pytest.mark.asyncio -async def test_base_channel_send_reasoning_is_noop_safe(): - """Plugins that don't override `send_reasoning` must not blow up.""" +async def test_base_channel_reasoning_primitives_are_noop_safe(): + """Plugins that don't override the streaming primitives must not blow up.""" class _Plain(BaseChannel): name = "plain" @@ -138,7 +187,9 @@ async def test_base_channel_send_reasoning_is_noop_safe(): pass channel = _Plain({}, MessageBus()) - # No exception, returns None. + assert await channel.send_reasoning_delta("c", "x") is None + assert await channel.send_reasoning_end("c") is None + # And the one-shot wrapper translates without raising. assert await channel.send_reasoning( OutboundMessage(channel="plain", chat_id="c", content="x", metadata={}) ) is None @@ -151,26 +202,21 @@ async def test_reasoning_routing_does_not_consult_send_progress(manager): channel = manager.channels["mock"] channel.send_progress = False channel.show_reasoning = True - msg = OutboundMessage( + await manager.bus.publish_outbound(OutboundMessage( channel="mock", chat_id="c1", content="still surfaces", - metadata={"_progress": True, "_reasoning": True}, - ) - await manager.bus.publish_outbound(msg) + metadata={"_progress": True, "_reasoning_delta": True}, + )) - pumped = await _pump_one(manager) + await _pump_one(manager) - assert pumped is True - channel._send_reasoning_mock.assert_awaited_once() + channel._delta_mock.assert_awaited_once() -async def _pump_one(manager: ChannelManager) -> bool: - """Drive the dispatcher for exactly one message, then cancel.""" - import asyncio - +async def _pump_one(manager: ChannelManager) -> None: + """Drive the dispatcher until the outbound queue drains, then cancel.""" task = asyncio.create_task(manager._dispatch_outbound()) - # Yield control until the queue drains. for _ in range(50): await asyncio.sleep(0.01) if manager.bus.outbound.qsize() == 0: @@ -180,4 +226,3 @@ async def _pump_one(manager: ChannelManager) -> bool: await task except asyncio.CancelledError: pass - return True diff --git a/tests/channels/test_websocket_channel.py b/tests/channels/test_websocket_channel.py index 0e682ed0a..f11cb21b4 100644 --- a/tests/channels/test_websocket_channel.py +++ b/tests/channels/test_websocket_channel.py @@ -359,30 +359,44 @@ async def test_send_delta_emits_delta_and_stream_end() -> None: @pytest.mark.asyncio -async def test_send_reasoning_emits_reasoning_kind_frame() -> None: +async def test_send_reasoning_delta_emits_streaming_frame() -> None: bus = MagicMock() channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) mock_ws = AsyncMock() channel._attach(mock_ws, "chat-1") - await channel.send_reasoning(OutboundMessage( - channel="websocket", - chat_id="chat-1", - content="step-by-step thinking", - metadata={"_progress": True, "_reasoning": True}, - )) + await channel.send_reasoning_delta( + "chat-1", + "step-by-step thinking", + {"_reasoning_delta": True, "_stream_id": "r1"}, + ) mock_ws.send.assert_awaited_once() payload = json.loads(mock_ws.send.await_args.args[0]) - assert payload["event"] == "message" + assert payload["event"] == "reasoning_delta" assert payload["chat_id"] == "chat-1" assert payload["text"] == "step-by-step thinking" - assert payload["kind"] == "reasoning" + assert payload["stream_id"] == "r1" @pytest.mark.asyncio -async def test_send_reasoning_drops_empty_content() -> None: - """Empty reasoning emits nothing — keeps the frontend bubble clean.""" +async def test_send_reasoning_end_emits_close_frame() -> None: + bus = MagicMock() + channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) + mock_ws = AsyncMock() + channel._attach(mock_ws, "chat-1") + + await channel.send_reasoning_end("chat-1", {"_reasoning_end": True, "_stream_id": "r1"}) + + payload = json.loads(mock_ws.send.await_args.args[0]) + assert payload == {"event": "reasoning_end", "chat_id": "chat-1", "stream_id": "r1"} + + +@pytest.mark.asyncio +async def test_send_reasoning_one_shot_expands_to_delta_plus_end() -> None: + """``send_reasoning`` is back-compat for hooks that haven't migrated: + the base implementation must produce one delta and one end so the + WebUI sees the same shape either way.""" bus = MagicMock() channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) mock_ws = AsyncMock() @@ -391,10 +405,27 @@ async def test_send_reasoning_drops_empty_content() -> None: await channel.send_reasoning(OutboundMessage( channel="websocket", chat_id="chat-1", - content="", + content="thinking", metadata={"_reasoning": True}, )) + assert mock_ws.send.await_count == 2 + first = json.loads(mock_ws.send.call_args_list[0][0][0]) + second = json.loads(mock_ws.send.call_args_list[1][0][0]) + assert first["event"] == "reasoning_delta" + assert first["text"] == "thinking" + assert second["event"] == "reasoning_end" + + +@pytest.mark.asyncio +async def test_send_reasoning_delta_drops_empty_chunks() -> None: + bus = MagicMock() + channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) + mock_ws = AsyncMock() + channel._attach(mock_ws, "chat-1") + + await channel.send_reasoning_delta("chat-1", "", {"_reasoning_delta": True}) + mock_ws.send.assert_not_awaited() @@ -403,12 +434,8 @@ async def test_send_reasoning_without_subscribers_is_noop() -> None: bus = MagicMock() channel = WebSocketChannel({"enabled": True, "allowFrom": ["*"]}, bus) - await channel.send_reasoning(OutboundMessage( - channel="websocket", - chat_id="unattached", - content="thinking", - metadata={"_reasoning": True}, - )) + await channel.send_reasoning_delta("unattached", "thinking", None) + await channel.send_reasoning_end("unattached", None) # No subscribers, no exception, no send. diff --git a/webui/src/components/MessageBubble.tsx b/webui/src/components/MessageBubble.tsx index 556460824..9002ad500 100644 --- a/webui/src/components/MessageBubble.tsx +++ b/webui/src/components/MessageBubble.tsx @@ -1,4 +1,4 @@ -import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; import { Check, ChevronRight, Copy, FileIcon, ImageIcon, PlaySquare, Sparkles, Wrench } from "lucide-react"; import { useTranslation } from "react-i18next"; @@ -85,12 +85,16 @@ export function MessageBubble({ message }: MessageBubbleProps) { const empty = message.content.trim().length === 0; const media = message.media ?? []; - const reasoning = message.role === "assistant" ? message.reasoning ?? [] : []; + const reasoning = message.role === "assistant" ? message.reasoning ?? "" : ""; + const reasoningStreaming = !!(message.role === "assistant" && message.reasoningStreaming); + const hasReasoning = reasoning.length > 0 || reasoningStreaming; const showAssistantActions = message.role === "assistant" && !message.isStreaming && !empty; return (
- {reasoning.length > 0 ? : null} - {empty && message.isStreaming && reasoning.length === 0 ? ( + {hasReasoning ? ( + + ) : null} + {empty && message.isStreaming && !hasReasoning ? ( ) : empty && message.isStreaming ? null : ( <> @@ -437,33 +441,52 @@ function TraceGroup({ message, animClass }: TraceGroupProps) { } interface ReasoningBubbleProps { - lines: string[]; + text: string; + streaming: boolean; } /** - * Subordinate "thinking" trace shown above an assistant turn. Mirrors the - * CLI's italic dim ``ChevronRight`` row visually; collapsible because - * reasoning from models like DeepSeek-R1 / o-series can run long. Defaults - * to expanded while the answer is still streaming (so the user sees the - * model "thinking out loud"), but the toggle persists across rerenders. + * Subordinate "thinking" trace shown above an assistant turn. + * + * Lifecycle: + * - While ``streaming`` is true (``reasoning_delta`` frames still arriving), + * the bubble defaults to open and the header runs a shimmer + pulse so + * the user sees the model "thinking out loud" in real time. + * - On ``reasoning_end`` the bubble auto-collapses for prose density — + * the user can re-expand to inspect the chain of thought. The local + * toggle persists once the user interacts. */ -function ReasoningBubble({ lines }: ReasoningBubbleProps) { +function ReasoningBubble({ text, streaming }: ReasoningBubbleProps) { const { t } = useTranslation(); - const [open, setOpen] = useState(true); - const text = useMemo(() => lines.join("\n\n"), [lines]); + const [userToggled, setUserToggled] = useState(false); + const [openLocal, setOpenLocal] = useState(true); + const open = userToggled ? openLocal : streaming; + const onToggle = () => { + setUserToggled(true); + setOpenLocal((v) => (userToggled ? !v : !open)); + }; return (
- {open && ( + {open && text.length > 0 && (
= 0; i -= 1) { + const candidate = prev[i]; + if (candidate.role !== "assistant" || candidate.kind === "trace") continue; + const hasAnswer = candidate.content.length > 0; + if (candidate.reasoningStreaming || (!hasAnswer && candidate.reasoning !== undefined)) { + const merged: UIMessage = { + ...candidate, + reasoning: (candidate.reasoning ?? "") + chunk, + reasoningStreaming: true, + }; + return [...prev.slice(0, i), merged, ...prev.slice(i + 1)]; + } + if (!hasAnswer && candidate.isStreaming) { + const merged: UIMessage = { + ...candidate, + reasoning: chunk, + reasoningStreaming: true, + }; + return [...prev.slice(0, i), merged, ...prev.slice(i + 1)]; + } + break; + } + return [ + ...prev, + { + id: crypto.randomUUID(), + role: "assistant", + content: "", + isStreaming: true, + reasoning: chunk, + reasoningStreaming: true, + createdAt: Date.now(), + }, + ]; +} + +/** + * Find the most recent assistant placeholder that an incoming answer + * delta should adopt instead of spawning a parallel row. We look for an + * empty-content assistant turn that is still marked ``isStreaming`` — + * typically created earlier by ``reasoning_delta``. Anything else means + * the model already produced an answer in a previous turn, so the new + * delta belongs in a fresh row. + */ +function findActiveAssistantPlaceholder(prev: UIMessage[]): string | null { + const last = prev[prev.length - 1]; + if (!last) return null; + if (last.role !== "assistant" || last.kind === "trace") return null; + if (last.content.length > 0) return null; + if (!last.isStreaming) return null; + return last.id; +} + +/** + * Close the active reasoning stream segment, if any. Idempotent: a + * ``reasoning_end`` with no preceding deltas is a harmless no-op. + */ +function closeReasoningStream(prev: UIMessage[]): UIMessage[] { + for (let i = prev.length - 1; i >= 0; i -= 1) { + const candidate = prev[i]; + if (!candidate.reasoningStreaming) continue; + const merged: UIMessage = { ...candidate, reasoningStreaming: false }; + return [...prev.slice(0, i), merged, ...prev.slice(i + 1)]; + } + return prev; +} + /** * Subscribe to a chat by ID. Returns the in-memory message list for the chat, * a streaming flag, and a ``send`` function. Initial history must be seeded @@ -122,27 +198,42 @@ export function useNanobotStream( if (ev.event === "delta") { if (suppressStreamUntilTurnEndRef.current) return; - const id = buffer.current?.messageId ?? crypto.randomUUID(); - if (!buffer.current) { - buffer.current = { messageId: id, parts: [] }; - setMessages((prev) => [ - ...prev, - { - id, - role: "assistant", - content: "", - isStreaming: true, - createdAt: Date.now(), - }, - ]); - setIsStreaming(true); - } - buffer.current.parts.push(ev.text); - const combined = buffer.current.parts.join(""); - const targetId = buffer.current.messageId; - setMessages((prev) => - prev.map((m) => (m.id === targetId ? { ...m, content: combined } : m)), - ); + const chunk = ev.text; + setIsStreaming(true); + setMessages((prev) => { + // Reuse an in-flight assistant placeholder (typically created by + // ``reasoning_delta``) so the answer renders below its own + // thinking trace instead of in a parallel row. + const adopted = !buffer.current ? findActiveAssistantPlaceholder(prev) : null; + let targetId: string; + let next: UIMessage[]; + if (buffer.current) { + targetId = buffer.current.messageId; + next = prev; + } else if (adopted) { + targetId = adopted; + buffer.current = { messageId: targetId, parts: [] }; + next = prev; + } else { + targetId = crypto.randomUUID(); + buffer.current = { messageId: targetId, parts: [] }; + next = [ + ...prev, + { + id: targetId, + role: "assistant", + content: "", + isStreaming: true, + createdAt: Date.now(), + }, + ]; + } + buffer.current.parts.push(chunk); + const combined = buffer.current.parts.join(""); + return next.map((m) => + m.id === targetId ? { ...m, content: combined, isStreaming: true } : m, + ); + }); return; } @@ -159,6 +250,21 @@ export function useNanobotStream( return; } + if (ev.event === "reasoning_delta") { + if (suppressStreamUntilTurnEndRef.current) return; + const chunk = ev.text; + if (!chunk) return; + setMessages((prev) => attachReasoningChunk(prev, chunk)); + setIsStreaming(true); + return; + } + + if (ev.event === "reasoning_end") { + if (suppressStreamUntilTurnEndRef.current) return; + setMessages((prev) => closeReasoningStream(prev)); + return; + } + if (ev.event === "turn_end") { // Definitive signal that the turn is fully complete. Cancel any // pending debounce timer and stop the loading indicator immediately. @@ -187,37 +293,13 @@ export function useNanobotStream( ) { return; } - // Model reasoning rides its own channel: stash it on the next - // assistant turn so the bubble renders it as a subordinate trace. - // If the assistant message hasn't materialized yet (typical, since - // reasoning fires before tool calls/answers), park it on a sentinel - // pending row that the next assistant message absorbs. + // Back-compat: a legacy ``kind: "reasoning"`` message (no streaming + // partner) is treated as one complete delta + immediate end so the + // bubble renders identically to the streaming path. if (ev.kind === "reasoning") { const line = ev.text; if (!line) return; - setMessages((prev) => { - for (let i = prev.length - 1; i >= 0; i -= 1) { - const candidate = prev[i]; - if (candidate.role === "assistant" && candidate.kind !== "trace") { - const merged: UIMessage = { - ...candidate, - reasoning: [...(candidate.reasoning ?? []), line], - }; - return [...prev.slice(0, i), merged, ...prev.slice(i + 1)]; - } - } - return [ - ...prev, - { - id: crypto.randomUUID(), - role: "assistant", - content: "", - isStreaming: true, - reasoning: [line], - createdAt: Date.now(), - }, - ]; - }); + setMessages((prev) => closeReasoningStream(attachReasoningChunk(prev, line))); return; } // Intermediate agent breadcrumbs (tool-call hints, raw progress). diff --git a/webui/src/i18n/locales/en/common.json b/webui/src/i18n/locales/en/common.json index 1f6eb7b54..e82a8f5b7 100644 --- a/webui/src/i18n/locales/en/common.json +++ b/webui/src/i18n/locales/en/common.json @@ -333,6 +333,7 @@ "toolSingle": "Using a tool", "toolMany": "Used {{count}} tools", "reasoning": "Thinking", + "reasoningStreaming": "Thinking…", "imageAttachment": "Image attachment", "copyReply": "Copy reply", "copiedReply": "Copied reply" diff --git a/webui/src/i18n/locales/zh-CN/common.json b/webui/src/i18n/locales/zh-CN/common.json index 662a5f7bd..18d4b5e16 100644 --- a/webui/src/i18n/locales/zh-CN/common.json +++ b/webui/src/i18n/locales/zh-CN/common.json @@ -320,7 +320,8 @@ "assistantTyping": "助手正在输入", "toolSingle": "正在使用工具", "toolMany": "已使用 {{count}} 个工具", - "reasoning": "思考中", + "reasoning": "思考过程", + "reasoningStreaming": "正在思考…", "imageAttachment": "图片附件", "copyReply": "复制回复", "copiedReply": "已复制回复" diff --git a/webui/src/lib/types.ts b/webui/src/lib/types.ts index 0338b75f3..25c317753 100644 --- a/webui/src/lib/types.ts +++ b/webui/src/lib/types.ts @@ -44,10 +44,13 @@ export interface UIMessage { images?: UIImage[]; /** Signed or local UI-renderable media attachments. */ media?: UIMediaAttachment[]; - /** Assistant turn: model reasoning / thinking content collected from - * `kind: "reasoning"` frames. Each entry is one emit cycle, joined with - * blank lines on render. */ - reasoning?: string[]; + /** Assistant turn: accumulated model reasoning / thinking text. Built up + * incrementally from ``reasoning_delta`` frames; finalized when + * ``reasoning_end`` arrives. */ + reasoning?: string; + /** True while ``reasoning_delta`` frames are still arriving for this turn. + * Drives the shimmer header on ``ReasoningBubble``. */ + reasoningStreaming?: boolean; } export interface ChatSummary { @@ -158,6 +161,17 @@ export type InboundEvent = chat_id: string; stream_id?: string; } + | { + event: "reasoning_delta"; + chat_id: string; + text: string; + stream_id?: string; + } + | { + event: "reasoning_end"; + chat_id: string; + stream_id?: string; + } | { event: "runtime_model_updated"; model_name: string; diff --git a/webui/src/tests/message-bubble.test.tsx b/webui/src/tests/message-bubble.test.tsx index 77608b121..29c40a3b8 100644 --- a/webui/src/tests/message-bubble.test.tsx +++ b/webui/src/tests/message-bubble.test.tsx @@ -103,37 +103,41 @@ describe("MessageBubble", () => { expect(container.querySelector("video[controls]")).toBeInTheDocument(); }); - it("surfaces reasoning content above the assistant answer when provided", () => { + it("auto-expands the reasoning trace while streaming with a shimmer header", () => { const message: UIMessage = { - id: "a-reasoning", + id: "a-reasoning-streaming", + role: "assistant", + content: "", + createdAt: Date.now(), + reasoning: "Step 1: parse intent. Step 2: compute.", + reasoningStreaming: true, + }; + + const { container } = render(); + + expect(screen.getByText("Thinking…")).toBeInTheDocument(); + expect(screen.getByText(/Step 1: parse intent\./)).toBeInTheDocument(); + expect(container.querySelector(".reasoning-shimmer")).toBeInTheDocument(); + }); + + it("collapses the reasoning section by default once streaming ends", () => { + const message: UIMessage = { + id: "a-reasoning-done", role: "assistant", content: "The answer is 42.", createdAt: Date.now(), - reasoning: ["Step 1: parse intent.", "Step 2: compute."], + reasoning: "hidden until expanded", + reasoningStreaming: false, }; render(); expect(screen.getByText("Thinking")).toBeInTheDocument(); - expect(screen.getByText(/Step 1: parse intent\./)).toBeInTheDocument(); - expect(screen.getByText(/Step 2: compute\./)).toBeInTheDocument(); expect(screen.getByText("The answer is 42.")).toBeInTheDocument(); - }); + expect(screen.queryByText("hidden until expanded")).not.toBeInTheDocument(); - it("collapses the reasoning section when toggled", () => { - const message: UIMessage = { - id: "a-reasoning-collapse", - role: "assistant", - content: "done", - createdAt: Date.now(), - reasoning: ["hidden after toggle"], - }; - - render(); - - expect(screen.getByText("hidden after toggle")).toBeInTheDocument(); fireEvent.click(screen.getByRole("button", { name: /thinking/i })); - expect(screen.queryByText("hidden after toggle")).not.toBeInTheDocument(); + expect(screen.getByText("hidden until expanded")).toBeInTheDocument(); }); it("renders assistant image media as a larger generated result", () => { diff --git a/webui/src/tests/useNanobotStream.test.tsx b/webui/src/tests/useNanobotStream.test.tsx index 7fb94063c..145d36c1c 100644 --- a/webui/src/tests/useNanobotStream.test.tsx +++ b/webui/src/tests/useNanobotStream.test.tsx @@ -113,7 +113,7 @@ describe("useNanobotStream", () => { expect(result.current.messages[1].kind).toBeUndefined(); }); - it("parks reasoning frames on a placeholder assistant message until the answer arrives", () => { + it("accumulates reasoning_delta chunks on a placeholder until reasoning_end", () => { const fake = fakeClient(); const { result } = renderHook(() => useNanobotStream("chat-r", EMPTY_MESSAGES), { wrapper: wrap(fake.client), @@ -121,28 +121,31 @@ describe("useNanobotStream", () => { act(() => { fake.emit("chat-r", { - event: "message", + event: "reasoning_delta", chat_id: "chat-r", - text: "Let me think step by step.", - kind: "reasoning", + text: "Let me think ", }); fake.emit("chat-r", { - event: "message", + event: "reasoning_delta", chat_id: "chat-r", - text: "First, decompose the request.", - kind: "reasoning", + text: "step by step.", }); }); expect(result.current.messages).toHaveLength(1); expect(result.current.messages[0].role).toBe("assistant"); - expect(result.current.messages[0].reasoning).toEqual([ - "Let me think step by step.", - "First, decompose the request.", - ]); + expect(result.current.messages[0].reasoning).toBe("Let me think step by step."); + expect(result.current.messages[0].reasoningStreaming).toBe(true); + + act(() => { + fake.emit("chat-r", { event: "reasoning_end", chat_id: "chat-r" }); + }); + + expect(result.current.messages[0].reasoningStreaming).toBe(false); + expect(result.current.messages[0].reasoning).toBe("Let me think step by step."); }); - it("attaches reasoning to the latest assistant turn rather than spawning a new one", () => { + it("absorbs a streaming reasoning placeholder into the answer turn that follows", () => { const fake = fakeClient(); const { result } = renderHook(() => useNanobotStream("chat-r2", EMPTY_MESSAGES), { wrapper: wrap(fake.client), @@ -150,24 +153,26 @@ describe("useNanobotStream", () => { act(() => { fake.emit("chat-r2", { - event: "message", + event: "reasoning_delta", + chat_id: "chat-r2", + text: "Plan first.", + }); + fake.emit("chat-r2", { event: "reasoning_end", chat_id: "chat-r2" }); + fake.emit("chat-r2", { + event: "delta", chat_id: "chat-r2", text: "The answer is 42.", }); - fake.emit("chat-r2", { - event: "message", - chat_id: "chat-r2", - text: "Reasoning surfaced post-hoc.", - kind: "reasoning", - }); + fake.emit("chat-r2", { event: "stream_end", chat_id: "chat-r2" }); }); expect(result.current.messages).toHaveLength(1); expect(result.current.messages[0].content).toBe("The answer is 42."); - expect(result.current.messages[0].reasoning).toEqual(["Reasoning surfaced post-hoc."]); + expect(result.current.messages[0].reasoning).toBe("Plan first."); + expect(result.current.messages[0].reasoningStreaming).toBe(false); }); - it("ignores empty reasoning frames", () => { + it("ignores empty reasoning_delta frames", () => { const fake = fakeClient(); const { result } = renderHook(() => useNanobotStream("chat-r3", EMPTY_MESSAGES), { wrapper: wrap(fake.client), @@ -175,16 +180,35 @@ describe("useNanobotStream", () => { act(() => { fake.emit("chat-r3", { - event: "message", + event: "reasoning_delta", chat_id: "chat-r3", text: "", - kind: "reasoning", }); }); expect(result.current.messages).toHaveLength(0); }); + it("treats legacy kind=reasoning messages as a complete delta + end pair", () => { + const fake = fakeClient(); + const { result } = renderHook(() => useNanobotStream("chat-r4", EMPTY_MESSAGES), { + wrapper: wrap(fake.client), + }); + + act(() => { + fake.emit("chat-r4", { + event: "message", + chat_id: "chat-r4", + text: "one-shot reasoning", + kind: "reasoning", + }); + }); + + expect(result.current.messages).toHaveLength(1); + expect(result.current.messages[0].reasoning).toBe("one-shot reasoning"); + expect(result.current.messages[0].reasoningStreaming).toBe(false); + }); + it("attaches assistant media_urls to complete messages", () => { const fake = fakeClient(); const { result } = renderHook(() => useNanobotStream("chat-m", EMPTY_MESSAGES), {