mirror of
https://github.com/HKUDS/nanobot.git
synced 2026-04-10 13:13:39 +00:00
482 lines
15 KiB
Python
482 lines
15 KiB
Python
import asyncio
|
|
import json
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from nanobot.cron.service import CronService
|
|
from nanobot.cron.types import CronJob, CronPayload, CronSchedule
|
|
|
|
|
|
def test_add_job_rejects_unknown_timezone(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
|
|
with pytest.raises(ValueError, match="unknown timezone 'America/Vancovuer'"):
|
|
service.add_job(
|
|
name="tz typo",
|
|
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="America/Vancovuer"),
|
|
message="hello",
|
|
)
|
|
|
|
assert service.list_jobs(include_disabled=True) == []
|
|
|
|
|
|
def test_add_job_accepts_valid_timezone(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
|
|
job = service.add_job(
|
|
name="tz ok",
|
|
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="America/Vancouver"),
|
|
message="hello",
|
|
)
|
|
|
|
assert job.schedule.tz == "America/Vancouver"
|
|
assert job.state.next_run_at_ms is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_execute_job_records_run_history(tmp_path) -> None:
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
|
|
job = service.add_job(
|
|
name="hist",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
await service.run_job(job.id)
|
|
|
|
loaded = service.get_job(job.id)
|
|
assert loaded is not None
|
|
assert len(loaded.state.run_history) == 1
|
|
rec = loaded.state.run_history[0]
|
|
assert rec.status == "ok"
|
|
assert rec.duration_ms >= 0
|
|
assert rec.error is None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_history_records_errors(tmp_path) -> None:
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
|
|
async def fail(_):
|
|
raise RuntimeError("boom")
|
|
|
|
service = CronService(store_path, on_job=fail)
|
|
job = service.add_job(
|
|
name="fail",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
await service.run_job(job.id)
|
|
|
|
loaded = service.get_job(job.id)
|
|
assert len(loaded.state.run_history) == 1
|
|
assert loaded.state.run_history[0].status == "error"
|
|
assert loaded.state.run_history[0].error == "boom"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_history_trimmed_to_max(tmp_path) -> None:
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
|
|
job = service.add_job(
|
|
name="trim",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
for _ in range(25):
|
|
await service.run_job(job.id)
|
|
|
|
loaded = service.get_job(job.id)
|
|
assert len(loaded.state.run_history) == CronService._MAX_RUN_HISTORY
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_history_persisted_to_disk(tmp_path) -> None:
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
|
|
job = service.add_job(
|
|
name="persist",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
await service.run_job(job.id)
|
|
|
|
raw = json.loads(store_path.read_text())
|
|
history = raw["jobs"][0]["state"]["runHistory"]
|
|
assert len(history) == 1
|
|
assert history[0]["status"] == "ok"
|
|
assert "runAtMs" in history[0]
|
|
assert "durationMs" in history[0]
|
|
|
|
fresh = CronService(store_path)
|
|
loaded = fresh.get_job(job.id)
|
|
assert len(loaded.state.run_history) == 1
|
|
assert loaded.state.run_history[0].status == "ok"
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_job_disabled_does_not_flip_running_state(tmp_path) -> None:
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
|
|
job = service.add_job(
|
|
name="disabled",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
service.enable_job(job.id, enabled=False)
|
|
|
|
result = await service.run_job(job.id)
|
|
|
|
assert result is False
|
|
assert service._running is False
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_run_job_preserves_running_service_state(tmp_path) -> None:
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
|
|
service._running = True
|
|
job = service.add_job(
|
|
name="manual",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
|
|
result = await service.run_job(job.id, force=True)
|
|
|
|
assert result is True
|
|
assert service._running is True
|
|
service.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_running_service_honors_external_disable(tmp_path) -> None:
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
called: list[str] = []
|
|
|
|
async def on_job(job) -> None:
|
|
called.append(job.id)
|
|
|
|
service = CronService(store_path, on_job=on_job)
|
|
job = service.add_job(
|
|
name="external-disable",
|
|
schedule=CronSchedule(kind="every", every_ms=200),
|
|
message="hello",
|
|
)
|
|
await service.start()
|
|
try:
|
|
# Wait slightly to ensure file mtime is definitively different
|
|
await asyncio.sleep(0.05)
|
|
external = CronService(store_path)
|
|
updated = external.enable_job(job.id, enabled=False)
|
|
assert updated is not None
|
|
assert updated.enabled is False
|
|
|
|
await asyncio.sleep(0.35)
|
|
assert called == []
|
|
finally:
|
|
service.stop()
|
|
|
|
|
|
def test_remove_job_refuses_system_jobs(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
service.register_system_job(CronJob(
|
|
id="dream",
|
|
name="dream",
|
|
schedule=CronSchedule(kind="cron", expr="0 */2 * * *", tz="UTC"),
|
|
payload=CronPayload(kind="system_event"),
|
|
))
|
|
|
|
result = service.remove_job("dream")
|
|
|
|
assert result == "protected"
|
|
assert service.get_job("dream") is not None
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_start_server_not_jobs(tmp_path):
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
called = []
|
|
async def on_job(job):
|
|
called.append(job.name)
|
|
|
|
service = CronService(store_path, on_job=on_job, max_sleep_ms=1000)
|
|
await service.start()
|
|
assert len(service.list_jobs()) == 0
|
|
|
|
service2 = CronService(tmp_path / "cron" / "jobs.json")
|
|
service2.add_job(
|
|
name="hist",
|
|
schedule=CronSchedule(kind="every", every_ms=500),
|
|
message="hello",
|
|
)
|
|
assert len(service.list_jobs()) == 1
|
|
await asyncio.sleep(2)
|
|
assert len(called) != 0
|
|
service.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_subsecond_job_not_delayed_to_one_second(tmp_path):
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
called = []
|
|
|
|
async def on_job(job):
|
|
called.append(job.name)
|
|
|
|
service = CronService(store_path, on_job=on_job, max_sleep_ms=5000)
|
|
service.add_job(
|
|
name="fast",
|
|
schedule=CronSchedule(kind="every", every_ms=100),
|
|
message="hello",
|
|
)
|
|
await service.start()
|
|
try:
|
|
await asyncio.sleep(0.35)
|
|
assert called
|
|
finally:
|
|
service.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_running_service_picks_up_external_add(tmp_path):
|
|
"""A running service should detect and execute a job added by another instance."""
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
called: list[str] = []
|
|
|
|
async def on_job(job):
|
|
called.append(job.name)
|
|
|
|
service = CronService(store_path, on_job=on_job)
|
|
service.add_job(
|
|
name="heartbeat",
|
|
schedule=CronSchedule(kind="every", every_ms=150),
|
|
message="tick",
|
|
)
|
|
await service.start()
|
|
try:
|
|
await asyncio.sleep(0.05)
|
|
|
|
external = CronService(store_path)
|
|
external.add_job(
|
|
name="external",
|
|
schedule=CronSchedule(kind="every", every_ms=150),
|
|
message="ping",
|
|
)
|
|
|
|
await asyncio.sleep(2)
|
|
assert "external" in called
|
|
finally:
|
|
service.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_add_job_during_jobs_exec(tmp_path):
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
run_once = True
|
|
|
|
async def on_job(job):
|
|
nonlocal run_once
|
|
if run_once:
|
|
service2 = CronService(store_path, on_job=lambda x: asyncio.sleep(0))
|
|
service2.add_job(
|
|
name="test",
|
|
schedule=CronSchedule(kind="every", every_ms=150),
|
|
message="tick",
|
|
)
|
|
run_once = False
|
|
|
|
service = CronService(store_path, on_job=on_job)
|
|
service.add_job(
|
|
name="heartbeat",
|
|
schedule=CronSchedule(kind="every", every_ms=150),
|
|
message="tick",
|
|
)
|
|
assert len(service.list_jobs()) == 1
|
|
await service.start()
|
|
try:
|
|
await asyncio.sleep(3)
|
|
jobs = service.list_jobs()
|
|
assert len(jobs) == 2
|
|
assert "test" in [j.name for j in jobs]
|
|
finally:
|
|
service.stop()
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_external_update_preserves_run_history_records(tmp_path):
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
|
|
job = service.add_job(
|
|
name="history",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
await service.run_job(job.id, force=True)
|
|
|
|
external = CronService(store_path)
|
|
updated = external.enable_job(job.id, enabled=False)
|
|
assert updated is not None
|
|
|
|
fresh = CronService(store_path)
|
|
loaded = fresh.get_job(job.id)
|
|
assert loaded is not None
|
|
assert loaded.state.run_history
|
|
assert loaded.state.run_history[0].status == "ok"
|
|
|
|
fresh._running = True
|
|
fresh._save_store()
|
|
|
|
|
|
# ── update_job tests ──
|
|
|
|
|
|
def test_update_job_changes_name(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
job = service.add_job(
|
|
name="old name",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
result = service.update_job(job.id, name="new name")
|
|
assert isinstance(result, CronJob)
|
|
assert result.name == "new name"
|
|
assert result.payload.message == "hello"
|
|
|
|
|
|
def test_update_job_changes_schedule(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
job = service.add_job(
|
|
name="sched",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
old_next = job.state.next_run_at_ms
|
|
|
|
new_sched = CronSchedule(kind="every", every_ms=120_000)
|
|
result = service.update_job(job.id, schedule=new_sched)
|
|
assert isinstance(result, CronJob)
|
|
assert result.schedule.every_ms == 120_000
|
|
assert result.state.next_run_at_ms != old_next
|
|
|
|
|
|
def test_update_job_changes_message(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
job = service.add_job(
|
|
name="msg",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="old message",
|
|
)
|
|
result = service.update_job(job.id, message="new message")
|
|
assert isinstance(result, CronJob)
|
|
assert result.payload.message == "new message"
|
|
|
|
|
|
def test_update_job_changes_cron_expression(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
job = service.add_job(
|
|
name="cron-job",
|
|
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="UTC"),
|
|
message="hello",
|
|
)
|
|
result = service.update_job(
|
|
job.id,
|
|
schedule=CronSchedule(kind="cron", expr="0 18 * * *", tz="UTC"),
|
|
)
|
|
assert isinstance(result, CronJob)
|
|
assert result.schedule.expr == "0 18 * * *"
|
|
assert result.state.next_run_at_ms is not None
|
|
|
|
|
|
def test_update_job_not_found(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
result = service.update_job("nonexistent", name="x")
|
|
assert result == "not_found"
|
|
|
|
|
|
def test_update_job_rejects_system_job(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
service.register_system_job(CronJob(
|
|
id="dream",
|
|
name="dream",
|
|
schedule=CronSchedule(kind="cron", expr="0 */2 * * *", tz="UTC"),
|
|
payload=CronPayload(kind="system_event"),
|
|
))
|
|
result = service.update_job("dream", name="hacked")
|
|
assert result == "protected"
|
|
assert service.get_job("dream").name == "dream"
|
|
|
|
|
|
def test_update_job_validates_schedule(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
job = service.add_job(
|
|
name="validate",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
with pytest.raises(ValueError, match="unknown timezone"):
|
|
service.update_job(
|
|
job.id,
|
|
schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="Bad/Zone"),
|
|
)
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_update_job_preserves_run_history(tmp_path) -> None:
|
|
import asyncio
|
|
store_path = tmp_path / "cron" / "jobs.json"
|
|
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
|
|
job = service.add_job(
|
|
name="hist",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
await service.run_job(job.id)
|
|
|
|
result = service.update_job(job.id, name="renamed")
|
|
assert isinstance(result, CronJob)
|
|
assert len(result.state.run_history) == 1
|
|
assert result.state.run_history[0].status == "ok"
|
|
|
|
|
|
def test_update_job_offline_writes_action(tmp_path) -> None:
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
job = service.add_job(
|
|
name="offline",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
)
|
|
service.update_job(job.id, name="updated-offline")
|
|
|
|
action_path = tmp_path / "cron" / "action.jsonl"
|
|
assert action_path.exists()
|
|
lines = [l for l in action_path.read_text().strip().split("\n") if l]
|
|
last = json.loads(lines[-1])
|
|
assert last["action"] == "update"
|
|
assert last["params"]["name"] == "updated-offline"
|
|
|
|
|
|
def test_update_job_sentinel_channel_and_to(tmp_path) -> None:
|
|
"""Passing None clears channel/to; omitting leaves them unchanged."""
|
|
service = CronService(tmp_path / "cron" / "jobs.json")
|
|
job = service.add_job(
|
|
name="sentinel",
|
|
schedule=CronSchedule(kind="every", every_ms=60_000),
|
|
message="hello",
|
|
channel="telegram",
|
|
to="user123",
|
|
)
|
|
assert job.payload.channel == "telegram"
|
|
assert job.payload.to == "user123"
|
|
|
|
result = service.update_job(job.id, name="renamed")
|
|
assert isinstance(result, CronJob)
|
|
assert result.payload.channel == "telegram"
|
|
assert result.payload.to == "user123"
|
|
|
|
result = service.update_job(job.id, channel=None, to=None)
|
|
assert isinstance(result, CronJob)
|
|
assert result.payload.channel is None
|
|
assert result.payload.to is None
|