fix(cron): guard _load_store against reentrant reload during job execution

When on_job callbacks call list_jobs() (which triggers _load_store),
the in-memory state is reloaded from disk, discarding the next_run_at_ms
updates that _on_timer is actively computing. This causes jobs to
re-trigger indefinitely on the next tick.

Add an _executing flag around the job execution loop. While set,
_load_store returns the cached store instead of reloading from disk.

Includes regression test.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Daniel Phang 2026-04-11 00:34:48 -07:00
parent e392c27f7e
commit b52bfddf16
2 changed files with 57 additions and 2 deletions

View File

@ -80,6 +80,7 @@ class CronService:
self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None
self._running = False
self._executing = False
self.max_sleep_ms = max_sleep_ms
def _load_jobs(self) -> tuple[list[CronJob], int]:
@ -171,7 +172,11 @@ class CronService:
def _load_store(self) -> CronStore:
"""Load jobs from disk. Reloads automatically if file was modified externally.
- Reload every time because it needs to merge operations on the jobs object from other instances.
- Skip reload when _executing to prevent on_job callbacks (e.g. list_jobs)
from replacing in-memory state that _on_timer is actively modifying.
"""
if self._executing and self._store is not None:
return self._store
jobs, version = self._load_jobs()
self._store = CronStore(version=version, jobs=jobs)
self._merge_action()
@ -298,8 +303,12 @@ class CronService:
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
for job in due_jobs:
await self._execute_job(job)
self._executing = True
try:
for job in due_jobs:
await self._execute_job(job)
finally:
self._executing = False
self._save_store()
self._arm_timer()

View File

@ -479,3 +479,49 @@ def test_update_job_sentinel_channel_and_to(tmp_path) -> None:
assert isinstance(result, CronJob)
assert result.payload.channel is None
assert result.payload.to is None
@pytest.mark.asyncio
async def test_list_jobs_during_on_job_does_not_cause_stale_reload(tmp_path) -> None:
"""Regression: if the bot calls list_jobs (which reloads from disk) during
on_job execution, the in-memory next_run_at_ms update must not be lost.
Previously this caused an infinite re-trigger loop."""
store_path = tmp_path / "cron" / "jobs.json"
execution_count = 0
async def on_job_that_lists(job):
nonlocal execution_count
execution_count += 1
# Simulate the bot calling cron(action=list) mid-execution
service.list_jobs()
service = CronService(store_path, on_job=on_job_that_lists, max_sleep_ms=100)
await service.start()
# Add two jobs scheduled in the past so they're immediately due
now_ms = int(time.time() * 1000)
for name in ("job-a", "job-b"):
service.add_job(
name=name,
schedule=CronSchedule(kind="every", every_ms=3_600_000),
message="test",
)
# Force next_run to the past so _on_timer picks them up
for job in service._store.jobs:
job.state.next_run_at_ms = now_ms - 1000
service._save_store()
service._arm_timer()
# Let the timer fire once
await asyncio.sleep(0.3)
service.stop()
# Each job should have run exactly once, not looped
assert execution_count == 2
# Verify next_run_at_ms was persisted correctly (in the future)
raw = json.loads(store_path.read_text())
for j in raw["jobs"]:
next_run = j["state"]["nextRunAtMs"]
assert next_run is not None
assert next_run > now_ms, f"Job '{j['name']}' next_run should be in the future"