diff --git a/zhenxun/builtin_plugins/scheduler_admin/__init__.py b/zhenxun/builtin_plugins/scheduler_admin/__init__.py index 292377ae..c4ffb468 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/__init__.py +++ b/zhenxun/builtin_plugins/scheduler_admin/__init__.py @@ -10,17 +10,21 @@ __plugin_meta__ = PluginMetadata( description="查看和管理由 SchedulerManager 控制的定时任务。", usage=""" 定时任务 查看 [-all] [-g <群号>] [-p <插件>] [--page <页码>] : 查看定时任务 - 定时任务 设置 <插件> <时间> [-g <群号> | --all] [--kwargs <参数>] : - 设置/开启定时任务 (SUPERUSER) - 定时任务 删除 <任务ID> : 通过ID删除任务 (SUPERUSER) - 定时任务 删除 -p <插件> [-g <群号> | --all] : 通过插件+群组删除任务 (SUPERUSER) - 定时任务 删除 -all [-g <群号>] : 删除所有群组的所有任务,-g 指定群 (SUPERUSER) - 定时任务 暂停 <任务ID> | -all [-g <群号>] | -p <插件> [-g <群号> | -all] : - 暂停任务,-all 为所有群组 (SUPERUSER) - 定时任务 恢复 <任务ID> | -all [-g <群号>] | -p <插件> [-g <群号> | -all] : - 恢复任务,-all 为所有群组 (SUPERUSER) + 定时任务 设置 <插件> [时间选项] [-g <群号> | -g all] [--kwargs <参数>] : + 设置/开启任务 (SUPERUSER) + + 时间选项 (三选一): + --cron "<分> <时> <日> <月> <周>" : 设置 cron 表达式 + (例如: --cron "0 8 * * *") + --interval <时间间隔> : 设置时间间隔 + (例如: --interval 30m, --interval 2h, --interval 10s) + --date "" : 设置在特定时间执行一次 + + 定时任务 删除 <任务ID> | -p <插件> [-g <群号>] | -all : 删除任务 (SUPERUSER) + 定时任务 暂停/恢复 <任务ID> | -p <插件> [-g <群号>] | -all : + 暂停/恢复任务 (SUPERUSER) 定时任务 执行 <任务ID> : 立即手动执行一次任务 (SUPERUSER) - 定时任务 更新 <任务ID> [--time <时间>] [--kwargs <参数>] : 更新任务配置 (SUPERUSER) + 定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>] : 更新任务配置 (SUPERUSER) 定时任务 插件列表 : 查看所有可设置定时任务的插件 (SUPERUSER) 别名支持: @@ -35,7 +39,7 @@ __plugin_meta__ = PluginMetadata( """.strip(), extra=PluginExtraData( author="HibiKier", - version="0.1.0", + version="0.1.1", plugin_type=PluginType.SUPERUSER, is_show=False, ).to_dict(), diff --git a/zhenxun/builtin_plugins/scheduler_admin/command.py b/zhenxun/builtin_plugins/scheduler_admin/command.py index eb948f43..c9982440 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/command.py +++ b/zhenxun/builtin_plugins/scheduler_admin/command.py @@ -1,6 +1,9 @@ import asyncio +from datetime import datetime +import re from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent +from nonebot.params import Depends from nonebot.permission import SUPERUSER from nonebot_plugin_alconna import ( Alconna, @@ -57,6 +60,152 @@ def _format_params(schedule_status: dict) -> str: return "-" +def _parse_interval(interval_str: str) -> dict: + match = re.match(r"(\d+)([smh])", interval_str.lower()) + if not match: + raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '10s' 的格式。") + + value, unit = int(match.group(1)), match.group(2) + if unit == "s": + return {"seconds": value} + if unit == "m": + return {"minutes": value} + if unit == "h": + return {"hours": value} + return {} + + +async def GetBotId( + bot: Bot, + bot_id_match: Match[str] = AlconnaMatch("bot_id"), +) -> str: + """获取要操作的Bot ID""" + if bot_id_match.available: + return bot_id_match.result + return bot.self_id + + +class ScheduleTarget: + """定时任务操作目标的基类""" + + pass + + +class TargetByID(ScheduleTarget): + """按任务ID操作""" + + def __init__(self, id: int): + self.id = id + + +class TargetByPlugin(ScheduleTarget): + """按插件名操作""" + + def __init__( + self, plugin: str, group_id: str | None = None, all_groups: bool = False + ): + self.plugin = plugin + self.group_id = group_id + self.all_groups = all_groups + + +class TargetAll(ScheduleTarget): + """操作所有任务""" + + def __init__(self, for_group: str | None = None): + self.for_group = for_group + + +TargetScope = TargetByID | TargetByPlugin | TargetAll | None + + +async def ParseScheduleTargetForDelete( + event: GroupMessageEvent, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query("删除.all"), +) -> TargetScope: + """解析删除命令的操作目标""" + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + gid = "__ALL_GROUPS__" + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) + + if all_enabled.available: + return TargetAll(for_group=group_id.result if group_id.available else None) + + return None + + +async def ParseScheduleTargetForPause( + event: GroupMessageEvent, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query("暂停.all"), +) -> TargetScope: + """解析暂停命令的操作目标""" + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + gid = "__ALL_GROUPS__" + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) + + if all_enabled.available: + return TargetAll(for_group=group_id.result if group_id.available else None) + + return None + + +async def ParseScheduleTargetForResume( + event: GroupMessageEvent, + schedule_id: Match[int] = AlconnaMatch("schedule_id"), + plugin_name: Match[str] = AlconnaMatch("plugin_name"), + group_id: Match[str] = AlconnaMatch("group_id"), + all_enabled: Query[bool] = Query("恢复.all"), +) -> TargetScope: + """解析恢复命令的操作目标""" + if schedule_id.available: + return TargetByID(schedule_id.result) + + if plugin_name.available: + p_name = plugin_name.result + if all_enabled.available: + return TargetByPlugin(plugin=p_name, all_groups=True) + elif group_id.available: + gid = group_id.result + if gid.lower() == "all": + gid = "__ALL_GROUPS__" + return TargetByPlugin(plugin=p_name, group_id=gid) + else: + return TargetByPlugin(plugin=p_name, group_id=str(event.group_id)) + + if all_enabled.available: + return TargetAll(for_group=group_id.result if group_id.available else None) + + return None + + schedule_cmd = on_alconna( Alconna( "定时任务", @@ -71,10 +220,16 @@ schedule_cmd = on_alconna( ), Subcommand( "设置", - Args["plugin_name", str]["time", str], - Option("-g", Args["group_id", str], help_text="指定群组ID"), - Option("-all", help_text="对所有群生效"), + Args["plugin_name", str], + Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), + Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), + Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), + Option("-g", Args["group_id", str], help_text="指定群组ID或'all'"), + Option("-all", help_text="对所有群生效 (等同于 -g all)"), Option("--kwargs", Args["kwargs_str", str], help_text="设置任务参数"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), alias=["add", "开启"], help_text="设置/开启一个定时任务", ), @@ -84,6 +239,9 @@ schedule_cmd = on_alconna( Option("-p", Args["plugin_name", str], help_text="指定插件名"), Option("-g", Args["group_id", str], help_text="指定群组ID"), Option("-all", help_text="对所有群生效"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), alias=["del", "rm", "remove", "关闭", "取消"], help_text="删除一个或多个定时任务", ), @@ -93,6 +251,9 @@ schedule_cmd = on_alconna( Option("-all", help_text="对当前群所有任务生效"), Option("-p", Args["plugin_name", str], help_text="指定插件名"), Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), alias=["pause"], help_text="暂停一个或多个定时任务", ), @@ -102,6 +263,9 @@ schedule_cmd = on_alconna( Option("-all", help_text="对当前群所有任务生效"), Option("-p", Args["plugin_name", str], help_text="指定插件名"), Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"), + Option( + "--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)" + ), alias=["resume"], help_text="恢复一个或多个定时任务", ), @@ -114,7 +278,9 @@ schedule_cmd = on_alconna( Subcommand( "更新", Args["schedule_id", int], - Option("--time", Args["time", str], help_text="更新时间 (HH:MM)"), + Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"), + Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"), + Option("--date", Args["date_expr", str], help_text="设置特定执行日期"), Option("--kwargs", Args["kwargs_str", str], help_text="更新参数"), alias=["update", "modify", "修改"], help_text="更新任务配置", @@ -190,7 +356,8 @@ async def _( [ s["id"], s["plugin_name"], - s["group_id"], + s.get("bot_id") or "N/A", + s["group_id"] or "全局", s["next_run_time"], _format_trigger(s), _format_params(s), @@ -206,7 +373,16 @@ async def _( img = await ImageTemplate.table_page( head_text=title, tip_text=f"第 {current_page}/{total_pages} 页,共 {total_items} 条任务", - column_name=["ID", "插件", "群组/目标", "下次运行", "触发规则", "参数", "状态"], + column_name=[ + "ID", + "插件", + "Bot ID", + "群组/目标", + "下次运行", + "触发规则", + "参数", + "状态", + ], data_list=data_list, column_space=20, ) @@ -216,26 +392,43 @@ async def _( @schedule_cmd.assign("设置") async def _( plugin_name: str, - time: str, + cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"), + interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"), + date_expr: Match[str] = AlconnaMatch("date.date_expr"), group_id: Match[str] = AlconnaMatch("group_id"), kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), all_enabled: Query[bool] = Query("设置.all"), + bot_id_to_operate: str = Depends(GetBotId), ): if plugin_name not in scheduler_manager._registered_tasks: await schedule_cmd.finish( f"插件 '{plugin_name}' 没有注册可用的定时任务。\n" f"可用插件: {list(scheduler_manager._registered_tasks.keys())}" ) + + trigger_type = "" + trigger_config = {} + try: - time_parts = time.split(":") - if len(time_parts) != 2: - raise ValueError("时间格式应为 HH:MM") - hour, minute = map(int, time_parts) - if not (0 <= hour <= 23 and 0 <= minute <= 59): - raise ValueError("小时或分钟超出有效范围") - trigger_config = {"hour": hour, "minute": minute} + if cron_expr.available: + trigger_type = "cron" + parts = cron_expr.result.split() + if len(parts) != 5: + raise ValueError("Cron 表达式必须有5个部分 (分 时 日 月 周)") + cron_keys = ["minute", "hour", "day", "month", "day_of_week"] + trigger_config = dict(zip(cron_keys, parts)) + elif interval_expr.available: + trigger_type = "interval" + trigger_config = _parse_interval(interval_expr.result) + elif date_expr.available: + trigger_type = "date" + trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)} + else: + await schedule_cmd.finish( + "必须提供一种时间选项: --cron, --interval, 或 --date。" + ) except ValueError as e: - await schedule_cmd.finish(f"时间格式错误: {e}") + await schedule_cmd.finish(f"时间参数解析错误: {e}") job_kwargs = None if kwargs_str.available: @@ -258,69 +451,76 @@ async def _( f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" ) - if all_enabled.available: - success, fail = await scheduler_manager.add_schedule_for_all( - plugin_name, "cron", trigger_config, job_kwargs - ) - await schedule_cmd.finish(f"已为 {success} 个群组设置任务,{fail} 个失败。") + target_group_id: str | None = None + if group_id.available and group_id.result.lower() == "all": + target_group_id = "__ALL_GROUPS__" + elif all_enabled.available: + target_group_id = "__ALL_GROUPS__" elif group_id.available: - success, msg = await scheduler_manager.add_schedule( - plugin_name, group_id.result, "cron", trigger_config, job_kwargs - ) - await schedule_cmd.finish(msg) + target_group_id = group_id.result else: - success, msg = await scheduler_manager.add_schedule( - plugin_name, None, "cron", trigger_config, job_kwargs - ) - await schedule_cmd.finish(f"已设置全局任务: {msg}") + target_group_id = None + + success, msg = await scheduler_manager.add_schedule( + plugin_name, + target_group_id, + trigger_type, + trigger_config, + job_kwargs, + bot_id=bot_id_to_operate, + ) + + if target_group_id == "__ALL_GROUPS__": + target_desc = "所有群组" + elif target_group_id is None: + target_desc = "全局" + else: + target_desc = f"群组 {target_group_id}" + + if success: + await schedule_cmd.finish(f"已成功为 [{target_desc}] {msg}") + else: + await schedule_cmd.finish(f"为 [{target_desc}] 设置任务失败: {msg}") @schedule_cmd.assign("删除") async def _( - event: GroupMessageEvent, - schedule_id: Match[int] = AlconnaMatch("schedule_id"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - group_id: Match[str] = AlconnaMatch("group_id"), - all_enabled: Query[bool] = Query("删除.all"), + target: TargetScope = Depends(ParseScheduleTargetForDelete), + bot_id_to_operate: str = Depends(GetBotId), ): - if schedule_id.available: - success, message = await scheduler_manager.remove_schedule_by_id( - schedule_id.result - ) + if isinstance(target, TargetByID): + _, message = await scheduler_manager.remove_schedule_by_id(target.id) await schedule_cmd.finish(message) - elif plugin_name.available: - p_name = plugin_name.result + elif isinstance(target, TargetByPlugin): + p_name = target.plugin if p_name not in scheduler_manager.get_registered_plugins(): await schedule_cmd.finish(f"未找到插件 '{p_name}'。") - if all_enabled.available: - removed_count = await scheduler_manager.remove_schedule_for_all(p_name) + if target.all_groups: + removed_count = await scheduler_manager.remove_schedule_for_all( + p_name, bot_id=bot_id_to_operate + ) message = ( f"已取消了 {removed_count} 个群组的插件 '{p_name}' 定时任务。" if removed_count > 0 else f"没有找到插件 '{p_name}' 的定时任务。" ) await schedule_cmd.finish(message) - - elif group_id.available: - success, message = await scheduler_manager.remove_schedule( - p_name, group_id.result + else: + _, message = await scheduler_manager.remove_schedule( + p_name, target.group_id, bot_id=bot_id_to_operate ) await schedule_cmd.finish(message) - else: - gid = str(event.group_id) - success, message = await scheduler_manager.remove_schedule(p_name, gid) - await schedule_cmd.finish(message) - - elif all_enabled.available: - if group_id.available: - gid = group_id.result - success, message = await scheduler_manager.remove_schedules_by_group(gid) + elif isinstance(target, TargetAll): + if target.for_group: + _, message = await scheduler_manager.remove_schedules_by_group( + target.for_group + ) await schedule_cmd.finish(message) else: - count, message = await scheduler_manager.remove_all_schedules() + _, message = await scheduler_manager.remove_all_schedules() await schedule_cmd.finish(message) else: @@ -331,44 +531,35 @@ async def _( @schedule_cmd.assign("暂停") async def _( - event: GroupMessageEvent, - schedule_id: Match[int] = AlconnaMatch("schedule_id"), - all_enabled: Query[bool] = Query("暂停.all"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - group_id: Match[str] = AlconnaMatch("group_id"), + target: TargetScope = Depends(ParseScheduleTargetForPause), + bot_id_to_operate: str = Depends(GetBotId), ): - if schedule_id.available: - success, message = await scheduler_manager.pause_schedule(schedule_id.result) + if isinstance(target, TargetByID): + _, message = await scheduler_manager.pause_schedule(target.id) await schedule_cmd.finish(message) - elif plugin_name.available: - p_name = plugin_name.result + elif isinstance(target, TargetByPlugin): + p_name = target.plugin if p_name not in scheduler_manager.get_registered_plugins(): await schedule_cmd.finish(f"未找到插件 '{p_name}'。") - if all_enabled.available: - count, message = await scheduler_manager.pause_schedules_by_plugin(p_name) - await schedule_cmd.finish(message) - elif group_id.available: - gid = group_id.result - success, message = await scheduler_manager.pause_schedule_by_plugin_group( - p_name, gid - ) + if target.all_groups: + _, message = await scheduler_manager.pause_schedules_by_plugin(p_name) await schedule_cmd.finish(message) else: - gid = str(event.group_id) - success, message = await scheduler_manager.pause_schedule_by_plugin_group( - p_name, gid + _, message = await scheduler_manager.pause_schedule_by_plugin_group( + p_name, target.group_id, bot_id=bot_id_to_operate ) await schedule_cmd.finish(message) - elif all_enabled.available: - if group_id.available: - gid = group_id.result - count, message = await scheduler_manager.pause_schedules_by_group(gid) + elif isinstance(target, TargetAll): + if target.for_group: + _, message = await scheduler_manager.pause_schedules_by_group( + target.for_group + ) await schedule_cmd.finish(message) else: - count, message = await scheduler_manager.pause_all_schedules() + _, message = await scheduler_manager.pause_all_schedules() await schedule_cmd.finish(message) else: @@ -377,44 +568,35 @@ async def _( @schedule_cmd.assign("恢复") async def _( - event: GroupMessageEvent, - schedule_id: Match[int] = AlconnaMatch("schedule_id"), - all_enabled: Query[bool] = Query("恢复.all"), - plugin_name: Match[str] = AlconnaMatch("plugin_name"), - group_id: Match[str] = AlconnaMatch("group_id"), + target: TargetScope = Depends(ParseScheduleTargetForResume), + bot_id_to_operate: str = Depends(GetBotId), ): - if schedule_id.available: - success, message = await scheduler_manager.resume_schedule(schedule_id.result) + if isinstance(target, TargetByID): + _, message = await scheduler_manager.resume_schedule(target.id) await schedule_cmd.finish(message) - elif plugin_name.available: - p_name = plugin_name.result + elif isinstance(target, TargetByPlugin): + p_name = target.plugin if p_name not in scheduler_manager.get_registered_plugins(): await schedule_cmd.finish(f"未找到插件 '{p_name}'。") - if all_enabled.available: - count, message = await scheduler_manager.resume_schedules_by_plugin(p_name) - await schedule_cmd.finish(message) - elif group_id.available: - gid = group_id.result - success, message = await scheduler_manager.resume_schedule_by_plugin_group( - p_name, gid - ) + if target.all_groups: + _, message = await scheduler_manager.resume_schedules_by_plugin(p_name) await schedule_cmd.finish(message) else: - gid = str(event.group_id) - success, message = await scheduler_manager.resume_schedule_by_plugin_group( - p_name, gid + _, message = await scheduler_manager.resume_schedule_by_plugin_group( + p_name, target.group_id, bot_id=bot_id_to_operate ) await schedule_cmd.finish(message) - elif all_enabled.available: - if group_id.available: - gid = group_id.result - count, message = await scheduler_manager.resume_schedules_by_group(gid) + elif isinstance(target, TargetAll): + if target.for_group: + _, message = await scheduler_manager.resume_schedules_by_group( + target.for_group + ) await schedule_cmd.finish(message) else: - count, message = await scheduler_manager.resume_all_schedules() + _, message = await scheduler_manager.resume_all_schedules() await schedule_cmd.finish(message) else: @@ -423,27 +605,44 @@ async def _( @schedule_cmd.assign("执行") async def _(schedule_id: int): - success, message = await scheduler_manager.trigger_now(schedule_id) + _, message = await scheduler_manager.trigger_now(schedule_id) await schedule_cmd.finish(message) @schedule_cmd.assign("更新") -async def _(schedule_id: int, time: Match[str], kwargs_str: Match[str]): - if not time.available and not kwargs_str.available: - await schedule_cmd.finish("请提供需要更新的时间 (--time) 或参数 (--kwargs)。") +async def _( + schedule_id: int, + cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"), + interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"), + date_expr: Match[str] = AlconnaMatch("date.date_expr"), + kwargs_str: Match[str] = AlconnaMatch("kwargs_str"), +): + if not any( + [ + cron_expr.available, + interval_expr.available, + date_expr.available, + kwargs_str.available, + ] + ): + await schedule_cmd.finish( + "请提供需要更新的时间 (--cron/--interval/--date) 或参数 (--kwargs)。" + ) trigger_config = None - if time.available: - try: - time_parts = time.result.split(":") - if len(time_parts) != 2: - raise ValueError("时间格式应为 HH:MM") - hour, minute = map(int, time_parts) - if not (0 <= hour <= 23) or not (0 <= minute <= 59): - raise ValueError("小时应在 0-23 范围内,分钟应在 0-59 范围内") - trigger_config = {"hour": hour, "minute": minute} - except ValueError as e: - await schedule_cmd.finish(f"时间格式错误: {e}") + try: + if cron_expr.available: + parts = cron_expr.result.split() + if len(parts) != 5: + raise ValueError("Cron 表达式必须有5个部分") + cron_keys = ["minute", "hour", "day", "month", "day_of_week"] + trigger_config = dict(zip(cron_keys, parts)) + elif interval_expr.available: + trigger_config = _parse_interval(interval_expr.result) + elif date_expr.available: + trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)} + except ValueError as e: + await schedule_cmd.finish(f"时间参数解析错误: {e}") job_kwargs = None if kwargs_str.available: @@ -494,7 +693,7 @@ async def _(schedule_id: int, time: Match[str], kwargs_str: Match[str]): f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}" ) - success, message = await scheduler_manager.update_schedule( + _, message = await scheduler_manager.update_schedule( schedule_id, trigger_config, job_kwargs ) await schedule_cmd.finish(message) diff --git a/zhenxun/models/schedule_info.py b/zhenxun/models/schedule_info.py index 33b9303f..c7583078 100644 --- a/zhenxun/models/schedule_info.py +++ b/zhenxun/models/schedule_info.py @@ -6,9 +6,17 @@ from zhenxun.services.db_context import Model class ScheduleInfo(Model): id = fields.IntField(pk=True, generated=True, auto_increment=True) """自增id""" + bot_id = fields.CharField( + 255, null=True, default=None, description="任务关联的Bot ID" + ) + """任务关联的Bot ID""" plugin_name = fields.CharField(255, description="插件模块名") """插件模块名""" - group_id = fields.CharField(255, null=True, description="群组ID, 为空表示全局任务") + group_id = fields.CharField( + 255, + null=True, + description="群组ID, '__ALL_GROUPS__' 表示所有群, 为空表示全局任务", + ) """群组ID, 为空表示全局任务""" trigger_type = fields.CharField( max_length=20, default="cron", description="触发器类型 (cron, interval, date)" diff --git a/zhenxun/utils/manager/schedule_manager.py b/zhenxun/utils/manager/schedule_manager.py index c1f4f928..0066d3a7 100644 --- a/zhenxun/utils/manager/schedule_manager.py +++ b/zhenxun/utils/manager/schedule_manager.py @@ -1,5 +1,7 @@ import asyncio from collections.abc import Callable, Coroutine +import inspect +import random from typing import ClassVar import nonebot @@ -58,7 +60,7 @@ class SchedulerManager: async def _execute_job(self, schedule_id: int): """ APScheduler 调度的入口函数。 - 它会根据 group_id 执行单个群组的任务。 + 根据 schedule_id 处理特定任务、所有群组任务或全局任务。 """ schedule = await ScheduleInfo.get_or_none(id=schedule_id) if not schedule or not schedule.is_enabled: @@ -66,7 +68,6 @@ class SchedulerManager: return plugin_name = schedule.plugin_name - group_id = schedule.group_id task_meta = self._registered_tasks.get(plugin_name) if not task_meta: @@ -79,13 +80,78 @@ class SchedulerManager: return try: - bot = nonebot.get_bot() + if schedule.bot_id: + bot = nonebot.get_bot(schedule.bot_id) + else: + bot = nonebot.get_bot() + logger.debug( + f"任务 {schedule_id} 未关联特定Bot,使用默认Bot {bot.self_id}" + ) + except KeyError: + logger.warning( + f"定时任务 {schedule_id} 需要的 Bot {schedule.bot_id} " + f"不在线,本次执行跳过。" + ) + return + except ValueError: + logger.warning(f"当前没有Bot在线,定时任务 {schedule_id} 跳过。") + return + if schedule.group_id == "__ALL_GROUPS__": + await self._execute_for_all_groups(schedule, task_meta, bot) + else: + await self._execute_for_single_target(schedule, task_meta, bot) + + async def _execute_for_all_groups( + self, schedule: ScheduleInfo, task_meta: dict, bot + ): + """为所有群组执行任务,并处理优先级覆盖。""" + plugin_name = schedule.plugin_name + logger.info( + f"开始执行针对 [所有群组] 的任务 " + f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})" + ) + + all_gids = set() + try: + group_list, _ = await PlatformUtils.get_group_list(bot) + all_gids.update( + g.group_id for g in group_list if g.group_id and not g.channel_id + ) + except Exception as e: + logger.error(f"为 'all' 任务获取 Bot {bot.self_id} 的群列表失败", e=e) + return + + specific_tasks_gids = set( + await ScheduleInfo.filter( + plugin_name=plugin_name, group_id__in=list(all_gids) + ).values_list("group_id", flat=True) + ) + + for gid in all_gids: + if gid in specific_tasks_gids: + logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。") + continue + + temp_schedule = schedule + temp_schedule.group_id = gid + await self._execute_for_single_target(temp_schedule, task_meta, bot) + await asyncio.sleep(random.uniform(0.1, 0.5)) + + async def _execute_for_single_target( + self, schedule: ScheduleInfo, task_meta: dict, bot + ): + """为单个目标(具体群组或全局)执行任务。""" + plugin_name = schedule.plugin_name + group_id = schedule.group_id + + try: is_blocked = await CommonUtils.task_is_block(bot, plugin_name, group_id) if is_blocked: + target_desc = f"群 {group_id}" if group_id else "全局" logger.info( - f"插件 '{plugin_name}' 在群 {group_id} " - f"的定时任务因功能被禁用而跳过执行。" + f"插件 '{plugin_name}' 的定时任务在目标 [{target_desc}]" + "因功能被禁用而跳过执行。" ) return @@ -93,13 +159,17 @@ class SchedulerManager: job_kwargs = schedule.job_kwargs if not isinstance(job_kwargs, dict): logger.error( - f"任务 {schedule_id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}" + f"任务 {schedule.id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}" ) return + sig = inspect.signature(task_func) + if "bot" in sig.parameters: + job_kwargs["bot"] = bot + logger.info( - f"插件 '{plugin_name}' 开始为群 {group_id} 执行定时任务 " - f"(ID: {schedule_id})。" + f"插件 '{plugin_name}' 开始为目标 [{group_id or '全局'}] " + f"执行定时任务 (ID: {schedule.id})。" ) task = asyncio.create_task(task_func(group_id, **job_kwargs)) self._running_tasks.add(task) @@ -107,7 +177,8 @@ class SchedulerManager: await task except Exception as e: logger.error( - f"执行定时任务 (ID: {schedule_id}, Plugin: {plugin_name}) 时发生异常", + f"执行定时任务 (ID: {schedule.id}, 插件: {plugin_name}, " + f"目标: {group_id or '全局'}) 时发生异常", e=e, ) @@ -155,30 +226,30 @@ class SchedulerManager: trigger_type: str, trigger_config: dict, job_kwargs: dict | None = None, + bot_id: str | None = None, ) -> tuple[bool, str]: """ 添加或更新一个定时任务。 - 如果已有相同 plugin_name 和 group_id 的任务,则会更新它。 - - Args: - plugin_name: 插件名。 - group_id: 群组ID,为None表示全局任务。 - trigger_type: "cron", "interval", "date"。 - trigger_config: 对应触发器的配置字典。 e.g., {"hour": 8, "minute": 30} - job_kwargs: 传递给任务函数的额外关键字参数。 """ if plugin_name not in self._registered_tasks: return False, f"插件 '{plugin_name}' 没有注册可用的定时任务。" + search_kwargs = { + "plugin_name": plugin_name, + "group_id": group_id, + "bot_id": bot_id, + } + + defaults = { + "trigger_type": trigger_type, + "trigger_config": trigger_config, + "job_kwargs": job_kwargs if job_kwargs is not None else {}, + "is_enabled": True, + } + schedule, created = await ScheduleInfo.update_or_create( - plugin_name=plugin_name, - group_id=group_id, - defaults={ - "trigger_type": trigger_type, - "trigger_config": trigger_config, - "job_kwargs": job_kwargs if job_kwargs is not None else {}, - "is_enabled": True, - }, + **search_kwargs, + defaults=defaults, ) self._add_aps_job(schedule) action = "设置" if created else "更新" @@ -254,24 +325,41 @@ class SchedulerManager: return True, f"成功更新了任务 ID: {schedule_id} 的配置。" async def remove_schedule( - self, plugin_name: str, group_id: str | None + self, plugin_name: str, group_id: str | None, bot_id: str | None = None ) -> tuple[bool, str]: """移除指定插件和群组的定时任务。""" - schedules = await ScheduleInfo.filter( - plugin_name=plugin_name, group_id=group_id - ) + query = {"plugin_name": plugin_name, "group_id": group_id} + if bot_id: + query["bot_id"] = bot_id + + schedules = await ScheduleInfo.filter(**query) if not schedules: - return False, f"群 {group_id} 未设置插件 '{plugin_name}' 的定时任务。" + msg = ( + f"未找到与 Bot {bot_id} 相关的群 {group_id} " + f"的插件 '{plugin_name}' 定时任务。" + ) + return (False, msg) for schedule in schedules: self._remove_aps_job(schedule.id) await schedule.delete() - return True, f"已取消群 {group_id} 的插件 '{plugin_name}' 所有定时任务。" + target_desc = f"群 {group_id}" if group_id else "全局" + msg = ( + f"已取消 Bot {bot_id} 在 [{target_desc}] " + f"的插件 '{plugin_name}' 所有定时任务。" + ) + return (True, msg) - async def remove_schedule_for_all(self, plugin_name: str) -> int: + async def remove_schedule_for_all( + self, plugin_name: str, bot_id: str | None = None + ) -> int: """移除指定插件在所有群组的定时任务。""" - schedules_to_delete = await ScheduleInfo.filter(plugin_name=plugin_name).all() + query = {"plugin_name": plugin_name} + if bot_id: + query["bot_id"] = bot_id + + schedules_to_delete = await ScheduleInfo.filter(**query).all() if not schedules_to_delete: return 0 @@ -364,12 +452,14 @@ class SchedulerManager: ) async def pause_schedule_by_plugin_group( - self, plugin_name: str, group_id: str | None + self, plugin_name: str, group_id: str | None, bot_id: str | None = None ) -> tuple[bool, str]: """暂停指定插件在指定群组的定时任务。""" - schedules = await ScheduleInfo.filter( - plugin_name=plugin_name, group_id=group_id, is_enabled=True - ) + query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": True} + if bot_id: + query["bot_id"] = bot_id + + schedules = await ScheduleInfo.filter(**query) if not schedules: return ( False, @@ -388,12 +478,14 @@ class SchedulerManager: ) async def resume_schedule_by_plugin_group( - self, plugin_name: str, group_id: str | None + self, plugin_name: str, group_id: str | None, bot_id: str | None = None ) -> tuple[bool, str]: """恢复指定插件在指定群组的定时任务。""" - schedules = await ScheduleInfo.filter( - plugin_name=plugin_name, group_id=group_id, is_enabled=False - ) + query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": False} + if bot_id: + query["bot_id"] = bot_id + + schedules = await ScheduleInfo.filter(**query) if not schedules: return ( False, @@ -508,6 +600,7 @@ class SchedulerManager: status = { "id": schedule.id, + "bot_id": schedule.bot_id, "plugin_name": schedule.plugin_name, "group_id": schedule.group_id, "is_enabled": schedule.is_enabled,