fix(cron): 修复固定间隔任务因 store 并发替换导致的重复执行

_on_timer 中 await _execute_job 让出控制权期间,前端轮询触发的
list_jobs 调用 _load_store 从磁盘重新加载覆盖 self._store,
已执行任务的状态被旧值回退,导致再次触发。
引入 _timer_active 标志位,在任务执行期间阻止并发 _load_store
替换 store。同时修复 store 为空时未重新 arm timer 的问题。

Made-with: Cursor
This commit is contained in:
weitongtong 2026-04-11 14:34:45 +08:00 committed by Xubin Ren
parent 5932482d01
commit e0ba568089

View File

@ -80,6 +80,7 @@ class CronService:
self._store: CronStore | None = None
self._timer_task: asyncio.Task | None = None
self._running = False
self._timer_active = False
self.max_sleep_ms = max_sleep_ms
def _load_jobs(self) -> tuple[list[CronJob], int]:
@ -171,7 +172,11 @@ class CronService:
def _load_store(self) -> CronStore:
"""Load jobs from disk. Reloads automatically if file was modified externally.
- Reload every time because it needs to merge operations on the jobs object from other instances.
- During _on_timer execution, return the existing store to prevent concurrent
_load_store calls (e.g. from list_jobs polling) from replacing it mid-execution.
"""
if self._timer_active and self._store:
return self._store
jobs, version = self._load_jobs()
self._store = CronStore(version=version, jobs=jobs)
self._merge_action()
@ -290,18 +295,23 @@ class CronService:
"""Handle timer tick - run due jobs."""
self._load_store()
if not self._store:
self._arm_timer()
return
now = _now_ms()
due_jobs = [
j for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
self._timer_active = True
try:
now = _now_ms()
due_jobs = [
j for j in self._store.jobs
if j.enabled and j.state.next_run_at_ms and now >= j.state.next_run_at_ms
]
for job in due_jobs:
await self._execute_job(job)
for job in due_jobs:
await self._execute_job(job)
self._save_store()
self._save_store()
finally:
self._timer_active = False
self._arm_timer()
async def _execute_job(self, job: CronJob) -> None: