feat(scheduler): 增强定时任务管理系统 (#1940)

*  feat(scheduler): 增强定时任务管理系统

- 新增状态查看、每日定时、私聊操作等功能
- 引入 Pydantic 参数验证,重构目标解析逻辑
- 添加并发控制,优化触发器显示格式
- 修复 ORM KeyError 问题,确保数据一致性
- 支持私聊通过 -g/-all 参数操作群组任务

* 🎨 修复pyright报错

* 🚨 auto fix by pre-commit hooks

---------

Co-authored-by: webjoin111 <455457521@qq.com>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
This commit is contained in:
Rumio 2025-07-01 10:31:38 +08:00 committed by GitHub
parent 87f02fd0ef
commit 8996cdf8f1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 463 additions and 231 deletions

View File

@ -9,37 +9,42 @@ __plugin_meta__ = PluginMetadata(
name="定时任务管理",
description="查看和管理由 SchedulerManager 控制的定时任务。",
usage="""
定时任务 查看 [-all] [-g <群号>] [-p <插件>] [--page <页码>] : 查看定时任务
定时任务 设置 <插件> [时间选项] [-g <群号> | -g all] [--kwargs <参数>] :
设置/开启任务 (SUPERUSER)
📋 定时任务管理 - 支持群聊和私聊操作
时间选项 (三选一):
--cron "<分> <时> <日> <月> <周>" : 设置 cron 表达式
(例如: --cron "0 8 * * *")
--interval <时间间隔> : 设置时间间隔
(例如: --interval 30m, --interval 2h, --interval 10s)
--date "<YYYY-MM-DD HH:MM:SS>" : 设置在特定时间执行一次
🔍 查看任务:
定时任务 查看 [-all] [-g <群号>] [-p <插件>] [--page <页码>]
群聊中: 查看本群任务
私聊中: 必须使用 -g <群号> -all 选项 (SUPERUSER)
定时任务 删除 <任务ID> | -p <插件> [-g <群号>] | -all : 删除任务 (SUPERUSER)
定时任务 暂停/恢复 <任务ID> | -p <插件> [-g <群号>] | -all :
暂停/恢复任务 (SUPERUSER)
定时任务 执行 <任务ID> : 立即手动执行一次任务 (SUPERUSER)
定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>] : 更新任务配置 (SUPERUSER)
定时任务 插件列表 : 查看所有可设置定时任务的插件 (SUPERUSER)
📊 任务状态:
定时任务 状态 <任务ID> 任务状态 <任务ID>
查看单个任务的详细信息和状态
别名支持:
- 查看: ls, list
- 设置: add, 开启
- 删除: del, rm, remove, 关闭, 取消
- 暂停: pause
- 恢复: resume
- 执行: trigger, run
- 更新: update, modify, 修改
- 插件列表: plugins
任务管理 (SUPERUSER):
定时任务 设置 <插件> [时间选项] [-g <群号> | -g all] [--kwargs <参数>]
定时任务 删除 <任务ID> | -p <插件> [-g <群号>] | -all
定时任务 暂停 <任务ID> | -p <插件> [-g <群号>] | -all
定时任务 恢复 <任务ID> | -p <插件> [-g <群号>] | -all
定时任务 执行 <任务ID>
定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>]
📝 时间选项 (三选一):
--cron "<分> <时> <日> <月> <周>" # 例: --cron "0 8 * * *"
--interval <时间间隔> # 例: --interval 30m, 2h, 10s
--date "<YYYY-MM-DD HH:MM:SS>" # 例: --date "2024-01-01 08:00:00"
--daily "<HH:MM>" # 例: --daily "08:30"
📚 其他功能:
定时任务 插件列表 # 查看所有可设置定时任务的插件 (SUPERUSER)
🏷 别名支持:
查看: ls, list | 设置: add, 开启 | 删除: del, rm, remove, 关闭, 取消
暂停: pause | 恢复: resume | 执行: trigger, run | 状态: status, info
更新: update, modify, 修改 | 插件列表: plugins
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1.1",
version="0.1.2",
plugin_type=PluginType.SUPERUSER,
is_show=False,
).to_dict(),

View File

@ -2,24 +2,39 @@ import asyncio
from datetime import datetime
import re
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent
from nonebot.adapters import Event
from nonebot.adapters.onebot.v11 import Bot
from nonebot.params import Depends
from nonebot.permission import SUPERUSER
from nonebot_plugin_alconna import (
Alconna,
AlconnaMatch,
Args,
Arparma,
Match,
Option,
Query,
Subcommand,
on_alconna,
)
from pydantic import BaseModel, ValidationError
from zhenxun.utils._image_template import ImageTemplate
from zhenxun.utils.manager.schedule_manager import scheduler_manager
def _get_type_name(annotation) -> str:
"""获取类型注解的名称"""
if hasattr(annotation, "__name__"):
return annotation.__name__
elif hasattr(annotation, "_name"):
return annotation._name
else:
return str(annotation)
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.rules import admin_check, ensure_group
from zhenxun.utils.rules import admin_check
def _format_trigger(schedule_status: dict) -> str:
@ -28,16 +43,26 @@ def _format_trigger(schedule_status: dict) -> str:
config = schedule_status["trigger_config"]
if trigger_type == "cron":
hour = config.get("hour")
minute = config.get("minute")
hour_str = f"{hour:02d}" if hour is not None else "*"
minute_str = f"{minute:02d}" if minute is not None else "*"
trigger_str = f"每天 {hour_str}:{minute_str}"
minute = config.get("minute", "*")
hour = config.get("hour", "*")
day = config.get("day", "*")
month = config.get("month", "*")
day_of_week = config.get("day_of_week", "*")
if day == "*" and month == "*" and day_of_week == "*":
formatted_hour = hour if hour == "*" else f"{int(hour):02d}"
formatted_minute = minute if minute == "*" else f"{int(minute):02d}"
return f"每天 {formatted_hour}:{formatted_minute}"
else:
return f"Cron: {minute} {hour} {day} {month} {day_of_week}"
elif trigger_type == "interval":
seconds = config.get("seconds", 0)
minutes = config.get("minutes", 0)
hours = config.get("hours", 0)
if hours:
days = config.get("days", 0)
if days:
trigger_str = f"{days}"
elif hours:
trigger_str = f"{hours} 小时"
elif minutes:
trigger_str = f"{minutes} 分钟"
@ -61,9 +86,10 @@ def _format_params(schedule_status: dict) -> str:
def _parse_interval(interval_str: str) -> dict:
match = re.match(r"(\d+)([smh])", interval_str.lower())
"""增强版解析器,支持 d(天)"""
match = re.match(r"(\d+)([smhd])", interval_str.lower())
if not match:
raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '10s' 的格式。")
raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '1d', '10s' 的格式。")
value, unit = int(match.group(1)), match.group(2)
if unit == "s":
@ -72,9 +98,37 @@ def _parse_interval(interval_str: str) -> dict:
return {"minutes": value}
if unit == "h":
return {"hours": value}
if unit == "d":
return {"days": value}
return {}
def _parse_daily_time(time_str: str) -> dict:
"""解析 HH:MM 或 HH:MM:SS 格式的时间为 cron 配置"""
if match := re.match(r"^(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?$", time_str):
hour, minute, second = match.groups()
hour, minute = int(hour), int(minute)
if not (0 <= hour <= 23 and 0 <= minute <= 59):
raise ValueError("小时或分钟数值超出范围。")
cron_config = {
"minute": str(minute),
"hour": str(hour),
"day": "*",
"month": "*",
"day_of_week": "*",
}
if second is not None:
if not (0 <= int(second) <= 59):
raise ValueError("秒数值超出范围。")
cron_config["second"] = str(second)
return cron_config
else:
raise ValueError("时间格式错误,请使用 'HH:MM''HH:MM:SS' 格式。")
async def GetBotId(
bot: Bot,
bot_id_match: Match[str] = AlconnaMatch("bot_id"),
@ -119,91 +173,45 @@ class TargetAll(ScheduleTarget):
TargetScope = TargetByID | TargetByPlugin | TargetAll | None
async def ParseScheduleTargetForDelete(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query("删除.all"),
) -> TargetScope:
"""解析删除命令的操作目标"""
if schedule_id.available:
return TargetByID(schedule_id.result)
def create_target_parser(subcommand_name: str):
"""
创建一个依赖注入函数用于解析删除暂停恢复等命令的操作目标
"""
async def dependency(
event: Event,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query(f"{subcommand_name}.all"),
) -> TargetScope:
if schedule_id.available:
return TargetByID(schedule_id.result)
if plugin_name.available:
p_name = plugin_name.result
if all_enabled.available:
return TargetByPlugin(plugin=p_name, all_groups=True)
elif group_id.available:
gid = group_id.result
if gid.lower() == "all":
return TargetByPlugin(plugin=p_name, all_groups=True)
return TargetByPlugin(plugin=p_name, group_id=gid)
else:
current_group_id = getattr(event, "group_id", None)
if current_group_id:
return TargetByPlugin(plugin=p_name, group_id=str(current_group_id))
else:
await schedule_cmd.finish(
"私聊中操作插件任务必须使用 -g <群号> 或 -all 选项。"
)
if plugin_name.available:
p_name = plugin_name.result
if all_enabled.available:
return TargetByPlugin(plugin=p_name, all_groups=True)
elif group_id.available:
gid = group_id.result
if gid.lower() == "all":
gid = "__ALL_GROUPS__"
return TargetByPlugin(plugin=p_name, group_id=gid)
else:
return TargetByPlugin(plugin=p_name, group_id=str(event.group_id))
return TargetAll(for_group=group_id.result if group_id.available else None)
if all_enabled.available:
return TargetAll(for_group=group_id.result if group_id.available else None)
return None
return None
async def ParseScheduleTargetForPause(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query("暂停.all"),
) -> TargetScope:
"""解析暂停命令的操作目标"""
if schedule_id.available:
return TargetByID(schedule_id.result)
if plugin_name.available:
p_name = plugin_name.result
if all_enabled.available:
return TargetByPlugin(plugin=p_name, all_groups=True)
elif group_id.available:
gid = group_id.result
if gid.lower() == "all":
gid = "__ALL_GROUPS__"
return TargetByPlugin(plugin=p_name, group_id=gid)
else:
return TargetByPlugin(plugin=p_name, group_id=str(event.group_id))
if all_enabled.available:
return TargetAll(for_group=group_id.result if group_id.available else None)
return None
async def ParseScheduleTargetForResume(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query("恢复.all"),
) -> TargetScope:
"""解析恢复命令的操作目标"""
if schedule_id.available:
return TargetByID(schedule_id.result)
if plugin_name.available:
p_name = plugin_name.result
if all_enabled.available:
return TargetByPlugin(plugin=p_name, all_groups=True)
elif group_id.available:
gid = group_id.result
if gid.lower() == "all":
gid = "__ALL_GROUPS__"
return TargetByPlugin(plugin=p_name, group_id=gid)
else:
return TargetByPlugin(plugin=p_name, group_id=str(event.group_id))
if all_enabled.available:
return TargetAll(for_group=group_id.result if group_id.available else None)
return None
return dependency
schedule_cmd = on_alconna(
@ -224,6 +232,11 @@ schedule_cmd = on_alconna(
Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"),
Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"),
Option("--date", Args["date_expr", str], help_text="设置特定执行日期"),
Option(
"--daily",
Args["daily_expr", str],
help_text="设置每天执行的时间 (如 08:20)",
),
Option("-g", Args["group_id", str], help_text="指定群组ID或'all'"),
Option("-all", help_text="对所有群生效 (等同于 -g all)"),
Option("--kwargs", Args["kwargs_str", str], help_text="设置任务参数"),
@ -281,10 +294,21 @@ schedule_cmd = on_alconna(
Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"),
Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"),
Option("--date", Args["date_expr", str], help_text="设置特定执行日期"),
Option(
"--daily",
Args["daily_expr", str],
help_text="更新每天执行的时间 (如 08:20)",
),
Option("--kwargs", Args["kwargs_str", str], help_text="更新参数"),
alias=["update", "modify", "修改"],
help_text="更新任务配置",
),
Subcommand(
"状态",
Args["schedule_id", int],
alias=["status", "info"],
help_text="查看单个任务的详细状态",
),
Subcommand(
"插件列表",
alias=["plugins"],
@ -293,14 +317,31 @@ schedule_cmd = on_alconna(
),
priority=5,
block=True,
rule=admin_check(1) & ensure_group,
rule=admin_check(1),
)
schedule_cmd.shortcut(
"任务状态",
command="定时任务",
arguments=["状态", "{%0}"],
prefix=True,
)
@schedule_cmd.handle()
async def _handle_time_options_mutex(arp: Arparma):
time_options = ["cron", "interval", "date", "daily"]
provided_options = [opt for opt in time_options if arp.query(opt) is not None]
if len(provided_options) > 1:
await schedule_cmd.finish(
f"时间选项 --{', --'.join(provided_options)} 不能同时使用,请只选择一个。"
)
@schedule_cmd.assign("查看")
async def _(
bot: Bot,
event: GroupMessageEvent,
event: Event,
target_group_id: Match[str] = AlconnaMatch("target_group_id"),
all_groups: Query[bool] = Query("查看.all"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
@ -310,6 +351,10 @@ async def _(
schedules = []
title = ""
current_group_id = getattr(event, "group_id", None)
if not (all_groups.available or target_group_id.available) and not current_group_id:
await schedule_cmd.finish("私聊中查看任务必须使用 -g <群号> 或 -all 选项。")
if all_groups.available:
if not is_superuser:
await schedule_cmd.finish("需要超级用户权限才能查看所有群组的定时任务。")
@ -324,7 +369,7 @@ async def _(
]
title = f"{gid} 的定时任务"
else:
gid = str(event.group_id)
gid = str(current_group_id)
schedules = [
s for s in await scheduler_manager.get_all_schedules() if s.group_id == gid
]
@ -391,12 +436,14 @@ async def _(
@schedule_cmd.assign("设置")
async def _(
event: Event,
plugin_name: str,
cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"),
interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"),
date_expr: Match[str] = AlconnaMatch("date.date_expr"),
group_id: Match[str] = AlconnaMatch("group_id"),
kwargs_str: Match[str] = AlconnaMatch("kwargs_str"),
cron_expr: str | None = None,
interval_expr: str | None = None,
date_expr: str | None = None,
daily_expr: str | None = None,
group_id: str | None = None,
kwargs_str: str | None = None,
all_enabled: Query[bool] = Query("设置.all"),
bot_id_to_operate: str = Depends(GetBotId),
):
@ -410,56 +457,91 @@ async def _(
trigger_config = {}
try:
if cron_expr.available:
if cron_expr:
trigger_type = "cron"
parts = cron_expr.result.split()
parts = cron_expr.split()
if len(parts) != 5:
raise ValueError("Cron 表达式必须有5个部分 (分 时 日 月 周)")
cron_keys = ["minute", "hour", "day", "month", "day_of_week"]
trigger_config = dict(zip(cron_keys, parts))
elif interval_expr.available:
elif interval_expr:
trigger_type = "interval"
trigger_config = _parse_interval(interval_expr.result)
elif date_expr.available:
trigger_config = _parse_interval(interval_expr)
elif date_expr:
trigger_type = "date"
trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)}
trigger_config = {"run_date": datetime.fromisoformat(date_expr)}
elif daily_expr:
trigger_type = "cron"
trigger_config = _parse_daily_time(daily_expr)
else:
await schedule_cmd.finish(
"必须提供一种时间选项: --cron, --interval, 或 --date。"
"必须提供一种时间选项: --cron, --interval, --date, 或 --daily"
)
except ValueError as e:
await schedule_cmd.finish(f"时间参数解析错误: {e}")
job_kwargs = None
if kwargs_str.available:
job_kwargs = {}
if kwargs_str:
task_meta = scheduler_manager._registered_tasks[plugin_name]
if not task_meta.get("params"):
params_model = task_meta.get("model")
if not params_model:
await schedule_cmd.finish(f"插件 '{plugin_name}' 不支持设置额外参数。")
registered_params = task_meta["params"]
job_kwargs = {}
if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)):
await schedule_cmd.finish(f"插件 '{plugin_name}' 的参数模型配置错误。")
raw_kwargs = {}
try:
for item in kwargs_str.result.split(","):
for item in kwargs_str.split(","):
key, value = item.strip().split("=", 1)
key = key.strip()
if key not in registered_params:
await schedule_cmd.finish(f"错误:插件不支持参数 '{key}'")
param_type = registered_params[key].get("type", str)
job_kwargs[key] = param_type(value)
raw_kwargs[key.strip()] = value
except Exception as e:
await schedule_cmd.finish(
f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}"
)
target_group_id: str | None = None
if group_id.available and group_id.result.lower() == "all":
try:
model_validate = getattr(params_model, "model_validate", None)
if not model_validate:
await schedule_cmd.finish(
f"插件 '{plugin_name}' 的参数模型不支持验证。"
)
return
validated_model = model_validate(raw_kwargs)
model_dump = getattr(validated_model, "model_dump", None)
if not model_dump:
await schedule_cmd.finish(
f"插件 '{plugin_name}' 的参数模型不支持导出。"
)
return
job_kwargs = model_dump()
except ValidationError as e:
errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()]
error_str = "\n".join(errors)
await schedule_cmd.finish(
f"插件 '{plugin_name}' 的任务参数验证失败:\n{error_str}"
)
return
target_group_id: str | None
current_group_id = getattr(event, "group_id", None)
if group_id and group_id.lower() == "all":
target_group_id = "__ALL_GROUPS__"
elif all_enabled.available:
target_group_id = "__ALL_GROUPS__"
elif group_id.available:
target_group_id = group_id.result
elif group_id:
target_group_id = group_id
elif current_group_id:
target_group_id = str(current_group_id)
else:
target_group_id = None
await schedule_cmd.finish(
"私聊中设置定时任务时,必须使用 -g <群号> 或 --all 选项指定目标。"
)
return
success, msg = await scheduler_manager.add_schedule(
plugin_name,
@ -471,7 +553,7 @@ async def _(
)
if target_group_id == "__ALL_GROUPS__":
target_desc = "所有群组"
target_desc = f"所有群组 (Bot: {bot_id_to_operate})"
elif target_group_id is None:
target_desc = "全局"
else:
@ -485,7 +567,7 @@ async def _(
@schedule_cmd.assign("删除")
async def _(
target: TargetScope = Depends(ParseScheduleTargetForDelete),
target: TargetScope = Depends(create_target_parser("删除")),
bot_id_to_operate: str = Depends(GetBotId),
):
if isinstance(target, TargetByID):
@ -531,7 +613,7 @@ async def _(
@schedule_cmd.assign("暂停")
async def _(
target: TargetScope = Depends(ParseScheduleTargetForPause),
target: TargetScope = Depends(create_target_parser("暂停")),
bot_id_to_operate: str = Depends(GetBotId),
):
if isinstance(target, TargetByID):
@ -568,7 +650,7 @@ async def _(
@schedule_cmd.assign("恢复")
async def _(
target: TargetScope = Depends(ParseScheduleTargetForResume),
target: TargetScope = Depends(create_target_parser("恢复")),
bot_id_to_operate: str = Depends(GetBotId),
):
if isinstance(target, TargetByID):
@ -612,89 +694,92 @@ async def _(schedule_id: int):
@schedule_cmd.assign("更新")
async def _(
schedule_id: int,
cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"),
interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"),
date_expr: Match[str] = AlconnaMatch("date.date_expr"),
kwargs_str: Match[str] = AlconnaMatch("kwargs_str"),
cron_expr: str | None = None,
interval_expr: str | None = None,
date_expr: str | None = None,
daily_expr: str | None = None,
kwargs_str: str | None = None,
):
if not any(
[
cron_expr.available,
interval_expr.available,
date_expr.available,
kwargs_str.available,
]
):
if not any([cron_expr, interval_expr, date_expr, daily_expr, kwargs_str]):
await schedule_cmd.finish(
"请提供需要更新的时间 (--cron/--interval/--date) 或参数 (--kwargs)"
"请提供需要更新的时间 (--cron/--interval/--date/--daily) 或参数 (--kwargs)"
)
trigger_config = None
trigger_type = None
try:
if cron_expr.available:
parts = cron_expr.result.split()
if cron_expr:
trigger_type = "cron"
parts = cron_expr.split()
if len(parts) != 5:
raise ValueError("Cron 表达式必须有5个部分")
cron_keys = ["minute", "hour", "day", "month", "day_of_week"]
trigger_config = dict(zip(cron_keys, parts))
elif interval_expr.available:
trigger_config = _parse_interval(interval_expr.result)
elif date_expr.available:
trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)}
elif interval_expr:
trigger_type = "interval"
trigger_config = _parse_interval(interval_expr)
elif date_expr:
trigger_type = "date"
trigger_config = {"run_date": datetime.fromisoformat(date_expr)}
elif daily_expr:
trigger_type = "cron"
trigger_config = _parse_daily_time(daily_expr)
except ValueError as e:
await schedule_cmd.finish(f"时间参数解析错误: {e}")
job_kwargs = None
if kwargs_str.available:
if kwargs_str:
schedule = await scheduler_manager.get_schedule_by_id(schedule_id)
if not schedule:
await schedule_cmd.finish(f"未找到 ID 为 {schedule_id} 的任务。")
if schedule.plugin_name not in scheduler_manager._registered_tasks:
await schedule_cmd.finish(f"插件 '{schedule.plugin_name}' 未注册定时任务。")
task_meta = scheduler_manager._registered_tasks[schedule.plugin_name]
if "params" not in task_meta or not task_meta["params"]:
task_meta = scheduler_manager._registered_tasks.get(schedule.plugin_name)
if not task_meta or not (params_model := task_meta.get("model")):
await schedule_cmd.finish(
f"插件 '{schedule.plugin_name}' 未定义参数元数据。"
f"请联系插件开发者更新插件注册代码。"
f"插件 '{schedule.plugin_name}' 未定义参数模型,无法更新参数。"
)
registered_params = task_meta["params"]
job_kwargs = {}
if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)):
await schedule_cmd.finish(
f"插件 '{schedule.plugin_name}' 的参数模型配置错误。"
)
raw_kwargs = {}
try:
for item in kwargs_str.result.split(","):
for item in kwargs_str.split(","):
key, value = item.strip().split("=", 1)
key = key.strip()
if key not in registered_params:
await schedule_cmd.finish(
f"错误:插件不支持参数 '{key}'"
f"可用参数: {list(registered_params.keys())}"
)
param_meta = registered_params[key]
if "type" not in param_meta:
await schedule_cmd.finish(
f"插件 '{schedule.plugin_name}' 的参数 '{key}' 未定义类型。"
f"请联系插件开发者更新参数元数据。"
)
param_type = param_meta["type"]
try:
job_kwargs[key] = param_type(value)
except (ValueError, TypeError):
await schedule_cmd.finish(
f"参数 '{key}' 的值 '{value}' 格式不正确,"
f"应为 {param_type.__name__} 类型。"
)
raw_kwargs[key.strip()] = value
except Exception as e:
await schedule_cmd.finish(
f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}"
)
try:
model_validate = getattr(params_model, "model_validate", None)
if not model_validate:
await schedule_cmd.finish(
f"插件 '{schedule.plugin_name}' 的参数模型不支持验证。"
)
return
validated_model = model_validate(raw_kwargs)
model_dump = getattr(validated_model, "model_dump", None)
if not model_dump:
await schedule_cmd.finish(
f"插件 '{schedule.plugin_name}' 的参数模型不支持导出。"
)
return
job_kwargs = model_dump(exclude_unset=True)
except ValidationError as e:
errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()]
error_str = "\n".join(errors)
await schedule_cmd.finish(f"更新的参数验证失败:\n{error_str}")
return
_, message = await scheduler_manager.update_schedule(
schedule_id, trigger_config, job_kwargs
schedule_id, trigger_type, trigger_config, job_kwargs
)
await schedule_cmd.finish(message)
@ -708,17 +793,44 @@ async def _():
message_parts = ["📋 已注册的定时任务插件:"]
for i, plugin_name in enumerate(registered_plugins, 1):
task_meta = scheduler_manager._registered_tasks[plugin_name]
if "params" not in task_meta:
message_parts.append(f"{i}. {plugin_name} - ⚠️ 未定义参数元数据")
params_model = task_meta.get("model")
if not params_model:
message_parts.append(f"{i}. {plugin_name} - 无参数")
continue
params = task_meta["params"]
if params:
if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)):
message_parts.append(f"{i}. {plugin_name} - ⚠️ 参数模型配置错误")
continue
model_fields = getattr(params_model, "model_fields", None)
if model_fields:
param_info = ", ".join(
f"{k}({v['type'].__name__})" for k, v in params.items()
f"{field_name}({_get_type_name(field_info.annotation)})"
for field_name, field_info in model_fields.items()
)
message_parts.append(f"{i}. {plugin_name} - 参数: {param_info}")
else:
message_parts.append(f"{i}. {plugin_name} - 无参数")
await schedule_cmd.finish("\n".join(message_parts))
@schedule_cmd.assign("状态")
async def _(schedule_id: int):
status = await scheduler_manager.get_schedule_status(schedule_id)
if not status:
await schedule_cmd.finish(f"未找到ID为 {schedule_id} 的定时任务。")
info_lines = [
f"📋 定时任务详细信息 (ID: {schedule_id})",
"--------------------",
f"▫️ 插件: {status['plugin_name']}",
f"▫️ Bot ID: {status.get('bot_id') or '默认'}",
f"▫️ 目标: {status['group_id'] or '全局'}",
f"▫️ 状态: {'✔️ 已启用' if status['is_enabled'] else '⏸️ 已暂停'}",
f"▫️ 下次运行: {status['next_run_time']}",
f"▫️ 触发规则: {_format_trigger(status)}",
f"▫️ 任务参数: {_format_params(status)}",
]
await schedule_cmd.finish("\n".join(info_lines))

View File

@ -1,5 +1,6 @@
import asyncio
from collections.abc import Callable, Coroutine
import copy
import inspect
import random
from typing import ClassVar
@ -7,25 +8,31 @@ from typing import ClassVar
import nonebot
from nonebot import get_bots
from nonebot_plugin_apscheduler import scheduler
from pydantic import BaseModel, ValidationError
from zhenxun.configs.config import Config
from zhenxun.models.schedule_info import ScheduleInfo
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.manager.priority_manager import PriorityLifecycle
from zhenxun.utils.platform import PlatformUtils
SCHEDULE_CONCURRENCY_KEY = "all_groups_concurrency_limit"
class SchedulerManager:
"""
一个通用的持久化的定时任务管理器供所有插件使用
"""
_registered_tasks: ClassVar[dict[str, dict]] = {}
_registered_tasks: ClassVar[
dict[str, dict[str, Callable | type[BaseModel] | None]]
] = {}
_JOB_PREFIX = "zhenxun_schedule_"
_running_tasks: ClassVar[set] = set()
def register(
self, plugin_name: str, params: dict[str, dict] | None = None
self, plugin_name: str, params_model: type[BaseModel] | None = None
) -> Callable:
"""
注册一个可调度的任务函数
@ -33,8 +40,8 @@ class SchedulerManager:
Args:
plugin_name (str): 插件的唯一名称 (通常是模块名)
params (dict, optional): 任务函数接受的额外参数元数据用于通用命令
格式: {"param_name": {"type": str, "help": "描述", "default": ...}}
params_model (type[BaseModel], optional): 一个 Pydantic BaseModel
用于定义和验证任务函数接受的额外参数
"""
def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]:
@ -42,9 +49,12 @@ class SchedulerManager:
logger.warning(f"插件 '{plugin_name}' 的定时任务已被重复注册。")
self._registered_tasks[plugin_name] = {
"func": func,
"params": params,
"model": params_model,
}
logger.debug(f"插件 '{plugin_name}' 的定时任务已注册,参数元数据: {params}")
model_name = params_model.__name__ if params_model else ""
logger.debug(
f"插件 '{plugin_name}' 的定时任务已注册,参数模型: {model_name}"
)
return func
return decorator
@ -107,9 +117,20 @@ class SchedulerManager:
):
"""为所有群组执行任务,并处理优先级覆盖。"""
plugin_name = schedule.plugin_name
concurrency_limit = Config.get_config(
"SchedulerManager", SCHEDULE_CONCURRENCY_KEY, 5
)
if not isinstance(concurrency_limit, int) or concurrency_limit <= 0:
logger.warning(
f"无效的定时任务并发限制配置 '{concurrency_limit}',将使用默认值 5。"
)
concurrency_limit = 5
logger.info(
f"开始执行针对 [所有群组] 的任务 "
f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})"
f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})"
f"并发限制: {concurrency_limit}"
)
all_gids = set()
@ -128,15 +149,25 @@ class SchedulerManager:
).values_list("group_id", flat=True)
)
semaphore = asyncio.Semaphore(concurrency_limit)
async def worker(gid: str):
"""使用 Semaphore 包装单个群组的任务执行"""
async with semaphore:
temp_schedule = copy.deepcopy(schedule)
temp_schedule.group_id = gid
await self._execute_for_single_target(temp_schedule, task_meta, bot)
await asyncio.sleep(random.uniform(0.1, 0.5))
tasks_to_run = []
for gid in all_gids:
if gid in specific_tasks_gids:
logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。")
continue
tasks_to_run.append(worker(gid))
temp_schedule = schedule
temp_schedule.group_id = gid
await self._execute_for_single_target(temp_schedule, task_meta, bot)
await asyncio.sleep(random.uniform(0.1, 0.5))
if tasks_to_run:
await asyncio.gather(*tasks_to_run)
async def _execute_for_single_target(
self, schedule: ScheduleInfo, task_meta: dict, bot
@ -182,6 +213,46 @@ class SchedulerManager:
e=e,
)
def _validate_and_prepare_kwargs(
self, plugin_name: str, job_kwargs: dict | None
) -> tuple[bool, str | dict]:
"""验证并准备任务参数,应用默认值"""
task_meta = self._registered_tasks.get(plugin_name)
if not task_meta:
return False, f"插件 '{plugin_name}' 未注册。"
params_model = task_meta.get("model")
job_kwargs = job_kwargs if job_kwargs is not None else {}
if not params_model:
if job_kwargs:
logger.warning(
f"插件 '{plugin_name}' 未定义参数模型,但收到了参数: {job_kwargs}"
)
return True, job_kwargs
if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)):
logger.error(f"插件 '{plugin_name}' 的参数模型不是有效的 BaseModel 类")
return False, f"插件 '{plugin_name}' 的参数模型配置错误"
try:
model_validate = getattr(params_model, "model_validate", None)
if not model_validate:
return False, f"插件 '{plugin_name}' 的参数模型不支持验证"
validated_model = model_validate(job_kwargs)
model_dump = getattr(validated_model, "model_dump", None)
if not model_dump:
return False, f"插件 '{plugin_name}' 的参数模型不支持导出"
return True, model_dump()
except ValidationError as e:
errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()]
error_str = "\n".join(errors)
msg = f"插件 '{plugin_name}' 的任务参数验证失败:\n{error_str}"
return False, msg
def _add_aps_job(self, schedule: ScheduleInfo):
"""根据 ScheduleInfo 对象添加或更新一个 APScheduler 任务。"""
job_id = self._get_job_id(schedule.id)
@ -234,23 +305,46 @@ class SchedulerManager:
if plugin_name not in self._registered_tasks:
return False, f"插件 '{plugin_name}' 没有注册可用的定时任务。"
is_valid, result = self._validate_and_prepare_kwargs(plugin_name, job_kwargs)
if not is_valid:
return False, str(result)
validated_job_kwargs = result
effective_bot_id = bot_id if group_id == "__ALL_GROUPS__" else None
search_kwargs = {
"plugin_name": plugin_name,
"group_id": group_id,
"bot_id": bot_id,
}
if effective_bot_id:
search_kwargs["bot_id"] = effective_bot_id
else:
search_kwargs["bot_id__isnull"] = True
defaults = {
"trigger_type": trigger_type,
"trigger_config": trigger_config,
"job_kwargs": job_kwargs if job_kwargs is not None else {},
"job_kwargs": validated_job_kwargs,
"is_enabled": True,
}
schedule, created = await ScheduleInfo.update_or_create(
**search_kwargs,
defaults=defaults,
)
schedule = await ScheduleInfo.filter(**search_kwargs).first()
created = False
if schedule:
for key, value in defaults.items():
setattr(schedule, key, value)
await schedule.save()
else:
creation_kwargs = {
"plugin_name": plugin_name,
"group_id": group_id,
"bot_id": effective_bot_id,
**defaults,
}
schedule = await ScheduleInfo.create(**creation_kwargs)
created = True
self._add_aps_job(schedule)
action = "设置" if created else "更新"
return True, f"已成功{action}插件 '{plugin_name}' 的定时任务。"
@ -296,6 +390,7 @@ class SchedulerManager:
async def update_schedule(
self,
schedule_id: int,
trigger_type: str | None = None,
trigger_config: dict | None = None,
job_kwargs: dict | None = None,
) -> tuple[bool, str]:
@ -306,15 +401,27 @@ class SchedulerManager:
updated_fields = []
if trigger_config is not None:
if not isinstance(schedule.trigger_config, dict):
return False, f"任务 {schedule_id} 的 trigger_config 数据格式错误。"
schedule.trigger_config.update(trigger_config)
schedule.trigger_config = trigger_config
updated_fields.append("trigger_config")
if trigger_type is not None and schedule.trigger_type != trigger_type:
schedule.trigger_type = trigger_type
updated_fields.append("trigger_type")
if job_kwargs is not None:
if not isinstance(schedule.job_kwargs, dict):
return False, f"任务 {schedule_id} 的 job_kwargs 数据格式错误。"
schedule.job_kwargs.update(job_kwargs)
merged_kwargs = schedule.job_kwargs.copy()
merged_kwargs.update(job_kwargs)
is_valid, result = self._validate_and_prepare_kwargs(
schedule.plugin_name, merged_kwargs
)
if not is_valid:
return False, str(result)
schedule.job_kwargs = result # type: ignore
updated_fields.append("job_kwargs")
if not updated_fields:
@ -683,6 +790,14 @@ scheduler_manager = SchedulerManager()
@PriorityLifecycle.on_startup(priority=90)
async def _load_schedules_from_db():
"""在服务启动时从数据库加载并调度所有任务。"""
Config.add_plugin_config(
"SchedulerManager",
SCHEDULE_CONCURRENCY_KEY,
5,
help="“所有群组”类型定时任务的并发执行数量限制",
type=int,
)
logger.info("正在从数据库加载并调度所有定时任务...")
schedules = await ScheduleInfo.filter(is_enabled=True).all()
count = 0