mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-03 01:42:45 +00:00
Fixes #2591 The "nanobot is thinking..." spinner was printing ANSI escape codes literally in some terminals, causing garbled output like: ?[2K?[32m⠧?[0m ?[2mnanobot is thinking...?[0m Root causes: 1. Console created without force_terminal=True, so Rich couldn't reliably detect terminal capabilities 2. Spinner continued running during user input prompt, conflicting with prompt_toolkit Changes: - Set force_terminal=True in _make_console() for proper ANSI handling - Add stop_for_input() method to StreamRenderer - Call stop_for_input() before reading user input in interactive mode - Add tests for the new functionality
174 lines
5.9 KiB
Python
174 lines
5.9 KiB
Python
import asyncio
|
|
from unittest.mock import AsyncMock, MagicMock, call, patch
|
|
|
|
import pytest
|
|
from prompt_toolkit.formatted_text import HTML
|
|
|
|
from nanobot.cli import commands
|
|
from nanobot.cli import stream as stream_mod
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_prompt_session():
|
|
"""Mock the global prompt session."""
|
|
mock_session = MagicMock()
|
|
mock_session.prompt_async = AsyncMock()
|
|
with patch("nanobot.cli.commands._PROMPT_SESSION", mock_session), \
|
|
patch("nanobot.cli.commands.patch_stdout"):
|
|
yield mock_session
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_interactive_input_async_returns_input(mock_prompt_session):
|
|
"""Test that _read_interactive_input_async returns the user input from prompt_session."""
|
|
mock_prompt_session.prompt_async.return_value = "hello world"
|
|
|
|
result = await commands._read_interactive_input_async()
|
|
|
|
assert result == "hello world"
|
|
mock_prompt_session.prompt_async.assert_called_once()
|
|
args, _ = mock_prompt_session.prompt_async.call_args
|
|
assert isinstance(args[0], HTML) # Verify HTML prompt is used
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_read_interactive_input_async_handles_eof(mock_prompt_session):
|
|
"""Test that EOFError converts to KeyboardInterrupt."""
|
|
mock_prompt_session.prompt_async.side_effect = EOFError()
|
|
|
|
with pytest.raises(KeyboardInterrupt):
|
|
await commands._read_interactive_input_async()
|
|
|
|
|
|
def test_init_prompt_session_creates_session():
|
|
"""Test that _init_prompt_session initializes the global session."""
|
|
# Ensure global is None before test
|
|
commands._PROMPT_SESSION = None
|
|
|
|
with patch("nanobot.cli.commands.PromptSession") as MockSession, \
|
|
patch("nanobot.cli.commands.FileHistory") as MockHistory, \
|
|
patch("pathlib.Path.home") as mock_home:
|
|
|
|
mock_home.return_value = MagicMock()
|
|
|
|
commands._init_prompt_session()
|
|
|
|
assert commands._PROMPT_SESSION is not None
|
|
MockSession.assert_called_once()
|
|
_, kwargs = MockSession.call_args
|
|
assert kwargs["multiline"] is False
|
|
assert kwargs["enable_open_in_editor"] is False
|
|
|
|
|
|
def test_thinking_spinner_pause_stops_and_restarts():
|
|
"""Pause should stop the active spinner and restart it afterward."""
|
|
spinner = MagicMock()
|
|
mock_console = MagicMock()
|
|
mock_console.status.return_value = spinner
|
|
|
|
thinking = stream_mod.ThinkingSpinner(console=mock_console)
|
|
with thinking:
|
|
with thinking.pause():
|
|
pass
|
|
|
|
assert spinner.method_calls == [
|
|
call.start(),
|
|
call.stop(),
|
|
call.start(),
|
|
call.stop(),
|
|
]
|
|
|
|
|
|
def test_print_cli_progress_line_pauses_spinner_before_printing():
|
|
"""CLI progress output should pause spinner to avoid garbled lines."""
|
|
order: list[str] = []
|
|
spinner = MagicMock()
|
|
spinner.start.side_effect = lambda: order.append("start")
|
|
spinner.stop.side_effect = lambda: order.append("stop")
|
|
mock_console = MagicMock()
|
|
mock_console.status.return_value = spinner
|
|
|
|
with patch.object(commands.console, "print", side_effect=lambda *_args, **_kwargs: order.append("print")):
|
|
thinking = stream_mod.ThinkingSpinner(console=mock_console)
|
|
with thinking:
|
|
commands._print_cli_progress_line("tool running", thinking)
|
|
|
|
assert order == ["start", "stop", "print", "start", "stop"]
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_print_interactive_progress_line_pauses_spinner_before_printing():
|
|
"""Interactive progress output should also pause spinner cleanly."""
|
|
order: list[str] = []
|
|
spinner = MagicMock()
|
|
spinner.start.side_effect = lambda: order.append("start")
|
|
spinner.stop.side_effect = lambda: order.append("stop")
|
|
mock_console = MagicMock()
|
|
mock_console.status.return_value = spinner
|
|
|
|
async def fake_print(_text: str) -> None:
|
|
order.append("print")
|
|
|
|
with patch("nanobot.cli.commands._print_interactive_line", side_effect=fake_print):
|
|
thinking = stream_mod.ThinkingSpinner(console=mock_console)
|
|
with thinking:
|
|
await commands._print_interactive_progress_line("tool running", thinking)
|
|
|
|
assert order == ["start", "stop", "print", "start", "stop"]
|
|
|
|
|
|
def test_response_renderable_uses_text_for_explicit_plain_rendering():
|
|
status = (
|
|
"🐈 nanobot v0.1.4.post5\n"
|
|
"🧠 Model: MiniMax-M2.7\n"
|
|
"📊 Tokens: 20639 in / 29 out"
|
|
)
|
|
|
|
renderable = commands._response_renderable(
|
|
status,
|
|
render_markdown=True,
|
|
metadata={"render_as": "text"},
|
|
)
|
|
|
|
assert renderable.__class__.__name__ == "Text"
|
|
|
|
|
|
def test_response_renderable_preserves_normal_markdown_rendering():
|
|
renderable = commands._response_renderable("**bold**", render_markdown=True)
|
|
|
|
assert renderable.__class__.__name__ == "Markdown"
|
|
|
|
|
|
def test_response_renderable_without_metadata_keeps_markdown_path():
|
|
help_text = "🐈 nanobot commands:\n/status — Show bot status\n/help — Show available commands"
|
|
|
|
renderable = commands._response_renderable(help_text, render_markdown=True)
|
|
|
|
assert renderable.__class__.__name__ == "Markdown"
|
|
|
|
|
|
def test_stream_renderer_stop_for_input_stops_spinner():
|
|
"""stop_for_input should stop the active spinner to avoid prompt_toolkit conflicts."""
|
|
spinner = MagicMock()
|
|
mock_console = MagicMock()
|
|
mock_console.status.return_value = spinner
|
|
|
|
# Create renderer with mocked console
|
|
with patch.object(stream_mod, "_make_console", return_value=mock_console):
|
|
renderer = stream_mod.StreamRenderer(show_spinner=True)
|
|
|
|
# Verify spinner started
|
|
spinner.start.assert_called_once()
|
|
|
|
# Stop for input
|
|
renderer.stop_for_input()
|
|
|
|
# Verify spinner stopped
|
|
spinner.stop.assert_called_once()
|
|
|
|
|
|
def test_make_console_uses_force_terminal():
|
|
"""Console should be created with force_terminal=True for proper ANSI handling."""
|
|
console = stream_mod._make_console()
|
|
assert console._force_terminal is True
|