diff --git a/zhenxun/builtin_plugins/scheduler_admin/__init__.py b/zhenxun/builtin_plugins/scheduler_admin/__init__.py index c4ffb468..adaaa621 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/__init__.py +++ b/zhenxun/builtin_plugins/scheduler_admin/__init__.py @@ -9,37 +9,42 @@ __plugin_meta__ = PluginMetadata( name="定时任务管理", description="查看和管理由 SchedulerManager 控制的定时任务。", usage=""" - 定时任务 查看 [-all] [-g <群号>] [-p <插件>] [--page <页码>] : 查看定时任务 - 定时任务 设置 <插件> [时间选项] [-g <群号> | -g all] [--kwargs <参数>] : - 设置/开启任务 (SUPERUSER) +📋 定时任务管理 - 支持群聊和私聊操作 - 时间选项 (三选一): - --cron "<分> <时> <日> <月> <周>" : 设置 cron 表达式 - (例如: --cron "0 8 * * *") - --interval <时间间隔> : 设置时间间隔 - (例如: --interval 30m, --interval 2h, --interval 10s) - --date "" : 设置在特定时间执行一次 +🔍 查看任务: + 定时任务 查看 [-all] [-g <群号>] [-p <插件>] [--page <页码>] + • 群聊中: 查看本群任务 + • 私聊中: 必须使用 -g <群号> 或 -all 选项 (SUPERUSER) - 定时任务 删除 <任务ID> | -p <插件> [-g <群号>] | -all : 删除任务 (SUPERUSER) - 定时任务 暂停/恢复 <任务ID> | -p <插件> [-g <群号>] | -all : - 暂停/恢复任务 (SUPERUSER) - 定时任务 执行 <任务ID> : 立即手动执行一次任务 (SUPERUSER) - 定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>] : 更新任务配置 (SUPERUSER) - 定时任务 插件列表 : 查看所有可设置定时任务的插件 (SUPERUSER) +📊 任务状态: + 定时任务 状态 <任务ID> 或 任务状态 <任务ID> + • 查看单个任务的详细信息和状态 - 别名支持: - - 查看: ls, list - - 设置: add, 开启 - - 删除: del, rm, remove, 关闭, 取消 - - 暂停: pause - - 恢复: resume - - 执行: trigger, run - - 更新: update, modify, 修改 - - 插件列表: plugins +⚙️ 任务管理 (SUPERUSER): + 定时任务 设置 <插件> [时间选项] [-g <群号> | -g all] [--kwargs <参数>] + 定时任务 删除 <任务ID> | -p <插件> [-g <群号>] | -all + 定时任务 暂停 <任务ID> | -p <插件> [-g <群号>] | -all + 定时任务 恢复 <任务ID> | -p <插件> [-g <群号>] | -all + 定时任务 执行 <任务ID> + 定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>] + +📝 时间选项 (三选一): + --cron "<分> <时> <日> <月> <周>" # 例: --cron "0 8 * * *" + --interval <时间间隔> # 例: --interval 30m, 2h, 10s + --date "" # 例: --date "2024-01-01 08:00:00" + --daily "" # 例: --daily "08:30" + +📚 其他功能: + 定时任务 插件列表 # 查看所有可设置定时任务的插件 (SUPERUSER) + +🏷️ 别名支持: + 查看: ls, list | 设置: add, 开启 | 删除: del, rm, remove, 关闭, 取消 + 暂停: pause | 恢复: resume | 执行: trigger, run | 状态: status, info + 更新: update, modify, 修改 | 插件列表: plugins """.strip(), extra=PluginExtraData( author="HibiKier", - version="0.1.1", + version="0.1.2", plugin_type=PluginType.SUPERUSER, is_show=False, ).to_dict(), diff --git a/zhenxun/builtin_plugins/scheduler_admin/command.py b/zhenxun/builtin_plugins/scheduler_admin/command.py index c9982440..08a085fb 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/command.py +++ b/zhenxun/builtin_plugins/scheduler_admin/command.py @@ -2,24 +2,39 @@ import asyncio from datetime import datetime import re -from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent +from nonebot.adapters import Event +from nonebot.adapters.onebot.v11 import Bot from nonebot.params import Depends from nonebot.permission import SUPERUSER from nonebot_plugin_alconna import ( Alconna, AlconnaMatch, Args, + Arparma, Match, Option, Query, Subcommand, on_alconna, ) +from pydantic import BaseModel, ValidationError from zhenxun.utils._image_template import ImageTemplate from zhenxun.utils.manager.schedule_manager import scheduler_manager + + +def _get_type_name(annotation) -> str: + """获取类型注解的名称""" + if hasattr(annotation, "__name__"): + return annotation.__name__ + elif hasattr(annotation, "_name"): + return annotation._name + else: + return str(annotation) + + from zhenxun.utils.message import MessageUtils -from zhenxun.utils.rules import admin_check, ensure_group +from zhenxun.utils.rules import admin_check def _format_trigger(schedule_status: dict) -> str: @@ -28,16 +43,26 @@ def _format_trigger(schedule_status: dict) -> str: config = schedule_status["trigger_config"] if trigger_type == "cron": - hour = config.get("hour") - minute = config.get("minute") - hour_str = f"{hour:02d}" if hour is not None else "*" - minute_str = f"{minute:02d}" if minute is not None else "*" - trigger_str = f"每天 {hour_str}:{minute_str}" + minute = config.get("minute", "*") + hour = config.get("hour", "*") + day = config.get("day", "*") + month = config.get("month", "*") + day_of_week = config.get("day_of_week", "*") + + if day == "*" and month == "*" and day_of_week == "*": + formatted_hour = hour if hour == "*" else f"{int(hour):02d}" + formatted_minute = minute if minute == "*" else f"{int(minute):02d}" + return f"每天 {formatted_hour}:{formatted_minute}" + else: + return f"Cron: {minute} {hour} {day} {month} {day_of_week}" elif trigger_type == "interval": seconds = config.get("seconds", 0) minutes = config.get("minutes", 0) hours = config.get("hours", 0) - if hours: + days = config.get("days", 0) + if days: + trigger_str = f"每 {days} 天" + elif hours: trigger_str = f"每 {hours} 小时" elif minutes: trigger_str = f"每 {minutes} 分钟" @@ -61,9 +86,10 @@ def _format_params(schedule_status: dict) -> str: def _parse_interval(interval_str: str) -> dict: - match = re.match(r"(\d+)([smh])", interval_str.lower()) + """增强版解析器,支持 d(天)""" + match = re.match(r"(\d+)([smhd])", interval_str.lower()) if not match: - raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '10s' 的格式。") + raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '1d', '10s' 的格式。") value, unit = int(match.group(1)), match.group(2) if unit == "s": @@ -72,9 +98,37 @@ def _parse_interval(interval_str: str) -> dict: return {"minutes": value} if unit == "h": return {"hours": value} + if unit == "d": + return {"days": value} return {} +def _parse_daily_time(time_str: str) -> dict: + """解析 HH:MM 或 HH:MM:SS 格式的时间为 cron 配置""" + if match := re.match(r"^(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?$", time_str): + hour, minute, second = match.groups() + hour, minute = int(hour), int(minute) + + if not (0 <= hour <= 23 and 0 <= minute <= 59): + raise ValueError("小时或分钟数值超出范围。") + + cron_config = { + "minute": str(minute), + "hour": str(hour), + "day": "*", + "month": "*", + "day_of_week": "*", + } + if second is not None: + if not (0 <= int(second) <= 59): + raise ValueError("秒数值超出范围。") + cron_config["second"] = str(second) + + return cron_config + else: + raise ValueError("时间格式错误,请使用 'HH:MM' 或 'HH:MM:SS' 格式。") + + async def GetBotId( bot: Bot, bot_id_match: Match[str] = AlconnaMatch("bot_id"), @@ -119,91 +173,45 @@ class TargetAll(ScheduleTarget): TargetScope = TargetByID | TargetByPlugin | TargetAll | None -async def ParseScheduleTargetForDelete( - event: GroupMessageEvent, - schedule_id: Match[int] = AlconnaMatch("schedule_id"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - group_id: Match[str] = AlconnaMatch("group_id"), - all_enabled: Query[bool] = Query("删除.all"), -) -> TargetScope: - """解析删除命令的操作目标""" - if schedule_id.available: - return TargetByID(schedule_id.result) +def create_target_parser(subcommand_name: str): + """ + 创建一个依赖注入函数,用于解析删除、暂停、恢复等命令的操作目标。 + """ + + async def dependency( + event: Event, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query(f"{subcommand_name}.all"), + ) -> TargetScope: + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + return TargetByPlugin(plugin=p_name, all_groups=True) + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + current_group_id = getattr(event, "group_id", None) + if current_group_id: + return TargetByPlugin(plugin=p_name, group_id=str(current_group_id)) + else: + await schedule_cmd.finish( + "私聊中操作插件任务必须使用 -g <群号> 或 -all 选项。" + ) - if plugin_name.available: - p_name = plugin_name.result if all_enabled.available: - return TargetByPlugin(plugin=p_name, all_groups=True) - elif group_id.available: - gid = group_id.result - if gid.lower() == "all": - gid = "__ALL_GROUPS__" - return TargetByPlugin(plugin=p_name, group_id=gid) - else: - return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) + return TargetAll(for_group=group_id.result if group_id.available else None) - if all_enabled.available: - return TargetAll(for_group=group_id.result if group_id.available else None) + return None - return None - - -async def ParseScheduleTargetForPause( - event: GroupMessageEvent, - schedule_id: Match[int] = AlconnaMatch("schedule_id"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - group_id: Match[str] = AlconnaMatch("group_id"), - all_enabled: Query[bool] = Query("暂停.all"), -) -> TargetScope: - """解析暂停命令的操作目标""" - if schedule_id.available: - return TargetByID(schedule_id.result) - - if plugin_name.available: - p_name = plugin_name.result - if all_enabled.available: - return TargetByPlugin(plugin=p_name, all_groups=True) - elif group_id.available: - gid = group_id.result - if gid.lower() == "all": - gid = "__ALL_GROUPS__" - return TargetByPlugin(plugin=p_name, group_id=gid) - else: - return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) - - if all_enabled.available: - return TargetAll(for_group=group_id.result if group_id.available else None) - - return None - - -async def ParseScheduleTargetForResume( - event: GroupMessageEvent, - schedule_id: Match[int] = AlconnaMatch("schedule_id"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - group_id: Match[str] = AlconnaMatch("group_id"), - all_enabled: Query[bool] = Query("恢复.all"), -) -> TargetScope: - """解析恢复命令的操作目标""" - if schedule_id.available: - return TargetByID(schedule_id.result) - - if plugin_name.available: - p_name = plugin_name.result - if all_enabled.available: - return TargetByPlugin(plugin=p_name, all_groups=True) - elif group_id.available: - gid = group_id.result - if gid.lower() == "all": - gid = "__ALL_GROUPS__" - return TargetByPlugin(plugin=p_name, group_id=gid) - else: - return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) - - if all_enabled.available: - return TargetAll(for_group=group_id.result if group_id.available else None) - - return None + return dependency schedule_cmd = on_alconna( @@ -224,6 +232,11 @@ schedule_cmd = on_alconna( Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), + Option( + "--daily", + Args["daily_expr", str], + help_text="设置每天执行的时间 (如 08:20)", + ), Option("-g", Args["group_id", str], help_text="指定群组ID或'all'"), Option("-all", help_text="对所有群生效 (等同于 -g all)"), Option("--kwargs", Args["kwargs_str", str], help_text="设置任务参数"), @@ -281,10 +294,21 @@ schedule_cmd = on_alconna( Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), + Option( + "--daily", + Args["daily_expr", str], + help_text="更新每天执行的时间 (如 08:20)", + ), Option("--kwargs", Args["kwargs_str", str], help_text="更新参数"), alias=["update", "modify", "修改"], help_text="更新任务配置", ), + Subcommand( + "状态", + Args["schedule_id", int], + alias=["status", "info"], + help_text="查看单个任务的详细状态", + ), Subcommand( "插件列表", alias=["plugins"], @@ -293,14 +317,31 @@ schedule_cmd = on_alconna( ), priority=5, block=True, - rule=admin_check(1) & ensure_group, + rule=admin_check(1), ) +schedule_cmd.shortcut( + "任务状态", + command="定时任务", + arguments=["状态", "{%0}"], + prefix=True, +) + + +@schedule_cmd.handle() +async def _handle_time_options_mutex(arp: Arparma): + time_options = ["cron", "interval", "date", "daily"] + provided_options = [opt for opt in time_options if arp.query(opt) is not None] + if len(provided_options) > 1: + await schedule_cmd.finish( + f"时间选项 --{', --'.join(provided_options)} 不能同时使用,请只选择一个。" + ) + @schedule_cmd.assign("查看") async def _( bot: Bot, - event: GroupMessageEvent, + event: Event, target_group_id: Match[str] = AlconnaMatch("target_group_id"), all_groups: Query[bool] = Query("查看.all"), plugin_name: Match[str] = AlconnaMatch("plugin_name"), @@ -310,6 +351,10 @@ async def _( schedules = [] title = "" + current_group_id = getattr(event, "group_id", None) + if not (all_groups.available or target_group_id.available) and not current_group_id: + await schedule_cmd.finish("私聊中查看任务必须使用 -g <群号> 或 -all 选项。") + if all_groups.available: if not is_superuser: await schedule_cmd.finish("需要超级用户权限才能查看所有群组的定时任务。") @@ -324,7 +369,7 @@ async def _( ] title = f"群 {gid} 的定时任务" else: - gid = str(event.group_id) + gid = str(current_group_id) schedules = [ s for s in await scheduler_manager.get_all_schedules() if s.group_id == gid ] @@ -391,12 +436,14 @@ async def _( @schedule_cmd.assign("设置") async def _( + event: Event, plugin_name: str, - cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"), - interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"), - date_expr: Match[str] = AlconnaMatch("date.date_expr"), - group_id: Match[str] = AlconnaMatch("group_id"), - kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), + cron_expr: str | None = None, + interval_expr: str | None = None, + date_expr: str | None = None, + daily_expr: str | None = None, + group_id: str | None = None, + kwargs_str: str | None = None, all_enabled: Query[bool] = Query("设置.all"), bot_id_to_operate: str = Depends(GetBotId), ): @@ -410,56 +457,91 @@ async def _( trigger_config = {} try: - if cron_expr.available: + if cron_expr: trigger_type = "cron" - parts = cron_expr.result.split() + parts = cron_expr.split() if len(parts) != 5: raise ValueError("Cron 表达式必须有5个部分 (分 时 日 月 周)") cron_keys = ["minute", "hour", "day", "month", "day_of_week"] trigger_config = dict(zip(cron_keys, parts)) - elif interval_expr.available: + elif interval_expr: trigger_type = "interval" - trigger_config = _parse_interval(interval_expr.result) - elif date_expr.available: + trigger_config = _parse_interval(interval_expr) + elif date_expr: trigger_type = "date" - trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)} + trigger_config = {"run_date": datetime.fromisoformat(date_expr)} + elif daily_expr: + trigger_type = "cron" + trigger_config = _parse_daily_time(daily_expr) else: await schedule_cmd.finish( - "必须提供一种时间选项: --cron, --interval, 或 --date。" + "必须提供一种时间选项: --cron, --interval, --date, 或 --daily。" ) except ValueError as e: await schedule_cmd.finish(f"时间参数解析错误: {e}") - job_kwargs = None - if kwargs_str.available: + job_kwargs = {} + if kwargs_str: task_meta = scheduler_manager._registered_tasks[plugin_name] - if not task_meta.get("params"): + params_model = task_meta.get("model") + if not params_model: await schedule_cmd.finish(f"插件 '{plugin_name}' 不支持设置额外参数。") - registered_params = task_meta["params"] - job_kwargs = {} + if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): + await schedule_cmd.finish(f"插件 '{plugin_name}' 的参数模型配置错误。") + + raw_kwargs = {} try: - for item in kwargs_str.result.split(","): + for item in kwargs_str.split(","): key, value = item.strip().split("=", 1) - key = key.strip() - if key not in registered_params: - await schedule_cmd.finish(f"错误:插件不支持参数 '{key}'。") - param_type = registered_params[key].get("type", str) - job_kwargs[key] = param_type(value) + raw_kwargs[key.strip()] = value except Exception as e: await schedule_cmd.finish( f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" ) - target_group_id: str | None = None - if group_id.available and group_id.result.lower() == "all": + try: + model_validate = getattr(params_model, "model_validate", None) + if not model_validate: + await schedule_cmd.finish( + f"插件 '{plugin_name}' 的参数模型不支持验证。" + ) + return + + validated_model = model_validate(raw_kwargs) + + model_dump = getattr(validated_model, "model_dump", None) + if not model_dump: + await schedule_cmd.finish( + f"插件 '{plugin_name}' 的参数模型不支持导出。" + ) + return + + job_kwargs = model_dump() + except ValidationError as e: + errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] + error_str = "\n".join(errors) + await schedule_cmd.finish( + f"插件 '{plugin_name}' 的任务参数验证失败:\n{error_str}" + ) + return + + target_group_id: str | None + current_group_id = getattr(event, "group_id", None) + + if group_id and group_id.lower() == "all": target_group_id = "__ALL_GROUPS__" elif all_enabled.available: target_group_id = "__ALL_GROUPS__" - elif group_id.available: - target_group_id = group_id.result + elif group_id: + target_group_id = group_id + elif current_group_id: + target_group_id = str(current_group_id) else: - target_group_id = None + await schedule_cmd.finish( + "私聊中设置定时任务时,必须使用 -g <群号> 或 --all 选项指定目标。" + ) + return success, msg = await scheduler_manager.add_schedule( plugin_name, @@ -471,7 +553,7 @@ async def _( ) if target_group_id == "__ALL_GROUPS__": - target_desc = "所有群组" + target_desc = f"所有群组 (Bot: {bot_id_to_operate})" elif target_group_id is None: target_desc = "全局" else: @@ -485,7 +567,7 @@ async def _( @schedule_cmd.assign("删除") async def _( - target: TargetScope = Depends(ParseScheduleTargetForDelete), + target: TargetScope = Depends(create_target_parser("删除")), bot_id_to_operate: str = Depends(GetBotId), ): if isinstance(target, TargetByID): @@ -531,7 +613,7 @@ async def _( @schedule_cmd.assign("暂停") async def _( - target: TargetScope = Depends(ParseScheduleTargetForPause), + target: TargetScope = Depends(create_target_parser("暂停")), bot_id_to_operate: str = Depends(GetBotId), ): if isinstance(target, TargetByID): @@ -568,7 +650,7 @@ async def _( @schedule_cmd.assign("恢复") async def _( - target: TargetScope = Depends(ParseScheduleTargetForResume), + target: TargetScope = Depends(create_target_parser("恢复")), bot_id_to_operate: str = Depends(GetBotId), ): if isinstance(target, TargetByID): @@ -612,89 +694,92 @@ async def _(schedule_id: int): @schedule_cmd.assign("更新") async def _( schedule_id: int, - cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"), - interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"), - date_expr: Match[str] = AlconnaMatch("date.date_expr"), - kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), + cron_expr: str | None = None, + interval_expr: str | None = None, + date_expr: str | None = None, + daily_expr: str | None = None, + kwargs_str: str | None = None, ): - if not any( - [ - cron_expr.available, - interval_expr.available, - date_expr.available, - kwargs_str.available, - ] - ): + if not any([cron_expr, interval_expr, date_expr, daily_expr, kwargs_str]): await schedule_cmd.finish( - "请提供需要更新的时间 (--cron/--interval/--date) 或参数 (--kwargs)。" + "请提供需要更新的时间 (--cron/--interval/--date/--daily) 或参数 (--kwargs)" ) trigger_config = None + trigger_type = None try: - if cron_expr.available: - parts = cron_expr.result.split() + if cron_expr: + trigger_type = "cron" + parts = cron_expr.split() if len(parts) != 5: raise ValueError("Cron 表达式必须有5个部分") cron_keys = ["minute", "hour", "day", "month", "day_of_week"] trigger_config = dict(zip(cron_keys, parts)) - elif interval_expr.available: - trigger_config = _parse_interval(interval_expr.result) - elif date_expr.available: - trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)} + elif interval_expr: + trigger_type = "interval" + trigger_config = _parse_interval(interval_expr) + elif date_expr: + trigger_type = "date" + trigger_config = {"run_date": datetime.fromisoformat(date_expr)} + elif daily_expr: + trigger_type = "cron" + trigger_config = _parse_daily_time(daily_expr) except ValueError as e: await schedule_cmd.finish(f"时间参数解析错误: {e}") job_kwargs = None - if kwargs_str.available: + if kwargs_str: schedule = await scheduler_manager.get_schedule_by_id(schedule_id) if not schedule: await schedule_cmd.finish(f"未找到 ID 为 {schedule_id} 的任务。") - if schedule.plugin_name not in scheduler_manager._registered_tasks: - await schedule_cmd.finish(f"插件 '{schedule.plugin_name}' 未注册定时任务。") - - task_meta = scheduler_manager._registered_tasks[schedule.plugin_name] - if "params" not in task_meta or not task_meta["params"]: + task_meta = scheduler_manager._registered_tasks.get(schedule.plugin_name) + if not task_meta or not (params_model := task_meta.get("model")): await schedule_cmd.finish( - f"插件 '{schedule.plugin_name}' 未定义参数元数据。" - f"请联系插件开发者更新插件注册代码。" + f"插件 '{schedule.plugin_name}' 未定义参数模型,无法更新参数。" ) - registered_params = task_meta["params"] - job_kwargs = {} + if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): + await schedule_cmd.finish( + f"插件 '{schedule.plugin_name}' 的参数模型配置错误。" + ) + + raw_kwargs = {} try: - for item in kwargs_str.result.split(","): + for item in kwargs_str.split(","): key, value = item.strip().split("=", 1) - key = key.strip() - - if key not in registered_params: - await schedule_cmd.finish( - f"错误:插件不支持参数 '{key}'。" - f"可用参数: {list(registered_params.keys())}" - ) - - param_meta = registered_params[key] - if "type" not in param_meta: - await schedule_cmd.finish( - f"插件 '{schedule.plugin_name}' 的参数 '{key}' 未定义类型。" - f"请联系插件开发者更新参数元数据。" - ) - param_type = param_meta["type"] - try: - job_kwargs[key] = param_type(value) - except (ValueError, TypeError): - await schedule_cmd.finish( - f"参数 '{key}' 的值 '{value}' 格式不正确," - f"应为 {param_type.__name__} 类型。" - ) - + raw_kwargs[key.strip()] = value except Exception as e: await schedule_cmd.finish( f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" ) + try: + model_validate = getattr(params_model, "model_validate", None) + if not model_validate: + await schedule_cmd.finish( + f"插件 '{schedule.plugin_name}' 的参数模型不支持验证。" + ) + return + + validated_model = model_validate(raw_kwargs) + + model_dump = getattr(validated_model, "model_dump", None) + if not model_dump: + await schedule_cmd.finish( + f"插件 '{schedule.plugin_name}' 的参数模型不支持导出。" + ) + return + + job_kwargs = model_dump(exclude_unset=True) + except ValidationError as e: + errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] + error_str = "\n".join(errors) + await schedule_cmd.finish(f"更新的参数验证失败:\n{error_str}") + return + _, message = await scheduler_manager.update_schedule( - schedule_id, trigger_config, job_kwargs + schedule_id, trigger_type, trigger_config, job_kwargs ) await schedule_cmd.finish(message) @@ -708,17 +793,44 @@ async def _(): message_parts = ["📋 已注册的定时任务插件:"] for i, plugin_name in enumerate(registered_plugins, 1): task_meta = scheduler_manager._registered_tasks[plugin_name] - if "params" not in task_meta: - message_parts.append(f"{i}. {plugin_name} - ⚠️ 未定义参数元数据") + params_model = task_meta.get("model") + + if not params_model: + message_parts.append(f"{i}. {plugin_name} - 无参数") continue - params = task_meta["params"] - if params: + if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): + message_parts.append(f"{i}. {plugin_name} - ⚠️ 参数模型配置错误") + continue + + model_fields = getattr(params_model, "model_fields", None) + if model_fields: param_info = ", ".join( - f"{k}({v['type'].__name__})" for k, v in params.items() + f"{field_name}({_get_type_name(field_info.annotation)})" + for field_name, field_info in model_fields.items() ) message_parts.append(f"{i}. {plugin_name} - 参数: {param_info}") else: message_parts.append(f"{i}. {plugin_name} - 无参数") await schedule_cmd.finish("\n".join(message_parts)) + + +@schedule_cmd.assign("状态") +async def _(schedule_id: int): + status = await scheduler_manager.get_schedule_status(schedule_id) + if not status: + await schedule_cmd.finish(f"未找到ID为 {schedule_id} 的定时任务。") + + info_lines = [ + f"📋 定时任务详细信息 (ID: {schedule_id})", + "--------------------", + f"▫️ 插件: {status['plugin_name']}", + f"▫️ Bot ID: {status.get('bot_id') or '默认'}", + f"▫️ 目标: {status['group_id'] or '全局'}", + f"▫️ 状态: {'✔️ 已启用' if status['is_enabled'] else '⏸️ 已暂停'}", + f"▫️ 下次运行: {status['next_run_time']}", + f"▫️ 触发规则: {_format_trigger(status)}", + f"▫️ 任务参数: {_format_params(status)}", + ] + await schedule_cmd.finish("\n".join(info_lines)) diff --git a/zhenxun/utils/manager/schedule_manager.py b/zhenxun/utils/manager/schedule_manager.py index 0066d3a7..a3b21272 100644 --- a/zhenxun/utils/manager/schedule_manager.py +++ b/zhenxun/utils/manager/schedule_manager.py @@ -1,5 +1,6 @@ import asyncio from collections.abc import Callable, Coroutine +import copy import inspect import random from typing import ClassVar @@ -7,25 +8,31 @@ from typing import ClassVar import nonebot from nonebot import get_bots from nonebot_plugin_apscheduler import scheduler +from pydantic import BaseModel, ValidationError +from zhenxun.configs.config import Config from zhenxun.models.schedule_info import ScheduleInfo from zhenxun.services.log import logger from zhenxun.utils.common_utils import CommonUtils from zhenxun.utils.manager.priority_manager import PriorityLifecycle from zhenxun.utils.platform import PlatformUtils +SCHEDULE_CONCURRENCY_KEY = "all_groups_concurrency_limit" + class SchedulerManager: """ 一个通用的、持久化的定时任务管理器,供所有插件使用。 """ - _registered_tasks: ClassVar[dict[str, dict]] = {} + _registered_tasks: ClassVar[ + dict[str, dict[str, Callable | type[BaseModel] | None]] + ] = {} _JOB_PREFIX = "zhenxun_schedule_" _running_tasks: ClassVar[set] = set() def register( - self, plugin_name: str, params: dict[str, dict] | None = None + self, plugin_name: str, params_model: type[BaseModel] | None = None ) -> Callable: """ 注册一个可调度的任务函数。 @@ -33,8 +40,8 @@ class SchedulerManager: Args: plugin_name (str): 插件的唯一名称 (通常是模块名)。 - params (dict, optional): 任务函数接受的额外参数元数据,用于通用命令。 - 格式: {"param_name": {"type": str, "help": "描述", "default": ...}} + params_model (type[BaseModel], optional): 一个 Pydantic BaseModel 类, + 用于定义和验证任务函数接受的额外参数。 """ def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]: @@ -42,9 +49,12 @@ class SchedulerManager: logger.warning(f"插件 '{plugin_name}' 的定时任务已被重复注册。") self._registered_tasks[plugin_name] = { "func": func, - "params": params, + "model": params_model, } - logger.debug(f"插件 '{plugin_name}' 的定时任务已注册,参数元数据: {params}") + model_name = params_model.__name__ if params_model else "无" + logger.debug( + f"插件 '{plugin_name}' 的定时任务已注册,参数模型: {model_name}" + ) return func return decorator @@ -107,9 +117,20 @@ class SchedulerManager: ): """为所有群组执行任务,并处理优先级覆盖。""" plugin_name = schedule.plugin_name + + concurrency_limit = Config.get_config( + "SchedulerManager", SCHEDULE_CONCURRENCY_KEY, 5 + ) + if not isinstance(concurrency_limit, int) or concurrency_limit <= 0: + logger.warning( + f"无效的定时任务并发限制配置 '{concurrency_limit}',将使用默认值 5。" + ) + concurrency_limit = 5 + logger.info( f"开始执行针对 [所有群组] 的任务 " - f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})" + f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})," + f"并发限制: {concurrency_limit}" ) all_gids = set() @@ -128,15 +149,25 @@ class SchedulerManager: ).values_list("group_id", flat=True) ) + semaphore = asyncio.Semaphore(concurrency_limit) + + async def worker(gid: str): + """使用 Semaphore 包装单个群组的任务执行""" + async with semaphore: + temp_schedule = copy.deepcopy(schedule) + temp_schedule.group_id = gid + await self._execute_for_single_target(temp_schedule, task_meta, bot) + await asyncio.sleep(random.uniform(0.1, 0.5)) + + tasks_to_run = [] for gid in all_gids: if gid in specific_tasks_gids: logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。") continue + tasks_to_run.append(worker(gid)) - temp_schedule = schedule - temp_schedule.group_id = gid - await self._execute_for_single_target(temp_schedule, task_meta, bot) - await asyncio.sleep(random.uniform(0.1, 0.5)) + if tasks_to_run: + await asyncio.gather(*tasks_to_run) async def _execute_for_single_target( self, schedule: ScheduleInfo, task_meta: dict, bot @@ -182,6 +213,46 @@ class SchedulerManager: e=e, ) + def _validate_and_prepare_kwargs( + self, plugin_name: str, job_kwargs: dict | None + ) -> tuple[bool, str | dict]: + """验证并准备任务参数,应用默认值""" + task_meta = self._registered_tasks.get(plugin_name) + if not task_meta: + return False, f"插件 '{plugin_name}' 未注册。" + + params_model = task_meta.get("model") + job_kwargs = job_kwargs if job_kwargs is not None else {} + + if not params_model: + if job_kwargs: + logger.warning( + f"插件 '{plugin_name}' 未定义参数模型,但收到了参数: {job_kwargs}" + ) + return True, job_kwargs + + if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): + logger.error(f"插件 '{plugin_name}' 的参数模型不是有效的 BaseModel 类") + return False, f"插件 '{plugin_name}' 的参数模型配置错误" + + try: + model_validate = getattr(params_model, "model_validate", None) + if not model_validate: + return False, f"插件 '{plugin_name}' 的参数模型不支持验证" + + validated_model = model_validate(job_kwargs) + + model_dump = getattr(validated_model, "model_dump", None) + if not model_dump: + return False, f"插件 '{plugin_name}' 的参数模型不支持导出" + + return True, model_dump() + except ValidationError as e: + errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] + error_str = "\n".join(errors) + msg = f"插件 '{plugin_name}' 的任务参数验证失败:\n{error_str}" + return False, msg + def _add_aps_job(self, schedule: ScheduleInfo): """根据 ScheduleInfo 对象添加或更新一个 APScheduler 任务。""" job_id = self._get_job_id(schedule.id) @@ -234,23 +305,46 @@ class SchedulerManager: if plugin_name not in self._registered_tasks: return False, f"插件 '{plugin_name}' 没有注册可用的定时任务。" + is_valid, result = self._validate_and_prepare_kwargs(plugin_name, job_kwargs) + if not is_valid: + return False, str(result) + + validated_job_kwargs = result + + effective_bot_id = bot_id if group_id == "__ALL_GROUPS__" else None + search_kwargs = { "plugin_name": plugin_name, "group_id": group_id, - "bot_id": bot_id, } + if effective_bot_id: + search_kwargs["bot_id"] = effective_bot_id + else: + search_kwargs["bot_id__isnull"] = True defaults = { "trigger_type": trigger_type, "trigger_config": trigger_config, - "job_kwargs": job_kwargs if job_kwargs is not None else {}, + "job_kwargs": validated_job_kwargs, "is_enabled": True, } - schedule, created = await ScheduleInfo.update_or_create( - **search_kwargs, - defaults=defaults, - ) + schedule = await ScheduleInfo.filter(**search_kwargs).first() + created = False + + if schedule: + for key, value in defaults.items(): + setattr(schedule, key, value) + await schedule.save() + else: + creation_kwargs = { + "plugin_name": plugin_name, + "group_id": group_id, + "bot_id": effective_bot_id, + **defaults, + } + schedule = await ScheduleInfo.create(**creation_kwargs) + created = True self._add_aps_job(schedule) action = "设置" if created else "更新" return True, f"已成功{action}插件 '{plugin_name}' 的定时任务。" @@ -296,6 +390,7 @@ class SchedulerManager: async def update_schedule( self, schedule_id: int, + trigger_type: str | None = None, trigger_config: dict | None = None, job_kwargs: dict | None = None, ) -> tuple[bool, str]: @@ -306,15 +401,27 @@ class SchedulerManager: updated_fields = [] if trigger_config is not None: - if not isinstance(schedule.trigger_config, dict): - return False, f"任务 {schedule_id} 的 trigger_config 数据格式错误。" - schedule.trigger_config.update(trigger_config) + schedule.trigger_config = trigger_config updated_fields.append("trigger_config") + if trigger_type is not None and schedule.trigger_type != trigger_type: + schedule.trigger_type = trigger_type + updated_fields.append("trigger_type") + if job_kwargs is not None: if not isinstance(schedule.job_kwargs, dict): return False, f"任务 {schedule_id} 的 job_kwargs 数据格式错误。" - schedule.job_kwargs.update(job_kwargs) + + merged_kwargs = schedule.job_kwargs.copy() + merged_kwargs.update(job_kwargs) + + is_valid, result = self._validate_and_prepare_kwargs( + schedule.plugin_name, merged_kwargs + ) + if not is_valid: + return False, str(result) + + schedule.job_kwargs = result # type: ignore updated_fields.append("job_kwargs") if not updated_fields: @@ -683,6 +790,14 @@ scheduler_manager = SchedulerManager() @PriorityLifecycle.on_startup(priority=90) async def _load_schedules_from_db(): """在服务启动时从数据库加载并调度所有任务。""" + Config.add_plugin_config( + "SchedulerManager", + SCHEDULE_CONCURRENCY_KEY, + 5, + help="“所有群组”类型定时任务的并发执行数量限制", + type=int, + ) + logger.info("正在从数据库加载并调度所有定时任务...") schedules = await ScheduleInfo.filter(is_enabled=True).all() count = 0