From 8b9ae7255b843e8de972605bc3ce6b44ba99dad5 Mon Sep 17 00:00:00 2001 From: Rumio <32546670+webjoin111@users.noreply.github.com> Date: Thu, 26 Jun 2025 11:13:36 +0800 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8=20feat!(scheduler):=20=E5=BC=95?= =?UTF-8?q?=E5=85=A5=E9=80=9A=E7=94=A8=E6=8C=81=E4=B9=85=E5=8C=96=E5=AE=9A?= =?UTF-8?q?=E6=97=B6=E4=BB=BB=E5=8A=A1=E7=B3=BB=E7=BB=9F=20(#1933)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ✨ feat!(scheduler): 引入通用持久化定时任务系统 * ✨ feat(scheduler): 支持全局定时任务(__ALL_GROUPS__)和多Bot管理 --------- Co-authored-by: webjoin111 <455457521@qq.com> --- .../scheduler_admin/__init__.py | 46 ++ .../scheduler_admin/command.py | 724 ++++++++++++++++++ zhenxun/models/schedule_info.py | 38 + zhenxun/services/db_context.py | 3 +- zhenxun/utils/manager/schedule_manager.py | 695 +++++++++++++++++ 5 files changed, 1505 insertions(+), 1 deletion(-) create mode 100644 zhenxun/builtin_plugins/scheduler_admin/__init__.py create mode 100644 zhenxun/builtin_plugins/scheduler_admin/command.py create mode 100644 zhenxun/models/schedule_info.py create mode 100644 zhenxun/utils/manager/schedule_manager.py diff --git a/zhenxun/builtin_plugins/scheduler_admin/__init__.py b/zhenxun/builtin_plugins/scheduler_admin/__init__.py new file mode 100644 index 00000000..c4ffb468 --- /dev/null +++ b/zhenxun/builtin_plugins/scheduler_admin/__init__.py @@ -0,0 +1,46 @@ +from nonebot.plugin import PluginMetadata + +from zhenxun.configs.utils import PluginExtraData +from zhenxun.utils.enum import PluginType + +from . import command # noqa: F401 + +__plugin_meta__ = PluginMetadata( + name="定时任务管理", + description="查看和管理由 SchedulerManager 控制的定时任务。", + usage=""" + 定时任务 查看 [-all] [-g <群号>] [-p <插件>] [--page <页码>] : 查看定时任务 + 定时任务 设置 <插件> [时间选项] [-g <群号> | -g all] [--kwargs <参数>] : + 设置/开启任务 (SUPERUSER) + + 时间选项 (三选一): + --cron "<分> <时> <日> <月> <周>" : 设置 cron 表达式 + (例如: --cron "0 8 * * *") + --interval <时间间隔> : 设置时间间隔 + (例如: --interval 30m, --interval 2h, --interval 10s) + --date "" : 设置在特定时间执行一次 + + 定时任务 删除 <任务ID> | -p <插件> [-g <群号>] | -all : 删除任务 (SUPERUSER) + 定时任务 暂停/恢复 <任务ID> | -p <插件> [-g <群号>] | -all : + 暂停/恢复任务 (SUPERUSER) + 定时任务 执行 <任务ID> : 立即手动执行一次任务 (SUPERUSER) + 定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>] : 更新任务配置 (SUPERUSER) + 定时任务 插件列表 : 查看所有可设置定时任务的插件 (SUPERUSER) + + 别名支持: + - 查看: ls, list + - 设置: add, 开启 + - 删除: del, rm, remove, 关闭, 取消 + - 暂停: pause + - 恢复: resume + - 执行: trigger, run + - 更新: update, modify, 修改 + - 插件列表: plugins + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1.1", + plugin_type=PluginType.SUPERUSER, + is_show=False, + ).to_dict(), +) diff --git a/zhenxun/builtin_plugins/scheduler_admin/command.py b/zhenxun/builtin_plugins/scheduler_admin/command.py new file mode 100644 index 00000000..c9982440 --- /dev/null +++ b/zhenxun/builtin_plugins/scheduler_admin/command.py @@ -0,0 +1,724 @@ +import asyncio +from datetime import datetime +import re + +from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent +from nonebot.params import Depends +from nonebot.permission import SUPERUSER +from nonebot_plugin_alconna import ( + Alconna, + AlconnaMatch, + Args, + Match, + Option, + Query, + Subcommand, + on_alconna, +) + +from zhenxun.utils._image_template import ImageTemplate +from zhenxun.utils.manager.schedule_manager import scheduler_manager +from zhenxun.utils.message import MessageUtils +from zhenxun.utils.rules import admin_check, ensure_group + + +def _format_trigger(schedule_status: dict) -> str: + """将触发器配置格式化为人类可读的字符串""" + trigger_type = schedule_status["trigger_type"] + config = schedule_status["trigger_config"] + + if trigger_type == "cron": + hour = config.get("hour") + minute = config.get("minute") + hour_str = f"{hour:02d}" if hour is not None else "*" + minute_str = f"{minute:02d}" if minute is not None else "*" + trigger_str = f"每天 {hour_str}:{minute_str}" + elif trigger_type == "interval": + seconds = config.get("seconds", 0) + minutes = config.get("minutes", 0) + hours = config.get("hours", 0) + if hours: + trigger_str = f"每 {hours} 小时" + elif minutes: + trigger_str = f"每 {minutes} 分钟" + else: + trigger_str = f"每 {seconds} 秒" + elif trigger_type == "date": + run_date = config.get("run_date", "未知时间") + trigger_str = f"在 {run_date}" + else: + trigger_str = f"{trigger_type}: {config}" + + return trigger_str + + +def _format_params(schedule_status: dict) -> str: + """将任务参数格式化为人类可读的字符串""" + if kwargs := schedule_status.get("job_kwargs"): + kwargs_str = " | ".join(f"{k}: {v}" for k, v in kwargs.items()) + return kwargs_str + return "-" + + +def _parse_interval(interval_str: str) -> dict: + match = re.match(r"(\d+)([smh])", interval_str.lower()) + if not match: + raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '10s' 的格式。") + + value, unit = int(match.group(1)), match.group(2) + if unit == "s": + return {"seconds": value} + if unit == "m": + return {"minutes": value} + if unit == "h": + return {"hours": value} + return {} + + +async def GetBotId( + bot: Bot, + bot_id_match: Match[str] = AlconnaMatch("bot_id"), +) -> str: + """获取要操作的Bot ID""" + if bot_id_match.available: + return bot_id_match.result + return bot.self_id + + +class ScheduleTarget: + """定时任务操作目标的基类""" + + pass + + +class TargetByID(ScheduleTarget): + """按任务ID操作""" + + def __init__(self, id: int): + self.id = id + + +class TargetByPlugin(ScheduleTarget): + """按插件名操作""" + + def __init__( + self, plugin: str, group_id: str | None = None, all_groups: bool = False + ): + self.plugin = plugin + self.group_id = group_id + self.all_groups = all_groups + + +class TargetAll(ScheduleTarget): + """操作所有任务""" + + def __init__(self, for_group: str | None = None): + self.for_group = for_group + + +TargetScope = TargetByID | TargetByPlugin | TargetAll | None + + +async def ParseScheduleTargetForDelete( + event: GroupMessageEvent, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query("删除.all"), +) -> TargetScope: + """解析删除命令的操作目标""" + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + gid = "__ALL_GROUPS__" + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) + + if all_enabled.available: + return TargetAll(for_group=group_id.result if group_id.available else None) + + return None + + +async def ParseScheduleTargetForPause( + event: GroupMessageEvent, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query("暂停.all"), +) -> TargetScope: + """解析暂停命令的操作目标""" + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + gid = "__ALL_GROUPS__" + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) + + if all_enabled.available: + return TargetAll(for_group=group_id.result if group_id.available else None) + + return None + + +async def ParseScheduleTargetForResume( + event: GroupMessageEvent, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query("恢复.all"), +) -> TargetScope: + """解析恢复命令的操作目标""" + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + gid = "__ALL_GROUPS__" + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) + + if all_enabled.available: + return TargetAll(for_group=group_id.result if group_id.available else None) + + return None + + +schedule_cmd = on_alconna( + Alconna( + "定时任务", + Subcommand( + "查看", + Option("-g", Args["target_group_id", str]), + Option("-all", help_text="查看所有群聊 (SUPERUSER)"), + Option("-p", Args["plugin_name", str], help_text="按插件名筛选"), + Option("--page", Args["page", int, 1], help_text="指定页码"), + alias=["ls", "list"], + help_text="查看定时任务", + ), + Subcommand( + "设置", + Args["plugin_name", str], + Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), + Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), + Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), + Option("-g", Args["group_id", str], help_text="指定群组ID或'all'"), + Option("-all", help_text="对所有群生效 (等同于 -g all)"), + Option("--kwargs", Args["kwargs_str", str], help_text="设置任务参数"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["add", "开启"], + help_text="设置/开启一个定时任务", + ), + Subcommand( + "删除", + Args["schedule_id?", int], + Option("-p", Args["plugin_name", str], help_text="指定插件名"), + Option("-g", Args["group_id", str], help_text="指定群组ID"), + Option("-all", help_text="对所有群生效"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["del", "rm", "remove", "关闭", "取消"], + help_text="删除一个或多个定时任务", + ), + Subcommand( + "暂停", + Args["schedule_id?", int], + Option("-all", help_text="对当前群所有任务生效"), + Option("-p", Args["plugin_name", str], help_text="指定插件名"), + Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["pause"], + help_text="暂停一个或多个定时任务", + ), + Subcommand( + "恢复", + Args["schedule_id?", int], + Option("-all", help_text="对当前群所有任务生效"), + Option("-p", Args["plugin_name", str], help_text="指定插件名"), + Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["resume"], + help_text="恢复一个或多个定时任务", + ), + Subcommand( + "执行", + Args["schedule_id", int], + alias=["trigger", "run"], + help_text="立即执行一次任务", + ), + Subcommand( + "更新", + Args["schedule_id", int], + Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), + Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), + Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), + Option("--kwargs", Args["kwargs_str", str], help_text="更新参数"), + alias=["update", "modify", "修改"], + help_text="更新任务配置", + ), + Subcommand( + "插件列表", + alias=["plugins"], + help_text="列出所有可用的插件", + ), + ), + priority=5, + block=True, + rule=admin_check(1) & ensure_group, +) + + +@schedule_cmd.assign("查看") +async def _( + bot: Bot, + event: GroupMessageEvent, + target_group_id: Match[str] = AlconnaMatch("target_group_id"), + all_groups: Query[bool] = Query("查看.all"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + page: Match[int] = AlconnaMatch("page"), +): + is_superuser = await SUPERUSER(bot, event) + schedules = [] + title = "" + + if all_groups.available: + if not is_superuser: + await schedule_cmd.finish("需要超级用户权限才能查看所有群组的定时任务。") + schedules = await scheduler_manager.get_all_schedules() + title = "所有群组的定时任务" + elif target_group_id.available: + if not is_superuser: + await schedule_cmd.finish("需要超级用户权限才能查看指定群组的定时任务。") + gid = target_group_id.result + schedules = [ + s for s in await scheduler_manager.get_all_schedules() if s.group_id == gid + ] + title = f"群 {gid} 的定时任务" + else: + gid = str(event.group_id) + schedules = [ + s for s in await scheduler_manager.get_all_schedules() if s.group_id == gid + ] + title = "本群的定时任务" + + if plugin_name.available: + schedules = [s for s in schedules if s.plugin_name == plugin_name.result] + title += f" [插件: {plugin_name.result}]" + + if not schedules: + await schedule_cmd.finish("没有找到任何相关的定时任务。") + + page_size = 15 + current_page = page.result + total_items = len(schedules) + total_pages = (total_items + page_size - 1) // page_size + start_index = (current_page - 1) * page_size + end_index = start_index + page_size + paginated_schedules = schedules[start_index:end_index] + + if not paginated_schedules: + await schedule_cmd.finish("这一页没有内容了哦~") + + status_tasks = [ + scheduler_manager.get_schedule_status(s.id) for s in paginated_schedules + ] + all_statuses = await asyncio.gather(*status_tasks) + data_list = [ + [ + s["id"], + s["plugin_name"], + s.get("bot_id") or "N/A", + s["group_id"] or "全局", + s["next_run_time"], + _format_trigger(s), + _format_params(s), + "✔️ 已启用" if s["is_enabled"] else "⏸️ 已暂停", + ] + for s in all_statuses + if s + ] + + if not data_list: + await schedule_cmd.finish("没有找到任何相关的定时任务。") + + img = await ImageTemplate.table_page( + head_text=title, + tip_text=f"第 {current_page}/{total_pages} 页,共 {total_items} 条任务", + column_name=[ + "ID", + "插件", + "Bot ID", + "群组/目标", + "下次运行", + "触发规则", + "参数", + "状态", + ], + data_list=data_list, + column_space=20, + ) + await MessageUtils.build_message(img).send(reply_to=True) + + +@schedule_cmd.assign("设置") +async def _( + plugin_name: str, + cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"), + interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"), + date_expr: Match[str] = AlconnaMatch("date.date_expr"), + group_id: Match[str] = AlconnaMatch("group_id"), + kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), + all_enabled: Query[bool] = Query("设置.all"), + bot_id_to_operate: str = Depends(GetBotId), +): + if plugin_name not in scheduler_manager._registered_tasks: + await schedule_cmd.finish( + f"插件 '{plugin_name}' 没有注册可用的定时任务。\n" + f"可用插件: {list(scheduler_manager._registered_tasks.keys())}" + ) + + trigger_type = "" + trigger_config = {} + + try: + if cron_expr.available: + trigger_type = "cron" + parts = cron_expr.result.split() + if len(parts) != 5: + raise ValueError("Cron 表达式必须有5个部分 (分 时 日 月 周)") + cron_keys = ["minute", "hour", "day", "month", "day_of_week"] + trigger_config = dict(zip(cron_keys, parts)) + elif interval_expr.available: + trigger_type = "interval" + trigger_config = _parse_interval(interval_expr.result) + elif date_expr.available: + trigger_type = "date" + trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)} + else: + await schedule_cmd.finish( + "必须提供一种时间选项: --cron, --interval, 或 --date。" + ) + except ValueError as e: + await schedule_cmd.finish(f"时间参数解析错误: {e}") + + job_kwargs = None + if kwargs_str.available: + task_meta = scheduler_manager._registered_tasks[plugin_name] + if not task_meta.get("params"): + await schedule_cmd.finish(f"插件 '{plugin_name}' 不支持设置额外参数。") + + registered_params = task_meta["params"] + job_kwargs = {} + try: + for item in kwargs_str.result.split(","): + key, value = item.strip().split("=", 1) + key = key.strip() + if key not in registered_params: + await schedule_cmd.finish(f"错误:插件不支持参数 '{key}'。") + param_type = registered_params[key].get("type", str) + job_kwargs[key] = param_type(value) + except Exception as e: + await schedule_cmd.finish( + f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" + ) + + target_group_id: str | None = None + if group_id.available and group_id.result.lower() == "all": + target_group_id = "__ALL_GROUPS__" + elif all_enabled.available: + target_group_id = "__ALL_GROUPS__" + elif group_id.available: + target_group_id = group_id.result + else: + target_group_id = None + + success, msg = await scheduler_manager.add_schedule( + plugin_name, + target_group_id, + trigger_type, + trigger_config, + job_kwargs, + bot_id=bot_id_to_operate, + ) + + if target_group_id == "__ALL_GROUPS__": + target_desc = "所有群组" + elif target_group_id is None: + target_desc = "全局" + else: + target_desc = f"群组 {target_group_id}" + + if success: + await schedule_cmd.finish(f"已成功为 [{target_desc}] {msg}") + else: + await schedule_cmd.finish(f"为 [{target_desc}] 设置任务失败: {msg}") + + +@schedule_cmd.assign("删除") +async def _( + target: TargetScope = Depends(ParseScheduleTargetForDelete), + bot_id_to_operate: str = Depends(GetBotId), +): + if isinstance(target, TargetByID): + _, message = await scheduler_manager.remove_schedule_by_id(target.id) + await schedule_cmd.finish(message) + + elif isinstance(target, TargetByPlugin): + p_name = target.plugin + if p_name not in scheduler_manager.get_registered_plugins(): + await schedule_cmd.finish(f"未找到插件 '{p_name}'。") + + if target.all_groups: + removed_count = await scheduler_manager.remove_schedule_for_all( + p_name, bot_id=bot_id_to_operate + ) + message = ( + f"已取消了 {removed_count} 个群组的插件 '{p_name}' 定时任务。" + if removed_count > 0 + else f"没有找到插件 '{p_name}' 的定时任务。" + ) + await schedule_cmd.finish(message) + else: + _, message = await scheduler_manager.remove_schedule( + p_name, target.group_id, bot_id=bot_id_to_operate + ) + await schedule_cmd.finish(message) + + elif isinstance(target, TargetAll): + if target.for_group: + _, message = await scheduler_manager.remove_schedules_by_group( + target.for_group + ) + await schedule_cmd.finish(message) + else: + _, message = await scheduler_manager.remove_all_schedules() + await schedule_cmd.finish(message) + + else: + await schedule_cmd.finish( + "删除任务失败:请提供任务ID,或通过 -p <插件> 或 -all 指定要删除的任务。" + ) + + +@schedule_cmd.assign("暂停") +async def _( + target: TargetScope = Depends(ParseScheduleTargetForPause), + bot_id_to_operate: str = Depends(GetBotId), +): + if isinstance(target, TargetByID): + _, message = await scheduler_manager.pause_schedule(target.id) + await schedule_cmd.finish(message) + + elif isinstance(target, TargetByPlugin): + p_name = target.plugin + if p_name not in scheduler_manager.get_registered_plugins(): + await schedule_cmd.finish(f"未找到插件 '{p_name}'。") + + if target.all_groups: + _, message = await scheduler_manager.pause_schedules_by_plugin(p_name) + await schedule_cmd.finish(message) + else: + _, message = await scheduler_manager.pause_schedule_by_plugin_group( + p_name, target.group_id, bot_id=bot_id_to_operate + ) + await schedule_cmd.finish(message) + + elif isinstance(target, TargetAll): + if target.for_group: + _, message = await scheduler_manager.pause_schedules_by_group( + target.for_group + ) + await schedule_cmd.finish(message) + else: + _, message = await scheduler_manager.pause_all_schedules() + await schedule_cmd.finish(message) + + else: + await schedule_cmd.finish("请提供任务ID、使用 -p <插件> 或 -all 选项。") + + +@schedule_cmd.assign("恢复") +async def _( + target: TargetScope = Depends(ParseScheduleTargetForResume), + bot_id_to_operate: str = Depends(GetBotId), +): + if isinstance(target, TargetByID): + _, message = await scheduler_manager.resume_schedule(target.id) + await schedule_cmd.finish(message) + + elif isinstance(target, TargetByPlugin): + p_name = target.plugin + if p_name not in scheduler_manager.get_registered_plugins(): + await schedule_cmd.finish(f"未找到插件 '{p_name}'。") + + if target.all_groups: + _, message = await scheduler_manager.resume_schedules_by_plugin(p_name) + await schedule_cmd.finish(message) + else: + _, message = await scheduler_manager.resume_schedule_by_plugin_group( + p_name, target.group_id, bot_id=bot_id_to_operate + ) + await schedule_cmd.finish(message) + + elif isinstance(target, TargetAll): + if target.for_group: + _, message = await scheduler_manager.resume_schedules_by_group( + target.for_group + ) + await schedule_cmd.finish(message) + else: + _, message = await scheduler_manager.resume_all_schedules() + await schedule_cmd.finish(message) + + else: + await schedule_cmd.finish("请提供任务ID、使用 -p <插件> 或 -all 选项。") + + +@schedule_cmd.assign("执行") +async def _(schedule_id: int): + _, message = await scheduler_manager.trigger_now(schedule_id) + await schedule_cmd.finish(message) + + +@schedule_cmd.assign("更新") +async def _( + schedule_id: int, + cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"), + interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"), + date_expr: Match[str] = AlconnaMatch("date.date_expr"), + kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), +): + if not any( + [ + cron_expr.available, + interval_expr.available, + date_expr.available, + kwargs_str.available, + ] + ): + await schedule_cmd.finish( + "请提供需要更新的时间 (--cron/--interval/--date) 或参数 (--kwargs)。" + ) + + trigger_config = None + try: + if cron_expr.available: + parts = cron_expr.result.split() + if len(parts) != 5: + raise ValueError("Cron 表达式必须有5个部分") + cron_keys = ["minute", "hour", "day", "month", "day_of_week"] + trigger_config = dict(zip(cron_keys, parts)) + elif interval_expr.available: + trigger_config = _parse_interval(interval_expr.result) + elif date_expr.available: + trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)} + except ValueError as e: + await schedule_cmd.finish(f"时间参数解析错误: {e}") + + job_kwargs = None + if kwargs_str.available: + schedule = await scheduler_manager.get_schedule_by_id(schedule_id) + if not schedule: + await schedule_cmd.finish(f"未找到 ID 为 {schedule_id} 的任务。") + + if schedule.plugin_name not in scheduler_manager._registered_tasks: + await schedule_cmd.finish(f"插件 '{schedule.plugin_name}' 未注册定时任务。") + + task_meta = scheduler_manager._registered_tasks[schedule.plugin_name] + if "params" not in task_meta or not task_meta["params"]: + await schedule_cmd.finish( + f"插件 '{schedule.plugin_name}' 未定义参数元数据。" + f"请联系插件开发者更新插件注册代码。" + ) + + registered_params = task_meta["params"] + job_kwargs = {} + try: + for item in kwargs_str.result.split(","): + key, value = item.strip().split("=", 1) + key = key.strip() + + if key not in registered_params: + await schedule_cmd.finish( + f"错误:插件不支持参数 '{key}'。" + f"可用参数: {list(registered_params.keys())}" + ) + + param_meta = registered_params[key] + if "type" not in param_meta: + await schedule_cmd.finish( + f"插件 '{schedule.plugin_name}' 的参数 '{key}' 未定义类型。" + f"请联系插件开发者更新参数元数据。" + ) + param_type = param_meta["type"] + try: + job_kwargs[key] = param_type(value) + except (ValueError, TypeError): + await schedule_cmd.finish( + f"参数 '{key}' 的值 '{value}' 格式不正确," + f"应为 {param_type.__name__} 类型。" + ) + + except Exception as e: + await schedule_cmd.finish( + f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" + ) + + _, message = await scheduler_manager.update_schedule( + schedule_id, trigger_config, job_kwargs + ) + await schedule_cmd.finish(message) + + +@schedule_cmd.assign("插件列表") +async def _(): + registered_plugins = scheduler_manager.get_registered_plugins() + if not registered_plugins: + await schedule_cmd.finish("当前没有已注册的定时任务插件。") + + message_parts = ["📋 已注册的定时任务插件:"] + for i, plugin_name in enumerate(registered_plugins, 1): + task_meta = scheduler_manager._registered_tasks[plugin_name] + if "params" not in task_meta: + message_parts.append(f"{i}. {plugin_name} - ⚠️ 未定义参数元数据") + continue + + params = task_meta["params"] + if params: + param_info = ", ".join( + f"{k}({v['type'].__name__})" for k, v in params.items() + ) + message_parts.append(f"{i}. {plugin_name} - 参数: {param_info}") + else: + message_parts.append(f"{i}. {plugin_name} - 无参数") + + await schedule_cmd.finish("\n".join(message_parts)) diff --git a/zhenxun/models/schedule_info.py b/zhenxun/models/schedule_info.py new file mode 100644 index 00000000..c7583078 --- /dev/null +++ b/zhenxun/models/schedule_info.py @@ -0,0 +1,38 @@ +from tortoise import fields + +from zhenxun.services.db_context import Model + + +class ScheduleInfo(Model): + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + bot_id = fields.CharField( + 255, null=True, default=None, description="任务关联的Bot ID" + ) + """任务关联的Bot ID""" + plugin_name = fields.CharField(255, description="插件模块名") + """插件模块名""" + group_id = fields.CharField( + 255, + null=True, + description="群组ID, '__ALL_GROUPS__' 表示所有群, 为空表示全局任务", + ) + """群组ID, 为空表示全局任务""" + trigger_type = fields.CharField( + max_length=20, default="cron", description="触发器类型 (cron, interval, date)" + ) + """触发器类型 (cron, interval, date)""" + trigger_config = fields.JSONField(description="触发器具体配置") + """触发器具体配置""" + job_kwargs = fields.JSONField( + default=dict, description="传递给任务函数的额外关键字参数" + ) + """传递给任务函数的额外关键字参数""" + is_enabled = fields.BooleanField(default=True, description="是否启用") + """是否启用""" + create_time = fields.DatetimeField(auto_now_add=True) + """创建时间""" + + class Meta: # pyright: ignore [reportIncompatibleVariableOverride] + table = "schedule_info" + table_description = "通用定时任务表" diff --git a/zhenxun/services/db_context.py b/zhenxun/services/db_context.py index 33678965..4543b262 100644 --- a/zhenxun/services/db_context.py +++ b/zhenxun/services/db_context.py @@ -26,7 +26,8 @@ class Model(Model_): """ def __init_subclass__(cls, **kwargs): - MODELS.append(cls.__module__) + if cls.__module__ not in MODELS: + MODELS.append(cls.__module__) if func := getattr(cls, "_run_script", None): SCRIPT_METHOD.append((cls.__module__, func)) diff --git a/zhenxun/utils/manager/schedule_manager.py b/zhenxun/utils/manager/schedule_manager.py new file mode 100644 index 00000000..0066d3a7 --- /dev/null +++ b/zhenxun/utils/manager/schedule_manager.py @@ -0,0 +1,695 @@ +import asyncio +from collections.abc import Callable, Coroutine +import inspect +import random +from typing import ClassVar + +import nonebot +from nonebot import get_bots +from nonebot_plugin_apscheduler import scheduler + +from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.services.log import logger +from zhenxun.utils.common_utils import CommonUtils +from zhenxun.utils.manager.priority_manager import PriorityLifecycle +from zhenxun.utils.platform import PlatformUtils + + +class SchedulerManager: + """ + 一个通用的、持久化的定时任务管理器,供所有插件使用。 + """ + + _registered_tasks: ClassVar[dict[str, dict]] = {} + _JOB_PREFIX = "zhenxun_schedule_" + _running_tasks: ClassVar[set] = set() + + def register( + self, plugin_name: str, params: dict[str, dict] | None = None + ) -> Callable: + """ + 注册一个可调度的任务函数。 + 被装饰的函数签名应为 `async def func(group_id: str | None, **kwargs)` + + Args: + plugin_name (str): 插件的唯一名称 (通常是模块名)。 + params (dict, optional): 任务函数接受的额外参数元数据,用于通用命令。 + 格式: {"param_name": {"type": str, "help": "描述", "default": ...}} + """ + + def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]: + if plugin_name in self._registered_tasks: + logger.warning(f"插件 '{plugin_name}' 的定时任务已被重复注册。") + self._registered_tasks[plugin_name] = { + "func": func, + "params": params, + } + logger.debug(f"插件 '{plugin_name}' 的定时任务已注册,参数元数据: {params}") + return func + + return decorator + + def get_registered_plugins(self) -> list[str]: + """获取所有已注册定时任务的插件列表。""" + return list(self._registered_tasks.keys()) + + def _get_job_id(self, schedule_id: int) -> str: + """根据数据库ID生成唯一的 APScheduler Job ID。""" + return f"{self._JOB_PREFIX}{schedule_id}" + + async def _execute_job(self, schedule_id: int): + """ + APScheduler 调度的入口函数。 + 根据 schedule_id 处理特定任务、所有群组任务或全局任务。 + """ + schedule = await ScheduleInfo.get_or_none(id=schedule_id) + if not schedule or not schedule.is_enabled: + logger.warning(f"定时任务 {schedule_id} 不存在或已禁用,跳过执行。") + return + + plugin_name = schedule.plugin_name + + task_meta = self._registered_tasks.get(plugin_name) + if not task_meta: + logger.error( + f"无法执行定时任务:插件 '{plugin_name}' 未注册或已卸载。将禁用该任务。" + ) + schedule.is_enabled = False + await schedule.save(update_fields=["is_enabled"]) + self._remove_aps_job(schedule.id) + return + + try: + if schedule.bot_id: + bot = nonebot.get_bot(schedule.bot_id) + else: + bot = nonebot.get_bot() + logger.debug( + f"任务 {schedule_id} 未关联特定Bot,使用默认Bot {bot.self_id}" + ) + except KeyError: + logger.warning( + f"定时任务 {schedule_id} 需要的 Bot {schedule.bot_id} " + f"不在线,本次执行跳过。" + ) + return + except ValueError: + logger.warning(f"当前没有Bot在线,定时任务 {schedule_id} 跳过。") + return + + if schedule.group_id == "__ALL_GROUPS__": + await self._execute_for_all_groups(schedule, task_meta, bot) + else: + await self._execute_for_single_target(schedule, task_meta, bot) + + async def _execute_for_all_groups( + self, schedule: ScheduleInfo, task_meta: dict, bot + ): + """为所有群组执行任务,并处理优先级覆盖。""" + plugin_name = schedule.plugin_name + logger.info( + f"开始执行针对 [所有群组] 的任务 " + f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})" + ) + + all_gids = set() + try: + group_list, _ = await PlatformUtils.get_group_list(bot) + all_gids.update( + g.group_id for g in group_list if g.group_id and not g.channel_id + ) + except Exception as e: + logger.error(f"为 'all' 任务获取 Bot {bot.self_id} 的群列表失败", e=e) + return + + specific_tasks_gids = set( + await ScheduleInfo.filter( + plugin_name=plugin_name, group_id__in=list(all_gids) + ).values_list("group_id", flat=True) + ) + + for gid in all_gids: + if gid in specific_tasks_gids: + logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。") + continue + + temp_schedule = schedule + temp_schedule.group_id = gid + await self._execute_for_single_target(temp_schedule, task_meta, bot) + await asyncio.sleep(random.uniform(0.1, 0.5)) + + async def _execute_for_single_target( + self, schedule: ScheduleInfo, task_meta: dict, bot + ): + """为单个目标(具体群组或全局)执行任务。""" + plugin_name = schedule.plugin_name + group_id = schedule.group_id + + try: + is_blocked = await CommonUtils.task_is_block(bot, plugin_name, group_id) + if is_blocked: + target_desc = f"群 {group_id}" if group_id else "全局" + logger.info( + f"插件 '{plugin_name}' 的定时任务在目标 [{target_desc}]" + "因功能被禁用而跳过执行。" + ) + return + + task_func = task_meta["func"] + job_kwargs = schedule.job_kwargs + if not isinstance(job_kwargs, dict): + logger.error( + f"任务 {schedule.id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}" + ) + return + + sig = inspect.signature(task_func) + if "bot" in sig.parameters: + job_kwargs["bot"] = bot + + logger.info( + f"插件 '{plugin_name}' 开始为目标 [{group_id or '全局'}] " + f"执行定时任务 (ID: {schedule.id})。" + ) + task = asyncio.create_task(task_func(group_id, **job_kwargs)) + self._running_tasks.add(task) + task.add_done_callback(self._running_tasks.discard) + await task + except Exception as e: + logger.error( + f"执行定时任务 (ID: {schedule.id}, 插件: {plugin_name}, " + f"目标: {group_id or '全局'}) 时发生异常", + e=e, + ) + + def _add_aps_job(self, schedule: ScheduleInfo): + """根据 ScheduleInfo 对象添加或更新一个 APScheduler 任务。""" + job_id = self._get_job_id(schedule.id) + try: + scheduler.remove_job(job_id) + except Exception: + pass + + if not isinstance(schedule.trigger_config, dict): + logger.error( + f"任务 {schedule.id} 的 trigger_config 不是字典类型: " + f"{type(schedule.trigger_config)}" + ) + return + + scheduler.add_job( + self._execute_job, + trigger=schedule.trigger_type, + id=job_id, + misfire_grace_time=300, + args=[schedule.id], + **schedule.trigger_config, + ) + logger.debug( + f"已在 APScheduler 中添加/更新任务: {job_id} " + f"with trigger: {schedule.trigger_config}" + ) + + def _remove_aps_job(self, schedule_id: int): + """移除一个 APScheduler 任务。""" + job_id = self._get_job_id(schedule_id) + try: + scheduler.remove_job(job_id) + logger.debug(f"已从 APScheduler 中移除任务: {job_id}") + except Exception: + pass + + async def add_schedule( + self, + plugin_name: str, + group_id: str | None, + trigger_type: str, + trigger_config: dict, + job_kwargs: dict | None = None, + bot_id: str | None = None, + ) -> tuple[bool, str]: + """ + 添加或更新一个定时任务。 + """ + if plugin_name not in self._registered_tasks: + return False, f"插件 '{plugin_name}' 没有注册可用的定时任务。" + + search_kwargs = { + "plugin_name": plugin_name, + "group_id": group_id, + "bot_id": bot_id, + } + + defaults = { + "trigger_type": trigger_type, + "trigger_config": trigger_config, + "job_kwargs": job_kwargs if job_kwargs is not None else {}, + "is_enabled": True, + } + + schedule, created = await ScheduleInfo.update_or_create( + **search_kwargs, + defaults=defaults, + ) + self._add_aps_job(schedule) + action = "设置" if created else "更新" + return True, f"已成功{action}插件 '{plugin_name}' 的定时任务。" + + async def add_schedule_for_all( + self, + plugin_name: str, + trigger_type: str, + trigger_config: dict, + job_kwargs: dict | None = None, + ) -> tuple[int, int]: + """为所有机器人所在的群组添加定时任务。""" + if plugin_name not in self._registered_tasks: + raise ValueError(f"插件 '{plugin_name}' 没有注册可用的定时任务。") + + groups = set() + for bot in get_bots().values(): + try: + group_list, _ = await PlatformUtils.get_group_list(bot) + groups.update( + g.group_id for g in group_list if g.group_id and not g.channel_id + ) + except Exception as e: + logger.error(f"获取 Bot {bot.self_id} 的群列表失败", e=e) + + success_count = 0 + fail_count = 0 + for gid in groups: + try: + success, _ = await self.add_schedule( + plugin_name, gid, trigger_type, trigger_config, job_kwargs + ) + if success: + success_count += 1 + else: + fail_count += 1 + except Exception as e: + logger.error(f"为群 {gid} 添加定时任务失败: {e}", e=e) + fail_count += 1 + await asyncio.sleep(0.05) + return success_count, fail_count + + async def update_schedule( + self, + schedule_id: int, + trigger_config: dict | None = None, + job_kwargs: dict | None = None, + ) -> tuple[bool, str]: + """部分更新一个已存在的定时任务。""" + schedule = await self.get_schedule_by_id(schedule_id) + if not schedule: + return False, f"未找到 ID 为 {schedule_id} 的任务。" + + updated_fields = [] + if trigger_config is not None: + if not isinstance(schedule.trigger_config, dict): + return False, f"任务 {schedule_id} 的 trigger_config 数据格式错误。" + schedule.trigger_config.update(trigger_config) + updated_fields.append("trigger_config") + + if job_kwargs is not None: + if not isinstance(schedule.job_kwargs, dict): + return False, f"任务 {schedule_id} 的 job_kwargs 数据格式错误。" + schedule.job_kwargs.update(job_kwargs) + updated_fields.append("job_kwargs") + + if not updated_fields: + return True, "没有任何需要更新的配置。" + + await schedule.save(update_fields=updated_fields) + self._add_aps_job(schedule) + return True, f"成功更新了任务 ID: {schedule_id} 的配置。" + + async def remove_schedule( + self, plugin_name: str, group_id: str | None, bot_id: str | None = None + ) -> tuple[bool, str]: + """移除指定插件和群组的定时任务。""" + query = {"plugin_name": plugin_name, "group_id": group_id} + if bot_id: + query["bot_id"] = bot_id + + schedules = await ScheduleInfo.filter(**query) + if not schedules: + msg = ( + f"未找到与 Bot {bot_id} 相关的群 {group_id} " + f"的插件 '{plugin_name}' 定时任务。" + ) + return (False, msg) + + for schedule in schedules: + self._remove_aps_job(schedule.id) + await schedule.delete() + + target_desc = f"群 {group_id}" if group_id else "全局" + msg = ( + f"已取消 Bot {bot_id} 在 [{target_desc}] " + f"的插件 '{plugin_name}' 所有定时任务。" + ) + return (True, msg) + + async def remove_schedule_for_all( + self, plugin_name: str, bot_id: str | None = None + ) -> int: + """移除指定插件在所有群组的定时任务。""" + query = {"plugin_name": plugin_name} + if bot_id: + query["bot_id"] = bot_id + + schedules_to_delete = await ScheduleInfo.filter(**query).all() + if not schedules_to_delete: + return 0 + + for schedule in schedules_to_delete: + self._remove_aps_job(schedule.id) + await schedule.delete() + await asyncio.sleep(0.01) + + return len(schedules_to_delete) + + async def remove_schedules_by_group(self, group_id: str) -> tuple[bool, str]: + """移除指定群组的所有定时任务。""" + schedules = await ScheduleInfo.filter(group_id=group_id) + if not schedules: + return False, f"群 {group_id} 没有任何定时任务。" + + count = 0 + for schedule in schedules: + self._remove_aps_job(schedule.id) + await schedule.delete() + count += 1 + await asyncio.sleep(0.01) + + return True, f"已成功移除群 {group_id} 的 {count} 个定时任务。" + + async def pause_schedules_by_group(self, group_id: str) -> tuple[int, str]: + """暂停指定群组的所有定时任务。""" + schedules = await ScheduleInfo.filter(group_id=group_id, is_enabled=True) + if not schedules: + return 0, f"群 {group_id} 没有正在运行的定时任务可暂停。" + + count = 0 + for schedule in schedules: + success, _ = await self.pause_schedule(schedule.id) + if success: + count += 1 + await asyncio.sleep(0.01) + + return count, f"已成功暂停群 {group_id} 的 {count} 个定时任务。" + + async def resume_schedules_by_group(self, group_id: str) -> tuple[int, str]: + """恢复指定群组的所有定时任务。""" + schedules = await ScheduleInfo.filter(group_id=group_id, is_enabled=False) + if not schedules: + return 0, f"群 {group_id} 没有已暂停的定时任务可恢复。" + + count = 0 + for schedule in schedules: + success, _ = await self.resume_schedule(schedule.id) + if success: + count += 1 + await asyncio.sleep(0.01) + + return count, f"已成功恢复群 {group_id} 的 {count} 个定时任务。" + + async def pause_schedules_by_plugin(self, plugin_name: str) -> tuple[int, str]: + """暂停指定插件在所有群组的定时任务。""" + schedules = await ScheduleInfo.filter(plugin_name=plugin_name, is_enabled=True) + if not schedules: + return 0, f"插件 '{plugin_name}' 没有正在运行的定时任务可暂停。" + + count = 0 + for schedule in schedules: + success, _ = await self.pause_schedule(schedule.id) + if success: + count += 1 + await asyncio.sleep(0.01) + + return ( + count, + f"已成功暂停插件 '{plugin_name}' 在所有群组的 {count} 个定时任务。", + ) + + async def resume_schedules_by_plugin(self, plugin_name: str) -> tuple[int, str]: + """恢复指定插件在所有群组的定时任务。""" + schedules = await ScheduleInfo.filter(plugin_name=plugin_name, is_enabled=False) + if not schedules: + return 0, f"插件 '{plugin_name}' 没有已暂停的定时任务可恢复。" + + count = 0 + for schedule in schedules: + success, _ = await self.resume_schedule(schedule.id) + if success: + count += 1 + await asyncio.sleep(0.01) + + return ( + count, + f"已成功恢复插件 '{plugin_name}' 在所有群组的 {count} 个定时任务。", + ) + + async def pause_schedule_by_plugin_group( + self, plugin_name: str, group_id: str | None, bot_id: str | None = None + ) -> tuple[bool, str]: + """暂停指定插件在指定群组的定时任务。""" + query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": True} + if bot_id: + query["bot_id"] = bot_id + + schedules = await ScheduleInfo.filter(**query) + if not schedules: + return ( + False, + f"群 {group_id} 未设置插件 '{plugin_name}' 的定时任务或任务已暂停。", + ) + + count = 0 + for schedule in schedules: + success, _ = await self.pause_schedule(schedule.id) + if success: + count += 1 + + return ( + True, + f"已成功暂停群 {group_id} 的插件 '{plugin_name}' 共 {count} 个定时任务。", + ) + + async def resume_schedule_by_plugin_group( + self, plugin_name: str, group_id: str | None, bot_id: str | None = None + ) -> tuple[bool, str]: + """恢复指定插件在指定群组的定时任务。""" + query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": False} + if bot_id: + query["bot_id"] = bot_id + + schedules = await ScheduleInfo.filter(**query) + if not schedules: + return ( + False, + f"群 {group_id} 未设置插件 '{plugin_name}' 的定时任务或任务已启用。", + ) + + count = 0 + for schedule in schedules: + success, _ = await self.resume_schedule(schedule.id) + if success: + count += 1 + + return ( + True, + f"已成功恢复群 {group_id} 的插件 '{plugin_name}' 共 {count} 个定时任务。", + ) + + async def remove_all_schedules(self) -> tuple[int, str]: + """移除所有群组的所有定时任务。""" + schedules = await ScheduleInfo.all() + if not schedules: + return 0, "当前没有任何定时任务。" + + count = 0 + for schedule in schedules: + self._remove_aps_job(schedule.id) + await schedule.delete() + count += 1 + await asyncio.sleep(0.01) + + return count, f"已成功移除所有群组的 {count} 个定时任务。" + + async def pause_all_schedules(self) -> tuple[int, str]: + """暂停所有群组的所有定时任务。""" + schedules = await ScheduleInfo.filter(is_enabled=True) + if not schedules: + return 0, "当前没有正在运行的定时任务可暂停。" + + count = 0 + for schedule in schedules: + success, _ = await self.pause_schedule(schedule.id) + if success: + count += 1 + await asyncio.sleep(0.01) + + return count, f"已成功暂停所有群组的 {count} 个定时任务。" + + async def resume_all_schedules(self) -> tuple[int, str]: + """恢复所有群组的所有定时任务。""" + schedules = await ScheduleInfo.filter(is_enabled=False) + if not schedules: + return 0, "当前没有已暂停的定时任务可恢复。" + + count = 0 + for schedule in schedules: + success, _ = await self.resume_schedule(schedule.id) + if success: + count += 1 + await asyncio.sleep(0.01) + + return count, f"已成功恢复所有群组的 {count} 个定时任务。" + + async def remove_schedule_by_id(self, schedule_id: int) -> tuple[bool, str]: + """通过ID移除指定的定时任务。""" + schedule = await self.get_schedule_by_id(schedule_id) + if not schedule: + return False, f"未找到 ID 为 {schedule_id} 的定时任务。" + + self._remove_aps_job(schedule.id) + await schedule.delete() + + return ( + True, + f"已删除插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " + f"的定时任务 (ID: {schedule.id})。", + ) + + async def get_schedule_by_id(self, schedule_id: int) -> ScheduleInfo | None: + """通过ID获取定时任务信息。""" + return await ScheduleInfo.get_or_none(id=schedule_id) + + async def get_schedules( + self, plugin_name: str, group_id: str | None + ) -> list[ScheduleInfo]: + """获取特定群组特定插件的所有定时任务。""" + return await ScheduleInfo.filter(plugin_name=plugin_name, group_id=group_id) + + async def get_schedule( + self, plugin_name: str, group_id: str | None + ) -> ScheduleInfo | None: + """获取特定群组的定时任务信息。""" + return await ScheduleInfo.get_or_none( + plugin_name=plugin_name, group_id=group_id + ) + + async def get_all_schedules( + self, plugin_name: str | None = None + ) -> list[ScheduleInfo]: + """获取所有定时任务信息,可按插件名过滤。""" + if plugin_name: + return await ScheduleInfo.filter(plugin_name=plugin_name).all() + return await ScheduleInfo.all() + + async def get_schedule_status(self, schedule_id: int) -> dict | None: + """获取任务的详细状态。""" + schedule = await self.get_schedule_by_id(schedule_id) + if not schedule: + return None + + job_id = self._get_job_id(schedule.id) + job = scheduler.get_job(job_id) + + status = { + "id": schedule.id, + "bot_id": schedule.bot_id, + "plugin_name": schedule.plugin_name, + "group_id": schedule.group_id, + "is_enabled": schedule.is_enabled, + "trigger_type": schedule.trigger_type, + "trigger_config": schedule.trigger_config, + "job_kwargs": schedule.job_kwargs, + "next_run_time": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") + if job and job.next_run_time + else "N/A", + "is_paused_in_scheduler": not bool(job.next_run_time) if job else "N/A", + } + return status + + async def pause_schedule(self, schedule_id: int) -> tuple[bool, str]: + """暂停一个定时任务。""" + schedule = await self.get_schedule_by_id(schedule_id) + if not schedule or not schedule.is_enabled: + return False, "任务不存在或已暂停。" + + schedule.is_enabled = False + await schedule.save(update_fields=["is_enabled"]) + + job_id = self._get_job_id(schedule.id) + try: + scheduler.pause_job(job_id) + except Exception: + pass + + return ( + True, + f"已暂停插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " + f"的定时任务 (ID: {schedule.id})。", + ) + + async def resume_schedule(self, schedule_id: int) -> tuple[bool, str]: + """恢复一个定时任务。""" + schedule = await self.get_schedule_by_id(schedule_id) + if not schedule or schedule.is_enabled: + return False, "任务不存在或已启用。" + + schedule.is_enabled = True + await schedule.save(update_fields=["is_enabled"]) + + job_id = self._get_job_id(schedule.id) + try: + scheduler.resume_job(job_id) + except Exception: + self._add_aps_job(schedule) + + return ( + True, + f"已恢复插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " + f"的定时任务 (ID: {schedule.id})。", + ) + + async def trigger_now(self, schedule_id: int) -> tuple[bool, str]: + """手动触发一个定时任务。""" + schedule = await self.get_schedule_by_id(schedule_id) + if not schedule: + return False, f"未找到 ID 为 {schedule_id} 的定时任务。" + + if schedule.plugin_name not in self._registered_tasks: + return False, f"插件 '{schedule.plugin_name}' 没有注册可用的定时任务。" + + try: + await self._execute_job(schedule.id) + return ( + True, + f"已手动触发插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " + f"的定时任务 (ID: {schedule.id})。", + ) + except Exception as e: + logger.error(f"手动触发任务失败: {e}") + return False, f"手动触发任务失败: {e}" + + +scheduler_manager = SchedulerManager() + + +@PriorityLifecycle.on_startup(priority=90) +async def _load_schedules_from_db(): + """在服务启动时从数据库加载并调度所有任务。""" + logger.info("正在从数据库加载并调度所有定时任务...") + schedules = await ScheduleInfo.filter(is_enabled=True).all() + count = 0 + for schedule in schedules: + if schedule.plugin_name in scheduler_manager._registered_tasks: + scheduler_manager._add_aps_job(schedule) + count += 1 + else: + logger.warning(f"跳过加载定时任务:插件 '{schedule.plugin_name}' 未注册。") + logger.info(f"定时任务加载完成,共成功加载 {count} 个任务。")