test(cron): add regression test for running service picking up external adds

Co-authored-by: chengyongru
Made-with: Cursor
This commit is contained in:
Xubin Ren 2026-04-07 14:47:34 +00:00 committed by Xubin Ren
parent a982d9f9be
commit 423aab09dd

View File

@ -176,3 +176,35 @@ def test_reload_jobs(tmp_path):
message="hello2",
)
assert len(service.list_jobs()) == 2
@pytest.mark.asyncio
async def test_running_service_picks_up_external_add(tmp_path):
"""A running service should detect and execute a job added by another instance."""
store_path = tmp_path / "cron" / "jobs.json"
called: list[str] = []
async def on_job(job):
called.append(job.name)
service = CronService(store_path, on_job=on_job)
service.add_job(
name="heartbeat",
schedule=CronSchedule(kind="every", every_ms=150),
message="tick",
)
await service.start()
try:
await asyncio.sleep(0.05)
external = CronService(store_path)
external.add_job(
name="external",
schedule=CronSchedule(kind="every", every_ms=150),
message="ping",
)
await asyncio.sleep(0.6)
assert "external" in called
finally:
service.stop()