mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-05-05 01:05:51 +00:00
346 lines
13 KiB
Python
346 lines
13 KiB
Python
"""Tests for /restart slash command."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.bus.events import InboundMessage
|
|
from nanobot.providers.base import LLMResponse
|
|
|
|
|
|
def _make_loop():
|
|
"""Create a minimal AgentLoop with mocked dependencies."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
workspace = MagicMock()
|
|
workspace.__truediv__ = MagicMock(return_value=MagicMock())
|
|
|
|
with patch("nanobot.agent.loop.ContextBuilder"), \
|
|
patch("nanobot.agent.loop.SessionManager"), \
|
|
patch("nanobot.agent.loop.SubagentManager"):
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
|
|
return loop, bus
|
|
|
|
|
|
async def _wait_until(predicate, *, timeout: float = 0.2, interval: float = 0.01) -> None:
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
if predicate():
|
|
return
|
|
await asyncio.sleep(interval)
|
|
assert predicate()
|
|
|
|
|
|
class TestRestartCommand:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restart_sends_message_and_calls_execv(self):
|
|
from nanobot.command.builtin import cmd_restart
|
|
from nanobot.command.router import CommandContext
|
|
from nanobot.utils.restart import (
|
|
RESTART_NOTIFY_CHANNEL_ENV,
|
|
RESTART_NOTIFY_CHAT_ID_ENV,
|
|
RESTART_STARTED_AT_ENV,
|
|
)
|
|
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/restart")
|
|
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/restart", loop=loop)
|
|
|
|
async def _fast_sleep(_delay: float) -> None:
|
|
return None
|
|
|
|
scheduled: list[asyncio.Task] = []
|
|
|
|
def _capture_task(coro):
|
|
task = asyncio.create_task(coro)
|
|
scheduled.append(task)
|
|
return task
|
|
|
|
fake_asyncio = SimpleNamespace(
|
|
sleep=_fast_sleep,
|
|
create_task=_capture_task,
|
|
)
|
|
|
|
with patch.dict(os.environ, {}, clear=False), \
|
|
patch("nanobot.command.builtin.asyncio", new=fake_asyncio), \
|
|
patch("nanobot.command.builtin.os.execv") as mock_execv:
|
|
out = await cmd_restart(ctx)
|
|
assert "Restarting" in out.content
|
|
assert os.environ.get(RESTART_NOTIFY_CHANNEL_ENV) == "cli"
|
|
assert os.environ.get(RESTART_NOTIFY_CHAT_ID_ENV) == "direct"
|
|
assert os.environ.get(RESTART_STARTED_AT_ENV)
|
|
|
|
assert scheduled
|
|
await scheduled[0]
|
|
mock_execv.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restart_intercepted_in_run_loop(self):
|
|
"""Verify /restart is handled at the run-loop level, not inside _dispatch."""
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/restart")
|
|
|
|
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch, \
|
|
patch("nanobot.command.builtin.os.execv"):
|
|
await bus.publish_inbound(msg)
|
|
|
|
loop._running = True
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
loop._running = False
|
|
run_task.cancel()
|
|
try:
|
|
await run_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
mock_dispatch.assert_not_called()
|
|
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
|
assert "Restarting" in out.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_intercepted_in_run_loop(self):
|
|
"""Verify /status is handled at the run-loop level for immediate replies."""
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
|
|
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch:
|
|
await bus.publish_inbound(msg)
|
|
|
|
loop._running = True
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
loop._running = False
|
|
run_task.cancel()
|
|
try:
|
|
await run_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
mock_dispatch.assert_not_called()
|
|
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
|
assert "nanobot" in out.content.lower() or "Model" in out.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_propagates_external_cancellation(self):
|
|
"""External task cancellation should not be swallowed by the inbound wait loop."""
|
|
loop, _bus = _make_loop()
|
|
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
run_task.cancel()
|
|
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await asyncio.wait_for(run_task, timeout=1.0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_help_includes_restart(self):
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/help")
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "/restart" in response.content
|
|
assert "/status" in response.content
|
|
assert response.metadata == {"render_as": "text"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_reports_runtime_info(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}] * 3
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop._start_time = time.time() - 125
|
|
loop._last_usage = {"prompt_tokens": 0, "completion_tokens": 0}
|
|
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(20500, "tiktoken")
|
|
)
|
|
loop.subagents.get_running_count_by_session.return_value = 0
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "Model: test-model" in response.content
|
|
assert "Tokens: 0 in / 0 out" in response.content
|
|
assert "Context: 20k/65k (31% of input budget)" in response.content
|
|
assert "Session: 3 messages" in response.content
|
|
assert "Uptime: 2m 5s" in response.content
|
|
assert "Tasks: 0 active" in response.content
|
|
assert response.metadata == {"render_as": "text"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_counts_running_dispatch_and_subagent_tasks(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}]
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(1000, "tiktoken")
|
|
)
|
|
|
|
running_task = MagicMock()
|
|
running_task.done.return_value = False
|
|
finished_task = MagicMock()
|
|
finished_task.done.return_value = True
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
loop._active_tasks[msg.session_key] = [running_task, finished_task]
|
|
loop.subagents.get_running_count_by_session.return_value = 2
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "Tasks: 3 active" in response.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_agent_loop_resets_usage_when_provider_omits_it(self):
|
|
loop, _bus = _make_loop()
|
|
loop.provider.chat_with_retry = AsyncMock(side_effect=[
|
|
LLMResponse(content="first", usage={"prompt_tokens": 9, "completion_tokens": 4}),
|
|
LLMResponse(content="second", usage={}),
|
|
])
|
|
|
|
await loop._run_agent_loop([])
|
|
assert loop._last_usage["prompt_tokens"] == 9
|
|
assert loop._last_usage["completion_tokens"] == 4
|
|
|
|
await loop._run_agent_loop([])
|
|
assert loop._last_usage["prompt_tokens"] == 0
|
|
assert loop._last_usage["completion_tokens"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_falls_back_to_last_usage_when_context_estimate_missing(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}]
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop._last_usage = {"prompt_tokens": 1200, "completion_tokens": 34}
|
|
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(0, "none")
|
|
)
|
|
loop.subagents.get_running_count_by_session.return_value = 0
|
|
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
)
|
|
|
|
assert response is not None
|
|
assert "Tokens: 1200 in / 34 out" in response.content
|
|
assert "Context: 1k/65k (1% of input budget)" in response.content
|
|
assert "Tasks: 0 active" in response.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_shows_recent_messages(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [
|
|
{"role": "user", "content": "Hello"},
|
|
{"role": "assistant", "content": "Hi there!"},
|
|
{"role": "tool", "content": "tool result"}, # should be filtered out
|
|
{"role": "user", "content": "How are you?"},
|
|
{"role": "assistant", "content": "I am doing well."},
|
|
]
|
|
loop.sessions.get_or_create.return_value = session
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/history")
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "👤 You: Hello" in response.content
|
|
assert "🤖 Bot: Hi there!" in response.content
|
|
assert "tool result" not in response.content # tool messages filtered
|
|
assert response.metadata == {"render_as": "text"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_respects_count_argument(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [
|
|
{"role": "user", "content": f"message {i}"} for i in range(20)
|
|
]
|
|
loop.sessions.get_or_create.return_value = session
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/history 3")
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "Last 3 message(s)" in response.content
|
|
assert "message 19" in response.content # most recent
|
|
assert "message 0" not in response.content # too old
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_clamps_count_and_extracts_text_blocks(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": "visible text"},
|
|
{"type": "image_url", "image_url": {"url": "data:image/png;base64,..."}},
|
|
],
|
|
},
|
|
*({"role": "assistant", "content": f"reply {i}"} for i in range(60)),
|
|
]
|
|
loop.sessions.get_or_create.return_value = session
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/history 999")
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "Last 50 message(s)" in response.content
|
|
assert "visible text" not in response.content
|
|
assert "reply 59" in response.content
|
|
assert "reply 9" not in response.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_invalid_count_returns_usage(self):
|
|
loop, _bus = _make_loop()
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/history nope")
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert response.content.startswith("Usage: /history [count]")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_history_empty_session(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = []
|
|
loop.sessions.get_or_create.return_value = session
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/history")
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "No conversation history yet." in response.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_direct_preserves_render_metadata(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = []
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop.subagents.get_running_count.return_value = 0
|
|
loop.subagents.get_running_count_by_session.return_value = 0
|
|
|
|
response = await loop.process_direct("/status", session_key="cli:test")
|
|
|
|
assert response is not None
|
|
assert response.metadata == {"render_as": "text"}
|