From 142cb46956db801b213280afb91ebdf3deeca892 Mon Sep 17 00:00:00 2001 From: Xubin Ren Date: Wed, 8 Apr 2026 15:27:20 +0000 Subject: [PATCH] fix(cron): preserve manual run state and merged history Keep manual runs from flipping the scheduler's running flag, rebuild merged run history records from action logs, and avoid delaying sub-second jobs to a one-second floor. Add regression coverage for disabled/manual runs, merged history persistence, and sub-second timers. Made-with: Cursor --- nanobot/cron/service.py | 39 ++++++++-------- nanobot/cron/types.py | 7 ++- tests/cron/test_cron_service.py | 82 +++++++++++++++++++++++++++++++++ 3 files changed, 109 insertions(+), 19 deletions(-) diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py index 1807c2926..1259d3d72 100644 --- a/nanobot/cron/service.py +++ b/nanobot/cron/service.py @@ -71,7 +71,7 @@ class CronService: self, store_path: Path, on_job: Callable[[CronJob], Coroutine[Any, Any, str | None]] | None = None, - max_sleep_ms: int = 300_000 # 5 minutes + max_sleep_ms: int = 300_000, # 5 minutes ): self.store_path = store_path self._action_path = store_path.parent / "action.jsonl" @@ -272,8 +272,11 @@ class CronService: if not self._running: return - next_wake = self._get_next_wake_ms() or 0 - delay_ms = min(self.max_sleep_ms ,max(1000, next_wake - _now_ms())) + next_wake = self._get_next_wake_ms() + if next_wake is None: + delay_ms = self.max_sleep_ms + else: + delay_ms = min(self.max_sleep_ms, max(0, next_wake - _now_ms())) delay_s = delay_ms / 1000 async def tick(): @@ -458,23 +461,23 @@ class CronService: return None async def run_job(self, job_id: str, force: bool = False) -> bool: - """Manually run a job. For testing purposes - - It's not that the gateway instance cannot run because it doesn't have the on_job method. - - There may be concurrency issues. - """ + """Manually run a job without disturbing the service's running state.""" + was_running = self._running self._running = True - store = self._load_store() - for job in store.jobs: - if job.id == job_id: - if not force and not job.enabled: - return False - await self._execute_job(job) - self._save_store() - self._running = False + try: + store = self._load_store() + for job in store.jobs: + if job.id == job_id: + if not force and not job.enabled: + return False + await self._execute_job(job) + self._save_store() + return True + return False + finally: + self._running = was_running + if was_running: self._arm_timer() - return True - self._running = False - return False def get_job(self, job_id: str) -> CronJob | None: """Get a job by ID.""" diff --git a/nanobot/cron/types.py b/nanobot/cron/types.py index 8a1d1e0f1..c38542e17 100644 --- a/nanobot/cron/types.py +++ b/nanobot/cron/types.py @@ -63,9 +63,14 @@ class CronJob: @classmethod def from_dict(cls, kwargs: dict): + state_kwargs = dict(kwargs.get("state", {})) + state_kwargs["run_history"] = [ + record if isinstance(record, CronRunRecord) else CronRunRecord(**record) + for record in state_kwargs.get("run_history", []) + ] kwargs["schedule"] = CronSchedule(**kwargs.get("schedule", {"kind": "every"})) kwargs["payload"] = CronPayload(**kwargs.get("payload", {})) - kwargs["state"] = CronJobState(**kwargs.get("state", {})) + kwargs["state"] = CronJobState(**state_kwargs) return cls(**kwargs) diff --git a/tests/cron/test_cron_service.py b/tests/cron/test_cron_service.py index 51cff228c..b54cf5e20 100644 --- a/tests/cron/test_cron_service.py +++ b/tests/cron/test_cron_service.py @@ -115,6 +115,41 @@ async def test_run_history_persisted_to_disk(tmp_path) -> None: assert loaded.state.run_history[0].status == "ok" +@pytest.mark.asyncio +async def test_run_job_disabled_does_not_flip_running_state(tmp_path) -> None: + store_path = tmp_path / "cron" / "jobs.json" + service = CronService(store_path, on_job=lambda _: asyncio.sleep(0)) + job = service.add_job( + name="disabled", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + service.enable_job(job.id, enabled=False) + + result = await service.run_job(job.id) + + assert result is False + assert service._running is False + + +@pytest.mark.asyncio +async def test_run_job_preserves_running_service_state(tmp_path) -> None: + store_path = tmp_path / "cron" / "jobs.json" + service = CronService(store_path, on_job=lambda _: asyncio.sleep(0)) + service._running = True + job = service.add_job( + name="manual", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + + result = await service.run_job(job.id, force=True) + + assert result is True + assert service._running is True + service.stop() + + @pytest.mark.asyncio async def test_running_service_honors_external_disable(tmp_path) -> None: store_path = tmp_path / "cron" / "jobs.json" @@ -182,6 +217,28 @@ async def test_start_server_not_jobs(tmp_path): service.stop() +@pytest.mark.asyncio +async def test_subsecond_job_not_delayed_to_one_second(tmp_path): + store_path = tmp_path / "cron" / "jobs.json" + called = [] + + async def on_job(job): + called.append(job.name) + + service = CronService(store_path, on_job=on_job, max_sleep_ms=5000) + service.add_job( + name="fast", + schedule=CronSchedule(kind="every", every_ms=100), + message="hello", + ) + await service.start() + try: + await asyncio.sleep(0.35) + assert called + finally: + service.stop() + + @pytest.mark.asyncio async def test_running_service_picks_up_external_add(tmp_path): """A running service should detect and execute a job added by another instance.""" @@ -245,3 +302,28 @@ async def test_add_job_during_jobs_exec(tmp_path): assert "test" in [j.name for j in jobs] finally: service.stop() + + +@pytest.mark.asyncio +async def test_external_update_preserves_run_history_records(tmp_path): + store_path = tmp_path / "cron" / "jobs.json" + service = CronService(store_path, on_job=lambda _: asyncio.sleep(0)) + job = service.add_job( + name="history", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + await service.run_job(job.id, force=True) + + external = CronService(store_path) + updated = external.enable_job(job.id, enabled=False) + assert updated is not None + + fresh = CronService(store_path) + loaded = fresh.get_job(job.id) + assert loaded is not None + assert loaded.state.run_history + assert loaded.state.run_history[0].status == "ok" + + fresh._running = True + fresh._save_store()