diff --git a/zhenxun/builtin_plugins/scheduler_admin/__init__.py b/zhenxun/builtin_plugins/scheduler_admin/__init__.py index adaaa621..eb71bafb 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/__init__.py +++ b/zhenxun/builtin_plugins/scheduler_admin/__init__.py @@ -1,9 +1,11 @@ from nonebot.plugin import PluginMetadata -from zhenxun.configs.utils import PluginExtraData +from zhenxun.configs.utils import PluginExtraData, RegisterConfig from zhenxun.utils.enum import PluginType -from . import command # noqa: F401 +from . import commands, handlers + +__all__ = ["commands", "handlers"] __plugin_meta__ = PluginMetadata( name="定时任务管理", @@ -27,6 +29,8 @@ __plugin_meta__ = PluginMetadata( 定时任务 恢复 <任务ID> | -p <插件> [-g <群号>] | -all 定时任务 执行 <任务ID> 定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>] + # [修改] 增加说明 + • 说明: -p 选项可单独使用,用于操作指定插件的所有任务 📝 时间选项 (三选一): --cron "<分> <时> <日> <月> <周>" # 例: --cron "0 8 * * *" @@ -47,5 +51,35 @@ __plugin_meta__ = PluginMetadata( version="0.1.2", plugin_type=PluginType.SUPERUSER, is_show=False, + configs=[ + RegisterConfig( + module="SchedulerManager", + key="ALL_GROUPS_CONCURRENCY_LIMIT", + value=5, + help="“所有群组”类型定时任务的并发执行数量限制", + type=int, + ), + RegisterConfig( + module="SchedulerManager", + key="JOB_MAX_RETRIES", + value=2, + help="定时任务执行失败时的最大重试次数", + type=int, + ), + RegisterConfig( + module="SchedulerManager", + key="JOB_RETRY_DELAY", + value=10, + help="定时任务执行重试的间隔时间(秒)", + type=int, + ), + RegisterConfig( + module="SchedulerManager", + key="SCHEDULER_TIMEZONE", + value="Asia/Shanghai", + help="定时任务使用的时区,默认为 Asia/Shanghai", + type=str, + ), + ], ).to_dict(), ) diff --git a/zhenxun/builtin_plugins/scheduler_admin/command.py b/zhenxun/builtin_plugins/scheduler_admin/command.py deleted file mode 100644 index 08a085fb..00000000 --- a/zhenxun/builtin_plugins/scheduler_admin/command.py +++ /dev/null @@ -1,836 +0,0 @@ -import asyncio -from datetime import datetime -import re - -from nonebot.adapters import Event -from nonebot.adapters.onebot.v11 import Bot -from nonebot.params import Depends -from nonebot.permission import SUPERUSER -from nonebot_plugin_alconna import ( - Alconna, - AlconnaMatch, - Args, - Arparma, - Match, - Option, - Query, - Subcommand, - on_alconna, -) -from pydantic import BaseModel, ValidationError - -from zhenxun.utils._image_template import ImageTemplate -from zhenxun.utils.manager.schedule_manager import scheduler_manager - - -def _get_type_name(annotation) -> str: - """获取类型注解的名称""" - if hasattr(annotation, "__name__"): - return annotation.__name__ - elif hasattr(annotation, "_name"): - return annotation._name - else: - return str(annotation) - - -from zhenxun.utils.message import MessageUtils -from zhenxun.utils.rules import admin_check - - -def _format_trigger(schedule_status: dict) -> str: - """将触发器配置格式化为人类可读的字符串""" - trigger_type = schedule_status["trigger_type"] - config = schedule_status["trigger_config"] - - if trigger_type == "cron": - minute = config.get("minute", "*") - hour = config.get("hour", "*") - day = config.get("day", "*") - month = config.get("month", "*") - day_of_week = config.get("day_of_week", "*") - - if day == "*" and month == "*" and day_of_week == "*": - formatted_hour = hour if hour == "*" else f"{int(hour):02d}" - formatted_minute = minute if minute == "*" else f"{int(minute):02d}" - return f"每天 {formatted_hour}:{formatted_minute}" - else: - return f"Cron: {minute} {hour} {day} {month} {day_of_week}" - elif trigger_type == "interval": - seconds = config.get("seconds", 0) - minutes = config.get("minutes", 0) - hours = config.get("hours", 0) - days = config.get("days", 0) - if days: - trigger_str = f"每 {days} 天" - elif hours: - trigger_str = f"每 {hours} 小时" - elif minutes: - trigger_str = f"每 {minutes} 分钟" - else: - trigger_str = f"每 {seconds} 秒" - elif trigger_type == "date": - run_date = config.get("run_date", "未知时间") - trigger_str = f"在 {run_date}" - else: - trigger_str = f"{trigger_type}: {config}" - - return trigger_str - - -def _format_params(schedule_status: dict) -> str: - """将任务参数格式化为人类可读的字符串""" - if kwargs := schedule_status.get("job_kwargs"): - kwargs_str = " | ".join(f"{k}: {v}" for k, v in kwargs.items()) - return kwargs_str - return "-" - - -def _parse_interval(interval_str: str) -> dict: - """增强版解析器,支持 d(天)""" - match = re.match(r"(\d+)([smhd])", interval_str.lower()) - if not match: - raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '1d', '10s' 的格式。") - - value, unit = int(match.group(1)), match.group(2) - if unit == "s": - return {"seconds": value} - if unit == "m": - return {"minutes": value} - if unit == "h": - return {"hours": value} - if unit == "d": - return {"days": value} - return {} - - -def _parse_daily_time(time_str: str) -> dict: - """解析 HH:MM 或 HH:MM:SS 格式的时间为 cron 配置""" - if match := re.match(r"^(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?$", time_str): - hour, minute, second = match.groups() - hour, minute = int(hour), int(minute) - - if not (0 <= hour <= 23 and 0 <= minute <= 59): - raise ValueError("小时或分钟数值超出范围。") - - cron_config = { - "minute": str(minute), - "hour": str(hour), - "day": "*", - "month": "*", - "day_of_week": "*", - } - if second is not None: - if not (0 <= int(second) <= 59): - raise ValueError("秒数值超出范围。") - cron_config["second"] = str(second) - - return cron_config - else: - raise ValueError("时间格式错误,请使用 'HH:MM' 或 'HH:MM:SS' 格式。") - - -async def GetBotId( - bot: Bot, - bot_id_match: Match[str] = AlconnaMatch("bot_id"), -) -> str: - """获取要操作的Bot ID""" - if bot_id_match.available: - return bot_id_match.result - return bot.self_id - - -class ScheduleTarget: - """定时任务操作目标的基类""" - - pass - - -class TargetByID(ScheduleTarget): - """按任务ID操作""" - - def __init__(self, id: int): - self.id = id - - -class TargetByPlugin(ScheduleTarget): - """按插件名操作""" - - def __init__( - self, plugin: str, group_id: str | None = None, all_groups: bool = False - ): - self.plugin = plugin - self.group_id = group_id - self.all_groups = all_groups - - -class TargetAll(ScheduleTarget): - """操作所有任务""" - - def __init__(self, for_group: str | None = None): - self.for_group = for_group - - -TargetScope = TargetByID | TargetByPlugin | TargetAll | None - - -def create_target_parser(subcommand_name: str): - """ - 创建一个依赖注入函数,用于解析删除、暂停、恢复等命令的操作目标。 - """ - - async def dependency( - event: Event, - schedule_id: Match[int] = AlconnaMatch("schedule_id"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - group_id: Match[str] = AlconnaMatch("group_id"), - all_enabled: Query[bool] = Query(f"{subcommand_name}.all"), - ) -> TargetScope: - if schedule_id.available: - return TargetByID(schedule_id.result) - - if plugin_name.available: - p_name = plugin_name.result - if all_enabled.available: - return TargetByPlugin(plugin=p_name, all_groups=True) - elif group_id.available: - gid = group_id.result - if gid.lower() == "all": - return TargetByPlugin(plugin=p_name, all_groups=True) - return TargetByPlugin(plugin=p_name, group_id=gid) - else: - current_group_id = getattr(event, "group_id", None) - if current_group_id: - return TargetByPlugin(plugin=p_name, group_id=str(current_group_id)) - else: - await schedule_cmd.finish( - "私聊中操作插件任务必须使用 -g <群号> 或 -all 选项。" - ) - - if all_enabled.available: - return TargetAll(for_group=group_id.result if group_id.available else None) - - return None - - return dependency - - -schedule_cmd = on_alconna( - Alconna( - "定时任务", - Subcommand( - "查看", - Option("-g", Args["target_group_id", str]), - Option("-all", help_text="查看所有群聊 (SUPERUSER)"), - Option("-p", Args["plugin_name", str], help_text="按插件名筛选"), - Option("--page", Args["page", int, 1], help_text="指定页码"), - alias=["ls", "list"], - help_text="查看定时任务", - ), - Subcommand( - "设置", - Args["plugin_name", str], - Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), - Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), - Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), - Option( - "--daily", - Args["daily_expr", str], - help_text="设置每天执行的时间 (如 08:20)", - ), - Option("-g", Args["group_id", str], help_text="指定群组ID或'all'"), - Option("-all", help_text="对所有群生效 (等同于 -g all)"), - Option("--kwargs", Args["kwargs_str", str], help_text="设置任务参数"), - Option( - "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" - ), - alias=["add", "开启"], - help_text="设置/开启一个定时任务", - ), - Subcommand( - "删除", - Args["schedule_id?", int], - Option("-p", Args["plugin_name", str], help_text="指定插件名"), - Option("-g", Args["group_id", str], help_text="指定群组ID"), - Option("-all", help_text="对所有群生效"), - Option( - "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" - ), - alias=["del", "rm", "remove", "关闭", "取消"], - help_text="删除一个或多个定时任务", - ), - Subcommand( - "暂停", - Args["schedule_id?", int], - Option("-all", help_text="对当前群所有任务生效"), - Option("-p", Args["plugin_name", str], help_text="指定插件名"), - Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), - Option( - "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" - ), - alias=["pause"], - help_text="暂停一个或多个定时任务", - ), - Subcommand( - "恢复", - Args["schedule_id?", int], - Option("-all", help_text="对当前群所有任务生效"), - Option("-p", Args["plugin_name", str], help_text="指定插件名"), - Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), - Option( - "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" - ), - alias=["resume"], - help_text="恢复一个或多个定时任务", - ), - Subcommand( - "执行", - Args["schedule_id", int], - alias=["trigger", "run"], - help_text="立即执行一次任务", - ), - Subcommand( - "更新", - Args["schedule_id", int], - Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), - Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), - Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), - Option( - "--daily", - Args["daily_expr", str], - help_text="更新每天执行的时间 (如 08:20)", - ), - Option("--kwargs", Args["kwargs_str", str], help_text="更新参数"), - alias=["update", "modify", "修改"], - help_text="更新任务配置", - ), - Subcommand( - "状态", - Args["schedule_id", int], - alias=["status", "info"], - help_text="查看单个任务的详细状态", - ), - Subcommand( - "插件列表", - alias=["plugins"], - help_text="列出所有可用的插件", - ), - ), - priority=5, - block=True, - rule=admin_check(1), -) - -schedule_cmd.shortcut( - "任务状态", - command="定时任务", - arguments=["状态", "{%0}"], - prefix=True, -) - - -@schedule_cmd.handle() -async def _handle_time_options_mutex(arp: Arparma): - time_options = ["cron", "interval", "date", "daily"] - provided_options = [opt for opt in time_options if arp.query(opt) is not None] - if len(provided_options) > 1: - await schedule_cmd.finish( - f"时间选项 --{', --'.join(provided_options)} 不能同时使用,请只选择一个。" - ) - - -@schedule_cmd.assign("查看") -async def _( - bot: Bot, - event: Event, - target_group_id: Match[str] = AlconnaMatch("target_group_id"), - all_groups: Query[bool] = Query("查看.all"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - page: Match[int] = AlconnaMatch("page"), -): - is_superuser = await SUPERUSER(bot, event) - schedules = [] - title = "" - - current_group_id = getattr(event, "group_id", None) - if not (all_groups.available or target_group_id.available) and not current_group_id: - await schedule_cmd.finish("私聊中查看任务必须使用 -g <群号> 或 -all 选项。") - - if all_groups.available: - if not is_superuser: - await schedule_cmd.finish("需要超级用户权限才能查看所有群组的定时任务。") - schedules = await scheduler_manager.get_all_schedules() - title = "所有群组的定时任务" - elif target_group_id.available: - if not is_superuser: - await schedule_cmd.finish("需要超级用户权限才能查看指定群组的定时任务。") - gid = target_group_id.result - schedules = [ - s for s in await scheduler_manager.get_all_schedules() if s.group_id == gid - ] - title = f"群 {gid} 的定时任务" - else: - gid = str(current_group_id) - schedules = [ - s for s in await scheduler_manager.get_all_schedules() if s.group_id == gid - ] - title = "本群的定时任务" - - if plugin_name.available: - schedules = [s for s in schedules if s.plugin_name == plugin_name.result] - title += f" [插件: {plugin_name.result}]" - - if not schedules: - await schedule_cmd.finish("没有找到任何相关的定时任务。") - - page_size = 15 - current_page = page.result - total_items = len(schedules) - total_pages = (total_items + page_size - 1) // page_size - start_index = (current_page - 1) * page_size - end_index = start_index + page_size - paginated_schedules = schedules[start_index:end_index] - - if not paginated_schedules: - await schedule_cmd.finish("这一页没有内容了哦~") - - status_tasks = [ - scheduler_manager.get_schedule_status(s.id) for s in paginated_schedules - ] - all_statuses = await asyncio.gather(*status_tasks) - data_list = [ - [ - s["id"], - s["plugin_name"], - s.get("bot_id") or "N/A", - s["group_id"] or "全局", - s["next_run_time"], - _format_trigger(s), - _format_params(s), - "✔️ 已启用" if s["is_enabled"] else "⏸️ 已暂停", - ] - for s in all_statuses - if s - ] - - if not data_list: - await schedule_cmd.finish("没有找到任何相关的定时任务。") - - img = await ImageTemplate.table_page( - head_text=title, - tip_text=f"第 {current_page}/{total_pages} 页,共 {total_items} 条任务", - column_name=[ - "ID", - "插件", - "Bot ID", - "群组/目标", - "下次运行", - "触发规则", - "参数", - "状态", - ], - data_list=data_list, - column_space=20, - ) - await MessageUtils.build_message(img).send(reply_to=True) - - -@schedule_cmd.assign("设置") -async def _( - event: Event, - plugin_name: str, - cron_expr: str | None = None, - interval_expr: str | None = None, - date_expr: str | None = None, - daily_expr: str | None = None, - group_id: str | None = None, - kwargs_str: str | None = None, - all_enabled: Query[bool] = Query("设置.all"), - bot_id_to_operate: str = Depends(GetBotId), -): - if plugin_name not in scheduler_manager._registered_tasks: - await schedule_cmd.finish( - f"插件 '{plugin_name}' 没有注册可用的定时任务。\n" - f"可用插件: {list(scheduler_manager._registered_tasks.keys())}" - ) - - trigger_type = "" - trigger_config = {} - - try: - if cron_expr: - trigger_type = "cron" - parts = cron_expr.split() - if len(parts) != 5: - raise ValueError("Cron 表达式必须有5个部分 (分 时 日 月 周)") - cron_keys = ["minute", "hour", "day", "month", "day_of_week"] - trigger_config = dict(zip(cron_keys, parts)) - elif interval_expr: - trigger_type = "interval" - trigger_config = _parse_interval(interval_expr) - elif date_expr: - trigger_type = "date" - trigger_config = {"run_date": datetime.fromisoformat(date_expr)} - elif daily_expr: - trigger_type = "cron" - trigger_config = _parse_daily_time(daily_expr) - else: - await schedule_cmd.finish( - "必须提供一种时间选项: --cron, --interval, --date, 或 --daily。" - ) - except ValueError as e: - await schedule_cmd.finish(f"时间参数解析错误: {e}") - - job_kwargs = {} - if kwargs_str: - task_meta = scheduler_manager._registered_tasks[plugin_name] - params_model = task_meta.get("model") - if not params_model: - await schedule_cmd.finish(f"插件 '{plugin_name}' 不支持设置额外参数。") - - if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): - await schedule_cmd.finish(f"插件 '{plugin_name}' 的参数模型配置错误。") - - raw_kwargs = {} - try: - for item in kwargs_str.split(","): - key, value = item.strip().split("=", 1) - raw_kwargs[key.strip()] = value - except Exception as e: - await schedule_cmd.finish( - f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" - ) - - try: - model_validate = getattr(params_model, "model_validate", None) - if not model_validate: - await schedule_cmd.finish( - f"插件 '{plugin_name}' 的参数模型不支持验证。" - ) - return - - validated_model = model_validate(raw_kwargs) - - model_dump = getattr(validated_model, "model_dump", None) - if not model_dump: - await schedule_cmd.finish( - f"插件 '{plugin_name}' 的参数模型不支持导出。" - ) - return - - job_kwargs = model_dump() - except ValidationError as e: - errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] - error_str = "\n".join(errors) - await schedule_cmd.finish( - f"插件 '{plugin_name}' 的任务参数验证失败:\n{error_str}" - ) - return - - target_group_id: str | None - current_group_id = getattr(event, "group_id", None) - - if group_id and group_id.lower() == "all": - target_group_id = "__ALL_GROUPS__" - elif all_enabled.available: - target_group_id = "__ALL_GROUPS__" - elif group_id: - target_group_id = group_id - elif current_group_id: - target_group_id = str(current_group_id) - else: - await schedule_cmd.finish( - "私聊中设置定时任务时,必须使用 -g <群号> 或 --all 选项指定目标。" - ) - return - - success, msg = await scheduler_manager.add_schedule( - plugin_name, - target_group_id, - trigger_type, - trigger_config, - job_kwargs, - bot_id=bot_id_to_operate, - ) - - if target_group_id == "__ALL_GROUPS__": - target_desc = f"所有群组 (Bot: {bot_id_to_operate})" - elif target_group_id is None: - target_desc = "全局" - else: - target_desc = f"群组 {target_group_id}" - - if success: - await schedule_cmd.finish(f"已成功为 [{target_desc}] {msg}") - else: - await schedule_cmd.finish(f"为 [{target_desc}] 设置任务失败: {msg}") - - -@schedule_cmd.assign("删除") -async def _( - target: TargetScope = Depends(create_target_parser("删除")), - bot_id_to_operate: str = Depends(GetBotId), -): - if isinstance(target, TargetByID): - _, message = await scheduler_manager.remove_schedule_by_id(target.id) - await schedule_cmd.finish(message) - - elif isinstance(target, TargetByPlugin): - p_name = target.plugin - if p_name not in scheduler_manager.get_registered_plugins(): - await schedule_cmd.finish(f"未找到插件 '{p_name}'。") - - if target.all_groups: - removed_count = await scheduler_manager.remove_schedule_for_all( - p_name, bot_id=bot_id_to_operate - ) - message = ( - f"已取消了 {removed_count} 个群组的插件 '{p_name}' 定时任务。" - if removed_count > 0 - else f"没有找到插件 '{p_name}' 的定时任务。" - ) - await schedule_cmd.finish(message) - else: - _, message = await scheduler_manager.remove_schedule( - p_name, target.group_id, bot_id=bot_id_to_operate - ) - await schedule_cmd.finish(message) - - elif isinstance(target, TargetAll): - if target.for_group: - _, message = await scheduler_manager.remove_schedules_by_group( - target.for_group - ) - await schedule_cmd.finish(message) - else: - _, message = await scheduler_manager.remove_all_schedules() - await schedule_cmd.finish(message) - - else: - await schedule_cmd.finish( - "删除任务失败:请提供任务ID,或通过 -p <插件> 或 -all 指定要删除的任务。" - ) - - -@schedule_cmd.assign("暂停") -async def _( - target: TargetScope = Depends(create_target_parser("暂停")), - bot_id_to_operate: str = Depends(GetBotId), -): - if isinstance(target, TargetByID): - _, message = await scheduler_manager.pause_schedule(target.id) - await schedule_cmd.finish(message) - - elif isinstance(target, TargetByPlugin): - p_name = target.plugin - if p_name not in scheduler_manager.get_registered_plugins(): - await schedule_cmd.finish(f"未找到插件 '{p_name}'。") - - if target.all_groups: - _, message = await scheduler_manager.pause_schedules_by_plugin(p_name) - await schedule_cmd.finish(message) - else: - _, message = await scheduler_manager.pause_schedule_by_plugin_group( - p_name, target.group_id, bot_id=bot_id_to_operate - ) - await schedule_cmd.finish(message) - - elif isinstance(target, TargetAll): - if target.for_group: - _, message = await scheduler_manager.pause_schedules_by_group( - target.for_group - ) - await schedule_cmd.finish(message) - else: - _, message = await scheduler_manager.pause_all_schedules() - await schedule_cmd.finish(message) - - else: - await schedule_cmd.finish("请提供任务ID、使用 -p <插件> 或 -all 选项。") - - -@schedule_cmd.assign("恢复") -async def _( - target: TargetScope = Depends(create_target_parser("恢复")), - bot_id_to_operate: str = Depends(GetBotId), -): - if isinstance(target, TargetByID): - _, message = await scheduler_manager.resume_schedule(target.id) - await schedule_cmd.finish(message) - - elif isinstance(target, TargetByPlugin): - p_name = target.plugin - if p_name not in scheduler_manager.get_registered_plugins(): - await schedule_cmd.finish(f"未找到插件 '{p_name}'。") - - if target.all_groups: - _, message = await scheduler_manager.resume_schedules_by_plugin(p_name) - await schedule_cmd.finish(message) - else: - _, message = await scheduler_manager.resume_schedule_by_plugin_group( - p_name, target.group_id, bot_id=bot_id_to_operate - ) - await schedule_cmd.finish(message) - - elif isinstance(target, TargetAll): - if target.for_group: - _, message = await scheduler_manager.resume_schedules_by_group( - target.for_group - ) - await schedule_cmd.finish(message) - else: - _, message = await scheduler_manager.resume_all_schedules() - await schedule_cmd.finish(message) - - else: - await schedule_cmd.finish("请提供任务ID、使用 -p <插件> 或 -all 选项。") - - -@schedule_cmd.assign("执行") -async def _(schedule_id: int): - _, message = await scheduler_manager.trigger_now(schedule_id) - await schedule_cmd.finish(message) - - -@schedule_cmd.assign("更新") -async def _( - schedule_id: int, - cron_expr: str | None = None, - interval_expr: str | None = None, - date_expr: str | None = None, - daily_expr: str | None = None, - kwargs_str: str | None = None, -): - if not any([cron_expr, interval_expr, date_expr, daily_expr, kwargs_str]): - await schedule_cmd.finish( - "请提供需要更新的时间 (--cron/--interval/--date/--daily) 或参数 (--kwargs)" - ) - - trigger_config = None - trigger_type = None - try: - if cron_expr: - trigger_type = "cron" - parts = cron_expr.split() - if len(parts) != 5: - raise ValueError("Cron 表达式必须有5个部分") - cron_keys = ["minute", "hour", "day", "month", "day_of_week"] - trigger_config = dict(zip(cron_keys, parts)) - elif interval_expr: - trigger_type = "interval" - trigger_config = _parse_interval(interval_expr) - elif date_expr: - trigger_type = "date" - trigger_config = {"run_date": datetime.fromisoformat(date_expr)} - elif daily_expr: - trigger_type = "cron" - trigger_config = _parse_daily_time(daily_expr) - except ValueError as e: - await schedule_cmd.finish(f"时间参数解析错误: {e}") - - job_kwargs = None - if kwargs_str: - schedule = await scheduler_manager.get_schedule_by_id(schedule_id) - if not schedule: - await schedule_cmd.finish(f"未找到 ID 为 {schedule_id} 的任务。") - - task_meta = scheduler_manager._registered_tasks.get(schedule.plugin_name) - if not task_meta or not (params_model := task_meta.get("model")): - await schedule_cmd.finish( - f"插件 '{schedule.plugin_name}' 未定义参数模型,无法更新参数。" - ) - - if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): - await schedule_cmd.finish( - f"插件 '{schedule.plugin_name}' 的参数模型配置错误。" - ) - - raw_kwargs = {} - try: - for item in kwargs_str.split(","): - key, value = item.strip().split("=", 1) - raw_kwargs[key.strip()] = value - except Exception as e: - await schedule_cmd.finish( - f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" - ) - - try: - model_validate = getattr(params_model, "model_validate", None) - if not model_validate: - await schedule_cmd.finish( - f"插件 '{schedule.plugin_name}' 的参数模型不支持验证。" - ) - return - - validated_model = model_validate(raw_kwargs) - - model_dump = getattr(validated_model, "model_dump", None) - if not model_dump: - await schedule_cmd.finish( - f"插件 '{schedule.plugin_name}' 的参数模型不支持导出。" - ) - return - - job_kwargs = model_dump(exclude_unset=True) - except ValidationError as e: - errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] - error_str = "\n".join(errors) - await schedule_cmd.finish(f"更新的参数验证失败:\n{error_str}") - return - - _, message = await scheduler_manager.update_schedule( - schedule_id, trigger_type, trigger_config, job_kwargs - ) - await schedule_cmd.finish(message) - - -@schedule_cmd.assign("插件列表") -async def _(): - registered_plugins = scheduler_manager.get_registered_plugins() - if not registered_plugins: - await schedule_cmd.finish("当前没有已注册的定时任务插件。") - - message_parts = ["📋 已注册的定时任务插件:"] - for i, plugin_name in enumerate(registered_plugins, 1): - task_meta = scheduler_manager._registered_tasks[plugin_name] - params_model = task_meta.get("model") - - if not params_model: - message_parts.append(f"{i}. {plugin_name} - 无参数") - continue - - if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): - message_parts.append(f"{i}. {plugin_name} - ⚠️ 参数模型配置错误") - continue - - model_fields = getattr(params_model, "model_fields", None) - if model_fields: - param_info = ", ".join( - f"{field_name}({_get_type_name(field_info.annotation)})" - for field_name, field_info in model_fields.items() - ) - message_parts.append(f"{i}. {plugin_name} - 参数: {param_info}") - else: - message_parts.append(f"{i}. {plugin_name} - 无参数") - - await schedule_cmd.finish("\n".join(message_parts)) - - -@schedule_cmd.assign("状态") -async def _(schedule_id: int): - status = await scheduler_manager.get_schedule_status(schedule_id) - if not status: - await schedule_cmd.finish(f"未找到ID为 {schedule_id} 的定时任务。") - - info_lines = [ - f"📋 定时任务详细信息 (ID: {schedule_id})", - "--------------------", - f"▫️ 插件: {status['plugin_name']}", - f"▫️ Bot ID: {status.get('bot_id') or '默认'}", - f"▫️ 目标: {status['group_id'] or '全局'}", - f"▫️ 状态: {'✔️ 已启用' if status['is_enabled'] else '⏸️ 已暂停'}", - f"▫️ 下次运行: {status['next_run_time']}", - f"▫️ 触发规则: {_format_trigger(status)}", - f"▫️ 任务参数: {_format_params(status)}", - ] - await schedule_cmd.finish("\n".join(info_lines)) diff --git a/zhenxun/builtin_plugins/scheduler_admin/commands.py b/zhenxun/builtin_plugins/scheduler_admin/commands.py new file mode 100644 index 00000000..8a565dab --- /dev/null +++ b/zhenxun/builtin_plugins/scheduler_admin/commands.py @@ -0,0 +1,298 @@ +import re + +from nonebot.adapters import Event +from nonebot.adapters.onebot.v11 import Bot +from nonebot.params import Depends +from nonebot.permission import SUPERUSER +from nonebot_plugin_alconna import ( + Alconna, + AlconnaMatch, + Args, + Match, + Option, + Query, + Subcommand, + on_alconna, +) + +from zhenxun.configs.config import Config +from zhenxun.services.scheduler import scheduler_manager +from zhenxun.services.scheduler.targeter import ScheduleTargeter +from zhenxun.utils.rules import admin_check + +schedule_cmd = on_alconna( + Alconna( + "定时任务", + Subcommand( + "查看", + Option("-g", Args["target_group_id", str]), + Option("-all", help_text="查看所有群聊 (SUPERUSER)"), + Option("-p", Args["plugin_name", str], help_text="按插件名筛选"), + Option("--page", Args["page", int, 1], help_text="指定页码"), + alias=["ls", "list"], + help_text="查看定时任务", + ), + Subcommand( + "设置", + Args["plugin_name", str], + Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), + Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), + Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), + Option( + "--daily", + Args["daily_expr", str], + help_text="设置每天执行的时间 (如 08:20)", + ), + Option("-g", Args["group_id", str], help_text="指定群组ID或'all'"), + Option("-all", help_text="对所有群生效 (等同于 -g all)"), + Option("--kwargs", Args["kwargs_str", str], help_text="设置任务参数"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["add", "开启"], + help_text="设置/开启一个定时任务", + ), + Subcommand( + "删除", + Args["schedule_id?", int], + Option("-p", Args["plugin_name", str], help_text="指定插件名"), + Option("-g", Args["group_id", str], help_text="指定群组ID"), + Option("-all", help_text="对所有群生效"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["del", "rm", "remove", "关闭", "取消"], + help_text="删除一个或多个定时任务", + ), + Subcommand( + "暂停", + Args["schedule_id?", int], + Option("-all", help_text="对当前群所有任务生效"), + Option("-p", Args["plugin_name", str], help_text="指定插件名"), + Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["pause"], + help_text="暂停一个或多个定时任务", + ), + Subcommand( + "恢复", + Args["schedule_id?", int], + Option("-all", help_text="对当前群所有任务生效"), + Option("-p", Args["plugin_name", str], help_text="指定插件名"), + Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), + alias=["resume"], + help_text="恢复一个或多个定时任务", + ), + Subcommand( + "执行", + Args["schedule_id", int], + alias=["trigger", "run"], + help_text="立即执行一次任务", + ), + Subcommand( + "更新", + Args["schedule_id", int], + Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), + Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), + Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), + Option( + "--daily", + Args["daily_expr", str], + help_text="更新每天执行的时间 (如 08:20)", + ), + Option("--kwargs", Args["kwargs_str", str], help_text="更新参数"), + alias=["update", "modify", "修改"], + help_text="更新任务配置", + ), + Subcommand( + "状态", + Args["schedule_id", int], + alias=["status", "info"], + help_text="查看单个任务的详细状态", + ), + Subcommand( + "插件列表", + alias=["plugins"], + help_text="列出所有可用的插件", + ), + ), + priority=5, + block=True, + rule=admin_check(1), +) + +schedule_cmd.shortcut( + "任务状态", + command="定时任务", + arguments=["状态", "{%0}"], + prefix=True, +) + + +class ScheduleTarget: + pass + + +class TargetByID(ScheduleTarget): + def __init__(self, id: int): + self.id = id + + +class TargetByPlugin(ScheduleTarget): + def __init__( + self, plugin: str, group_id: str | None = None, all_groups: bool = False + ): + self.plugin = plugin + self.group_id = group_id + self.all_groups = all_groups + + +class TargetAll(ScheduleTarget): + def __init__(self, for_group: str | None = None): + self.for_group = for_group + + +TargetScope = TargetByID | TargetByPlugin | TargetAll | None + + +def create_target_parser(subcommand_name: str): + async def dependency( + event: Event, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query(f"{subcommand_name}.all"), + ) -> TargetScope: + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + return TargetByPlugin(plugin=p_name, all_groups=True) + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + current_group_id = getattr(event, "group_id", None) + return TargetByPlugin( + plugin=p_name, + group_id=str(current_group_id) if current_group_id else None, + ) + + if all_enabled.available: + current_group_id = getattr(event, "group_id", None) + if not current_group_id: + await schedule_cmd.finish( + "私聊中单独使用 -all 选项时,必须使用 -g <群号> 指定目标。" + ) + return TargetAll(for_group=str(current_group_id)) + + return None + + return dependency + + +def parse_interval(interval_str: str) -> dict: + match = re.match(r"(\d+)([smhd])", interval_str.lower()) + if not match: + raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '1d', '10s' 的格式。") + value, unit = int(match.group(1)), match.group(2) + if unit == "s": + return {"seconds": value} + if unit == "m": + return {"minutes": value} + if unit == "h": + return {"hours": value} + if unit == "d": + return {"days": value} + return {} + + +def parse_daily_time(time_str: str) -> dict: + if match := re.match(r"^(\d{1,2}):(\d{1,2})(?::(\d{1,2}))?$", time_str): + hour, minute, second = match.groups() + hour, minute = int(hour), int(minute) + if not (0 <= hour <= 23 and 0 <= minute <= 59): + raise ValueError("小时或分钟数值超出范围。") + cron_config = { + "minute": str(minute), + "hour": str(hour), + "day": "*", + "month": "*", + "day_of_week": "*", + "timezone": Config.get_config("SchedulerManager", "SCHEDULER_TIMEZONE"), + } + if second is not None: + if not (0 <= int(second) <= 59): + raise ValueError("秒数值超出范围。") + cron_config["second"] = str(second) + return cron_config + else: + raise ValueError("时间格式错误,请使用 'HH:MM' 或 'HH:MM:SS' 格式。") + + +async def GetBotId(bot: Bot, bot_id_match: Match[str] = AlconnaMatch("bot_id")) -> str: + if bot_id_match.available: + return bot_id_match.result + return bot.self_id + + +def GetTargeter(subcommand: str): + """ + 依赖注入函数,用于解析命令参数并返回一个配置好的 ScheduleTargeter 实例。 + """ + + async def dependency( + event: Event, + bot: Bot, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query(f"{subcommand}.all"), + bot_id_to_operate: str = Depends(GetBotId), + ) -> ScheduleTargeter: + if schedule_id.available: + return scheduler_manager.target(id=schedule_id.result) + + if plugin_name.available: + if all_enabled.available: + return scheduler_manager.target(plugin_name=plugin_name.result) + + current_group_id = getattr(event, "group_id", None) + gid = group_id.result if group_id.available else current_group_id + return scheduler_manager.target( + plugin_name=plugin_name.result, + group_id=str(gid) if gid else None, + bot_id=bot_id_to_operate, + ) + + if all_enabled.available: + current_group_id = getattr(event, "group_id", None) + gid = group_id.result if group_id.available else current_group_id + is_su = await SUPERUSER(bot, event) + if not gid and not is_su: + await schedule_cmd.finish( + f"在私聊中对所有任务进行'{subcommand}'操作需要超级用户权限。" + ) + + if (gid and str(gid).lower() == "all") or (not gid and is_su): + return scheduler_manager.target() + + return scheduler_manager.target( + group_id=str(gid) if gid else None, bot_id=bot_id_to_operate + ) + + await schedule_cmd.finish( + f"'{subcommand}'操作失败:请提供任务ID," + f"或通过 -p <插件名> 或 -all 指定要操作的任务。" + ) + + return Depends(dependency) diff --git a/zhenxun/builtin_plugins/scheduler_admin/handlers.py b/zhenxun/builtin_plugins/scheduler_admin/handlers.py new file mode 100644 index 00000000..839ece12 --- /dev/null +++ b/zhenxun/builtin_plugins/scheduler_admin/handlers.py @@ -0,0 +1,380 @@ +from datetime import datetime + +from nonebot.adapters import Event +from nonebot.adapters.onebot.v11 import Bot +from nonebot.params import Depends +from nonebot.permission import SUPERUSER +from nonebot_plugin_alconna import AlconnaMatch, Arparma, Match, Query +from pydantic import BaseModel, ValidationError + +from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.services.scheduler import scheduler_manager +from zhenxun.services.scheduler.targeter import ScheduleTargeter +from zhenxun.utils.message import MessageUtils + +from . import presenters +from .commands import ( + GetBotId, + GetTargeter, + parse_daily_time, + parse_interval, + schedule_cmd, +) + + +@schedule_cmd.handle() +async def _handle_time_options_mutex(arp: Arparma): + time_options = ["cron", "interval", "date", "daily"] + provided_options = [opt for opt in time_options if arp.query(opt) is not None] + if len(provided_options) > 1: + await schedule_cmd.finish( + f"时间选项 --{', --'.join(provided_options)} 不能同时使用,请只选择一个。" + ) + + +@schedule_cmd.assign("查看") +async def handle_view( + bot: Bot, + event: Event, + target_group_id: Match[str] = AlconnaMatch("target_group_id"), + all_groups: Query[bool] = Query("查看.all"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + page: Match[int] = AlconnaMatch("page"), +): + is_superuser = await SUPERUSER(bot, event) + title = "" + gid_filter = None + + current_group_id = getattr(event, "group_id", None) + if not (all_groups.available or target_group_id.available) and not current_group_id: + await schedule_cmd.finish("私聊中查看任务必须使用 -g <群号> 或 -all 选项。") + + if all_groups.available: + if not is_superuser: + await schedule_cmd.finish("需要超级用户权限才能查看所有群组的定时任务。") + title = "所有群组的定时任务" + elif target_group_id.available: + if not is_superuser: + await schedule_cmd.finish("需要超级用户权限才能查看指定群组的定时任务。") + gid_filter = target_group_id.result + title = f"群 {gid_filter} 的定时任务" + else: + gid_filter = str(current_group_id) + title = "本群的定时任务" + + p_name_filter = plugin_name.result if plugin_name.available else None + + schedules = await scheduler_manager.get_schedules( + plugin_name=p_name_filter, group_id=gid_filter + ) + + if p_name_filter: + title += f" [插件: {p_name_filter}]" + + if not schedules: + await schedule_cmd.finish("没有找到任何相关的定时任务。") + + img = await presenters.format_schedule_list_as_image( + schedules=schedules, title=title, current_page=page.result + ) + await MessageUtils.build_message(img).send(reply_to=True) + + +@schedule_cmd.assign("设置") +async def handle_set( + event: Event, + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + cron_expr: Match[str] = AlconnaMatch("cron_expr"), + interval_expr: Match[str] = AlconnaMatch("interval_expr"), + date_expr: Match[str] = AlconnaMatch("date_expr"), + daily_expr: Match[str] = AlconnaMatch("daily_expr"), + group_id: Match[str] = AlconnaMatch("group_id"), + kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), + all_enabled: Query[bool] = Query("设置.all"), + bot_id_to_operate: str = Depends(GetBotId), +): + if not plugin_name.available: + await schedule_cmd.finish("设置任务时必须提供插件名称。") + + has_time_option = any( + [ + cron_expr.available, + interval_expr.available, + date_expr.available, + daily_expr.available, + ] + ) + if not has_time_option: + await schedule_cmd.finish( + "必须提供一种时间选项: --cron, --interval, --date, 或 --daily。" + ) + + p_name = plugin_name.result + if p_name not in scheduler_manager.get_registered_plugins(): + await schedule_cmd.finish( + f"插件 '{p_name}' 没有注册可用的定时任务。\n" + f"可用插件: {list(scheduler_manager.get_registered_plugins())}" + ) + + trigger_type, trigger_config = "", {} + try: + if cron_expr.available: + trigger_type, trigger_config = ( + "cron", + dict( + zip( + ["minute", "hour", "day", "month", "day_of_week"], + cron_expr.result.split(), + ) + ), + ) + elif interval_expr.available: + trigger_type, trigger_config = ( + "interval", + parse_interval(interval_expr.result), + ) + elif date_expr.available: + trigger_type, trigger_config = ( + "date", + {"run_date": datetime.fromisoformat(date_expr.result)}, + ) + elif daily_expr.available: + trigger_type, trigger_config = "cron", parse_daily_time(daily_expr.result) + else: + await schedule_cmd.finish( + "必须提供一种时间选项: --cron, --interval, --date, 或 --daily。" + ) + except ValueError as e: + await schedule_cmd.finish(f"时间参数解析错误: {e}") + + job_kwargs = {} + if kwargs_str.available: + task_meta = scheduler_manager._registered_tasks[p_name] + params_model = task_meta.get("model") + if not ( + params_model + and isinstance(params_model, type) + and issubclass(params_model, BaseModel) + ): + await schedule_cmd.finish(f"插件 '{p_name}' 不支持或配置了无效的参数模型。") + try: + raw_kwargs = dict( + item.strip().split("=", 1) for item in kwargs_str.result.split(",") + ) + + model_validate = getattr(params_model, "model_validate", None) + if not model_validate: + await schedule_cmd.finish(f"插件 '{p_name}' 的参数模型不支持验证") + + validated_model = model_validate(raw_kwargs) + + model_dump = getattr(validated_model, "model_dump", None) + if not model_dump: + await schedule_cmd.finish(f"插件 '{p_name}' 的参数模型不支持导出") + + job_kwargs = model_dump() + except ValidationError as e: + errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] + await schedule_cmd.finish( + f"插件 '{p_name}' 的任务参数验证失败:\n" + "\n".join(errors) + ) + except Exception as e: + await schedule_cmd.finish( + f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" + ) + + gid_str = group_id.result if group_id.available else None + target_group_id = ( + scheduler_manager.ALL_GROUPS + if (gid_str and gid_str.lower() == "all") or all_enabled.available + else gid_str or getattr(event, "group_id", None) + ) + if not target_group_id: + await schedule_cmd.finish( + "私聊中设置定时任务时,必须使用 -g <群号> 或 --all 选项指定目标。" + ) + + schedule = await scheduler_manager.add_schedule( + p_name, + str(target_group_id), + trigger_type, + trigger_config, + job_kwargs, + bot_id=bot_id_to_operate, + ) + + target_desc = ( + f"所有群组 (Bot: {bot_id_to_operate})" + if target_group_id == scheduler_manager.ALL_GROUPS + else f"群组 {target_group_id}" + ) + + if schedule: + await schedule_cmd.finish( + f"为 [{target_desc}] 已成功设置插件 '{p_name}' 的定时任务 " + f"(ID: {schedule.id})。" + ) + else: + await schedule_cmd.finish(f"为 [{target_desc}] 设置任务失败。") + + +@schedule_cmd.assign("删除") +async def handle_delete(targeter: ScheduleTargeter = GetTargeter("删除")): + schedules_to_remove: list[ScheduleInfo] = await targeter._get_schedules() + if not schedules_to_remove: + await schedule_cmd.finish("没有找到可删除的任务。") + + count, _ = await targeter.remove() + + if count > 0 and schedules_to_remove: + if len(schedules_to_remove) == 1: + message = presenters.format_remove_success(schedules_to_remove[0]) + else: + target_desc = targeter._generate_target_description() + message = f"✅ 成功移除了{target_desc} {count} 个任务。" + else: + message = "没有任务被移除。" + await schedule_cmd.finish(message) + + +@schedule_cmd.assign("暂停") +async def handle_pause(targeter: ScheduleTargeter = GetTargeter("暂停")): + schedules_to_pause: list[ScheduleInfo] = await targeter._get_schedules() + if not schedules_to_pause: + await schedule_cmd.finish("没有找到可暂停的任务。") + + count, _ = await targeter.pause() + + if count > 0 and schedules_to_pause: + if len(schedules_to_pause) == 1: + message = presenters.format_pause_success(schedules_to_pause[0]) + else: + target_desc = targeter._generate_target_description() + message = f"✅ 成功暂停了{target_desc} {count} 个任务。" + else: + message = "没有任务被暂停。" + await schedule_cmd.finish(message) + + +@schedule_cmd.assign("恢复") +async def handle_resume(targeter: ScheduleTargeter = GetTargeter("恢复")): + schedules_to_resume: list[ScheduleInfo] = await targeter._get_schedules() + if not schedules_to_resume: + await schedule_cmd.finish("没有找到可恢复的任务。") + + count, _ = await targeter.resume() + + if count > 0 and schedules_to_resume: + if len(schedules_to_resume) == 1: + message = presenters.format_resume_success(schedules_to_resume[0]) + else: + target_desc = targeter._generate_target_description() + message = f"✅ 成功恢复了{target_desc} {count} 个任务。" + else: + message = "没有任务被恢复。" + await schedule_cmd.finish(message) + + +@schedule_cmd.assign("执行") +async def handle_trigger(schedule_id: Match[int] = AlconnaMatch("schedule_id")): + from zhenxun.services.scheduler.repository import ScheduleRepository + + schedule_info = await ScheduleRepository.get_by_id(schedule_id.result) + if not schedule_info: + await schedule_cmd.finish(f"未找到 ID 为 {schedule_id.result} 的任务。") + + success, message = await scheduler_manager.trigger_now(schedule_id.result) + + if success: + final_message = presenters.format_trigger_success(schedule_info) + else: + final_message = f"❌ 手动触发失败: {message}" + await schedule_cmd.finish(final_message) + + +@schedule_cmd.assign("更新") +async def handle_update( + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + cron_expr: Match[str] = AlconnaMatch("cron_expr"), + interval_expr: Match[str] = AlconnaMatch("interval_expr"), + date_expr: Match[str] = AlconnaMatch("date_expr"), + daily_expr: Match[str] = AlconnaMatch("daily_expr"), + kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), +): + if not any( + [ + cron_expr.available, + interval_expr.available, + date_expr.available, + daily_expr.available, + kwargs_str.available, + ] + ): + await schedule_cmd.finish( + "请提供需要更新的时间 (--cron/--interval/--date/--daily) 或参数 (--kwargs)" + ) + + trigger_type, trigger_config, job_kwargs = None, None, None + try: + if cron_expr.available: + trigger_type, trigger_config = ( + "cron", + dict( + zip( + ["minute", "hour", "day", "month", "day_of_week"], + cron_expr.result.split(), + ) + ), + ) + elif interval_expr.available: + trigger_type, trigger_config = ( + "interval", + parse_interval(interval_expr.result), + ) + elif date_expr.available: + trigger_type, trigger_config = ( + "date", + {"run_date": datetime.fromisoformat(date_expr.result)}, + ) + elif daily_expr.available: + trigger_type, trigger_config = "cron", parse_daily_time(daily_expr.result) + except ValueError as e: + await schedule_cmd.finish(f"时间参数解析错误: {e}") + + if kwargs_str.available: + job_kwargs = dict( + item.strip().split("=", 1) for item in kwargs_str.result.split(",") + ) + + success, message = await scheduler_manager.update_schedule( + schedule_id.result, trigger_type, trigger_config, job_kwargs + ) + + if success: + from zhenxun.services.scheduler.repository import ScheduleRepository + + updated_schedule = await ScheduleRepository.get_by_id(schedule_id.result) + if updated_schedule: + final_message = presenters.format_update_success(updated_schedule) + else: + final_message = "✅ 更新成功,但无法获取更新后的任务详情。" + else: + final_message = f"❌ 更新失败: {message}" + + await schedule_cmd.finish(final_message) + + +@schedule_cmd.assign("插件列表") +async def handle_plugins_list(): + message = await presenters.format_plugins_list() + await schedule_cmd.finish(message) + + +@schedule_cmd.assign("状态") +async def handle_status(schedule_id: Match[int] = AlconnaMatch("schedule_id")): + status = await scheduler_manager.get_schedule_status(schedule_id.result) + if not status: + await schedule_cmd.finish(f"未找到ID为 {schedule_id.result} 的定时任务。") + + message = presenters.format_single_status_message(status) + await schedule_cmd.finish(message) diff --git a/zhenxun/builtin_plugins/scheduler_admin/presenters.py b/zhenxun/builtin_plugins/scheduler_admin/presenters.py new file mode 100644 index 00000000..ef6785bd --- /dev/null +++ b/zhenxun/builtin_plugins/scheduler_admin/presenters.py @@ -0,0 +1,274 @@ +import asyncio + +from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.services.scheduler import scheduler_manager +from zhenxun.utils._image_template import ImageTemplate, RowStyle + + +def _get_type_name(annotation) -> str: + """获取类型注解的名称""" + if hasattr(annotation, "__name__"): + return annotation.__name__ + elif hasattr(annotation, "_name"): + return annotation._name + else: + return str(annotation) + + +def _format_trigger(schedule: dict) -> str: + """格式化触发器信息为可读字符串""" + trigger_type = schedule.get("trigger_type") + config = schedule.get("trigger_config") + + if not isinstance(config, dict): + return f"配置错误: {config}" + + if trigger_type == "cron": + hour = config.get("hour", "??") + minute = config.get("minute", "??") + try: + hour_int = int(hour) + minute_int = int(minute) + return f"每天 {hour_int:02d}:{minute_int:02d}" + except (ValueError, TypeError): + return f"每天 {hour}:{minute}" + elif trigger_type == "interval": + units = { + "weeks": "周", + "days": "天", + "hours": "小时", + "minutes": "分钟", + "seconds": "秒", + } + for unit, unit_name in units.items(): + if value := config.get(unit): + return f"每 {value} {unit_name}" + return "未知间隔" + elif trigger_type == "date": + run_date = config.get("run_date", "N/A") + return f"特定时间 {run_date}" + else: + return f"未知触发器类型: {trigger_type}" + + +def _format_trigger_for_card(schedule_info: ScheduleInfo | dict) -> str: + """为信息卡片格式化触发器规则""" + trigger_type = ( + schedule_info.get("trigger_type") + if isinstance(schedule_info, dict) + else schedule_info.trigger_type + ) + config = ( + schedule_info.get("trigger_config") + if isinstance(schedule_info, dict) + else schedule_info.trigger_config + ) + + if not isinstance(config, dict): + return f"配置错误: {config}" + + if trigger_type == "cron": + hour = config.get("hour", "??") + minute = config.get("minute", "??") + try: + hour_int = int(hour) + minute_int = int(minute) + return f"每天 {hour_int:02d}:{minute_int:02d}" + except (ValueError, TypeError): + return f"每天 {hour}:{minute}" + elif trigger_type == "interval": + units = { + "weeks": "周", + "days": "天", + "hours": "小时", + "minutes": "分钟", + "seconds": "秒", + } + for unit, unit_name in units.items(): + if value := config.get(unit): + return f"每 {value} {unit_name}" + return "未知间隔" + elif trigger_type == "date": + run_date = config.get("run_date", "N/A") + return f"特定时间 {run_date}" + else: + return f"未知规则: {trigger_type}" + + +def _format_operation_result_card( + title: str, schedule_info: ScheduleInfo, extra_info: list[str] | None = None +) -> str: + """ + 生成一个标准的操作结果信息卡片。 + + 参数: + title: 卡片的标题 (例如 "✅ 成功暂停定时任务!") + schedule_info: 相关的 ScheduleInfo 对象 + extra_info: (可选) 额外的补充信息行 + """ + target_desc = ( + f"群组 {schedule_info.group_id}" + if schedule_info.group_id + and schedule_info.group_id != scheduler_manager.ALL_GROUPS + else "所有群组" + if schedule_info.group_id == scheduler_manager.ALL_GROUPS + else "全局" + ) + + info_lines = [ + title, + f"✓ 任务 ID: {schedule_info.id}", + f"🖋 插件: {schedule_info.plugin_name}", + f"🎯 目标: {target_desc}", + f"⏰ 时间: {_format_trigger_for_card(schedule_info)}", + ] + if extra_info: + info_lines.extend(extra_info) + + return "\n".join(info_lines) + + +def format_pause_success(schedule_info: ScheduleInfo) -> str: + """格式化暂停成功的消息""" + return _format_operation_result_card("✅ 成功暂停定时任务!", schedule_info) + + +def format_resume_success(schedule_info: ScheduleInfo) -> str: + """格式化恢复成功的消息""" + return _format_operation_result_card("▶️ 成功恢复定时任务!", schedule_info) + + +def format_remove_success(schedule_info: ScheduleInfo) -> str: + """格式化删除成功的消息""" + return _format_operation_result_card("❌ 成功删除定时任务!", schedule_info) + + +def format_trigger_success(schedule_info: ScheduleInfo) -> str: + """格式化手动触发成功的消息""" + return _format_operation_result_card("🚀 成功手动触发定时任务!", schedule_info) + + +def format_update_success(schedule_info: ScheduleInfo) -> str: + """格式化更新成功的消息""" + return _format_operation_result_card("🔄️ 成功更新定时任务配置!", schedule_info) + + +def _status_row_style(column: str, text: str) -> RowStyle: + """为状态列设置颜色""" + style = RowStyle() + if column == "状态": + if text == "启用": + style.font_color = "#67C23A" + elif text == "暂停": + style.font_color = "#F56C6C" + elif text == "运行中": + style.font_color = "#409EFF" + return style + + +def _format_params(schedule_status: dict) -> str: + """将任务参数格式化为人类可读的字符串""" + if kwargs := schedule_status.get("job_kwargs"): + return " | ".join(f"{k}: {v}" for k, v in kwargs.items()) + return "-" + + +async def format_schedule_list_as_image( + schedules: list[ScheduleInfo], title: str, current_page: int +): + """将任务列表格式化为图片""" + page_size = 15 + total_items = len(schedules) + total_pages = (total_items + page_size - 1) // page_size + start_index = (current_page - 1) * page_size + end_index = start_index + page_size + paginated_schedules = schedules[start_index:end_index] + + if not paginated_schedules: + return "这一页没有内容了哦~" + + status_tasks = [ + scheduler_manager.get_schedule_status(s.id) for s in paginated_schedules + ] + all_statuses = await asyncio.gather(*status_tasks) + + def get_status_text(status_value): + if isinstance(status_value, bool): + return "启用" if status_value else "暂停" + return str(status_value) + + data_list = [ + [ + s["id"], + s["plugin_name"], + s.get("bot_id") or "N/A", + s["group_id"] or "全局", + s["next_run_time"], + _format_trigger(s), + _format_params(s), + get_status_text(s["is_enabled"]), + ] + for s in all_statuses + if s + ] + + if not data_list: + return "没有找到任何相关的定时任务。" + + return await ImageTemplate.table_page( + head_text=title, + tip_text=f"第 {current_page}/{total_pages} 页,共 {total_items} 条任务", + column_name=["ID", "插件", "Bot", "目标", "下次运行", "规则", "参数", "状态"], + data_list=data_list, + column_space=20, + text_style=_status_row_style, + ) + + +def format_single_status_message(status: dict) -> str: + """格式化单个任务状态为文本消息""" + info_lines = [ + f"📋 定时任务详细信息 (ID: {status['id']})", + "--------------------", + f"▫️ 插件: {status['plugin_name']}", + f"▫️ Bot ID: {status.get('bot_id') or '默认'}", + f"▫️ 目标: {status['group_id'] or '全局'}", + f"▫️ 状态: {'✔️ 已启用' if status['is_enabled'] else '⏸️ 已暂停'}", + f"▫️ 下次运行: {status['next_run_time']}", + f"▫️ 触发规则: {_format_trigger(status)}", + f"▫️ 任务参数: {_format_params(status)}", + ] + return "\n".join(info_lines) + + +async def format_plugins_list() -> str: + """格式化可用插件列表为文本消息""" + from pydantic import BaseModel + + registered_plugins = scheduler_manager.get_registered_plugins() + if not registered_plugins: + return "当前没有已注册的定时任务插件。" + + message_parts = ["📋 已注册的定时任务插件:"] + for i, plugin_name in enumerate(registered_plugins, 1): + task_meta = scheduler_manager._registered_tasks[plugin_name] + params_model = task_meta.get("model") + + param_info_str = "无参数" + if ( + params_model + and isinstance(params_model, type) + and issubclass(params_model, BaseModel) + ): + model_fields = getattr(params_model, "model_fields", None) + if model_fields: + param_info_str = "参数: " + ", ".join( + f"{field_name}({_get_type_name(field_info.annotation)})" + for field_name, field_info in model_fields.items() + ) + elif params_model: + param_info_str = "⚠️ 参数模型配置错误" + + message_parts.append(f"{i}. {plugin_name} - {param_info_str}") + + return "\n".join(message_parts) diff --git a/zhenxun/services/__init__.py b/zhenxun/services/__init__.py index 14ae227f..6af390a8 100644 --- a/zhenxun/services/__init__.py +++ b/zhenxun/services/__init__.py @@ -1,3 +1,14 @@ +""" +Zhenxun Bot - 核心服务模块 + +主要服务包括: +- 数据库上下文 (db_context): 提供数据库模型基类和连接管理。 +- 日志服务 (log): 提供增强的、带上下文的日志记录器。 +- LLM服务 (llm): 提供与大语言模型交互的统一API。 +- 插件生命周期管理 (plugin_init): 支持插件安装和卸载时的钩子函数。 +- 定时任务调度器 (scheduler): 提供持久化的、可管理的定时任务服务。 +""" + from nonebot import require require("nonebot_plugin_apscheduler") @@ -6,3 +17,33 @@ require("nonebot_plugin_session") require("nonebot_plugin_htmlrender") require("nonebot_plugin_uninfo") require("nonebot_plugin_waiter") + +from .db_context import Model, disconnect +from .llm import ( + AI, + LLMContentPart, + LLMException, + LLMMessage, + get_model_instance, + list_available_models, + tool_registry, +) +from .log import logger +from .plugin_init import PluginInit, PluginInitManager +from .scheduler import scheduler_manager + +__all__ = [ + "AI", + "LLMContentPart", + "LLMException", + "LLMMessage", + "Model", + "PluginInit", + "PluginInitManager", + "disconnect", + "get_model_instance", + "list_available_models", + "logger", + "scheduler_manager", + "tool_registry", +] diff --git a/zhenxun/services/scheduler/__init__.py b/zhenxun/services/scheduler/__init__.py new file mode 100644 index 00000000..603339fd --- /dev/null +++ b/zhenxun/services/scheduler/__init__.py @@ -0,0 +1,12 @@ +""" +定时调度服务模块 + +提供一个统一的、持久化的定时任务管理器,供所有插件使用。 +""" + +from .lifecycle import _load_schedules_from_db +from .service import scheduler_manager + +_ = _load_schedules_from_db + +__all__ = ["scheduler_manager"] diff --git a/zhenxun/services/scheduler/adapter.py b/zhenxun/services/scheduler/adapter.py new file mode 100644 index 00000000..104a0457 --- /dev/null +++ b/zhenxun/services/scheduler/adapter.py @@ -0,0 +1,102 @@ +""" +引擎适配层 (Adapter) + +封装所有对具体调度器引擎 (APScheduler) 的操作, +使上层服务与调度器实现解耦。 +""" + +from nonebot_plugin_apscheduler import scheduler + +from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.services.log import logger + +from .job import _execute_job + +JOB_PREFIX = "zhenxun_schedule_" + + +class APSchedulerAdapter: + """封装对 APScheduler 的操作""" + + @staticmethod + def _get_job_id(schedule_id: int) -> str: + """生成 APScheduler 的 Job ID""" + return f"{JOB_PREFIX}{schedule_id}" + + @staticmethod + def add_or_reschedule_job(schedule: ScheduleInfo): + """根据 ScheduleInfo 添加或重新调度一个 APScheduler 任务""" + job_id = APSchedulerAdapter._get_job_id(schedule.id) + + if not isinstance(schedule.trigger_config, dict): + logger.error( + f"任务 {schedule.id} 的 trigger_config 不是字典类型: " + f"{type(schedule.trigger_config)}" + ) + return + + job = scheduler.get_job(job_id) + if job: + scheduler.reschedule_job( + job_id, trigger=schedule.trigger_type, **schedule.trigger_config + ) + logger.debug(f"已更新APScheduler任务: {job_id}") + else: + scheduler.add_job( + _execute_job, + trigger=schedule.trigger_type, + id=job_id, + misfire_grace_time=300, + args=[schedule.id], + **schedule.trigger_config, + ) + logger.debug(f"已添加新的APScheduler任务: {job_id}") + + @staticmethod + def remove_job(schedule_id: int): + """移除一个 APScheduler 任务""" + job_id = APSchedulerAdapter._get_job_id(schedule_id) + try: + scheduler.remove_job(job_id) + logger.debug(f"已从APScheduler中移除任务: {job_id}") + except Exception: + pass + + @staticmethod + def pause_job(schedule_id: int): + """暂停一个 APScheduler 任务""" + job_id = APSchedulerAdapter._get_job_id(schedule_id) + try: + scheduler.pause_job(job_id) + except Exception: + pass + + @staticmethod + def resume_job(schedule_id: int): + """恢复一个 APScheduler 任务""" + job_id = APSchedulerAdapter._get_job_id(schedule_id) + try: + scheduler.resume_job(job_id) + except Exception: + import asyncio + + from .repository import ScheduleRepository + + async def _re_add_job(): + schedule = await ScheduleRepository.get_by_id(schedule_id) + if schedule: + APSchedulerAdapter.add_or_reschedule_job(schedule) + + asyncio.create_task(_re_add_job()) # noqa: RUF006 + + @staticmethod + def get_job_status(schedule_id: int) -> dict: + """获取 APScheduler Job 的状态""" + job_id = APSchedulerAdapter._get_job_id(schedule_id) + job = scheduler.get_job(job_id) + return { + "next_run_time": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") + if job and job.next_run_time + else "N/A", + "is_paused_in_scheduler": not bool(job.next_run_time) if job else "N/A", + } diff --git a/zhenxun/services/scheduler/job.py b/zhenxun/services/scheduler/job.py new file mode 100644 index 00000000..dd7abdfc --- /dev/null +++ b/zhenxun/services/scheduler/job.py @@ -0,0 +1,192 @@ +""" +定时任务的执行逻辑 + +包含被 APScheduler 实际调度的函数,以及处理不同目标(单个、所有群组)的执行策略。 +""" + +import asyncio +import copy +import inspect +import random + +import nonebot + +from zhenxun.configs.config import Config +from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.services.log import logger +from zhenxun.utils.common_utils import CommonUtils +from zhenxun.utils.decorator.retry import Retry +from zhenxun.utils.platform import PlatformUtils + +SCHEDULE_CONCURRENCY_KEY = "all_groups_concurrency_limit" + + +async def _execute_job(schedule_id: int): + """ + APScheduler 调度的入口函数。 + 根据 schedule_id 处理特定任务、所有群组任务或全局任务。 + """ + from .repository import ScheduleRepository + from .service import scheduler_manager + + scheduler_manager._running_tasks.add(schedule_id) + try: + schedule = await ScheduleRepository.get_by_id(schedule_id) + if not schedule or not schedule.is_enabled: + logger.warning(f"定时任务 {schedule_id} 不存在或已禁用,跳过执行。") + return + + plugin_name = schedule.plugin_name + + task_meta = scheduler_manager._registered_tasks.get(plugin_name) + if not task_meta: + logger.error( + f"无法执行定时任务:插件 '{plugin_name}' 未注册或已卸载。将禁用该任务。" + ) + schedule.is_enabled = False + await ScheduleRepository.save(schedule, update_fields=["is_enabled"]) + from .adapter import APSchedulerAdapter + + APSchedulerAdapter.remove_job(schedule.id) + return + + try: + if schedule.bot_id: + bot = nonebot.get_bot(schedule.bot_id) + else: + bot = nonebot.get_bot() + logger.debug( + f"任务 {schedule_id} 未关联特定Bot,使用默认Bot {bot.self_id}" + ) + except KeyError: + logger.warning( + f"定时任务 {schedule_id} 需要的 Bot {schedule.bot_id} " + f"不在线,本次执行跳过。" + ) + return + except ValueError: + logger.warning(f"当前没有Bot在线,定时任务 {schedule_id} 跳过。") + return + + if schedule.group_id == scheduler_manager.ALL_GROUPS: + await _execute_for_all_groups(schedule, task_meta, bot) + else: + await _execute_for_single_target(schedule, task_meta, bot) + finally: + scheduler_manager._running_tasks.discard(schedule_id) + + +async def _execute_for_all_groups(schedule: ScheduleInfo, task_meta: dict, bot): + """为所有群组执行任务,并处理优先级覆盖。""" + plugin_name = schedule.plugin_name + + concurrency_limit = Config.get_config( + "SchedulerManager", SCHEDULE_CONCURRENCY_KEY, 5 + ) + if not isinstance(concurrency_limit, int) or concurrency_limit <= 0: + logger.warning( + f"无效的定时任务并发限制配置 '{concurrency_limit}',将使用默认值 5。" + ) + concurrency_limit = 5 + + logger.info( + f"开始执行针对 [所有群组] 的任务 " + f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})," + f"并发限制: {concurrency_limit}" + ) + + all_gids = set() + try: + group_list, _ = await PlatformUtils.get_group_list(bot) + all_gids.update( + g.group_id for g in group_list if g.group_id and not g.channel_id + ) + except Exception as e: + logger.error(f"为 'all' 任务获取 Bot {bot.self_id} 的群列表失败", e=e) + return + + specific_tasks_gids = set( + await ScheduleInfo.filter( + plugin_name=plugin_name, group_id__in=list(all_gids) + ).values_list("group_id", flat=True) + ) + + semaphore = asyncio.Semaphore(concurrency_limit) + + async def worker(gid: str): + """使用 Semaphore 包装单个群组的任务执行""" + await asyncio.sleep(random.uniform(0, 59)) + async with semaphore: + temp_schedule = copy.deepcopy(schedule) + temp_schedule.group_id = gid + await _execute_for_single_target(temp_schedule, task_meta, bot) + await asyncio.sleep(random.uniform(0.1, 0.5)) + + tasks_to_run = [] + for gid in all_gids: + if gid in specific_tasks_gids: + logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。") + continue + tasks_to_run.append(worker(gid)) + + if tasks_to_run: + await asyncio.gather(*tasks_to_run) + + +async def _execute_for_single_target(schedule: ScheduleInfo, task_meta: dict, bot): + """为单个目标(具体群组或全局)执行任务。""" + + plugin_name = schedule.plugin_name + group_id = schedule.group_id + + try: + is_blocked = await CommonUtils.task_is_block(bot, plugin_name, group_id) + if is_blocked: + target_desc = f"群 {group_id}" if group_id else "全局" + logger.info( + f"插件 '{plugin_name}' 的定时任务在目标 [{target_desc}]" + "因功能被禁用而跳过执行。" + ) + return + + max_retries = Config.get_config("SchedulerManager", "JOB_MAX_RETRIES", 2) + retry_delay = Config.get_config("SchedulerManager", "JOB_RETRY_DELAY", 10) + + @Retry.simple( + stop_max_attempt=max_retries + 1, + wait_fixed_seconds=retry_delay, + log_name=f"定时任务执行:{schedule.plugin_name}", + ) + async def _execute_task_with_retry(): + task_func = task_meta["func"] + job_kwargs = schedule.job_kwargs + if not isinstance(job_kwargs, dict): + logger.error( + f"任务 {schedule.id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}" + ) + return + + sig = inspect.signature(task_func) + if "bot" in sig.parameters: + job_kwargs["bot"] = bot + + await task_func(group_id, **job_kwargs) + + try: + logger.info( + f"插件 '{schedule.plugin_name}' 开始为目标 " + f"[{schedule.group_id or '全局'}] 执行定时任务 (ID: {schedule.id})。" + ) + await _execute_task_with_retry() + except Exception as e: + logger.error( + f"执行定时任务 (ID: {schedule.id}, 插件: {schedule.plugin_name}, " + f"目标: {schedule.group_id or '全局'}) 在所有重试后最终失败", + e=e, + ) + except Exception as e: + logger.error( + f"执行定时任务 (ID: {schedule.id}, 插件: {plugin_name}, " + f"目标: {group_id or '全局'}) 时发生异常", + e=e, + ) diff --git a/zhenxun/services/scheduler/lifecycle.py b/zhenxun/services/scheduler/lifecycle.py new file mode 100644 index 00000000..19f6c627 --- /dev/null +++ b/zhenxun/services/scheduler/lifecycle.py @@ -0,0 +1,62 @@ +""" +定时任务的生命周期管理 + +包含在机器人启动时加载和调度数据库中保存的任务的逻辑。 +""" + +from zhenxun.services.log import logger +from zhenxun.utils.manager.priority_manager import PriorityLifecycle + +from .adapter import APSchedulerAdapter +from .repository import ScheduleRepository +from .service import scheduler_manager + + +@PriorityLifecycle.on_startup(priority=90) +async def _load_schedules_from_db(): + """在服务启动时从数据库加载并调度所有任务。""" + logger.info("正在从数据库加载并调度所有定时任务...") + schedules = await ScheduleRepository.get_all_enabled() + count = 0 + for schedule in schedules: + if schedule.plugin_name in scheduler_manager._registered_tasks: + APSchedulerAdapter.add_or_reschedule_job(schedule) + count += 1 + else: + logger.warning(f"跳过加载定时任务:插件 '{schedule.plugin_name}' 未注册。") + logger.info(f"数据库定时任务加载完成,共成功加载 {count} 个任务。") + + logger.info("正在检查并注册声明式默认任务...") + declared_count = 0 + for task_info in scheduler_manager._declared_tasks: + plugin_name = task_info["plugin_name"] + group_id = task_info["group_id"] + bot_id = task_info["bot_id"] + + query_kwargs = { + "plugin_name": plugin_name, + "group_id": group_id, + "bot_id": bot_id, + } + exists = await ScheduleRepository.exists(**query_kwargs) + + if not exists: + logger.info(f"为插件 '{plugin_name}' 注册新的默认定时任务...") + schedule = await scheduler_manager.add_schedule( + plugin_name=plugin_name, + group_id=group_id, + trigger_type=task_info["trigger_type"], + trigger_config=task_info["trigger_config"], + job_kwargs=task_info["job_kwargs"], + bot_id=bot_id, + ) + if schedule: + declared_count += 1 + logger.debug(f"默认任务 '{plugin_name}' 注册成功 (ID: {schedule.id})") + else: + logger.error(f"默认任务 '{plugin_name}' 注册失败") + else: + logger.debug(f"插件 '{plugin_name}' 的默认任务已存在于数据库中,跳过注册。") + + if declared_count > 0: + logger.info(f"声明式任务检查完成,新注册了 {declared_count} 个默认任务。") diff --git a/zhenxun/services/scheduler/repository.py b/zhenxun/services/scheduler/repository.py new file mode 100644 index 00000000..7e168db9 --- /dev/null +++ b/zhenxun/services/scheduler/repository.py @@ -0,0 +1,79 @@ +""" +数据持久层 (Repository) + +封装所有对 ScheduleInfo 模型的数据库操作,将数据访问逻辑与业务逻辑分离。 +""" + +from typing import Any + +from tortoise.queryset import QuerySet + +from zhenxun.models.schedule_info import ScheduleInfo + + +class ScheduleRepository: + """封装 ScheduleInfo 模型的数据库操作""" + + @staticmethod + async def get_by_id(schedule_id: int) -> ScheduleInfo | None: + """通过ID获取任务""" + return await ScheduleInfo.get_or_none(id=schedule_id) + + @staticmethod + async def get_all_enabled() -> list[ScheduleInfo]: + """获取所有启用的任务""" + return await ScheduleInfo.filter(is_enabled=True).all() + + @staticmethod + async def get_all(plugin_name: str | None = None) -> list[ScheduleInfo]: + """获取所有任务,可按插件名过滤""" + if plugin_name: + return await ScheduleInfo.filter(plugin_name=plugin_name).all() + return await ScheduleInfo.all() + + @staticmethod + async def save(schedule: ScheduleInfo, update_fields: list[str] | None = None): + """保存任务""" + await schedule.save(update_fields=update_fields) + + @staticmethod + async def exists(**kwargs: Any) -> bool: + """检查任务是否存在""" + return await ScheduleInfo.exists(**kwargs) + + @staticmethod + async def get_by_plugin_and_group( + plugin_name: str, group_ids: list[str] + ) -> list[ScheduleInfo]: + """根据插件和群组ID列表获取任务""" + return await ScheduleInfo.filter( + plugin_name=plugin_name, group_id__in=group_ids + ).all() + + @staticmethod + async def update_or_create( + defaults: dict, **kwargs: Any + ) -> tuple[ScheduleInfo, bool]: + """更新或创建任务""" + return await ScheduleInfo.update_or_create(defaults=defaults, **kwargs) + + @staticmethod + async def query_schedules(**filters: Any) -> list[ScheduleInfo]: + """ + 根据任意条件查询任务列表 + + 参数: + **filters: 过滤条件,如 group_id="123", plugin_name="abc" + + 返回: + list[ScheduleInfo]: 任务列表 + """ + cleaned_filters = {k: v for k, v in filters.items() if v is not None} + if not cleaned_filters: + return await ScheduleInfo.all() + return await ScheduleInfo.filter(**cleaned_filters).all() + + @staticmethod + def filter(**kwargs: Any) -> QuerySet[ScheduleInfo]: + """提供一个通用的过滤查询接口,供Targeter使用""" + return ScheduleInfo.filter(**kwargs) diff --git a/zhenxun/services/scheduler/service.py b/zhenxun/services/scheduler/service.py new file mode 100644 index 00000000..4ed98c12 --- /dev/null +++ b/zhenxun/services/scheduler/service.py @@ -0,0 +1,448 @@ +""" +服务层 (Service) + +定义 SchedulerManager 类作为定时任务服务的公共 API 入口。 +它负责编排业务逻辑,并调用 Repository 和 Adapter 层来完成具体工作。 +""" + +from collections.abc import Callable, Coroutine +from datetime import datetime +from typing import Any, ClassVar + +import nonebot +from pydantic import BaseModel + +from zhenxun.configs.config import Config +from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.services.log import logger + +from .adapter import APSchedulerAdapter +from .job import _execute_job +from .repository import ScheduleRepository +from .targeter import ScheduleTargeter + + +class SchedulerManager: + ALL_GROUPS: ClassVar[str] = "__ALL_GROUPS__" + _registered_tasks: ClassVar[ + dict[str, dict[str, Callable | type[BaseModel] | None]] + ] = {} + _declared_tasks: ClassVar[list[dict[str, Any]]] = [] + _running_tasks: ClassVar[set] = set() + + def target(self, **filters: Any) -> ScheduleTargeter: + """ + 创建目标选择器以执行批量操作 + + 参数: + **filters: 过滤条件,支持plugin_name、group_id、bot_id等字段。 + + 返回: + ScheduleTargeter: 目标选择器对象,可用于批量操作。 + """ + return ScheduleTargeter(self, **filters) + + def task( + self, + trigger: str, + group_id: str | None = None, + bot_id: str | None = None, + **trigger_kwargs, + ): + """ + 声明式定时任务装饰器 + + 参数: + trigger: 触发器类型,如'cron'、'interval'等。 + group_id: 目标群组ID,None表示全局任务。 + bot_id: 目标Bot ID,None表示使用默认Bot。 + **trigger_kwargs: 触发器配置参数。 + + 返回: + Callable: 装饰器函数。 + """ + + def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]: + try: + plugin = nonebot.get_plugin_by_module_name(func.__module__) + if not plugin: + raise ValueError(f"函数 {func.__name__} 不在任何已加载的插件中。") + plugin_name = plugin.name + + task_declaration = { + "plugin_name": plugin_name, + "func": func, + "group_id": group_id, + "bot_id": bot_id, + "trigger_type": trigger, + "trigger_config": trigger_kwargs, + "job_kwargs": {}, + } + self._declared_tasks.append(task_declaration) + logger.debug( + f"发现声明式定时任务 '{plugin_name}',将在启动时进行注册。" + ) + except Exception as e: + logger.error(f"注册声明式定时任务失败: {func.__name__}, 错误: {e}") + + return func + + return decorator + + def register( + self, plugin_name: str, params_model: type[BaseModel] | None = None + ) -> Callable: + """ + 注册可调度的任务函数 + + 参数: + plugin_name: 插件名称,用于标识任务。 + params_model: 参数验证模型,继承自BaseModel的类。 + + 返回: + Callable: 装饰器函数。 + """ + + def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]: + if plugin_name in self._registered_tasks: + logger.warning(f"插件 '{plugin_name}' 的定时任务已被重复注册。") + self._registered_tasks[plugin_name] = { + "func": func, + "model": params_model, + } + model_name = params_model.__name__ if params_model else "无" + logger.debug( + f"插件 '{plugin_name}' 的定时任务已注册,参数模型: {model_name}" + ) + return func + + return decorator + + def get_registered_plugins(self) -> list[str]: + """ + 获取已注册插件列表 + + 返回: + list[str]: 已注册的插件名称列表。 + """ + return list(self._registered_tasks.keys()) + + async def add_daily_task( + self, + plugin_name: str, + group_id: str | None, + hour: int, + minute: int, + second: int = 0, + job_kwargs: dict | None = None, + bot_id: str | None = None, + ) -> "ScheduleInfo | None": + """ + 添加每日定时任务 + + 参数: + plugin_name: 插件名称。 + group_id: 目标群组ID,None表示全局任务。 + hour: 执行小时(0-23)。 + minute: 执行分钟(0-59)。 + second: 执行秒数(0-59),默认为0。 + job_kwargs: 任务参数字典。 + bot_id: 目标Bot ID,None表示使用默认Bot。 + + 返回: + ScheduleInfo | None: 创建的任务信息,失败时返回None。 + """ + trigger_config = { + "hour": hour, + "minute": minute, + "second": second, + "timezone": Config.get_config("SchedulerManager", "SCHEDULER_TIMEZONE"), + } + return await self.add_schedule( + plugin_name, + group_id, + "cron", + trigger_config, + job_kwargs=job_kwargs, + bot_id=bot_id, + ) + + async def add_interval_task( + self, + plugin_name: str, + group_id: str | None, + *, + weeks: int = 0, + days: int = 0, + hours: int = 0, + minutes: int = 0, + seconds: int = 0, + start_date: str | datetime | None = None, + job_kwargs: dict | None = None, + bot_id: str | None = None, + ) -> "ScheduleInfo | None": + """添加间隔性定时任务""" + trigger_config = { + "weeks": weeks, + "days": days, + "hours": hours, + "minutes": minutes, + "seconds": seconds, + "start_date": start_date, + } + trigger_config = {k: v for k, v in trigger_config.items() if v} + return await self.add_schedule( + plugin_name, + group_id, + "interval", + trigger_config, + job_kwargs=job_kwargs, + bot_id=bot_id, + ) + + def _validate_and_prepare_kwargs( + self, plugin_name: str, job_kwargs: dict | None + ) -> tuple[bool, str | dict]: + """验证并准备任务参数,应用默认值""" + from pydantic import ValidationError + + task_meta = self._registered_tasks.get(plugin_name) + if not task_meta: + return False, f"插件 '{plugin_name}' 未注册。" + + params_model = task_meta.get("model") + job_kwargs = job_kwargs if job_kwargs is not None else {} + + if not params_model: + if job_kwargs: + logger.warning( + f"插件 '{plugin_name}' 未定义参数模型,但收到了参数: {job_kwargs}" + ) + return True, job_kwargs + + if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): + logger.error(f"插件 '{plugin_name}' 的参数模型不是有效的 BaseModel 类") + return False, f"插件 '{plugin_name}' 的参数模型配置错误" + + try: + model_validate = getattr(params_model, "model_validate", None) + if not model_validate: + return False, f"插件 '{plugin_name}' 的参数模型不支持验证" + + validated_model = model_validate(job_kwargs) + + model_dump = getattr(validated_model, "model_dump", None) + if not model_dump: + return False, f"插件 '{plugin_name}' 的参数模型不支持导出" + + return True, model_dump() + except ValidationError as e: + errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] + error_str = "\n".join(errors) + msg = f"插件 '{plugin_name}' 的任务参数验证失败:\n{error_str}" + return False, msg + + async def add_schedule( + self, + plugin_name: str, + group_id: str | None, + trigger_type: str, + trigger_config: dict, + job_kwargs: dict | None = None, + bot_id: str | None = None, + ) -> "ScheduleInfo | None": + """ + 添加定时任务(通用方法) + + 参数: + plugin_name: 插件名称。 + group_id: 目标群组ID,None表示全局任务。 + trigger_type: 触发器类型,如'cron'、'interval'等。 + trigger_config: 触发器配置字典。 + job_kwargs: 任务参数字典。 + bot_id: 目标Bot ID,None表示使用默认Bot。 + + 返回: + ScheduleInfo | None: 创建的任务信息,失败时返回None。 + """ + if plugin_name not in self._registered_tasks: + logger.error(f"插件 '{plugin_name}' 没有注册可用的定时任务。") + return None + + is_valid, result = self._validate_and_prepare_kwargs(plugin_name, job_kwargs) + if not is_valid: + logger.error(f"任务参数校验失败: {result}") + return None + + search_kwargs = {"plugin_name": plugin_name, "group_id": group_id} + if bot_id and group_id == self.ALL_GROUPS: + search_kwargs["bot_id"] = bot_id + else: + search_kwargs["bot_id__isnull"] = True + + defaults = { + "trigger_type": trigger_type, + "trigger_config": trigger_config, + "job_kwargs": result, + "is_enabled": True, + } + + schedule, created = await ScheduleRepository.update_or_create( + defaults, **search_kwargs + ) + APSchedulerAdapter.add_or_reschedule_job(schedule) + + action = "设置" if created else "更新" + logger.info( + f"已成功{action}插件 '{plugin_name}' 的定时任务 (ID: {schedule.id})。" + ) + return schedule + + async def get_all_schedules(self) -> list[ScheduleInfo]: + """ + 获取所有定时任务信息 + """ + return await self.get_schedules() + + async def get_schedules( + self, + plugin_name: str | None = None, + group_id: str | None = None, + bot_id: str | None = None, + ) -> list[ScheduleInfo]: + """ + 根据条件获取定时任务列表 + + 参数: + plugin_name: 插件名称,None表示不限制。 + group_id: 群组ID,None表示不限制。 + bot_id: Bot ID,None表示不限制。 + + 返回: + list[ScheduleInfo]: 符合条件的任务信息列表。 + """ + return await ScheduleRepository.query_schedules( + plugin_name=plugin_name, group_id=group_id, bot_id=bot_id + ) + + async def update_schedule( + self, + schedule_id: int, + trigger_type: str | None = None, + trigger_config: dict | None = None, + job_kwargs: dict | None = None, + ) -> tuple[bool, str]: + """ + 更新定时任务配置 + + 参数: + schedule_id: 任务ID。 + trigger_type: 新的触发器类型,None表示不更新。 + trigger_config: 新的触发器配置,None表示不更新。 + job_kwargs: 新的任务参数,None表示不更新。 + + 返回: + tuple[bool, str]: (是否成功, 结果消息)。 + """ + schedule = await ScheduleRepository.get_by_id(schedule_id) + if not schedule: + return False, f"未找到 ID 为 {schedule_id} 的任务。" + + updated_fields = [] + if trigger_config is not None: + schedule.trigger_config = trigger_config + updated_fields.append("trigger_config") + if trigger_type is not None and schedule.trigger_type != trigger_type: + schedule.trigger_type = trigger_type + updated_fields.append("trigger_type") + + if job_kwargs is not None: + existing_kwargs = ( + schedule.job_kwargs.copy() + if isinstance(schedule.job_kwargs, dict) + else {} + ) + existing_kwargs.update(job_kwargs) + + is_valid, result = self._validate_and_prepare_kwargs( + schedule.plugin_name, existing_kwargs + ) + if not is_valid: + return False, str(result) + + assert isinstance(result, dict), "验证成功时 result 应该是字典类型" + schedule.job_kwargs = result + updated_fields.append("job_kwargs") + + if not updated_fields: + return True, "没有任何需要更新的配置。" + + await ScheduleRepository.save(schedule, update_fields=updated_fields) + APSchedulerAdapter.add_or_reschedule_job(schedule) + return True, f"成功更新了任务 ID: {schedule_id} 的配置。" + + async def get_schedule_status(self, schedule_id: int) -> dict | None: + """获取定时任务的详细状态信息""" + schedule = await ScheduleRepository.get_by_id(schedule_id) + if not schedule: + return None + + status_from_scheduler = APSchedulerAdapter.get_job_status(schedule.id) + + status_text = ( + "运行中" + if schedule_id in self._running_tasks + else ("启用" if schedule.is_enabled else "暂停") + ) + + return { + "id": schedule.id, + "bot_id": schedule.bot_id, + "plugin_name": schedule.plugin_name, + "group_id": schedule.group_id, + "is_enabled": status_text, + "trigger_type": schedule.trigger_type, + "trigger_config": schedule.trigger_config, + "job_kwargs": schedule.job_kwargs, + **status_from_scheduler, + } + + async def pause_schedule(self, schedule_id: int) -> tuple[bool, str]: + """暂停指定的定时任务""" + schedule = await ScheduleRepository.get_by_id(schedule_id) + if not schedule or not schedule.is_enabled: + return False, "任务不存在或已暂停。" + + schedule.is_enabled = False + await ScheduleRepository.save(schedule, update_fields=["is_enabled"]) + APSchedulerAdapter.pause_job(schedule_id) + return True, f"已暂停任务 (ID: {schedule.id})。" + + async def resume_schedule(self, schedule_id: int) -> tuple[bool, str]: + """恢复指定的定时任务""" + schedule = await ScheduleRepository.get_by_id(schedule_id) + if not schedule or schedule.is_enabled: + return False, "任务不存在或已启用。" + + schedule.is_enabled = True + await ScheduleRepository.save(schedule, update_fields=["is_enabled"]) + APSchedulerAdapter.resume_job(schedule_id) + return True, f"已恢复任务 (ID: {schedule.id})。" + + async def trigger_now(self, schedule_id: int) -> tuple[bool, str]: + """立即手动触发指定的定时任务""" + schedule = await ScheduleRepository.get_by_id(schedule_id) + if not schedule: + return False, f"未找到 ID 为 {schedule_id} 的定时任务。" + if schedule.plugin_name not in self._registered_tasks: + return False, f"插件 '{schedule.plugin_name}' 没有注册可用的定时任务。" + + try: + await _execute_job(schedule.id) + return True, f"已手动触发任务 (ID: {schedule.id})。" + except Exception as e: + logger.error(f"手动触发任务失败: {e}") + return False, f"手动触发任务失败: {e}" + + +scheduler_manager = SchedulerManager() diff --git a/zhenxun/services/scheduler/targeter.py b/zhenxun/services/scheduler/targeter.py new file mode 100644 index 00000000..a5b3277f --- /dev/null +++ b/zhenxun/services/scheduler/targeter.py @@ -0,0 +1,109 @@ +""" +目标选择器 (Targeter) + +提供链式API,用于构建和执行对多个定时任务的批量操作。 +""" + +from collections.abc import Callable, Coroutine +from typing import Any + +from .adapter import APSchedulerAdapter +from .repository import ScheduleRepository + + +class ScheduleTargeter: + """ + 一个用于构建和执行定时任务批量操作的目标选择器。 + """ + + def __init__(self, manager: Any, **filters: Any): + """初始化目标选择器""" + self._manager = manager + self._filters = {k: v for k, v in filters.items() if v is not None} + + async def _get_schedules(self): + """根据过滤器获取任务""" + query = ScheduleRepository.filter(**self._filters) + return await query.all() + + def _generate_target_description(self) -> str: + """根据过滤条件生成友好的目标描述""" + if "id" in self._filters: + return f"任务 ID {self._filters['id']} 的" + + parts = [] + if "group_id" in self._filters: + group_id = self._filters["group_id"] + if group_id == self._manager.ALL_GROUPS: + parts.append("所有群组中") + else: + parts.append(f"群 {group_id} 中") + + if "plugin_name" in self._filters: + parts.append(f"插件 '{self._filters['plugin_name']}' 的") + + if not parts: + return "所有" + + return "".join(parts) + + async def _apply_operation( + self, + operation_func: Callable[[int], Coroutine[Any, Any, tuple[bool, str]]], + operation_name: str, + ) -> tuple[int, str]: + """通用的操作应用模板""" + schedules = await self._get_schedules() + if not schedules: + target_desc = self._generate_target_description() + return 0, f"没有找到{target_desc}可供{operation_name}的任务。" + + success_count = 0 + for schedule in schedules: + success, _ = await operation_func(schedule.id) + if success: + success_count += 1 + + target_desc = self._generate_target_description() + return ( + success_count, + f"成功{operation_name}了{target_desc} {success_count} 个任务。", + ) + + async def pause(self) -> tuple[int, str]: + """ + 暂停匹配的定时任务 + + 返回: + tuple[int, str]: (成功暂停的任务数量, 操作结果消息)。 + """ + return await self._apply_operation(self._manager.pause_schedule, "暂停") + + async def resume(self) -> tuple[int, str]: + """ + 恢复匹配的定时任务 + + 返回: + tuple[int, str]: (成功恢复的任务数量, 操作结果消息)。 + """ + return await self._apply_operation(self._manager.resume_schedule, "恢复") + + async def remove(self) -> tuple[int, str]: + """ + 移除匹配的定时任务 + + 返回: + tuple[int, str]: (成功移除的任务数量, 操作结果消息)。 + """ + schedules = await self._get_schedules() + if not schedules: + target_desc = self._generate_target_description() + return 0, f"没有找到{target_desc}可供移除的任务。" + + for schedule in schedules: + APSchedulerAdapter.remove_job(schedule.id) + + query = ScheduleRepository.filter(**self._filters) + count = await query.delete() + target_desc = self._generate_target_description() + return count, f"成功移除了{target_desc} {count} 个任务。" diff --git a/zhenxun/utils/manager/schedule_manager.py b/zhenxun/utils/manager/schedule_manager.py deleted file mode 100644 index a3b21272..00000000 --- a/zhenxun/utils/manager/schedule_manager.py +++ /dev/null @@ -1,810 +0,0 @@ -import asyncio -from collections.abc import Callable, Coroutine -import copy -import inspect -import random -from typing import ClassVar - -import nonebot -from nonebot import get_bots -from nonebot_plugin_apscheduler import scheduler -from pydantic import BaseModel, ValidationError - -from zhenxun.configs.config import Config -from zhenxun.models.schedule_info import ScheduleInfo -from zhenxun.services.log import logger -from zhenxun.utils.common_utils import CommonUtils -from zhenxun.utils.manager.priority_manager import PriorityLifecycle -from zhenxun.utils.platform import PlatformUtils - -SCHEDULE_CONCURRENCY_KEY = "all_groups_concurrency_limit" - - -class SchedulerManager: - """ - 一个通用的、持久化的定时任务管理器,供所有插件使用。 - """ - - _registered_tasks: ClassVar[ - dict[str, dict[str, Callable | type[BaseModel] | None]] - ] = {} - _JOB_PREFIX = "zhenxun_schedule_" - _running_tasks: ClassVar[set] = set() - - def register( - self, plugin_name: str, params_model: type[BaseModel] | None = None - ) -> Callable: - """ - 注册一个可调度的任务函数。 - 被装饰的函数签名应为 `async def func(group_id: str | None, **kwargs)` - - Args: - plugin_name (str): 插件的唯一名称 (通常是模块名)。 - params_model (type[BaseModel], optional): 一个 Pydantic BaseModel 类, - 用于定义和验证任务函数接受的额外参数。 - """ - - def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]: - if plugin_name in self._registered_tasks: - logger.warning(f"插件 '{plugin_name}' 的定时任务已被重复注册。") - self._registered_tasks[plugin_name] = { - "func": func, - "model": params_model, - } - model_name = params_model.__name__ if params_model else "无" - logger.debug( - f"插件 '{plugin_name}' 的定时任务已注册,参数模型: {model_name}" - ) - return func - - return decorator - - def get_registered_plugins(self) -> list[str]: - """获取所有已注册定时任务的插件列表。""" - return list(self._registered_tasks.keys()) - - def _get_job_id(self, schedule_id: int) -> str: - """根据数据库ID生成唯一的 APScheduler Job ID。""" - return f"{self._JOB_PREFIX}{schedule_id}" - - async def _execute_job(self, schedule_id: int): - """ - APScheduler 调度的入口函数。 - 根据 schedule_id 处理特定任务、所有群组任务或全局任务。 - """ - schedule = await ScheduleInfo.get_or_none(id=schedule_id) - if not schedule or not schedule.is_enabled: - logger.warning(f"定时任务 {schedule_id} 不存在或已禁用,跳过执行。") - return - - plugin_name = schedule.plugin_name - - task_meta = self._registered_tasks.get(plugin_name) - if not task_meta: - logger.error( - f"无法执行定时任务:插件 '{plugin_name}' 未注册或已卸载。将禁用该任务。" - ) - schedule.is_enabled = False - await schedule.save(update_fields=["is_enabled"]) - self._remove_aps_job(schedule.id) - return - - try: - if schedule.bot_id: - bot = nonebot.get_bot(schedule.bot_id) - else: - bot = nonebot.get_bot() - logger.debug( - f"任务 {schedule_id} 未关联特定Bot,使用默认Bot {bot.self_id}" - ) - except KeyError: - logger.warning( - f"定时任务 {schedule_id} 需要的 Bot {schedule.bot_id} " - f"不在线,本次执行跳过。" - ) - return - except ValueError: - logger.warning(f"当前没有Bot在线,定时任务 {schedule_id} 跳过。") - return - - if schedule.group_id == "__ALL_GROUPS__": - await self._execute_for_all_groups(schedule, task_meta, bot) - else: - await self._execute_for_single_target(schedule, task_meta, bot) - - async def _execute_for_all_groups( - self, schedule: ScheduleInfo, task_meta: dict, bot - ): - """为所有群组执行任务,并处理优先级覆盖。""" - plugin_name = schedule.plugin_name - - concurrency_limit = Config.get_config( - "SchedulerManager", SCHEDULE_CONCURRENCY_KEY, 5 - ) - if not isinstance(concurrency_limit, int) or concurrency_limit <= 0: - logger.warning( - f"无效的定时任务并发限制配置 '{concurrency_limit}',将使用默认值 5。" - ) - concurrency_limit = 5 - - logger.info( - f"开始执行针对 [所有群组] 的任务 " - f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})," - f"并发限制: {concurrency_limit}" - ) - - all_gids = set() - try: - group_list, _ = await PlatformUtils.get_group_list(bot) - all_gids.update( - g.group_id for g in group_list if g.group_id and not g.channel_id - ) - except Exception as e: - logger.error(f"为 'all' 任务获取 Bot {bot.self_id} 的群列表失败", e=e) - return - - specific_tasks_gids = set( - await ScheduleInfo.filter( - plugin_name=plugin_name, group_id__in=list(all_gids) - ).values_list("group_id", flat=True) - ) - - semaphore = asyncio.Semaphore(concurrency_limit) - - async def worker(gid: str): - """使用 Semaphore 包装单个群组的任务执行""" - async with semaphore: - temp_schedule = copy.deepcopy(schedule) - temp_schedule.group_id = gid - await self._execute_for_single_target(temp_schedule, task_meta, bot) - await asyncio.sleep(random.uniform(0.1, 0.5)) - - tasks_to_run = [] - for gid in all_gids: - if gid in specific_tasks_gids: - logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。") - continue - tasks_to_run.append(worker(gid)) - - if tasks_to_run: - await asyncio.gather(*tasks_to_run) - - async def _execute_for_single_target( - self, schedule: ScheduleInfo, task_meta: dict, bot - ): - """为单个目标(具体群组或全局)执行任务。""" - plugin_name = schedule.plugin_name - group_id = schedule.group_id - - try: - is_blocked = await CommonUtils.task_is_block(bot, plugin_name, group_id) - if is_blocked: - target_desc = f"群 {group_id}" if group_id else "全局" - logger.info( - f"插件 '{plugin_name}' 的定时任务在目标 [{target_desc}]" - "因功能被禁用而跳过执行。" - ) - return - - task_func = task_meta["func"] - job_kwargs = schedule.job_kwargs - if not isinstance(job_kwargs, dict): - logger.error( - f"任务 {schedule.id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}" - ) - return - - sig = inspect.signature(task_func) - if "bot" in sig.parameters: - job_kwargs["bot"] = bot - - logger.info( - f"插件 '{plugin_name}' 开始为目标 [{group_id or '全局'}] " - f"执行定时任务 (ID: {schedule.id})。" - ) - task = asyncio.create_task(task_func(group_id, **job_kwargs)) - self._running_tasks.add(task) - task.add_done_callback(self._running_tasks.discard) - await task - except Exception as e: - logger.error( - f"执行定时任务 (ID: {schedule.id}, 插件: {plugin_name}, " - f"目标: {group_id or '全局'}) 时发生异常", - e=e, - ) - - def _validate_and_prepare_kwargs( - self, plugin_name: str, job_kwargs: dict | None - ) -> tuple[bool, str | dict]: - """验证并准备任务参数,应用默认值""" - task_meta = self._registered_tasks.get(plugin_name) - if not task_meta: - return False, f"插件 '{plugin_name}' 未注册。" - - params_model = task_meta.get("model") - job_kwargs = job_kwargs if job_kwargs is not None else {} - - if not params_model: - if job_kwargs: - logger.warning( - f"插件 '{plugin_name}' 未定义参数模型,但收到了参数: {job_kwargs}" - ) - return True, job_kwargs - - if not (isinstance(params_model, type) and issubclass(params_model, BaseModel)): - logger.error(f"插件 '{plugin_name}' 的参数模型不是有效的 BaseModel 类") - return False, f"插件 '{plugin_name}' 的参数模型配置错误" - - try: - model_validate = getattr(params_model, "model_validate", None) - if not model_validate: - return False, f"插件 '{plugin_name}' 的参数模型不支持验证" - - validated_model = model_validate(job_kwargs) - - model_dump = getattr(validated_model, "model_dump", None) - if not model_dump: - return False, f"插件 '{plugin_name}' 的参数模型不支持导出" - - return True, model_dump() - except ValidationError as e: - errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] - error_str = "\n".join(errors) - msg = f"插件 '{plugin_name}' 的任务参数验证失败:\n{error_str}" - return False, msg - - def _add_aps_job(self, schedule: ScheduleInfo): - """根据 ScheduleInfo 对象添加或更新一个 APScheduler 任务。""" - job_id = self._get_job_id(schedule.id) - try: - scheduler.remove_job(job_id) - except Exception: - pass - - if not isinstance(schedule.trigger_config, dict): - logger.error( - f"任务 {schedule.id} 的 trigger_config 不是字典类型: " - f"{type(schedule.trigger_config)}" - ) - return - - scheduler.add_job( - self._execute_job, - trigger=schedule.trigger_type, - id=job_id, - misfire_grace_time=300, - args=[schedule.id], - **schedule.trigger_config, - ) - logger.debug( - f"已在 APScheduler 中添加/更新任务: {job_id} " - f"with trigger: {schedule.trigger_config}" - ) - - def _remove_aps_job(self, schedule_id: int): - """移除一个 APScheduler 任务。""" - job_id = self._get_job_id(schedule_id) - try: - scheduler.remove_job(job_id) - logger.debug(f"已从 APScheduler 中移除任务: {job_id}") - except Exception: - pass - - async def add_schedule( - self, - plugin_name: str, - group_id: str | None, - trigger_type: str, - trigger_config: dict, - job_kwargs: dict | None = None, - bot_id: str | None = None, - ) -> tuple[bool, str]: - """ - 添加或更新一个定时任务。 - """ - if plugin_name not in self._registered_tasks: - return False, f"插件 '{plugin_name}' 没有注册可用的定时任务。" - - is_valid, result = self._validate_and_prepare_kwargs(plugin_name, job_kwargs) - if not is_valid: - return False, str(result) - - validated_job_kwargs = result - - effective_bot_id = bot_id if group_id == "__ALL_GROUPS__" else None - - search_kwargs = { - "plugin_name": plugin_name, - "group_id": group_id, - } - if effective_bot_id: - search_kwargs["bot_id"] = effective_bot_id - else: - search_kwargs["bot_id__isnull"] = True - - defaults = { - "trigger_type": trigger_type, - "trigger_config": trigger_config, - "job_kwargs": validated_job_kwargs, - "is_enabled": True, - } - - schedule = await ScheduleInfo.filter(**search_kwargs).first() - created = False - - if schedule: - for key, value in defaults.items(): - setattr(schedule, key, value) - await schedule.save() - else: - creation_kwargs = { - "plugin_name": plugin_name, - "group_id": group_id, - "bot_id": effective_bot_id, - **defaults, - } - schedule = await ScheduleInfo.create(**creation_kwargs) - created = True - self._add_aps_job(schedule) - action = "设置" if created else "更新" - return True, f"已成功{action}插件 '{plugin_name}' 的定时任务。" - - async def add_schedule_for_all( - self, - plugin_name: str, - trigger_type: str, - trigger_config: dict, - job_kwargs: dict | None = None, - ) -> tuple[int, int]: - """为所有机器人所在的群组添加定时任务。""" - if plugin_name not in self._registered_tasks: - raise ValueError(f"插件 '{plugin_name}' 没有注册可用的定时任务。") - - groups = set() - for bot in get_bots().values(): - try: - group_list, _ = await PlatformUtils.get_group_list(bot) - groups.update( - g.group_id for g in group_list if g.group_id and not g.channel_id - ) - except Exception as e: - logger.error(f"获取 Bot {bot.self_id} 的群列表失败", e=e) - - success_count = 0 - fail_count = 0 - for gid in groups: - try: - success, _ = await self.add_schedule( - plugin_name, gid, trigger_type, trigger_config, job_kwargs - ) - if success: - success_count += 1 - else: - fail_count += 1 - except Exception as e: - logger.error(f"为群 {gid} 添加定时任务失败: {e}", e=e) - fail_count += 1 - await asyncio.sleep(0.05) - return success_count, fail_count - - async def update_schedule( - self, - schedule_id: int, - trigger_type: str | None = None, - trigger_config: dict | None = None, - job_kwargs: dict | None = None, - ) -> tuple[bool, str]: - """部分更新一个已存在的定时任务。""" - schedule = await self.get_schedule_by_id(schedule_id) - if not schedule: - return False, f"未找到 ID 为 {schedule_id} 的任务。" - - updated_fields = [] - if trigger_config is not None: - schedule.trigger_config = trigger_config - updated_fields.append("trigger_config") - - if trigger_type is not None and schedule.trigger_type != trigger_type: - schedule.trigger_type = trigger_type - updated_fields.append("trigger_type") - - if job_kwargs is not None: - if not isinstance(schedule.job_kwargs, dict): - return False, f"任务 {schedule_id} 的 job_kwargs 数据格式错误。" - - merged_kwargs = schedule.job_kwargs.copy() - merged_kwargs.update(job_kwargs) - - is_valid, result = self._validate_and_prepare_kwargs( - schedule.plugin_name, merged_kwargs - ) - if not is_valid: - return False, str(result) - - schedule.job_kwargs = result # type: ignore - updated_fields.append("job_kwargs") - - if not updated_fields: - return True, "没有任何需要更新的配置。" - - await schedule.save(update_fields=updated_fields) - self._add_aps_job(schedule) - return True, f"成功更新了任务 ID: {schedule_id} 的配置。" - - async def remove_schedule( - self, plugin_name: str, group_id: str | None, bot_id: str | None = None - ) -> tuple[bool, str]: - """移除指定插件和群组的定时任务。""" - query = {"plugin_name": plugin_name, "group_id": group_id} - if bot_id: - query["bot_id"] = bot_id - - schedules = await ScheduleInfo.filter(**query) - if not schedules: - msg = ( - f"未找到与 Bot {bot_id} 相关的群 {group_id} " - f"的插件 '{plugin_name}' 定时任务。" - ) - return (False, msg) - - for schedule in schedules: - self._remove_aps_job(schedule.id) - await schedule.delete() - - target_desc = f"群 {group_id}" if group_id else "全局" - msg = ( - f"已取消 Bot {bot_id} 在 [{target_desc}] " - f"的插件 '{plugin_name}' 所有定时任务。" - ) - return (True, msg) - - async def remove_schedule_for_all( - self, plugin_name: str, bot_id: str | None = None - ) -> int: - """移除指定插件在所有群组的定时任务。""" - query = {"plugin_name": plugin_name} - if bot_id: - query["bot_id"] = bot_id - - schedules_to_delete = await ScheduleInfo.filter(**query).all() - if not schedules_to_delete: - return 0 - - for schedule in schedules_to_delete: - self._remove_aps_job(schedule.id) - await schedule.delete() - await asyncio.sleep(0.01) - - return len(schedules_to_delete) - - async def remove_schedules_by_group(self, group_id: str) -> tuple[bool, str]: - """移除指定群组的所有定时任务。""" - schedules = await ScheduleInfo.filter(group_id=group_id) - if not schedules: - return False, f"群 {group_id} 没有任何定时任务。" - - count = 0 - for schedule in schedules: - self._remove_aps_job(schedule.id) - await schedule.delete() - count += 1 - await asyncio.sleep(0.01) - - return True, f"已成功移除群 {group_id} 的 {count} 个定时任务。" - - async def pause_schedules_by_group(self, group_id: str) -> tuple[int, str]: - """暂停指定群组的所有定时任务。""" - schedules = await ScheduleInfo.filter(group_id=group_id, is_enabled=True) - if not schedules: - return 0, f"群 {group_id} 没有正在运行的定时任务可暂停。" - - count = 0 - for schedule in schedules: - success, _ = await self.pause_schedule(schedule.id) - if success: - count += 1 - await asyncio.sleep(0.01) - - return count, f"已成功暂停群 {group_id} 的 {count} 个定时任务。" - - async def resume_schedules_by_group(self, group_id: str) -> tuple[int, str]: - """恢复指定群组的所有定时任务。""" - schedules = await ScheduleInfo.filter(group_id=group_id, is_enabled=False) - if not schedules: - return 0, f"群 {group_id} 没有已暂停的定时任务可恢复。" - - count = 0 - for schedule in schedules: - success, _ = await self.resume_schedule(schedule.id) - if success: - count += 1 - await asyncio.sleep(0.01) - - return count, f"已成功恢复群 {group_id} 的 {count} 个定时任务。" - - async def pause_schedules_by_plugin(self, plugin_name: str) -> tuple[int, str]: - """暂停指定插件在所有群组的定时任务。""" - schedules = await ScheduleInfo.filter(plugin_name=plugin_name, is_enabled=True) - if not schedules: - return 0, f"插件 '{plugin_name}' 没有正在运行的定时任务可暂停。" - - count = 0 - for schedule in schedules: - success, _ = await self.pause_schedule(schedule.id) - if success: - count += 1 - await asyncio.sleep(0.01) - - return ( - count, - f"已成功暂停插件 '{plugin_name}' 在所有群组的 {count} 个定时任务。", - ) - - async def resume_schedules_by_plugin(self, plugin_name: str) -> tuple[int, str]: - """恢复指定插件在所有群组的定时任务。""" - schedules = await ScheduleInfo.filter(plugin_name=plugin_name, is_enabled=False) - if not schedules: - return 0, f"插件 '{plugin_name}' 没有已暂停的定时任务可恢复。" - - count = 0 - for schedule in schedules: - success, _ = await self.resume_schedule(schedule.id) - if success: - count += 1 - await asyncio.sleep(0.01) - - return ( - count, - f"已成功恢复插件 '{plugin_name}' 在所有群组的 {count} 个定时任务。", - ) - - async def pause_schedule_by_plugin_group( - self, plugin_name: str, group_id: str | None, bot_id: str | None = None - ) -> tuple[bool, str]: - """暂停指定插件在指定群组的定时任务。""" - query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": True} - if bot_id: - query["bot_id"] = bot_id - - schedules = await ScheduleInfo.filter(**query) - if not schedules: - return ( - False, - f"群 {group_id} 未设置插件 '{plugin_name}' 的定时任务或任务已暂停。", - ) - - count = 0 - for schedule in schedules: - success, _ = await self.pause_schedule(schedule.id) - if success: - count += 1 - - return ( - True, - f"已成功暂停群 {group_id} 的插件 '{plugin_name}' 共 {count} 个定时任务。", - ) - - async def resume_schedule_by_plugin_group( - self, plugin_name: str, group_id: str | None, bot_id: str | None = None - ) -> tuple[bool, str]: - """恢复指定插件在指定群组的定时任务。""" - query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": False} - if bot_id: - query["bot_id"] = bot_id - - schedules = await ScheduleInfo.filter(**query) - if not schedules: - return ( - False, - f"群 {group_id} 未设置插件 '{plugin_name}' 的定时任务或任务已启用。", - ) - - count = 0 - for schedule in schedules: - success, _ = await self.resume_schedule(schedule.id) - if success: - count += 1 - - return ( - True, - f"已成功恢复群 {group_id} 的插件 '{plugin_name}' 共 {count} 个定时任务。", - ) - - async def remove_all_schedules(self) -> tuple[int, str]: - """移除所有群组的所有定时任务。""" - schedules = await ScheduleInfo.all() - if not schedules: - return 0, "当前没有任何定时任务。" - - count = 0 - for schedule in schedules: - self._remove_aps_job(schedule.id) - await schedule.delete() - count += 1 - await asyncio.sleep(0.01) - - return count, f"已成功移除所有群组的 {count} 个定时任务。" - - async def pause_all_schedules(self) -> tuple[int, str]: - """暂停所有群组的所有定时任务。""" - schedules = await ScheduleInfo.filter(is_enabled=True) - if not schedules: - return 0, "当前没有正在运行的定时任务可暂停。" - - count = 0 - for schedule in schedules: - success, _ = await self.pause_schedule(schedule.id) - if success: - count += 1 - await asyncio.sleep(0.01) - - return count, f"已成功暂停所有群组的 {count} 个定时任务。" - - async def resume_all_schedules(self) -> tuple[int, str]: - """恢复所有群组的所有定时任务。""" - schedules = await ScheduleInfo.filter(is_enabled=False) - if not schedules: - return 0, "当前没有已暂停的定时任务可恢复。" - - count = 0 - for schedule in schedules: - success, _ = await self.resume_schedule(schedule.id) - if success: - count += 1 - await asyncio.sleep(0.01) - - return count, f"已成功恢复所有群组的 {count} 个定时任务。" - - async def remove_schedule_by_id(self, schedule_id: int) -> tuple[bool, str]: - """通过ID移除指定的定时任务。""" - schedule = await self.get_schedule_by_id(schedule_id) - if not schedule: - return False, f"未找到 ID 为 {schedule_id} 的定时任务。" - - self._remove_aps_job(schedule.id) - await schedule.delete() - - return ( - True, - f"已删除插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " - f"的定时任务 (ID: {schedule.id})。", - ) - - async def get_schedule_by_id(self, schedule_id: int) -> ScheduleInfo | None: - """通过ID获取定时任务信息。""" - return await ScheduleInfo.get_or_none(id=schedule_id) - - async def get_schedules( - self, plugin_name: str, group_id: str | None - ) -> list[ScheduleInfo]: - """获取特定群组特定插件的所有定时任务。""" - return await ScheduleInfo.filter(plugin_name=plugin_name, group_id=group_id) - - async def get_schedule( - self, plugin_name: str, group_id: str | None - ) -> ScheduleInfo | None: - """获取特定群组的定时任务信息。""" - return await ScheduleInfo.get_or_none( - plugin_name=plugin_name, group_id=group_id - ) - - async def get_all_schedules( - self, plugin_name: str | None = None - ) -> list[ScheduleInfo]: - """获取所有定时任务信息,可按插件名过滤。""" - if plugin_name: - return await ScheduleInfo.filter(plugin_name=plugin_name).all() - return await ScheduleInfo.all() - - async def get_schedule_status(self, schedule_id: int) -> dict | None: - """获取任务的详细状态。""" - schedule = await self.get_schedule_by_id(schedule_id) - if not schedule: - return None - - job_id = self._get_job_id(schedule.id) - job = scheduler.get_job(job_id) - - status = { - "id": schedule.id, - "bot_id": schedule.bot_id, - "plugin_name": schedule.plugin_name, - "group_id": schedule.group_id, - "is_enabled": schedule.is_enabled, - "trigger_type": schedule.trigger_type, - "trigger_config": schedule.trigger_config, - "job_kwargs": schedule.job_kwargs, - "next_run_time": job.next_run_time.strftime("%Y-%m-%d %H:%M:%S") - if job and job.next_run_time - else "N/A", - "is_paused_in_scheduler": not bool(job.next_run_time) if job else "N/A", - } - return status - - async def pause_schedule(self, schedule_id: int) -> tuple[bool, str]: - """暂停一个定时任务。""" - schedule = await self.get_schedule_by_id(schedule_id) - if not schedule or not schedule.is_enabled: - return False, "任务不存在或已暂停。" - - schedule.is_enabled = False - await schedule.save(update_fields=["is_enabled"]) - - job_id = self._get_job_id(schedule.id) - try: - scheduler.pause_job(job_id) - except Exception: - pass - - return ( - True, - f"已暂停插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " - f"的定时任务 (ID: {schedule.id})。", - ) - - async def resume_schedule(self, schedule_id: int) -> tuple[bool, str]: - """恢复一个定时任务。""" - schedule = await self.get_schedule_by_id(schedule_id) - if not schedule or schedule.is_enabled: - return False, "任务不存在或已启用。" - - schedule.is_enabled = True - await schedule.save(update_fields=["is_enabled"]) - - job_id = self._get_job_id(schedule.id) - try: - scheduler.resume_job(job_id) - except Exception: - self._add_aps_job(schedule) - - return ( - True, - f"已恢复插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " - f"的定时任务 (ID: {schedule.id})。", - ) - - async def trigger_now(self, schedule_id: int) -> tuple[bool, str]: - """手动触发一个定时任务。""" - schedule = await self.get_schedule_by_id(schedule_id) - if not schedule: - return False, f"未找到 ID 为 {schedule_id} 的定时任务。" - - if schedule.plugin_name not in self._registered_tasks: - return False, f"插件 '{schedule.plugin_name}' 没有注册可用的定时任务。" - - try: - await self._execute_job(schedule.id) - return ( - True, - f"已手动触发插件 '{schedule.plugin_name}' 在群 {schedule.group_id} " - f"的定时任务 (ID: {schedule.id})。", - ) - except Exception as e: - logger.error(f"手动触发任务失败: {e}") - return False, f"手动触发任务失败: {e}" - - -scheduler_manager = SchedulerManager() - - -@PriorityLifecycle.on_startup(priority=90) -async def _load_schedules_from_db(): - """在服务启动时从数据库加载并调度所有任务。""" - Config.add_plugin_config( - "SchedulerManager", - SCHEDULE_CONCURRENCY_KEY, - 5, - help="“所有群组”类型定时任务的并发执行数量限制", - type=int, - ) - - logger.info("正在从数据库加载并调度所有定时任务...") - schedules = await ScheduleInfo.filter(is_enabled=True).all() - count = 0 - for schedule in schedules: - if schedule.plugin_name in scheduler_manager._registered_tasks: - scheduler_manager._add_aps_job(schedule) - count += 1 - else: - logger.warning(f"跳过加载定时任务:插件 '{schedule.plugin_name}' 未注册。") - logger.info(f"定时任务加载完成,共成功加载 {count} 个任务。")