fix(cron): preserve manual run state and merged history

Keep manual runs from flipping the scheduler's running flag, rebuild merged run history records from action logs, and avoid delaying sub-second jobs to a one-second floor. Add regression coverage for disabled/manual runs, merged history persistence, and sub-second timers.

Made-with: Cursor
This commit is contained in:
Xubin Ren 2026-04-08 15:27:20 +00:00 committed by Xubin Ren
parent 0f1e3aa151
commit 142cb46956
3 changed files with 109 additions and 19 deletions

View File

@ -71,7 +71,7 @@ class CronService:
self,
store_path: Path,
on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None,
max_sleep_ms: int = 300_000 # 5 minutes
max_sleep_ms: int = 300_000, # 5 minutes
):
self.store_path = store_path
self._action_path = store_path.parent / "action.jsonl"
@ -272,8 +272,11 @@ class CronService:
if not self._running:
return
next_wake = self._get_next_wake_ms() or 0
delay_ms = min(self.max_sleep_ms ,max(1000, next_wake - _now_ms()))
next_wake = self._get_next_wake_ms()
if next_wake is None:
delay_ms = self.max_sleep_ms
else:
delay_ms = min(self.max_sleep_ms, max(0, next_wake - _now_ms()))
delay_s = delay_ms / 1000
async def tick():
@ -458,23 +461,23 @@ class CronService:
return None
async def run_job(self, job_id: str, force: bool = False) -> bool:
"""Manually run a job. For testing purposes
- It's not that the gateway instance cannot run because it doesn't have the on_job method.
- There may be concurrency issues.
"""
"""Manually run a job without disturbing the service's running state."""
was_running = self._running
self._running = True
store = self._load_store()
for job in store.jobs:
if job.id == job_id:
if not force and not job.enabled:
return False
await self._execute_job(job)
self._save_store()
self._running = False
try:
store = self._load_store()
for job in store.jobs:
if job.id == job_id:
if not force and not job.enabled:
return False
await self._execute_job(job)
self._save_store()
return True
return False
finally:
self._running = was_running
if was_running:
self._arm_timer()
return True
self._running = False
return False
def get_job(self, job_id: str) -> CronJob | None:
"""Get a job by ID."""

View File

@ -63,9 +63,14 @@ class CronJob:
@classmethod
def from_dict(cls, kwargs: dict):
state_kwargs = dict(kwargs.get("state", {}))
state_kwargs["run_history"] = [
record if isinstance(record, CronRunRecord) else CronRunRecord(**record)
for record in state_kwargs.get("run_history", [])
]
kwargs["schedule"] = CronSchedule(**kwargs.get("schedule", {"kind": "every"}))
kwargs["payload"] = CronPayload(**kwargs.get("payload", {}))
kwargs["state"] = CronJobState(**kwargs.get("state", {}))
kwargs["state"] = CronJobState(**state_kwargs)
return cls(**kwargs)

View File

@ -115,6 +115,41 @@ async def test_run_history_persisted_to_disk(tmp_path) -> None:
assert loaded.state.run_history[0].status == "ok"
@pytest.mark.asyncio
async def test_run_job_disabled_does_not_flip_running_state(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
job = service.add_job(
name="disabled",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
service.enable_job(job.id, enabled=False)
result = await service.run_job(job.id)
assert result is False
assert service._running is False
@pytest.mark.asyncio
async def test_run_job_preserves_running_service_state(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
service._running = True
job = service.add_job(
name="manual",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
result = await service.run_job(job.id, force=True)
assert result is True
assert service._running is True
service.stop()
@pytest.mark.asyncio
async def test_running_service_honors_external_disable(tmp_path) -> None:
store_path = tmp_path / "cron" / "jobs.json"
@ -182,6 +217,28 @@ async def test_start_server_not_jobs(tmp_path):
service.stop()
@pytest.mark.asyncio
async def test_subsecond_job_not_delayed_to_one_second(tmp_path):
store_path = tmp_path / "cron" / "jobs.json"
called = []
async def on_job(job):
called.append(job.name)
service = CronService(store_path, on_job=on_job, max_sleep_ms=5000)
service.add_job(
name="fast",
schedule=CronSchedule(kind="every", every_ms=100),
message="hello",
)
await service.start()
try:
await asyncio.sleep(0.35)
assert called
finally:
service.stop()
@pytest.mark.asyncio
async def test_running_service_picks_up_external_add(tmp_path):
"""A running service should detect and execute a job added by another instance."""
@ -245,3 +302,28 @@ async def test_add_job_during_jobs_exec(tmp_path):
assert "test" in [j.name for j in jobs]
finally:
service.stop()
@pytest.mark.asyncio
async def test_external_update_preserves_run_history_records(tmp_path):
store_path = tmp_path / "cron" / "jobs.json"
service = CronService(store_path, on_job=lambda _: asyncio.sleep(0))
job = service.add_job(
name="history",
schedule=CronSchedule(kind="every", every_ms=60_000),
message="hello",
)
await service.run_job(job.id, force=True)
external = CronService(store_path)
updated = external.enable_job(job.id, enabled=False)
assert updated is not None
fresh = CronService(store_path)
loaded = fresh.get_job(job.id)
assert loaded is not None
assert loaded.state.run_history
assert loaded.state.run_history[0].status == "ok"
fresh._running = True
fresh._save_store()