From 9bba83cfe60ecb4c3eba84370d31c943a90fc24c Mon Sep 17 00:00:00 2001 From: webjoin111 <455457521@qq.com> Date: Wed, 29 Oct 2025 15:49:37 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat(scheduler):=20=E4=B8=BA?= =?UTF-8?q?=E5=A4=9A=E7=9B=AE=E6=A0=87=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1?= =?UTF-8?q?=E6=B7=BB=E5=8A=A0=E5=9B=BA=E5=AE=9A=E9=97=B4=E9=9A=94=E4=B8=B2?= =?UTF-8?q?=E8=A1=8C=E6=89=A7=E8=A1=8C=E9=80=89=E9=A1=B9?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scheduler_admin/__init__.py | 12 +++++- .../scheduler_admin/commands.py | 5 +++ .../scheduler_admin/data_source.py | 3 ++ .../scheduler_admin/handlers.py | 11 +++++ zhenxun/models/scheduled_job.py | 3 +- zhenxun/services/scheduler/engine.py | 42 +++++++++++++++---- zhenxun/services/scheduler/manager.py | 4 ++ zhenxun/services/scheduler/types.py | 7 +++- 8 files changed, 76 insertions(+), 11 deletions(-) diff --git a/zhenxun/builtin_plugins/scheduler_admin/__init__.py b/zhenxun/builtin_plugins/scheduler_admin/__init__.py index f3ee4f99..47d01822 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/__init__.py +++ b/zhenxun/builtin_plugins/scheduler_admin/__init__.py @@ -111,6 +111,14 @@ __plugin_meta__ = PluginMetadata( default_value=300, type=int, ), + RegisterConfig( + module="SchedulerManager", + key="DEFAULT_INTERVAL_SECONDS", + value=0, + help="为多目标定时任务设置的默认串行执行间隔秒数(大于0时生效),用于控制任务间的固定时间间隔。", + default_value=0, + type=int, + ), ], - ).to_dict(), -) + ).to_dict(), + ) diff --git a/zhenxun/builtin_plugins/scheduler_admin/commands.py b/zhenxun/builtin_plugins/scheduler_admin/commands.py index 9bbcdbca..955875de 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/commands.py +++ b/zhenxun/builtin_plugins/scheduler_admin/commands.py @@ -128,6 +128,11 @@ schedule_cmd = on_alconna( Args["spread_seconds", int], help_text="设置多目标执行的分散延迟(秒)", ), + Option( + "--fixed-interval", + Args["interval_seconds", int], + help_text="设置任务间的固定执行间隔(秒),将强制串行", + ), Option( "--permission", Args["perm_level", int], diff --git a/zhenxun/builtin_plugins/scheduler_admin/data_source.py b/zhenxun/builtin_plugins/scheduler_admin/data_source.py index 06a67da3..bdb4b659 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/data_source.py +++ b/zhenxun/builtin_plugins/scheduler_admin/data_source.py @@ -65,6 +65,7 @@ class SchedulerAdminService: job_name: str | None, jitter: int | None, spread: int | None, + interval: int | None, created_by: str, ) -> str: """创建或更新一个定时任务""" @@ -77,6 +78,8 @@ class SchedulerAdminService: execution_options["jitter"] = jitter if spread is not None: execution_options["spread"] = spread + if interval is not None: + execution_options["interval"] = interval for target_desc in targets: target_type, target_id = self._resolve_target_descriptor(target_desc) diff --git a/zhenxun/builtin_plugins/scheduler_admin/handlers.py b/zhenxun/builtin_plugins/scheduler_admin/handlers.py index 6d02a30b..26b91a88 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/handlers.py +++ b/zhenxun/builtin_plugins/scheduler_admin/handlers.py @@ -63,6 +63,7 @@ async def handle_set( tag_name: Match[str] = AlconnaMatch("tag_name"), jitter: Match[int] = AlconnaMatch("jitter_seconds"), spread: Match[int] = AlconnaMatch("spread_seconds"), + interval: Match[int] = AlconnaMatch("interval_seconds"), job_name: Match[str] = AlconnaMatch("job_name"), bot_id_to_operate: str = Depends(GetBotId), trigger_info: tuple[str, dict] = Depends(GetTriggerInfo), @@ -74,6 +75,7 @@ async def handle_set( p_name = plugin_name.result jitter_val: int | None = jitter.result if jitter.available else None spread_val: int | None = spread.result if spread.available else None + interval_val: int | None = interval.result if interval.available else None is_multi_target = ( len(target_groups) > 1 @@ -100,6 +102,14 @@ async def handle_set( "SchedulerManager", "DEFAULT_SPREAD_SECONDS" ) + if interval_val is None: + if task_meta and task_meta.get("default_interval") is not None: + interval_val = cast(int | None, task_meta["default_interval"]) + else: + interval_val = Config.get_config( + "SchedulerManager", "DEFAULT_INTERVAL_SECONDS" + ) + result_message = await scheduler_admin_service.set_schedule( targets=target_groups, creator_permission_level=creator_permission_level, @@ -111,6 +121,7 @@ async def handle_set( job_name=job_name.result if job_name.available else None, jitter=jitter_val, spread=spread_val, + interval=interval_val, created_by=session.user.id, ) await MessageUtils.build_message(result_message).send() diff --git a/zhenxun/models/scheduled_job.py b/zhenxun/models/scheduled_job.py index 8bae484a..6ae0de60 100644 --- a/zhenxun/models/scheduled_job.py +++ b/zhenxun/models/scheduled_job.py @@ -46,7 +46,8 @@ class ScheduledJob(Model): consecutive_failures = fields.IntField(default=0, description="连续失败次数") execution_options = fields.JSONField( null=True, - description="任务执行的额外选项 (例如: jitter, spread, concurrency_policy)", + description="任务执行的额外选项 (例如: jitter, spread, " + "interval, concurrency_policy)", ) create_time = fields.DatetimeField(auto_now_add=True) diff --git a/zhenxun/services/scheduler/engine.py b/zhenxun/services/scheduler/engine.py index 33537496..7f711b9e 100644 --- a/zhenxun/services/scheduler/engine.py +++ b/zhenxun/services/scheduler/engine.py @@ -412,16 +412,44 @@ async def _execute_job( if isinstance(schedule.execution_options, dict) else {} ) - spread_seconds = spread_config.get("spread", 1.0) + interval_seconds = spread_config.get("interval") - async def worker(target_id: str | None): - await asyncio.sleep(random.uniform(0.1, spread_seconds)) - async with semaphore: + if interval_seconds is not None and interval_seconds > 0: + logger.debug( + f"任务 {schedule.id}: 使用串行模式执行 {len(resolved_targets)} " + f"个目标,固定间隔 {interval_seconds} 秒。" + ) + for i, target_id in enumerate(resolved_targets): + if i > 0: + logger.debug( + f"任务 {schedule.id} 目标 [{target_id or '全局'}]: " + f"等待 {interval_seconds} 秒后执行。" + ) + await asyncio.sleep(interval_seconds) await _execute_single_job_instance(schedule, bot, group_id=target_id) + else: + spread_seconds = spread_config.get("spread", 1.0) - tasks_to_run = [worker(target_id) for target_id in resolved_targets] - if tasks_to_run: - await asyncio.gather(*tasks_to_run, return_exceptions=True) + logger.debug( + f"任务 {schedule.id}: 将在 {spread_seconds:.2f} 秒内分散执行 " + f"{len(resolved_targets)} 个目标。" + ) + + async def worker(target_id: str | None): + delay = random.uniform(0.1, spread_seconds) + logger.debug( + f"任务 {schedule.id} 目标 [{target_id or '全局'}]: " + f"随机延迟 {delay:.2f} 秒后执行。" + ) + await asyncio.sleep(delay) + async with semaphore: + await _execute_single_job_instance( + schedule, bot, group_id=target_id + ) + + tasks_to_run = [worker(target_id) for target_id in resolved_targets] + if tasks_to_run: + await asyncio.gather(*tasks_to_run, return_exceptions=True) schedule.last_run_at = datetime.now() schedule.last_run_status = "SUCCESS" diff --git a/zhenxun/services/scheduler/manager.py b/zhenxun/services/scheduler/manager.py index b7ddd4e8..07369713 100644 --- a/zhenxun/services/scheduler/manager.py +++ b/zhenxun/services/scheduler/manager.py @@ -109,6 +109,7 @@ class SchedulerManager: policy: ExecutionPolicy | None = None, default_jitter: int | None = None, default_spread: int | None = None, + default_interval: int | None = None, ): """ 声明式定时任务的统一装饰器。 @@ -140,6 +141,7 @@ class SchedulerManager: "model": params_model, "default_jitter": default_jitter, "default_spread": default_spread, + "default_interval": default_interval, } job_kwargs = model_dump(default_params) if default_params else {} @@ -205,6 +207,7 @@ class SchedulerManager: default_permission: int = 5, default_jitter: int | None = None, default_spread: int | None = None, + default_interval: int | None = None, ) -> Callable: """ 注册可调度的任务函数 @@ -220,6 +223,7 @@ class SchedulerManager: "default_permission": default_permission, "default_jitter": default_jitter, "default_spread": default_spread, + "default_interval": default_interval, } model_name = params_model.__name__ if params_model else "无" logger.debug( diff --git a/zhenxun/services/scheduler/types.py b/zhenxun/services/scheduler/types.py index 415b8654..236770c9 100644 --- a/zhenxun/services/scheduler/types.py +++ b/zhenxun/services/scheduler/types.py @@ -83,7 +83,12 @@ class ExecutionOptions(BaseModel): """ jitter: int | None = Field(None, description="触发时间抖动(秒)") - spread: int | None = Field(None, description="多目标执行的分散延迟(秒)") + spread: int | None = Field( + None, description="(并发模式)多目标执行的最大分散延迟(秒)" + ) + interval: int | None = Field( + None, description="多目标执行的固定间隔(秒),设置后将强制串行执行" + ) concurrency_policy: Literal["ALLOW", "SKIP", "QUEUE"] = Field( "ALLOW", description="并发策略" )