mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
Compare commits
3 Commits
e6f6ad0559
...
7b04d81074
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7b04d81074 | ||
|
|
b74fb66939 | ||
|
|
9bba83cfe6 |
@ -111,6 +111,14 @@ __plugin_meta__ = PluginMetadata(
|
|||||||
default_value=300,
|
default_value=300,
|
||||||
type=int,
|
type=int,
|
||||||
),
|
),
|
||||||
|
RegisterConfig(
|
||||||
|
module="SchedulerManager",
|
||||||
|
key="DEFAULT_INTERVAL_SECONDS",
|
||||||
|
value=0,
|
||||||
|
help="为多目标定时任务设置的默认串行执行间隔秒数(大于0时生效),用于控制任务间的固定时间间隔。",
|
||||||
|
default_value=0,
|
||||||
|
type=int,
|
||||||
|
),
|
||||||
],
|
],
|
||||||
).to_dict(),
|
).to_dict(),
|
||||||
)
|
)
|
||||||
|
|||||||
@ -128,6 +128,11 @@ schedule_cmd = on_alconna(
|
|||||||
Args["spread_seconds", int],
|
Args["spread_seconds", int],
|
||||||
help_text="设置多目标执行的分散延迟(秒)",
|
help_text="设置多目标执行的分散延迟(秒)",
|
||||||
),
|
),
|
||||||
|
Option(
|
||||||
|
"--fixed-interval",
|
||||||
|
Args["interval_seconds", int],
|
||||||
|
help_text="设置任务间的固定执行间隔(秒),将强制串行",
|
||||||
|
),
|
||||||
Option(
|
Option(
|
||||||
"--permission",
|
"--permission",
|
||||||
Args["perm_level", int],
|
Args["perm_level", int],
|
||||||
@ -142,8 +147,8 @@ schedule_cmd = on_alconna(
|
|||||||
Subcommand(
|
Subcommand(
|
||||||
"删除",
|
"删除",
|
||||||
Args[
|
Args[
|
||||||
"schedule_id?",
|
"schedule_ids?",
|
||||||
int,
|
MultiVar(int),
|
||||||
Field(unmatch_tips=lambda text: f"任务ID '{text}' 必须是数字!"),
|
Field(unmatch_tips=lambda text: f"任务ID '{text}' 必须是数字!"),
|
||||||
],
|
],
|
||||||
*create_targeting_options(),
|
*create_targeting_options(),
|
||||||
@ -153,8 +158,8 @@ schedule_cmd = on_alconna(
|
|||||||
Subcommand(
|
Subcommand(
|
||||||
"暂停",
|
"暂停",
|
||||||
Args[
|
Args[
|
||||||
"schedule_id?",
|
"schedule_ids?",
|
||||||
int,
|
MultiVar(int),
|
||||||
Field(unmatch_tips=lambda text: f"任务ID '{text}' 必须是数字!"),
|
Field(unmatch_tips=lambda text: f"任务ID '{text}' 必须是数字!"),
|
||||||
],
|
],
|
||||||
*create_targeting_options(),
|
*create_targeting_options(),
|
||||||
@ -164,8 +169,8 @@ schedule_cmd = on_alconna(
|
|||||||
Subcommand(
|
Subcommand(
|
||||||
"恢复",
|
"恢复",
|
||||||
Args[
|
Args[
|
||||||
"schedule_id?",
|
"schedule_ids?",
|
||||||
int,
|
MultiVar(int),
|
||||||
Field(unmatch_tips=lambda text: f"任务ID '{text}' 必须是数字!"),
|
Field(unmatch_tips=lambda text: f"任务ID '{text}' 必须是数字!"),
|
||||||
],
|
],
|
||||||
*create_targeting_options(),
|
*create_targeting_options(),
|
||||||
|
|||||||
@ -65,6 +65,7 @@ class SchedulerAdminService:
|
|||||||
job_name: str | None,
|
job_name: str | None,
|
||||||
jitter: int | None,
|
jitter: int | None,
|
||||||
spread: int | None,
|
spread: int | None,
|
||||||
|
interval: int | None,
|
||||||
created_by: str,
|
created_by: str,
|
||||||
) -> str:
|
) -> str:
|
||||||
"""创建或更新一个定时任务"""
|
"""创建或更新一个定时任务"""
|
||||||
@ -77,6 +78,8 @@ class SchedulerAdminService:
|
|||||||
execution_options["jitter"] = jitter
|
execution_options["jitter"] = jitter
|
||||||
if spread is not None:
|
if spread is not None:
|
||||||
execution_options["spread"] = spread
|
execution_options["spread"] = spread
|
||||||
|
if interval is not None:
|
||||||
|
execution_options["interval"] = interval
|
||||||
|
|
||||||
for target_desc in targets:
|
for target_desc in targets:
|
||||||
target_type, target_id = self._resolve_target_descriptor(target_desc)
|
target_type, target_id = self._resolve_target_descriptor(target_desc)
|
||||||
|
|||||||
@ -158,7 +158,7 @@ async def GetTargeter(
|
|||||||
event: Event,
|
event: Event,
|
||||||
bot: Bot,
|
bot: Bot,
|
||||||
arp: Arparma = AlconnaMatches(),
|
arp: Arparma = AlconnaMatches(),
|
||||||
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
|
schedule_ids: Match[list[int]] = AlconnaMatch("schedule_ids"),
|
||||||
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
|
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
|
||||||
group_ids: Match[list[str]] = AlconnaMatch("group_ids"),
|
group_ids: Match[list[str]] = AlconnaMatch("group_ids"),
|
||||||
user_id: Match[str] = AlconnaMatch("user_id"),
|
user_id: Match[str] = AlconnaMatch("user_id"),
|
||||||
@ -172,8 +172,8 @@ async def GetTargeter(
|
|||||||
if not subcommand:
|
if not subcommand:
|
||||||
await matcher.finish("内部错误:无法解析子命令。")
|
await matcher.finish("内部错误:无法解析子命令。")
|
||||||
|
|
||||||
if schedule_id.available:
|
if schedule_ids.available:
|
||||||
return scheduler_manager.target(id=schedule_id.result)
|
return scheduler_manager.target(id__in=schedule_ids.result)
|
||||||
|
|
||||||
all_enabled = arp.query(f"{subcommand}.all.value", False)
|
all_enabled = arp.query(f"{subcommand}.all.value", False)
|
||||||
global_flag = arp.query(f"{subcommand}.global.value", False)
|
global_flag = arp.query(f"{subcommand}.global.value", False)
|
||||||
|
|||||||
@ -63,6 +63,7 @@ async def handle_set(
|
|||||||
tag_name: Match[str] = AlconnaMatch("tag_name"),
|
tag_name: Match[str] = AlconnaMatch("tag_name"),
|
||||||
jitter: Match[int] = AlconnaMatch("jitter_seconds"),
|
jitter: Match[int] = AlconnaMatch("jitter_seconds"),
|
||||||
spread: Match[int] = AlconnaMatch("spread_seconds"),
|
spread: Match[int] = AlconnaMatch("spread_seconds"),
|
||||||
|
interval: Match[int] = AlconnaMatch("interval_seconds"),
|
||||||
job_name: Match[str] = AlconnaMatch("job_name"),
|
job_name: Match[str] = AlconnaMatch("job_name"),
|
||||||
bot_id_to_operate: str = Depends(GetBotId),
|
bot_id_to_operate: str = Depends(GetBotId),
|
||||||
trigger_info: tuple[str, dict] = Depends(GetTriggerInfo),
|
trigger_info: tuple[str, dict] = Depends(GetTriggerInfo),
|
||||||
@ -74,6 +75,7 @@ async def handle_set(
|
|||||||
p_name = plugin_name.result
|
p_name = plugin_name.result
|
||||||
jitter_val: int | None = jitter.result if jitter.available else None
|
jitter_val: int | None = jitter.result if jitter.available else None
|
||||||
spread_val: int | None = spread.result if spread.available else None
|
spread_val: int | None = spread.result if spread.available else None
|
||||||
|
interval_val: int | None = interval.result if interval.available else None
|
||||||
|
|
||||||
is_multi_target = (
|
is_multi_target = (
|
||||||
len(target_groups) > 1
|
len(target_groups) > 1
|
||||||
@ -100,6 +102,14 @@ async def handle_set(
|
|||||||
"SchedulerManager", "DEFAULT_SPREAD_SECONDS"
|
"SchedulerManager", "DEFAULT_SPREAD_SECONDS"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if interval_val is None:
|
||||||
|
if task_meta and task_meta.get("default_interval") is not None:
|
||||||
|
interval_val = cast(int | None, task_meta["default_interval"])
|
||||||
|
else:
|
||||||
|
interval_val = Config.get_config(
|
||||||
|
"SchedulerManager", "DEFAULT_INTERVAL_SECONDS"
|
||||||
|
)
|
||||||
|
|
||||||
result_message = await scheduler_admin_service.set_schedule(
|
result_message = await scheduler_admin_service.set_schedule(
|
||||||
targets=target_groups,
|
targets=target_groups,
|
||||||
creator_permission_level=creator_permission_level,
|
creator_permission_level=creator_permission_level,
|
||||||
@ -111,6 +121,7 @@ async def handle_set(
|
|||||||
job_name=job_name.result if job_name.available else None,
|
job_name=job_name.result if job_name.available else None,
|
||||||
jitter=jitter_val,
|
jitter=jitter_val,
|
||||||
spread=spread_val,
|
spread=spread_val,
|
||||||
|
interval=interval_val,
|
||||||
created_by=session.user.id,
|
created_by=session.user.id,
|
||||||
)
|
)
|
||||||
await MessageUtils.build_message(result_message).send()
|
await MessageUtils.build_message(result_message).send()
|
||||||
|
|||||||
@ -46,7 +46,8 @@ class ScheduledJob(Model):
|
|||||||
consecutive_failures = fields.IntField(default=0, description="连续失败次数")
|
consecutive_failures = fields.IntField(default=0, description="连续失败次数")
|
||||||
execution_options = fields.JSONField(
|
execution_options = fields.JSONField(
|
||||||
null=True,
|
null=True,
|
||||||
description="任务执行的额外选项 (例如: jitter, spread, concurrency_policy)",
|
description="任务执行的额外选项 (例如: jitter, spread, "
|
||||||
|
"interval, concurrency_policy)",
|
||||||
)
|
)
|
||||||
create_time = fields.DatetimeField(auto_now_add=True)
|
create_time = fields.DatetimeField(auto_now_add=True)
|
||||||
|
|
||||||
|
|||||||
@ -412,16 +412,44 @@ async def _execute_job(
|
|||||||
if isinstance(schedule.execution_options, dict)
|
if isinstance(schedule.execution_options, dict)
|
||||||
else {}
|
else {}
|
||||||
)
|
)
|
||||||
spread_seconds = spread_config.get("spread", 1.0)
|
interval_seconds = spread_config.get("interval")
|
||||||
|
|
||||||
async def worker(target_id: str | None):
|
if interval_seconds is not None and interval_seconds > 0:
|
||||||
await asyncio.sleep(random.uniform(0.1, spread_seconds))
|
logger.debug(
|
||||||
async with semaphore:
|
f"任务 {schedule.id}: 使用串行模式执行 {len(resolved_targets)} "
|
||||||
|
f"个目标,固定间隔 {interval_seconds} 秒。"
|
||||||
|
)
|
||||||
|
for i, target_id in enumerate(resolved_targets):
|
||||||
|
if i > 0:
|
||||||
|
logger.debug(
|
||||||
|
f"任务 {schedule.id} 目标 [{target_id or '全局'}]: "
|
||||||
|
f"等待 {interval_seconds} 秒后执行。"
|
||||||
|
)
|
||||||
|
await asyncio.sleep(interval_seconds)
|
||||||
await _execute_single_job_instance(schedule, bot, group_id=target_id)
|
await _execute_single_job_instance(schedule, bot, group_id=target_id)
|
||||||
|
else:
|
||||||
|
spread_seconds = spread_config.get("spread", 1.0)
|
||||||
|
|
||||||
tasks_to_run = [worker(target_id) for target_id in resolved_targets]
|
logger.debug(
|
||||||
if tasks_to_run:
|
f"任务 {schedule.id}: 将在 {spread_seconds:.2f} 秒内分散执行 "
|
||||||
await asyncio.gather(*tasks_to_run, return_exceptions=True)
|
f"{len(resolved_targets)} 个目标。"
|
||||||
|
)
|
||||||
|
|
||||||
|
async def worker(target_id: str | None):
|
||||||
|
delay = random.uniform(0.1, spread_seconds)
|
||||||
|
logger.debug(
|
||||||
|
f"任务 {schedule.id} 目标 [{target_id or '全局'}]: "
|
||||||
|
f"随机延迟 {delay:.2f} 秒后执行。"
|
||||||
|
)
|
||||||
|
await asyncio.sleep(delay)
|
||||||
|
async with semaphore:
|
||||||
|
await _execute_single_job_instance(
|
||||||
|
schedule, bot, group_id=target_id
|
||||||
|
)
|
||||||
|
|
||||||
|
tasks_to_run = [worker(target_id) for target_id in resolved_targets]
|
||||||
|
if tasks_to_run:
|
||||||
|
await asyncio.gather(*tasks_to_run, return_exceptions=True)
|
||||||
|
|
||||||
schedule.last_run_at = datetime.now()
|
schedule.last_run_at = datetime.now()
|
||||||
schedule.last_run_status = "SUCCESS"
|
schedule.last_run_status = "SUCCESS"
|
||||||
|
|||||||
@ -109,6 +109,7 @@ class SchedulerManager:
|
|||||||
policy: ExecutionPolicy | None = None,
|
policy: ExecutionPolicy | None = None,
|
||||||
default_jitter: int | None = None,
|
default_jitter: int | None = None,
|
||||||
default_spread: int | None = None,
|
default_spread: int | None = None,
|
||||||
|
default_interval: int | None = None,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
声明式定时任务的统一装饰器。
|
声明式定时任务的统一装饰器。
|
||||||
@ -140,6 +141,7 @@ class SchedulerManager:
|
|||||||
"model": params_model,
|
"model": params_model,
|
||||||
"default_jitter": default_jitter,
|
"default_jitter": default_jitter,
|
||||||
"default_spread": default_spread,
|
"default_spread": default_spread,
|
||||||
|
"default_interval": default_interval,
|
||||||
}
|
}
|
||||||
|
|
||||||
job_kwargs = model_dump(default_params) if default_params else {}
|
job_kwargs = model_dump(default_params) if default_params else {}
|
||||||
@ -205,6 +207,7 @@ class SchedulerManager:
|
|||||||
default_permission: int = 5,
|
default_permission: int = 5,
|
||||||
default_jitter: int | None = None,
|
default_jitter: int | None = None,
|
||||||
default_spread: int | None = None,
|
default_spread: int | None = None,
|
||||||
|
default_interval: int | None = None,
|
||||||
) -> Callable:
|
) -> Callable:
|
||||||
"""
|
"""
|
||||||
注册可调度的任务函数
|
注册可调度的任务函数
|
||||||
@ -220,6 +223,7 @@ class SchedulerManager:
|
|||||||
"default_permission": default_permission,
|
"default_permission": default_permission,
|
||||||
"default_jitter": default_jitter,
|
"default_jitter": default_jitter,
|
||||||
"default_spread": default_spread,
|
"default_spread": default_spread,
|
||||||
|
"default_interval": default_interval,
|
||||||
}
|
}
|
||||||
model_name = params_model.__name__ if params_model else "无"
|
model_name = params_model.__name__ if params_model else "无"
|
||||||
logger.debug(
|
logger.debug(
|
||||||
@ -468,9 +472,7 @@ class SchedulerManager:
|
|||||||
"required_permission": required_permission,
|
"required_permission": required_permission,
|
||||||
"source": source,
|
"source": source,
|
||||||
"is_one_off": is_one_off,
|
"is_one_off": is_one_off,
|
||||||
"execution_options": model_dump(
|
"execution_options": model_dump(validated_options, exclude_none=True),
|
||||||
validated_options, exclude_none=True
|
|
||||||
),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
defaults = {k: v for k, v in defaults.items() if v is not None}
|
defaults = {k: v for k, v in defaults.items() if v is not None}
|
||||||
|
|||||||
@ -77,13 +77,19 @@ class Trigger:
|
|||||||
"""创建一个 Date 触发器配置。"""
|
"""创建一个 Date 触发器配置。"""
|
||||||
return DateTrigger(**kwargs)
|
return DateTrigger(**kwargs)
|
||||||
|
|
||||||
|
|
||||||
class ExecutionOptions(BaseModel):
|
class ExecutionOptions(BaseModel):
|
||||||
"""
|
"""
|
||||||
封装定时任务的执行策略,包括重试和回调。
|
封装定时任务的执行策略,包括重试和回调。
|
||||||
"""
|
"""
|
||||||
|
|
||||||
jitter: int | None = Field(None, description="触发时间抖动(秒)")
|
jitter: int | None = Field(None, description="触发时间抖动(秒)")
|
||||||
spread: int | None = Field(None, description="多目标执行的分散延迟(秒)")
|
spread: int | None = Field(
|
||||||
|
None, description="(并发模式)多目标执行的最大分散延迟(秒)"
|
||||||
|
)
|
||||||
|
interval: int | None = Field(
|
||||||
|
None, description="多目标执行的固定间隔(秒),设置后将强制串行执行"
|
||||||
|
)
|
||||||
concurrency_policy: Literal["ALLOW", "SKIP", "QUEUE"] = Field(
|
concurrency_policy: Literal["ALLOW", "SKIP", "QUEUE"] = Field(
|
||||||
"ALLOW", description="并发策略"
|
"ALLOW", description="并发策略"
|
||||||
)
|
)
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user