feat(scheduler): 为多目标定时任务添加固定间隔串行执行选项

This commit is contained in:
webjoin111 2025-10-29 15:49:37 +08:00
parent 6815caf805
commit 9bba83cfe6
8 changed files with 76 additions and 11 deletions

View File

@ -111,6 +111,14 @@ __plugin_meta__ = PluginMetadata(
default_value=300, default_value=300,
type=int, type=int,
), ),
RegisterConfig(
module="SchedulerManager",
key="DEFAULT_INTERVAL_SECONDS",
value=0,
help="为多目标定时任务设置的默认串行执行间隔秒数(大于0时生效),用于控制任务间的固定时间间隔。",
default_value=0,
type=int,
),
], ],
).to_dict(), ).to_dict(),
) )

View File

@ -128,6 +128,11 @@ schedule_cmd = on_alconna(
Args["spread_seconds", int], Args["spread_seconds", int],
help_text="设置多目标执行的分散延迟(秒)", help_text="设置多目标执行的分散延迟(秒)",
), ),
Option(
"--fixed-interval",
Args["interval_seconds", int],
help_text="设置任务间的固定执行间隔(秒),将强制串行",
),
Option( Option(
"--permission", "--permission",
Args["perm_level", int], Args["perm_level", int],

View File

@ -65,6 +65,7 @@ class SchedulerAdminService:
job_name: str | None, job_name: str | None,
jitter: int | None, jitter: int | None,
spread: int | None, spread: int | None,
interval: int | None,
created_by: str, created_by: str,
) -> str: ) -> str:
"""创建或更新一个定时任务""" """创建或更新一个定时任务"""
@ -77,6 +78,8 @@ class SchedulerAdminService:
execution_options["jitter"] = jitter execution_options["jitter"] = jitter
if spread is not None: if spread is not None:
execution_options["spread"] = spread execution_options["spread"] = spread
if interval is not None:
execution_options["interval"] = interval
for target_desc in targets: for target_desc in targets:
target_type, target_id = self._resolve_target_descriptor(target_desc) target_type, target_id = self._resolve_target_descriptor(target_desc)

View File

@ -63,6 +63,7 @@ async def handle_set(
tag_name: Match[str] = AlconnaMatch("tag_name"), tag_name: Match[str] = AlconnaMatch("tag_name"),
jitter: Match[int] = AlconnaMatch("jitter_seconds"), jitter: Match[int] = AlconnaMatch("jitter_seconds"),
spread: Match[int] = AlconnaMatch("spread_seconds"), spread: Match[int] = AlconnaMatch("spread_seconds"),
interval: Match[int] = AlconnaMatch("interval_seconds"),
job_name: Match[str] = AlconnaMatch("job_name"), job_name: Match[str] = AlconnaMatch("job_name"),
bot_id_to_operate: str = Depends(GetBotId), bot_id_to_operate: str = Depends(GetBotId),
trigger_info: tuple[str, dict] = Depends(GetTriggerInfo), trigger_info: tuple[str, dict] = Depends(GetTriggerInfo),
@ -74,6 +75,7 @@ async def handle_set(
p_name = plugin_name.result p_name = plugin_name.result
jitter_val: int | None = jitter.result if jitter.available else None jitter_val: int | None = jitter.result if jitter.available else None
spread_val: int | None = spread.result if spread.available else None spread_val: int | None = spread.result if spread.available else None
interval_val: int | None = interval.result if interval.available else None
is_multi_target = ( is_multi_target = (
len(target_groups) > 1 len(target_groups) > 1
@ -100,6 +102,14 @@ async def handle_set(
"SchedulerManager", "DEFAULT_SPREAD_SECONDS" "SchedulerManager", "DEFAULT_SPREAD_SECONDS"
) )
if interval_val is None:
if task_meta and task_meta.get("default_interval") is not None:
interval_val = cast(int | None, task_meta["default_interval"])
else:
interval_val = Config.get_config(
"SchedulerManager", "DEFAULT_INTERVAL_SECONDS"
)
result_message = await scheduler_admin_service.set_schedule( result_message = await scheduler_admin_service.set_schedule(
targets=target_groups, targets=target_groups,
creator_permission_level=creator_permission_level, creator_permission_level=creator_permission_level,
@ -111,6 +121,7 @@ async def handle_set(
job_name=job_name.result if job_name.available else None, job_name=job_name.result if job_name.available else None,
jitter=jitter_val, jitter=jitter_val,
spread=spread_val, spread=spread_val,
interval=interval_val,
created_by=session.user.id, created_by=session.user.id,
) )
await MessageUtils.build_message(result_message).send() await MessageUtils.build_message(result_message).send()

View File

@ -46,7 +46,8 @@ class ScheduledJob(Model):
consecutive_failures = fields.IntField(default=0, description="连续失败次数") consecutive_failures = fields.IntField(default=0, description="连续失败次数")
execution_options = fields.JSONField( execution_options = fields.JSONField(
null=True, null=True,
description="任务执行的额外选项 (例如: jitter, spread, concurrency_policy)", description="任务执行的额外选项 (例如: jitter, spread, "
"interval, concurrency_policy)",
) )
create_time = fields.DatetimeField(auto_now_add=True) create_time = fields.DatetimeField(auto_now_add=True)

View File

@ -412,16 +412,44 @@ async def _execute_job(
if isinstance(schedule.execution_options, dict) if isinstance(schedule.execution_options, dict)
else {} else {}
) )
spread_seconds = spread_config.get("spread", 1.0) interval_seconds = spread_config.get("interval")
async def worker(target_id: str | None): if interval_seconds is not None and interval_seconds > 0:
await asyncio.sleep(random.uniform(0.1, spread_seconds)) logger.debug(
async with semaphore: f"任务 {schedule.id}: 使用串行模式执行 {len(resolved_targets)} "
f"个目标,固定间隔 {interval_seconds} 秒。"
)
for i, target_id in enumerate(resolved_targets):
if i > 0:
logger.debug(
f"任务 {schedule.id} 目标 [{target_id or '全局'}]: "
f"等待 {interval_seconds} 秒后执行。"
)
await asyncio.sleep(interval_seconds)
await _execute_single_job_instance(schedule, bot, group_id=target_id) await _execute_single_job_instance(schedule, bot, group_id=target_id)
else:
spread_seconds = spread_config.get("spread", 1.0)
tasks_to_run = [worker(target_id) for target_id in resolved_targets] logger.debug(
if tasks_to_run: f"任务 {schedule.id}: 将在 {spread_seconds:.2f} 秒内分散执行 "
await asyncio.gather(*tasks_to_run, return_exceptions=True) f"{len(resolved_targets)} 个目标。"
)
async def worker(target_id: str | None):
delay = random.uniform(0.1, spread_seconds)
logger.debug(
f"任务 {schedule.id} 目标 [{target_id or '全局'}]: "
f"随机延迟 {delay:.2f} 秒后执行。"
)
await asyncio.sleep(delay)
async with semaphore:
await _execute_single_job_instance(
schedule, bot, group_id=target_id
)
tasks_to_run = [worker(target_id) for target_id in resolved_targets]
if tasks_to_run:
await asyncio.gather(*tasks_to_run, return_exceptions=True)
schedule.last_run_at = datetime.now() schedule.last_run_at = datetime.now()
schedule.last_run_status = "SUCCESS" schedule.last_run_status = "SUCCESS"

View File

@ -109,6 +109,7 @@ class SchedulerManager:
policy: ExecutionPolicy | None = None, policy: ExecutionPolicy | None = None,
default_jitter: int | None = None, default_jitter: int | None = None,
default_spread: int | None = None, default_spread: int | None = None,
default_interval: int | None = None,
): ):
""" """
声明式定时任务的统一装饰器 声明式定时任务的统一装饰器
@ -140,6 +141,7 @@ class SchedulerManager:
"model": params_model, "model": params_model,
"default_jitter": default_jitter, "default_jitter": default_jitter,
"default_spread": default_spread, "default_spread": default_spread,
"default_interval": default_interval,
} }
job_kwargs = model_dump(default_params) if default_params else {} job_kwargs = model_dump(default_params) if default_params else {}
@ -205,6 +207,7 @@ class SchedulerManager:
default_permission: int = 5, default_permission: int = 5,
default_jitter: int | None = None, default_jitter: int | None = None,
default_spread: int | None = None, default_spread: int | None = None,
default_interval: int | None = None,
) -> Callable: ) -> Callable:
""" """
注册可调度的任务函数 注册可调度的任务函数
@ -220,6 +223,7 @@ class SchedulerManager:
"default_permission": default_permission, "default_permission": default_permission,
"default_jitter": default_jitter, "default_jitter": default_jitter,
"default_spread": default_spread, "default_spread": default_spread,
"default_interval": default_interval,
} }
model_name = params_model.__name__ if params_model else "" model_name = params_model.__name__ if params_model else ""
logger.debug( logger.debug(

View File

@ -83,7 +83,12 @@ class ExecutionOptions(BaseModel):
""" """
jitter: int | None = Field(None, description="触发时间抖动(秒)") jitter: int | None = Field(None, description="触发时间抖动(秒)")
spread: int | None = Field(None, description="多目标执行的分散延迟(秒)") spread: int | None = Field(
None, description="(并发模式)多目标执行的最大分散延迟(秒)"
)
interval: int | None = Field(
None, description="多目标执行的固定间隔(秒),设置后将强制串行执行"
)
concurrency_policy: Literal["ALLOW", "SKIP", "QUEUE"] = Field( concurrency_policy: Literal["ALLOW", "SKIP", "QUEUE"] = Field(
"ALLOW", description="并发策略" "ALLOW", description="并发策略"
) )