nanobot/tests/cli/test_restart_command.py
Xubin Ren 46e11a68a7 test: speed up cron and restart timing tests
Replace fixed sleep-based waits with condition polling in cron tests and mock the restart delay in CLI restart tests to reduce suite runtime without changing behavior.
2026-04-19 12:35:57 +00:00

259 lines
9.7 KiB
Python

"""Tests for /restart slash command."""
from __future__ import annotations
import asyncio
import os
import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from nanobot.bus.events import InboundMessage, OutboundMessage
from nanobot.providers.base import LLMResponse
def _make_loop():
"""Create a minimal AgentLoop with mocked dependencies."""
from nanobot.agent.loop import AgentLoop
from nanobot.bus.queue import MessageBus
bus = MessageBus()
provider = MagicMock()
provider.get_default_model.return_value = "test-model"
workspace = MagicMock()
workspace.__truediv__ = MagicMock(return_value=MagicMock())
with patch("nanobot.agent.loop.ContextBuilder"), \
patch("nanobot.agent.loop.SessionManager"), \
patch("nanobot.agent.loop.SubagentManager"):
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
return loop, bus
async def _wait_until(predicate, *, timeout: float = 0.2, interval: float = 0.01) -> None:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if predicate():
return
await asyncio.sleep(interval)
assert predicate()
class TestRestartCommand:
@pytest.mark.asyncio
async def test_restart_sends_message_and_calls_execv(self):
from nanobot.command.builtin import cmd_restart
from nanobot.command.router import CommandContext
from nanobot.utils.restart import (
RESTART_NOTIFY_CHANNEL_ENV,
RESTART_NOTIFY_CHAT_ID_ENV,
RESTART_STARTED_AT_ENV,
)
loop, bus = _make_loop()
msg = InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/restart")
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/restart", loop=loop)
async def _fast_sleep(_delay: float) -> None:
return None
scheduled: list[asyncio.Task] = []
def _capture_task(coro):
task = asyncio.create_task(coro)
scheduled.append(task)
return task
fake_asyncio = SimpleNamespace(
sleep=_fast_sleep,
create_task=_capture_task,
)
with patch.dict(os.environ, {}, clear=False), \
patch("nanobot.command.builtin.asyncio", new=fake_asyncio), \
patch("nanobot.command.builtin.os.execv") as mock_execv:
out = await cmd_restart(ctx)
assert "Restarting" in out.content
assert os.environ.get(RESTART_NOTIFY_CHANNEL_ENV) == "cli"
assert os.environ.get(RESTART_NOTIFY_CHAT_ID_ENV) == "direct"
assert os.environ.get(RESTART_STARTED_AT_ENV)
assert scheduled
await scheduled[0]
mock_execv.assert_called_once()
@pytest.mark.asyncio
async def test_restart_intercepted_in_run_loop(self):
"""Verify /restart is handled at the run-loop level, not inside _dispatch."""
loop, bus = _make_loop()
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/restart")
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch, \
patch("nanobot.command.builtin.os.execv"):
await bus.publish_inbound(msg)
loop._running = True
run_task = asyncio.create_task(loop.run())
await asyncio.sleep(0.1)
loop._running = False
run_task.cancel()
try:
await run_task
except asyncio.CancelledError:
pass
mock_dispatch.assert_not_called()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "Restarting" in out.content
@pytest.mark.asyncio
async def test_status_intercepted_in_run_loop(self):
"""Verify /status is handled at the run-loop level for immediate replies."""
loop, bus = _make_loop()
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch:
await bus.publish_inbound(msg)
loop._running = True
run_task = asyncio.create_task(loop.run())
await asyncio.sleep(0.1)
loop._running = False
run_task.cancel()
try:
await run_task
except asyncio.CancelledError:
pass
mock_dispatch.assert_not_called()
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
assert "nanobot" in out.content.lower() or "Model" in out.content
@pytest.mark.asyncio
async def test_run_propagates_external_cancellation(self):
"""External task cancellation should not be swallowed by the inbound wait loop."""
loop, _bus = _make_loop()
run_task = asyncio.create_task(loop.run())
await asyncio.sleep(0.1)
run_task.cancel()
with pytest.raises(asyncio.CancelledError):
await asyncio.wait_for(run_task, timeout=1.0)
@pytest.mark.asyncio
async def test_help_includes_restart(self):
loop, bus = _make_loop()
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/help")
response = await loop._process_message(msg)
assert response is not None
assert "/restart" in response.content
assert "/status" in response.content
assert response.metadata == {"render_as": "text"}
@pytest.mark.asyncio
async def test_status_reports_runtime_info(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = [{"role": "user"}] * 3
loop.sessions.get_or_create.return_value = session
loop._start_time = time.time() - 125
loop._last_usage = {"prompt_tokens": 0, "completion_tokens": 0}
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
return_value=(20500, "tiktoken")
)
loop.subagents.get_running_count_by_session.return_value = 0
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
response = await loop._process_message(msg)
assert response is not None
assert "Model: test-model" in response.content
assert "Tokens: 0 in / 0 out" in response.content
assert "Context: 20k/65k (31% of input budget)" in response.content
assert "Session: 3 messages" in response.content
assert "Uptime: 2m 5s" in response.content
assert "Tasks: 0 active" in response.content
assert response.metadata == {"render_as": "text"}
@pytest.mark.asyncio
async def test_status_counts_running_dispatch_and_subagent_tasks(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = [{"role": "user"}]
loop.sessions.get_or_create.return_value = session
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
return_value=(1000, "tiktoken")
)
running_task = MagicMock()
running_task.done.return_value = False
finished_task = MagicMock()
finished_task.done.return_value = True
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
loop._active_tasks[msg.session_key] = [running_task, finished_task]
loop.subagents.get_running_count_by_session.return_value = 2
response = await loop._process_message(msg)
assert response is not None
assert "Tasks: 3 active" in response.content
@pytest.mark.asyncio
async def test_run_agent_loop_resets_usage_when_provider_omits_it(self):
loop, _bus = _make_loop()
loop.provider.chat_with_retry = AsyncMock(side_effect=[
LLMResponse(content="first", usage={"prompt_tokens": 9, "completion_tokens": 4}),
LLMResponse(content="second", usage={}),
])
await loop._run_agent_loop([])
assert loop._last_usage["prompt_tokens"] == 9
assert loop._last_usage["completion_tokens"] == 4
await loop._run_agent_loop([])
assert loop._last_usage["prompt_tokens"] == 0
assert loop._last_usage["completion_tokens"] == 0
@pytest.mark.asyncio
async def test_status_falls_back_to_last_usage_when_context_estimate_missing(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = [{"role": "user"}]
loop.sessions.get_or_create.return_value = session
loop._last_usage = {"prompt_tokens": 1200, "completion_tokens": 34}
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
return_value=(0, "none")
)
loop.subagents.get_running_count_by_session.return_value = 0
response = await loop._process_message(
InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
)
assert response is not None
assert "Tokens: 1200 in / 34 out" in response.content
assert "Context: 1k/65k (1% of input budget)" in response.content
assert "Tasks: 0 active" in response.content
@pytest.mark.asyncio
async def test_process_direct_preserves_render_metadata(self):
loop, _bus = _make_loop()
session = MagicMock()
session.get_history.return_value = []
loop.sessions.get_or_create.return_value = session
loop.subagents.get_running_count.return_value = 0
loop.subagents.get_running_count_by_session.return_value = 0
response = await loop.process_direct("/status", session_key="cli:test")
assert response is not None
assert response.metadata == {"render_as": "text"}