From 1a51f907aa2ca578101faafa4458953d5c34c1fa Mon Sep 17 00:00:00 2001 From: weitongtong Date: Fri, 10 Apr 2026 16:10:15 +0800 Subject: [PATCH] =?UTF-8?q?feat(cron):=20=E6=B7=BB=E5=8A=A0=20CronService.?= =?UTF-8?q?update=5Fjob=20=E6=96=B9=E6=B3=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 支持更新已有定时任务的名称、调度计划、消息内容、投递配置等可变字段。 系统任务(system_event)受保护不可编辑。包含完整的单元测试覆盖。 Made-with: Cursor --- nanobot/cron/service.py | 53 +++++++++++++ tests/cron/test_cron_service.py | 127 ++++++++++++++++++++++++++++++++ 2 files changed, 180 insertions(+) diff --git a/nanobot/cron/service.py b/nanobot/cron/service.py index 1259d3d72..267613012 100644 --- a/nanobot/cron/service.py +++ b/nanobot/cron/service.py @@ -460,6 +460,59 @@ class CronService: return job return None + def update_job( + self, + job_id: str, + *, + name: str | None = None, + schedule: CronSchedule | None = None, + message: str | None = None, + deliver: bool | None = None, + channel: str | None = ..., + to: str | None = ..., + delete_after_run: bool | None = None, + ) -> CronJob | Literal["not_found", "protected"]: + """Update mutable fields of an existing job. System jobs cannot be updated. + + For ``channel`` and ``to``, pass an explicit value (including ``None``) + to update; omit (sentinel ``...``) to leave unchanged. + """ + store = self._load_store() + job = next((j for j in store.jobs if j.id == job_id), None) + if job is None: + return "not_found" + if job.payload.kind == "system_event": + return "protected" + + if schedule is not None: + _validate_schedule_for_add(schedule) + job.schedule = schedule + if name is not None: + job.name = name + if message is not None: + job.payload.message = message + if deliver is not None: + job.payload.deliver = deliver + if channel is not ...: + job.payload.channel = channel + if to is not ...: + job.payload.to = to + if delete_after_run is not None: + job.delete_after_run = delete_after_run + + job.updated_at_ms = _now_ms() + if job.enabled: + job.state.next_run_at_ms = _compute_next_run(job.schedule, _now_ms()) + + if self._running: + self._save_store() + self._arm_timer() + else: + self._append_action("update", asdict(job)) + + logger.info("Cron: updated job '{}' ({})", job.name, job.id) + return job + async def run_job(self, job_id: str, force: bool = False) -> bool: """Manually run a job without disturbing the service's running state.""" was_running = self._running diff --git a/tests/cron/test_cron_service.py b/tests/cron/test_cron_service.py index b54cf5e20..d10d9569f 100644 --- a/tests/cron/test_cron_service.py +++ b/tests/cron/test_cron_service.py @@ -327,3 +327,130 @@ async def test_external_update_preserves_run_history_records(tmp_path): fresh._running = True fresh._save_store() + + +# ── update_job tests ── + + +def test_update_job_changes_name(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + job = service.add_job( + name="old name", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + result = service.update_job(job.id, name="new name") + assert isinstance(result, CronJob) + assert result.name == "new name" + assert result.payload.message == "hello" + + +def test_update_job_changes_schedule(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + job = service.add_job( + name="sched", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + old_next = job.state.next_run_at_ms + + new_sched = CronSchedule(kind="every", every_ms=120_000) + result = service.update_job(job.id, schedule=new_sched) + assert isinstance(result, CronJob) + assert result.schedule.every_ms == 120_000 + assert result.state.next_run_at_ms != old_next + + +def test_update_job_changes_message(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + job = service.add_job( + name="msg", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="old message", + ) + result = service.update_job(job.id, message="new message") + assert isinstance(result, CronJob) + assert result.payload.message == "new message" + + +def test_update_job_changes_cron_expression(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + job = service.add_job( + name="cron-job", + schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="UTC"), + message="hello", + ) + result = service.update_job( + job.id, + schedule=CronSchedule(kind="cron", expr="0 18 * * *", tz="UTC"), + ) + assert isinstance(result, CronJob) + assert result.schedule.expr == "0 18 * * *" + assert result.state.next_run_at_ms is not None + + +def test_update_job_not_found(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + result = service.update_job("nonexistent", name="x") + assert result == "not_found" + + +def test_update_job_rejects_system_job(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + service.register_system_job(CronJob( + id="dream", + name="dream", + schedule=CronSchedule(kind="cron", expr="0 */2 * * *", tz="UTC"), + payload=CronPayload(kind="system_event"), + )) + result = service.update_job("dream", name="hacked") + assert result == "protected" + assert service.get_job("dream").name == "dream" + + +def test_update_job_validates_schedule(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + job = service.add_job( + name="validate", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + with pytest.raises(ValueError, match="unknown timezone"): + service.update_job( + job.id, + schedule=CronSchedule(kind="cron", expr="0 9 * * *", tz="Bad/Zone"), + ) + + +def test_update_job_preserves_run_history(tmp_path) -> None: + import asyncio + store_path = tmp_path / "cron" / "jobs.json" + service = CronService(store_path, on_job=lambda _: asyncio.sleep(0)) + job = service.add_job( + name="hist", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + asyncio.get_event_loop().run_until_complete(service.run_job(job.id)) + + result = service.update_job(job.id, name="renamed") + assert isinstance(result, CronJob) + assert len(result.state.run_history) == 1 + assert result.state.run_history[0].status == "ok" + + +def test_update_job_offline_writes_action(tmp_path) -> None: + service = CronService(tmp_path / "cron" / "jobs.json") + job = service.add_job( + name="offline", + schedule=CronSchedule(kind="every", every_ms=60_000), + message="hello", + ) + service.update_job(job.id, name="updated-offline") + + action_path = tmp_path / "cron" / "action.jsonl" + assert action_path.exists() + lines = [l for l in action_path.read_text().strip().split("\n") if l] + last = json.loads(lines[-1]) + assert last["action"] == "update" + assert last["params"]["name"] == "updated-offline"