Xubin Ren 458b4ba235 feat(reasoning): stream reasoning content as a first-class channel
Reasoning now flows as its own stream — symmetric to the answer's
``delta`` / ``stream_end`` pair — instead of being shipped as one
oversized progress message. This lets WebUI render a live "Thinking…"
bubble that updates in place, then auto-collapses when the stream
closes. Other channels remain plugin no-ops by default.

## Protocol

New metadata: ``_reasoning_delta`` (chunk) and ``_reasoning_end``
(close marker). ChannelManager routes both to the dedicated plugin
hooks below; the legacy one-shot ``_reasoning`` is kept for back-compat
and BaseChannel expands it into a single delta + end pair so plugins
only ever implement the streaming primitives.

WebSocket emits two new events:

- ``reasoning_delta`` (event, chat_id, text, optional stream_id)
- ``reasoning_end`` (event, chat_id, optional stream_id)

## BaseChannel surface

- ``send_reasoning_delta(chat_id, delta, metadata)`` — no-op default
- ``send_reasoning_end(chat_id, metadata)`` — no-op default
- ``send_reasoning(msg)`` — back-compat wrapper, base impl forwards
  to the streaming primitives

A channel adds reasoning support by overriding the two streaming
primitives. Telegram / Slack / Discord / Feishu / WeChat / Matrix keep
the base no-ops until their bubble UIs are adapted; reasoning silently
drops at dispatch, never as a stray text message.

## AgentHook

Adds ``emit_reasoning_end`` to the hook lifecycle. ``_LoopHook`` tracks
whether a reasoning segment is open and closes it on:

- the first answer delta arriving (so the UI locks the bubble before
  the answer renders below),
- ``on_stream_end``,
- one-shot ``reasoning_content`` / ``thinking_blocks`` after a single
  non-streaming response.

## WebUI

- ``UIMessage.reasoning`` is now a single accumulated string with a
  companion ``reasoningStreaming`` flag.
- ``useNanobotStream`` consumes ``reasoning_delta`` / ``reasoning_end``;
  legacy ``kind: "reasoning"`` is auto-translated to a delta + end.
- New ``ReasoningBubble``: shimmer header + auto-expanded while
  streaming, collapses to a clickable "Thinking" pill once closed,
  respects ``prefers-reduced-motion``.
- Answer deltas adopt the reasoning placeholder so the bubble and the
  answer share one assistant row.

## Tests

- ``tests/channels/test_channel_manager_reasoning.py`` — manager routes
  delta + end, drops on channel opt-out, expands one-shot back-compat.
- ``tests/channels/test_websocket_channel.py`` — new ``reasoning_delta``
  / ``reasoning_end`` frames, empty-chunk safety, no-subscriber safety,
  back-compat expansion.
- ``tests/agent/test_runner_reasoning.py`` — runner closes the segment
  on streaming answer start and after one-shot reasoning.
- WebUI ``useNanobotStream`` + ``message-bubble`` cover the new
  protocol and the shimmer styling.

## Docs

``docs/configuration.md`` and ``docs/websocket.md`` document the new
events and the plugin contract.

Co-authored-by: Cursor <cursoragent@cursor.com>
2026-05-13 07:13:43 +00:00

249 lines
8.8 KiB
Python

"""Base channel interface for chat platforms."""
from __future__ import annotations
from abc import ABC, abstractmethod
from pathlib import Path
from typing import Any
from loguru import logger
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.bus.queue import MessageBus
class BaseChannel(ABC):
"""
Abstract base class for chat channel implementations.
Each channel (Telegram, Discord, etc.) should implement this interface
to integrate with the nanobot message bus.
"""
name: str = "base"
display_name: str = "Base"
transcription_provider: str = "groq"
transcription_api_key: str = ""
transcription_api_base: str = ""
transcription_language: str | None = None
send_progress: bool = True
send_tool_hints: bool = False
show_reasoning: bool = True
def __init__(self, config: Any, bus: MessageBus):
"""
Initialize the channel.
Args:
config: Channel-specific configuration.
bus: The message bus for communication.
"""
self.config = config
self.logger = logger.bind(channel=self.name)
self.bus = bus
self._running = False
async def transcribe_audio(self, file_path: str | Path) -> str:
"""Transcribe an audio file via Whisper (OpenAI or Groq). Returns empty string on failure."""
if not self.transcription_api_key:
return ""
try:
if self.transcription_provider == "openai":
from nanobot.providers.transcription import OpenAITranscriptionProvider
provider = OpenAITranscriptionProvider(
api_key=self.transcription_api_key,
api_base=self.transcription_api_base or None,
language=self.transcription_language or None,
)
else:
from nanobot.providers.transcription import GroqTranscriptionProvider
provider = GroqTranscriptionProvider(
api_key=self.transcription_api_key,
api_base=self.transcription_api_base or None,
language=self.transcription_language or None,
)
return await provider.transcribe(file_path)
except Exception:
self.logger.exception("Audio transcription failed")
return ""
async def login(self, force: bool = False) -> bool:
"""
Perform channel-specific interactive login (e.g. QR code scan).
Args:
force: If True, ignore existing credentials and force re-authentication.
Returns True if already authenticated or login succeeds.
Override in subclasses that support interactive login.
"""
return True
@abstractmethod
async def start(self) -> None:
"""
Start the channel and begin listening for messages.
This should be a long-running async task that:
1. Connects to the chat platform
2. Listens for incoming messages
3. Forwards messages to the bus via _handle_message()
"""
pass
@abstractmethod
async def stop(self) -> None:
"""Stop the channel and clean up resources."""
pass
@abstractmethod
async def send(self, msg: OutboundMessage) -> None:
"""
Send a message through this channel.
Args:
msg: The message to send.
Implementations should raise on delivery failure so the channel manager
can apply any retry policy in one place.
"""
pass
async def send_delta(self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None) -> None:
"""Deliver a streaming text chunk.
Override in subclasses to enable streaming. Implementations should
raise on delivery failure so the channel manager can retry.
Streaming contract: ``_stream_delta`` is a chunk, ``_stream_end`` ends
the current segment, and stateful implementations must key buffers by
``_stream_id`` rather than only by ``chat_id``.
"""
pass
async def send_reasoning_delta(
self, chat_id: str, delta: str, metadata: dict[str, Any] | None = None
) -> None:
"""Stream a chunk of model reasoning/thinking content.
Default is no-op. Channels with a native low-emphasis primitive
(Slack context block, Telegram expandable blockquote, Discord
subtext, WebUI italic bubble, ...) override to render reasoning
as a subordinate trace that updates in place as the model thinks.
Streaming contract mirrors :meth:`send_delta`: ``_reasoning_delta``
is a chunk, ``_reasoning_end`` ends the current reasoning segment,
and stateful implementations should key buffers by ``_stream_id``
rather than only by ``chat_id``.
"""
return
async def send_reasoning_end(
self, chat_id: str, metadata: dict[str, Any] | None = None
) -> None:
"""Mark the end of a reasoning stream segment.
Default is no-op. Channels that buffer ``send_reasoning_delta``
chunks for in-place updates use this signal to flush and freeze
the rendered group; one-shot channels can ignore it entirely.
"""
return
async def send_reasoning(self, msg: OutboundMessage) -> None:
"""Deliver a complete reasoning block.
Default implementation reuses the streaming pair so plugins only
need to override the delta/end methods. Equivalent to one delta
with the full content followed immediately by an end marker —
keeps a single rendering path for both streamed and one-shot
reasoning (e.g. DeepSeek-R1's final-response ``reasoning_content``).
"""
if not msg.content:
return
meta = dict(msg.metadata or {})
meta.setdefault("_reasoning_delta", True)
await self.send_reasoning_delta(msg.chat_id, msg.content, meta)
end_meta = dict(meta)
end_meta.pop("_reasoning_delta", None)
end_meta["_reasoning_end"] = True
await self.send_reasoning_end(msg.chat_id, end_meta)
@property
def supports_streaming(self) -> bool:
"""True when config enables streaming AND this subclass implements send_delta."""
cfg = self.config
streaming = cfg.get("streaming", False) if isinstance(cfg, dict) else getattr(cfg, "streaming", False)
return bool(streaming) and type(self).send_delta is not BaseChannel.send_delta
def is_allowed(self, sender_id: str) -> bool:
"""Check if *sender_id* is permitted. Empty list → deny all; ``"*"`` → allow all."""
if isinstance(self.config, dict):
if "allow_from" in self.config:
allow_list = self.config.get("allow_from")
else:
allow_list = self.config.get("allowFrom", [])
else:
allow_list = getattr(self.config, "allow_from", [])
if not allow_list:
self.logger.warning("allow_from is empty — all access denied")
return False
if "*" in allow_list:
return True
return str(sender_id) in allow_list
async def _handle_message(
self,
sender_id: str,
chat_id: str,
content: str,
media: list[str] | None = None,
metadata: dict[str, Any] | None = None,
session_key: str | None = None,
) -> None:
"""
Handle an incoming message from the chat platform.
This method checks permissions and forwards to the bus.
Args:
sender_id: The sender's identifier.
chat_id: The chat/channel identifier.
content: Message text content.
media: Optional list of media URLs.
metadata: Optional channel-specific metadata.
session_key: Optional session key override (e.g. thread-scoped sessions).
"""
if not self.is_allowed(sender_id):
self.logger.warning(
"Access denied for sender {}. "
"Add them to allowFrom list in config to grant access.",
sender_id,
)
return
meta = metadata or {}
if self.supports_streaming:
meta = {**meta, "_wants_stream": True}
msg = InboundMessage(
channel=self.name,
sender_id=str(sender_id),
chat_id=str(chat_id),
content=content,
media=media or [],
metadata=meta,
session_key_override=session_key,
)
await self.bus.publish_inbound(msg)
@classmethod
def default_config(cls) -> dict[str, Any]:
"""Return default config for onboard. Override in plugins to auto-populate config.json."""
return {"enabled": False}
@property
def is_running(self) -> bool:
"""Check if the channel is running."""
return self._running