mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-20 16:42:25 +00:00
Reasoning now flows as its own stream — symmetric to the answer's ``delta`` / ``stream_end`` pair — instead of being shipped as one oversized progress message. This lets WebUI render a live "Thinking…" bubble that updates in place, then auto-collapses when the stream closes. Other channels remain plugin no-ops by default. ## Protocol New metadata: ``_reasoning_delta`` (chunk) and ``_reasoning_end`` (close marker). ChannelManager routes both to the dedicated plugin hooks below; the legacy one-shot ``_reasoning`` is kept for back-compat and BaseChannel expands it into a single delta + end pair so plugins only ever implement the streaming primitives. WebSocket emits two new events: - ``reasoning_delta`` (event, chat_id, text, optional stream_id) - ``reasoning_end`` (event, chat_id, optional stream_id) ## BaseChannel surface - ``send_reasoning_delta(chat_id, delta, metadata)`` — no-op default - ``send_reasoning_end(chat_id, metadata)`` — no-op default - ``send_reasoning(msg)`` — back-compat wrapper, base impl forwards to the streaming primitives A channel adds reasoning support by overriding the two streaming primitives. Telegram / Slack / Discord / Feishu / WeChat / Matrix keep the base no-ops until their bubble UIs are adapted; reasoning silently drops at dispatch, never as a stray text message. ## AgentHook Adds ``emit_reasoning_end`` to the hook lifecycle. ``_LoopHook`` tracks whether a reasoning segment is open and closes it on: - the first answer delta arriving (so the UI locks the bubble before the answer renders below), - ``on_stream_end``, - one-shot ``reasoning_content`` / ``thinking_blocks`` after a single non-streaming response. ## WebUI - ``UIMessage.reasoning`` is now a single accumulated string with a companion ``reasoningStreaming`` flag. - ``useNanobotStream`` consumes ``reasoning_delta`` / ``reasoning_end``; legacy ``kind: "reasoning"`` is auto-translated to a delta + end. - New ``ReasoningBubble``: shimmer header + auto-expanded while streaming, collapses to a clickable "Thinking" pill once closed, respects ``prefers-reduced-motion``. - Answer deltas adopt the reasoning placeholder so the bubble and the answer share one assistant row. ## Tests - ``tests/channels/test_channel_manager_reasoning.py`` — manager routes delta + end, drops on channel opt-out, expands one-shot back-compat. - ``tests/channels/test_websocket_channel.py`` — new ``reasoning_delta`` / ``reasoning_end`` frames, empty-chunk safety, no-subscriber safety, back-compat expansion. - ``tests/agent/test_runner_reasoning.py`` — runner closes the segment on streaming answer start and after one-shot reasoning. - WebUI ``useNanobotStream`` + ``message-bubble`` cover the new protocol and the shimmer styling. ## Docs ``docs/configuration.md`` and ``docs/websocket.md`` document the new events and the plugin contract. Co-authored-by: Cursor <cursoragent@cursor.com>
229 lines
7.2 KiB
Python
229 lines
7.2 KiB
Python
"""Tests for ChannelManager routing of model reasoning content.
|
|
|
|
Reasoning is delivered through plugin streaming primitives
|
|
(``send_reasoning_delta`` / ``send_reasoning_end``) so each channel
|
|
controls in-place rendering — mirroring the existing answer ``send_delta``
|
|
/ ``stream_end`` pair. The manager forwards reasoning frames only to
|
|
channels that opt in via ``channel.show_reasoning``; plugins without a
|
|
low-emphasis UI primitive keep the base no-op and the content silently
|
|
drops at dispatch.
|
|
|
|
One-shot ``_reasoning`` frames are accepted for back-compat with hooks
|
|
that haven't migrated yet — ``BaseChannel.send_reasoning`` expands them
|
|
to a single delta + end pair so plugins only implement the streaming
|
|
primitives.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
from unittest.mock import AsyncMock
|
|
|
|
import pytest
|
|
|
|
from nanobot.bus.events import OutboundMessage
|
|
from nanobot.bus.queue import MessageBus
|
|
from nanobot.channels.base import BaseChannel
|
|
from nanobot.channels.manager import ChannelManager
|
|
from nanobot.config.schema import Config
|
|
|
|
|
|
class _MockChannel(BaseChannel):
|
|
name = "mock"
|
|
display_name = "Mock"
|
|
|
|
def __init__(self, config, bus):
|
|
super().__init__(config, bus)
|
|
self._send_mock = AsyncMock()
|
|
self._delta_mock = AsyncMock()
|
|
self._end_mock = AsyncMock()
|
|
|
|
async def start(self): # pragma: no cover - not exercised
|
|
pass
|
|
|
|
async def stop(self): # pragma: no cover - not exercised
|
|
pass
|
|
|
|
async def send(self, msg):
|
|
return await self._send_mock(msg)
|
|
|
|
async def send_reasoning_delta(self, chat_id, delta, metadata=None):
|
|
return await self._delta_mock(chat_id, delta, metadata)
|
|
|
|
async def send_reasoning_end(self, chat_id, metadata=None):
|
|
return await self._end_mock(chat_id, metadata)
|
|
|
|
|
|
@pytest.fixture
|
|
def manager() -> ChannelManager:
|
|
mgr = ChannelManager(Config(), MessageBus())
|
|
mgr.channels["mock"] = _MockChannel({}, mgr.bus)
|
|
return mgr
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reasoning_delta_routes_to_send_reasoning_delta(manager):
|
|
channel = manager.channels["mock"]
|
|
msg = OutboundMessage(
|
|
channel="mock",
|
|
chat_id="c1",
|
|
content="step-by-step",
|
|
metadata={"_progress": True, "_reasoning_delta": True, "_stream_id": "r1"},
|
|
)
|
|
await manager._send_once(channel, msg)
|
|
channel._delta_mock.assert_awaited_once()
|
|
args = channel._delta_mock.await_args.args
|
|
assert args[0] == "c1"
|
|
assert args[1] == "step-by-step"
|
|
channel._send_mock.assert_not_awaited()
|
|
channel._end_mock.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reasoning_end_routes_to_send_reasoning_end(manager):
|
|
channel = manager.channels["mock"]
|
|
msg = OutboundMessage(
|
|
channel="mock",
|
|
chat_id="c1",
|
|
content="",
|
|
metadata={"_progress": True, "_reasoning_end": True, "_stream_id": "r1"},
|
|
)
|
|
await manager._send_once(channel, msg)
|
|
channel._end_mock.assert_awaited_once()
|
|
channel._delta_mock.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_legacy_one_shot_reasoning_expands_to_delta_plus_end(manager):
|
|
"""`_reasoning` (no delta/end pair) falls back through `send_reasoning`
|
|
which the base class expands to a single delta + end. Hooks that haven't
|
|
migrated still surface in WebUI as a complete stream segment."""
|
|
channel = manager.channels["mock"]
|
|
msg = OutboundMessage(
|
|
channel="mock",
|
|
chat_id="c1",
|
|
content="one-shot reasoning",
|
|
metadata={"_progress": True, "_reasoning": True},
|
|
)
|
|
await manager._send_once(channel, msg)
|
|
channel._delta_mock.assert_awaited_once()
|
|
channel._end_mock.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_drops_reasoning_when_channel_opts_out(manager):
|
|
channel = manager.channels["mock"]
|
|
channel.show_reasoning = False
|
|
msg = OutboundMessage(
|
|
channel="mock",
|
|
chat_id="c1",
|
|
content="hidden thinking",
|
|
metadata={"_progress": True, "_reasoning_delta": True},
|
|
)
|
|
await manager.bus.publish_outbound(msg)
|
|
|
|
await _pump_one(manager)
|
|
|
|
channel._delta_mock.assert_not_awaited()
|
|
channel._end_mock.assert_not_awaited()
|
|
channel._send_mock.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_delivers_reasoning_when_channel_opts_in(manager):
|
|
channel = manager.channels["mock"]
|
|
channel.show_reasoning = True
|
|
for chunk in ("first ", "second"):
|
|
await manager.bus.publish_outbound(OutboundMessage(
|
|
channel="mock",
|
|
chat_id="c1",
|
|
content=chunk,
|
|
metadata={"_progress": True, "_reasoning_delta": True, "_stream_id": "r1"},
|
|
))
|
|
await manager.bus.publish_outbound(OutboundMessage(
|
|
channel="mock",
|
|
chat_id="c1",
|
|
content="",
|
|
metadata={"_progress": True, "_reasoning_end": True, "_stream_id": "r1"},
|
|
))
|
|
|
|
await _pump_one(manager)
|
|
|
|
assert channel._delta_mock.await_count == 2
|
|
channel._end_mock.assert_awaited_once()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_dispatch_silently_drops_reasoning_for_unknown_channel(manager):
|
|
msg = OutboundMessage(
|
|
channel="ghost",
|
|
chat_id="c1",
|
|
content="nobody home",
|
|
metadata={"_progress": True, "_reasoning_delta": True},
|
|
)
|
|
await manager.bus.publish_outbound(msg)
|
|
|
|
await _pump_one(manager)
|
|
|
|
manager.channels["mock"]._delta_mock.assert_not_awaited()
|
|
manager.channels["mock"]._send_mock.assert_not_awaited()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_base_channel_reasoning_primitives_are_noop_safe():
|
|
"""Plugins that don't override the streaming primitives must not blow up."""
|
|
|
|
class _Plain(BaseChannel):
|
|
name = "plain"
|
|
display_name = "Plain"
|
|
|
|
async def start(self): # pragma: no cover
|
|
pass
|
|
|
|
async def stop(self): # pragma: no cover
|
|
pass
|
|
|
|
async def send(self, msg): # pragma: no cover
|
|
pass
|
|
|
|
channel = _Plain({}, MessageBus())
|
|
assert await channel.send_reasoning_delta("c", "x") is None
|
|
assert await channel.send_reasoning_end("c") is None
|
|
# And the one-shot wrapper translates without raising.
|
|
assert await channel.send_reasoning(
|
|
OutboundMessage(channel="plain", chat_id="c", content="x", metadata={})
|
|
) is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_reasoning_routing_does_not_consult_send_progress(manager):
|
|
"""`show_reasoning` is orthogonal to `send_progress` — turning off
|
|
progress streaming must not silence reasoning."""
|
|
channel = manager.channels["mock"]
|
|
channel.send_progress = False
|
|
channel.show_reasoning = True
|
|
await manager.bus.publish_outbound(OutboundMessage(
|
|
channel="mock",
|
|
chat_id="c1",
|
|
content="still surfaces",
|
|
metadata={"_progress": True, "_reasoning_delta": True},
|
|
))
|
|
|
|
await _pump_one(manager)
|
|
|
|
channel._delta_mock.assert_awaited_once()
|
|
|
|
|
|
async def _pump_one(manager: ChannelManager) -> None:
|
|
"""Drive the dispatcher until the outbound queue drains, then cancel."""
|
|
task = asyncio.create_task(manager._dispatch_outbound())
|
|
for _ in range(50):
|
|
await asyncio.sleep(0.01)
|
|
if manager.bus.outbound.qsize() == 0:
|
|
break
|
|
task.cancel()
|
|
try:
|
|
await task
|
|
except asyncio.CancelledError:
|
|
pass
|