add reload jobs test

This commit is contained in:
xinnan.hou 2026-04-07 17:44:21 +08:00 committed by Xubin Ren
parent fd2bb3bb7d
commit a982d9f9be

View File

@ -156,3 +156,23 @@ def test_remove_job_refuses_system_jobs(tmp_path) -> None:
assert result == "protected"
assert service.get_job("dream") is not None
def test_reload_jobs(tmp_path):
store_path = tmp_path / "cron" / "jobs.json"
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
service.add_job(
name="hist",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
assert len(service.list_jobs()) == 1
service2 = CronService(tmp_path / "cron" / "jobs.json")
service2.add_job(
name="hist2",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello2",
)
assert len(service.list_jobs()) == 2