mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-30 23:05:51 +00:00
Replace fixed sleep-based waits with condition polling in cron tests and mock the restart delay in CLI restart tests to reduce suite runtime without changing behavior.
259 lines
9.7 KiB
Python
259 lines
9.7 KiB
Python
"""Tests for /restart slash command."""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import os
|
|
import time
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
from nanobot.bus.events import InboundMessage, OutboundMessage
|
|
from nanobot.providers.base import LLMResponse
|
|
|
|
|
|
def _make_loop():
|
|
"""Create a minimal AgentLoop with mocked dependencies."""
|
|
from nanobot.agent.loop import AgentLoop
|
|
from nanobot.bus.queue import MessageBus
|
|
|
|
bus = MessageBus()
|
|
provider = MagicMock()
|
|
provider.get_default_model.return_value = "test-model"
|
|
workspace = MagicMock()
|
|
workspace.__truediv__ = MagicMock(return_value=MagicMock())
|
|
|
|
with patch("nanobot.agent.loop.ContextBuilder"), \
|
|
patch("nanobot.agent.loop.SessionManager"), \
|
|
patch("nanobot.agent.loop.SubagentManager"):
|
|
loop = AgentLoop(bus=bus, provider=provider, workspace=workspace)
|
|
return loop, bus
|
|
|
|
|
|
async def _wait_until(predicate, *, timeout: float = 0.2, interval: float = 0.01) -> None:
|
|
deadline = time.monotonic() + timeout
|
|
while time.monotonic() < deadline:
|
|
if predicate():
|
|
return
|
|
await asyncio.sleep(interval)
|
|
assert predicate()
|
|
|
|
|
|
class TestRestartCommand:
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restart_sends_message_and_calls_execv(self):
|
|
from nanobot.command.builtin import cmd_restart
|
|
from nanobot.command.router import CommandContext
|
|
from nanobot.utils.restart import (
|
|
RESTART_NOTIFY_CHANNEL_ENV,
|
|
RESTART_NOTIFY_CHAT_ID_ENV,
|
|
RESTART_STARTED_AT_ENV,
|
|
)
|
|
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/restart")
|
|
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/restart", loop=loop)
|
|
|
|
async def _fast_sleep(_delay: float) -> None:
|
|
return None
|
|
|
|
scheduled: list[asyncio.Task] = []
|
|
|
|
def _capture_task(coro):
|
|
task = asyncio.create_task(coro)
|
|
scheduled.append(task)
|
|
return task
|
|
|
|
fake_asyncio = SimpleNamespace(
|
|
sleep=_fast_sleep,
|
|
create_task=_capture_task,
|
|
)
|
|
|
|
with patch.dict(os.environ, {}, clear=False), \
|
|
patch("nanobot.command.builtin.asyncio", new=fake_asyncio), \
|
|
patch("nanobot.command.builtin.os.execv") as mock_execv:
|
|
out = await cmd_restart(ctx)
|
|
assert "Restarting" in out.content
|
|
assert os.environ.get(RESTART_NOTIFY_CHANNEL_ENV) == "cli"
|
|
assert os.environ.get(RESTART_NOTIFY_CHAT_ID_ENV) == "direct"
|
|
assert os.environ.get(RESTART_STARTED_AT_ENV)
|
|
|
|
assert scheduled
|
|
await scheduled[0]
|
|
mock_execv.assert_called_once()
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_restart_intercepted_in_run_loop(self):
|
|
"""Verify /restart is handled at the run-loop level, not inside _dispatch."""
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/restart")
|
|
|
|
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch, \
|
|
patch("nanobot.command.builtin.os.execv"):
|
|
await bus.publish_inbound(msg)
|
|
|
|
loop._running = True
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
loop._running = False
|
|
run_task.cancel()
|
|
try:
|
|
await run_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
mock_dispatch.assert_not_called()
|
|
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
|
assert "Restarting" in out.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_intercepted_in_run_loop(self):
|
|
"""Verify /status is handled at the run-loop level for immediate replies."""
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
|
|
with patch.object(loop, "_dispatch", new_callable=AsyncMock) as mock_dispatch:
|
|
await bus.publish_inbound(msg)
|
|
|
|
loop._running = True
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
loop._running = False
|
|
run_task.cancel()
|
|
try:
|
|
await run_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
|
|
mock_dispatch.assert_not_called()
|
|
out = await asyncio.wait_for(bus.consume_outbound(), timeout=1.0)
|
|
assert "nanobot" in out.content.lower() or "Model" in out.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_propagates_external_cancellation(self):
|
|
"""External task cancellation should not be swallowed by the inbound wait loop."""
|
|
loop, _bus = _make_loop()
|
|
|
|
run_task = asyncio.create_task(loop.run())
|
|
await asyncio.sleep(0.1)
|
|
run_task.cancel()
|
|
|
|
with pytest.raises(asyncio.CancelledError):
|
|
await asyncio.wait_for(run_task, timeout=1.0)
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_help_includes_restart(self):
|
|
loop, bus = _make_loop()
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/help")
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "/restart" in response.content
|
|
assert "/status" in response.content
|
|
assert response.metadata == {"render_as": "text"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_reports_runtime_info(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}] * 3
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop._start_time = time.time() - 125
|
|
loop._last_usage = {"prompt_tokens": 0, "completion_tokens": 0}
|
|
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(20500, "tiktoken")
|
|
)
|
|
loop.subagents.get_running_count_by_session.return_value = 0
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "Model: test-model" in response.content
|
|
assert "Tokens: 0 in / 0 out" in response.content
|
|
assert "Context: 20k/65k (31% of input budget)" in response.content
|
|
assert "Session: 3 messages" in response.content
|
|
assert "Uptime: 2m 5s" in response.content
|
|
assert "Tasks: 0 active" in response.content
|
|
assert response.metadata == {"render_as": "text"}
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_counts_running_dispatch_and_subagent_tasks(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}]
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(1000, "tiktoken")
|
|
)
|
|
|
|
running_task = MagicMock()
|
|
running_task.done.return_value = False
|
|
finished_task = MagicMock()
|
|
finished_task.done.return_value = True
|
|
|
|
msg = InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
loop._active_tasks[msg.session_key] = [running_task, finished_task]
|
|
loop.subagents.get_running_count_by_session.return_value = 2
|
|
|
|
response = await loop._process_message(msg)
|
|
|
|
assert response is not None
|
|
assert "Tasks: 3 active" in response.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_agent_loop_resets_usage_when_provider_omits_it(self):
|
|
loop, _bus = _make_loop()
|
|
loop.provider.chat_with_retry = AsyncMock(side_effect=[
|
|
LLMResponse(content="first", usage={"prompt_tokens": 9, "completion_tokens": 4}),
|
|
LLMResponse(content="second", usage={}),
|
|
])
|
|
|
|
await loop._run_agent_loop([])
|
|
assert loop._last_usage["prompt_tokens"] == 9
|
|
assert loop._last_usage["completion_tokens"] == 4
|
|
|
|
await loop._run_agent_loop([])
|
|
assert loop._last_usage["prompt_tokens"] == 0
|
|
assert loop._last_usage["completion_tokens"] == 0
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_status_falls_back_to_last_usage_when_context_estimate_missing(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = [{"role": "user"}]
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop._last_usage = {"prompt_tokens": 1200, "completion_tokens": 34}
|
|
loop.consolidator.estimate_session_prompt_tokens = MagicMock(
|
|
return_value=(0, "none")
|
|
)
|
|
loop.subagents.get_running_count_by_session.return_value = 0
|
|
|
|
response = await loop._process_message(
|
|
InboundMessage(channel="telegram", sender_id="u1", chat_id="c1", content="/status")
|
|
)
|
|
|
|
assert response is not None
|
|
assert "Tokens: 1200 in / 34 out" in response.content
|
|
assert "Context: 1k/65k (1% of input budget)" in response.content
|
|
assert "Tasks: 0 active" in response.content
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_process_direct_preserves_render_metadata(self):
|
|
loop, _bus = _make_loop()
|
|
session = MagicMock()
|
|
session.get_history.return_value = []
|
|
loop.sessions.get_or_create.return_value = session
|
|
loop.subagents.get_running_count.return_value = 0
|
|
loop.subagents.get_running_count_by_session.return_value = 0
|
|
|
|
response = await loop.process_direct("/status", session_key="cli:test")
|
|
|
|
assert response is not None
|
|
assert response.metadata == {"render_as": "text"}
|