test: speed up cron and restart timing tests

Replace fixed sleep-based waits with condition polling in cron tests and mock the restart delay in CLI restart tests to reduce suite runtime without changing behavior.
This commit is contained in:
Xubin Ren 2026-04-19 12:35:57 +00:00
parent b6d63fb1ec
commit 46e11a68a7
2 changed files with 47 additions and 11 deletions

View File

@ -5,6 +5,7 @@ from __future__ import annotations
import asyncio
import os
import time
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
@ -31,6 +32,15 @@ def _make_loop():
return loop, bus
async def _wait_until(predicate, *, timeout: float = 0.2, interval: float = 0.01) -> None:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if predicate():
return
await asyncio.sleep(interval)
assert predicate()
class TestRestartCommand:
@pytest.mark.asyncio
@ -47,7 +57,23 @@ class TestRestartCommand:
msg = InboundMessage(channel="cli", sender_id="user", chat_id="direct", content="/restart")
ctx = CommandContext(msg=msg, session=None, key=msg.session_key, raw="/restart", loop=loop)
async def _fast_sleep(_delay: float) -> None:
return None
scheduled: list[asyncio.Task] = []
def _capture_task(coro):
task = asyncio.create_task(coro)
scheduled.append(task)
return task
fake_asyncio = SimpleNamespace(
sleep=_fast_sleep,
create_task=_capture_task,
)
with patch.dict(os.environ, {}, clear=False), \
patch("nanobot.command.builtin.asyncio", new=fake_asyncio), \
patch("nanobot.command.builtin.os.execv") as mock_execv:
out = await cmd_restart(ctx)
assert "Restarting" in out.content
@ -55,7 +81,8 @@ class TestRestartCommand:
assert os.environ.get(RESTART_NOTIFY_CHAT_ID_ENV) == "direct"
assert os.environ.get(RESTART_STARTED_AT_ENV)
await asyncio.sleep(1.5)
assert scheduled
await scheduled[0]
mock_execv.assert_called_once()
@pytest.mark.asyncio

View File

@ -8,6 +8,15 @@ from nanobot.cron.service import CronService
from nanobot.cron.types import CronJob, CronPayload, CronSchedule
async def _wait_until(predicate, *, timeout: float = 1.0, interval: float = 0.01) -> None:
deadline = time.monotonic() + timeout
while time.monotonic() < deadline:
if predicate():
return
await asyncio.sleep(interval)
assert predicate()
def test_add_job_rejects_unknown_timezone(tmp_path) -> None:
service = CronService(tmp_path / "cron" / "jobs.json")
@ -201,18 +210,18 @@ async def test_start_server_not_jobs(tmp_path):
async def on_job(job):
called.append(job.name)
service = CronService(store_path, on_job=on_job, max_sleep_ms=1000)
service = CronService(store_path, on_job=on_job, max_sleep_ms=100)
await service.start()
assert len(service.list_jobs()) == 0
service2 = CronService(tmp_path / "cron" / "jobs.json")
service2.add_job(
name="hist",
schedule=CronSchedule(kind="every", every_ms=500),
schedule=CronSchedule(kind="every", every_ms=100),
message="hello",
)
assert len(service.list_jobs()) == 1
await asyncio.sleep(2)
await _wait_until(lambda: bool(called), timeout=0.8)
assert len(called) != 0
service.stop()
@ -248,10 +257,10 @@ async def test_running_service_picks_up_external_add(tmp_path):
async def on_job(job):
called.append(job.name)
service = CronService(store_path, on_job=on_job)
service = CronService(store_path, on_job=on_job, max_sleep_ms=100)
service.add_job(
name="heartbeat",
schedule=CronSchedule(kind="every", every_ms=150),
schedule=CronSchedule(kind="every", every_ms=100),
message="tick",
)
await service.start()
@ -261,11 +270,11 @@ async def test_running_service_picks_up_external_add(tmp_path):
external = CronService(store_path)
external.add_job(
name="external",
schedule=CronSchedule(kind="every", every_ms=150),
schedule=CronSchedule(kind="every", every_ms=100),
message="ping",
)
await asyncio.sleep(2)
await _wait_until(lambda: "external" in called, timeout=0.8)
assert "external" in called
finally:
service.stop()
@ -287,16 +296,16 @@ async def test_add_job_during_jobs_exec(tmp_path):
)
run_once = False
service = CronService(store_path, on_job=on_job)
service = CronService(store_path, on_job=on_job, max_sleep_ms=100)
service.add_job(
name="heartbeat",
schedule=CronSchedule(kind="every", every_ms=150),
schedule=CronSchedule(kind="every", every_ms=100),
message="tick",
)
assert len(service.list_jobs()) == 1
await service.start()
try:
await asyncio.sleep(3)
await _wait_until(lambda: len(service.list_jobs()) == 2, timeout=0.8)
jobs = service.list_jobs()
assert len(jobs) == 2
assert "test" in [j.name for j in jobs]