refactor(cli): localize reasoning buffer state

This commit is contained in:
Xubin Ren 2026-05-17 23:58:03 +08:00
parent b67205f5aa
commit 4445fcc8b9
2 changed files with 132 additions and 38 deletions

View File

@ -22,11 +22,6 @@ if sys.platform == "win32":
import typer import typer
from loguru import logger from loguru import logger
# Buffered reasoning display: accumulate streaming tokens and flush
# on sentence/line boundaries so the user sees grouped text instead of
# one token per line. The empty string placeholder is the sentinel.
_reasoning_buf: str = ""
# Remove default handler and re-add with unified nanobot format # Remove default handler and re-add with unified nanobot format
logger.remove() logger.remove()
_log_handler_id = logger.add( _log_handler_id = logger.add(
@ -96,6 +91,8 @@ app = typer.Typer(
console = Console() console = Console()
EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"} EXIT_COMMANDS = {"exit", "quit", "/exit", "/quit", ":q"}
_REASONING_SENTENCE_ENDINGS = (".", "!", "?", "", "", "")
_REASONING_FLUSH_CHARS = 60
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
# CLI input: prompt_toolkit for editing, paste, history, and display # CLI input: prompt_toolkit for editing, paste, history, and display
@ -247,14 +244,39 @@ def _print_cli_progress_line(text: str, thinking: ThinkingSpinner | None, render
target.print(f" [dim]↳ {text}[/dim]") target.print(f" [dim]↳ {text}[/dim]")
def _flush_reasoning(thinking: ThinkingSpinner | None, renderer: StreamRenderer | None = None) -> None: class _ReasoningBuffer:
"""Flush accumulated reasoning buffer to the display.""" def __init__(self) -> None:
global _reasoning_buf self._text = ""
if not _reasoning_buf or not _reasoning_buf.strip():
_reasoning_buf = "" def add(self, text: str) -> str | None:
if not text:
return None
self._text += text
if self._should_flush(text):
return self.flush()
return None
def flush(self) -> str | None:
text = self._text.strip()
self._text = ""
return text or None
def clear(self) -> None:
self._text = ""
def _should_flush(self, text: str) -> bool:
stripped = text.rstrip()
return (
"\n" in text
or stripped.endswith(_REASONING_SENTENCE_ENDINGS)
or len(self._text) >= _REASONING_FLUSH_CHARS
)
def _print_cli_reasoning(text: str, thinking: ThinkingSpinner | None, renderer: StreamRenderer | None = None) -> None:
"""Print reasoning/thinking content in a distinct style."""
if not text.strip():
return return
text = _reasoning_buf.strip()
_reasoning_buf = ""
target = renderer.console if renderer else console target = renderer.console if renderer else console
pause = renderer.pause_spinner() if renderer else (thinking.pause() if thinking else nullcontext()) pause = renderer.pause_spinner() if renderer else (thinking.pause() if thinking else nullcontext())
with pause: with pause:
@ -263,26 +285,14 @@ def _flush_reasoning(thinking: ThinkingSpinner | None, renderer: StreamRenderer
target.print(f"[dim italic]✻ {text}[/dim italic]") target.print(f"[dim italic]✻ {text}[/dim italic]")
def _print_cli_reasoning(text: str, thinking: ThinkingSpinner | None, renderer: StreamRenderer | None = None) -> None: def _flush_cli_reasoning(
"""Accumulate reasoning tokens and flush on sentence / line boundaries. reasoning_buffer: _ReasoningBuffer,
thinking: ThinkingSpinner | None,
Without buffering, each streaming delta (often a single token) would be renderer: StreamRenderer | None = None,
printed as a separate ```` line. This version groups tokens into ) -> None:
natural chunks visible in the terminal. text = reasoning_buffer.flush()
""" if text:
global _reasoning_buf _print_cli_reasoning(text, thinking, renderer)
if not text:
return
_reasoning_buf += text
# Flush on newline, sentence-ending punctuation, or when the chunk is
# long enough to wrap meaningfully at typical terminal widths.
if (
text.endswith("\n")
or any(text.rstrip().endswith(p) for p in (".", "!", "?", "", "", ""))
or len(_reasoning_buf) >= 60
):
_flush_reasoning(thinking, renderer)
async def _print_interactive_progress_line(text: str, thinking: ThinkingSpinner | None, renderer: StreamRenderer | None = None) -> None: async def _print_interactive_progress_line(text: str, thinking: ThinkingSpinner | None, renderer: StreamRenderer | None = None) -> None:
@ -303,6 +313,7 @@ async def _maybe_print_interactive_progress(
thinking: ThinkingSpinner | None, thinking: ThinkingSpinner | None,
channels_config: Any, channels_config: Any,
renderer: StreamRenderer | None = None, renderer: StreamRenderer | None = None,
reasoning_buffer: _ReasoningBuffer | None = None,
) -> bool: ) -> bool:
metadata = msg.metadata or {} metadata = msg.metadata or {}
if metadata.get("_retry_wait"): if metadata.get("_retry_wait"):
@ -312,17 +323,24 @@ async def _maybe_print_interactive_progress(
if not metadata.get("_progress"): if not metadata.get("_progress"):
return False return False
# Flush reasoning buffer when the reasoning stream ends (bus path). reasoning_buffer = reasoning_buffer or _ReasoningBuffer()
if metadata.get("_reasoning_end"): if metadata.get("_reasoning_end"):
_flush_reasoning(thinking, renderer) if channels_config and not channels_config.show_reasoning:
reasoning_buffer.clear()
else:
_flush_cli_reasoning(reasoning_buffer, thinking, renderer)
return True return True
is_tool_hint = metadata.get("_tool_hint", False) is_tool_hint = metadata.get("_tool_hint", False)
is_reasoning = metadata.get("_reasoning", False) or metadata.get("_reasoning_delta", False) is_reasoning = metadata.get("_reasoning", False) or metadata.get("_reasoning_delta", False)
if is_reasoning: if is_reasoning:
if channels_config and not channels_config.show_reasoning: if channels_config and not channels_config.show_reasoning:
reasoning_buffer.clear()
return True return True
_print_cli_reasoning(msg.content, thinking, renderer) text = reasoning_buffer.add(msg.content)
if text:
_print_cli_reasoning(text, thinking, renderer)
return True return True
if channels_config and is_tool_hint and not channels_config.send_tool_hints: if channels_config and is_tool_hint and not channels_config.send_tool_hints:
return True return True
@ -1143,18 +1161,25 @@ def agent(
_thinking: ThinkingSpinner | None = None _thinking: ThinkingSpinner | None = None
def _make_progress(renderer: StreamRenderer | None = None): def _make_progress(renderer: StreamRenderer | None = None):
reasoning_buffer = _ReasoningBuffer()
async def _cli_progress(content: str, *, tool_hint: bool = False, reasoning: bool = False, **_kwargs: Any) -> None: async def _cli_progress(content: str, *, tool_hint: bool = False, reasoning: bool = False, **_kwargs: Any) -> None:
ch = agent_loop.channels_config ch = agent_loop.channels_config
# Flush remaining reasoning buffer when the stream ends.
if _kwargs.get("reasoning_end"): if _kwargs.get("reasoning_end"):
_flush_reasoning(_thinking, renderer) if ch and not ch.show_reasoning:
reasoning_buffer.clear()
else:
_flush_cli_reasoning(reasoning_buffer, _thinking, renderer)
return return
if reasoning: if reasoning:
if ch and not ch.show_reasoning: if ch and not ch.show_reasoning:
reasoning_buffer.clear()
return return
_print_cli_reasoning(content, _thinking, renderer) text = reasoning_buffer.add(content)
if text:
_print_cli_reasoning(text, _thinking, renderer)
return return
if ch and tool_hint and not ch.send_tool_hints: if ch and tool_hint and not ch.send_tool_hints:
return return
@ -1225,6 +1250,7 @@ def agent(
turn_done.set() turn_done.set()
turn_response: list[tuple[str, dict]] = [] turn_response: list[tuple[str, dict]] = []
renderer: StreamRenderer | None = None renderer: StreamRenderer | None = None
reasoning_buffer = _ReasoningBuffer()
async def _consume_outbound(): async def _consume_outbound():
while True: while True:
@ -1250,6 +1276,7 @@ def agent(
renderer, renderer,
agent_loop.channels_config, agent_loop.channels_config,
renderer, renderer,
reasoning_buffer,
): ):
continue continue
@ -1290,6 +1317,7 @@ def agent(
turn_done.clear() turn_done.clear()
turn_response.clear() turn_response.clear()
reasoning_buffer.clear()
renderer = StreamRenderer( renderer = StreamRenderer(
render_markdown=markdown, render_markdown=markdown,
bot_name=config.agents.defaults.bot_name, bot_name=config.agents.defaults.bot_name,

View File

@ -69,6 +69,72 @@ async def test_reasoning_delta_displayed_when_show_reasoning_enabled():
assert calls == ["I should search first."] assert calls == ["I should search first."]
@pytest.mark.asyncio
async def test_reasoning_delta_buffers_until_sentence_boundary():
calls: list[str] = []
channels_config = SimpleNamespace(
send_progress=True, send_tool_hints=False, show_reasoning=True,
)
reasoning_buffer = commands._ReasoningBuffer()
with patch("nanobot.cli.commands._print_cli_reasoning", side_effect=lambda t, th, r=None: calls.append(t)):
first = await commands._maybe_print_interactive_progress(
SimpleNamespace(
content="The",
metadata={"_progress": True, "_reasoning_delta": True},
),
None,
channels_config,
reasoning_buffer=reasoning_buffer,
)
second = await commands._maybe_print_interactive_progress(
SimpleNamespace(
content=" user asked.",
metadata={"_progress": True, "_reasoning_delta": True},
),
None,
channels_config,
reasoning_buffer=reasoning_buffer,
)
assert first is True
assert second is True
assert calls == ["The user asked."]
@pytest.mark.asyncio
async def test_reasoning_end_flushes_buffered_delta():
calls: list[str] = []
channels_config = SimpleNamespace(
send_progress=True, send_tool_hints=False, show_reasoning=True,
)
reasoning_buffer = commands._ReasoningBuffer()
with patch("nanobot.cli.commands._print_cli_reasoning", side_effect=lambda t, th, r=None: calls.append(t)):
delta = await commands._maybe_print_interactive_progress(
SimpleNamespace(
content="The user asked",
metadata={"_progress": True, "_reasoning_delta": True},
),
None,
channels_config,
reasoning_buffer=reasoning_buffer,
)
end = await commands._maybe_print_interactive_progress(
SimpleNamespace(
content="",
metadata={"_progress": True, "_reasoning_end": True},
),
None,
channels_config,
reasoning_buffer=reasoning_buffer,
)
assert delta is True
assert end is True
assert calls == ["The user asked"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_reasoning_hidden_when_show_reasoning_disabled(): async def test_reasoning_hidden_when_show_reasoning_disabled():
"""Reasoning content should be suppressed when show_reasoning is False.""" """Reasoning content should be suppressed when show_reasoning is False."""