feat(scheduler): 支持全局定时任务(__ALL_GROUPS__)和多Bot管理

This commit is contained in:
webjoin111 2025-06-24 07:39:47 +08:00
parent ab9d4d55c3
commit 2fb67c4173
4 changed files with 478 additions and 174 deletions

View File

@ -10,17 +10,21 @@ __plugin_meta__ = PluginMetadata(
description="查看和管理由 SchedulerManager 控制的定时任务。",
usage="""
定时任务 查看 [-all] [-g <群号>] [-p <插件>] [--page <页码>] : 查看定时任务
定时任务 设置 <插件> <时间> [-g <群号> | --all] [--kwargs <参数>] :
设置/开启定时任务 (SUPERUSER)
定时任务 删除 <任务ID> : 通过ID删除任务 (SUPERUSER)
定时任务 删除 -p <插件> [-g <群号> | --all] : 通过插件+群组删除任务 (SUPERUSER)
定时任务 删除 -all [-g <群号>] : 删除所有群组的所有任务-g 指定群 (SUPERUSER)
定时任务 暂停 <任务ID> | -all [-g <群号>] | -p <插件> [-g <群号> | -all] :
暂停任务-all 为所有群组 (SUPERUSER)
定时任务 恢复 <任务ID> | -all [-g <群号>] | -p <插件> [-g <群号> | -all] :
恢复任务-all 为所有群组 (SUPERUSER)
定时任务 设置 <插件> [时间选项] [-g <群号> | -g all] [--kwargs <参数>] :
设置/开启任务 (SUPERUSER)
时间选项 (三选一):
--cron "<分> <时> <日> <月> <周>" : 设置 cron 表达式
(例如: --cron "0 8 * * *")
--interval <时间间隔> : 设置时间间隔
(例如: --interval 30m, --interval 2h, --interval 10s)
--date "<YYYY-MM-DD HH:MM:SS>" : 设置在特定时间执行一次
定时任务 删除 <任务ID> | -p <插件> [-g <群号>] | -all : 删除任务 (SUPERUSER)
定时任务 暂停/恢复 <任务ID> | -p <插件> [-g <群号>] | -all :
暂停/恢复任务 (SUPERUSER)
定时任务 执行 <任务ID> : 立即手动执行一次任务 (SUPERUSER)
定时任务 更新 <任务ID> [--time <时间>] [--kwargs <参数>] : 更新任务配置 (SUPERUSER)
定时任务 更新 <任务ID> [时间选项] [--kwargs <参数>] : 更新任务配置 (SUPERUSER)
定时任务 插件列表 : 查看所有可设置定时任务的插件 (SUPERUSER)
别名支持:
@ -35,7 +39,7 @@ __plugin_meta__ = PluginMetadata(
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1.0",
version="0.1.1",
plugin_type=PluginType.SUPERUSER,
is_show=False,
).to_dict(),

View File

@ -1,6 +1,9 @@
import asyncio
from datetime import datetime
import re
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent
from nonebot.params import Depends
from nonebot.permission import SUPERUSER
from nonebot_plugin_alconna import (
Alconna,
@ -57,6 +60,152 @@ def _format_params(schedule_status: dict) -> str:
return "-"
def _parse_interval(interval_str: str) -> dict:
match = re.match(r"(\d+)([smh])", interval_str.lower())
if not match:
raise ValueError("时间间隔格式错误, 请使用如 '30m', '2h', '10s' 的格式。")
value, unit = int(match.group(1)), match.group(2)
if unit == "s":
return {"seconds": value}
if unit == "m":
return {"minutes": value}
if unit == "h":
return {"hours": value}
return {}
async def GetBotId(
bot: Bot,
bot_id_match: Match[str] = AlconnaMatch("bot_id"),
) -> str:
"""获取要操作的Bot ID"""
if bot_id_match.available:
return bot_id_match.result
return bot.self_id
class ScheduleTarget:
"""定时任务操作目标的基类"""
pass
class TargetByID(ScheduleTarget):
"""按任务ID操作"""
def __init__(self, id: int):
self.id = id
class TargetByPlugin(ScheduleTarget):
"""按插件名操作"""
def __init__(
self, plugin: str, group_id: str | None = None, all_groups: bool = False
):
self.plugin = plugin
self.group_id = group_id
self.all_groups = all_groups
class TargetAll(ScheduleTarget):
"""操作所有任务"""
def __init__(self, for_group: str | None = None):
self.for_group = for_group
TargetScope = TargetByID | TargetByPlugin | TargetAll | None
async def ParseScheduleTargetForDelete(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query("删除.all"),
) -> TargetScope:
"""解析删除命令的操作目标"""
if schedule_id.available:
return TargetByID(schedule_id.result)
if plugin_name.available:
p_name = plugin_name.result
if all_enabled.available:
return TargetByPlugin(plugin=p_name, all_groups=True)
elif group_id.available:
gid = group_id.result
if gid.lower() == "all":
gid = "__ALL_GROUPS__"
return TargetByPlugin(plugin=p_name, group_id=gid)
else:
return TargetByPlugin(plugin=p_name, group_id=str(event.group_id))
if all_enabled.available:
return TargetAll(for_group=group_id.result if group_id.available else None)
return None
async def ParseScheduleTargetForPause(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query("暂停.all"),
) -> TargetScope:
"""解析暂停命令的操作目标"""
if schedule_id.available:
return TargetByID(schedule_id.result)
if plugin_name.available:
p_name = plugin_name.result
if all_enabled.available:
return TargetByPlugin(plugin=p_name, all_groups=True)
elif group_id.available:
gid = group_id.result
if gid.lower() == "all":
gid = "__ALL_GROUPS__"
return TargetByPlugin(plugin=p_name, group_id=gid)
else:
return TargetByPlugin(plugin=p_name, group_id=str(event.group_id))
if all_enabled.available:
return TargetAll(for_group=group_id.result if group_id.available else None)
return None
async def ParseScheduleTargetForResume(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query("恢复.all"),
) -> TargetScope:
"""解析恢复命令的操作目标"""
if schedule_id.available:
return TargetByID(schedule_id.result)
if plugin_name.available:
p_name = plugin_name.result
if all_enabled.available:
return TargetByPlugin(plugin=p_name, all_groups=True)
elif group_id.available:
gid = group_id.result
if gid.lower() == "all":
gid = "__ALL_GROUPS__"
return TargetByPlugin(plugin=p_name, group_id=gid)
else:
return TargetByPlugin(plugin=p_name, group_id=str(event.group_id))
if all_enabled.available:
return TargetAll(for_group=group_id.result if group_id.available else None)
return None
schedule_cmd = on_alconna(
Alconna(
"定时任务",
@ -71,10 +220,16 @@ schedule_cmd = on_alconna(
),
Subcommand(
"设置",
Args["plugin_name", str]["time", str],
Option("-g", Args["group_id", str], help_text="指定群组ID"),
Option("-all", help_text="对所有群生效"),
Args["plugin_name", str],
Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"),
Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"),
Option("--date", Args["date_expr", str], help_text="设置特定执行日期"),
Option("-g", Args["group_id", str], help_text="指定群组ID或'all'"),
Option("-all", help_text="对所有群生效 (等同于 -g all)"),
Option("--kwargs", Args["kwargs_str", str], help_text="设置任务参数"),
Option(
"--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)"
),
alias=["add", "开启"],
help_text="设置/开启一个定时任务",
),
@ -84,6 +239,9 @@ schedule_cmd = on_alconna(
Option("-p", Args["plugin_name", str], help_text="指定插件名"),
Option("-g", Args["group_id", str], help_text="指定群组ID"),
Option("-all", help_text="对所有群生效"),
Option(
"--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)"
),
alias=["del", "rm", "remove", "关闭", "取消"],
help_text="删除一个或多个定时任务",
),
@ -93,6 +251,9 @@ schedule_cmd = on_alconna(
Option("-all", help_text="对当前群所有任务生效"),
Option("-p", Args["plugin_name", str], help_text="指定插件名"),
Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"),
Option(
"--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)"
),
alias=["pause"],
help_text="暂停一个或多个定时任务",
),
@ -102,6 +263,9 @@ schedule_cmd = on_alconna(
Option("-all", help_text="对当前群所有任务生效"),
Option("-p", Args["plugin_name", str], help_text="指定插件名"),
Option("-g", Args["group_id", str], help_text="指定群组ID (SUPERUSER)"),
Option(
"--bot", Args["bot_id", str], help_text="指定操作的Bot ID (SUPERUSER)"
),
alias=["resume"],
help_text="恢复一个或多个定时任务",
),
@ -114,7 +278,9 @@ schedule_cmd = on_alconna(
Subcommand(
"更新",
Args["schedule_id", int],
Option("--time", Args["time", str], help_text="更新时间 (HH:MM)"),
Option("--cron", Args["cron_expr", str], help_text="设置 cron 表达式"),
Option("--interval", Args["interval_expr", str], help_text="设置时间间隔"),
Option("--date", Args["date_expr", str], help_text="设置特定执行日期"),
Option("--kwargs", Args["kwargs_str", str], help_text="更新参数"),
alias=["update", "modify", "修改"],
help_text="更新任务配置",
@ -190,7 +356,8 @@ async def _(
[
s["id"],
s["plugin_name"],
s["group_id"],
s.get("bot_id") or "N/A",
s["group_id"] or "全局",
s["next_run_time"],
_format_trigger(s),
_format_params(s),
@ -206,7 +373,16 @@ async def _(
img = await ImageTemplate.table_page(
head_text=title,
tip_text=f"{current_page}/{total_pages} 页,共 {total_items} 条任务",
column_name=["ID", "插件", "群组/目标", "下次运行", "触发规则", "参数", "状态"],
column_name=[
"ID",
"插件",
"Bot ID",
"群组/目标",
"下次运行",
"触发规则",
"参数",
"状态",
],
data_list=data_list,
column_space=20,
)
@ -216,26 +392,43 @@ async def _(
@schedule_cmd.assign("设置")
async def _(
plugin_name: str,
time: str,
cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"),
interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"),
date_expr: Match[str] = AlconnaMatch("date.date_expr"),
group_id: Match[str] = AlconnaMatch("group_id"),
kwargs_str: Match[str] = AlconnaMatch("kwargs_str"),
all_enabled: Query[bool] = Query("设置.all"),
bot_id_to_operate: str = Depends(GetBotId),
):
if plugin_name not in scheduler_manager._registered_tasks:
await schedule_cmd.finish(
f"插件 '{plugin_name}' 没有注册可用的定时任务。\n"
f"可用插件: {list(scheduler_manager._registered_tasks.keys())}"
)
trigger_type = ""
trigger_config = {}
try:
time_parts = time.split(":")
if len(time_parts) != 2:
raise ValueError("时间格式应为 HH:MM")
hour, minute = map(int, time_parts)
if not (0 <= hour <= 23 and 0 <= minute <= 59):
raise ValueError("小时或分钟超出有效范围")
trigger_config = {"hour": hour, "minute": minute}
if cron_expr.available:
trigger_type = "cron"
parts = cron_expr.result.split()
if len(parts) != 5:
raise ValueError("Cron 表达式必须有5个部分 (分 时 日 月 周)")
cron_keys = ["minute", "hour", "day", "month", "day_of_week"]
trigger_config = dict(zip(cron_keys, parts))
elif interval_expr.available:
trigger_type = "interval"
trigger_config = _parse_interval(interval_expr.result)
elif date_expr.available:
trigger_type = "date"
trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)}
else:
await schedule_cmd.finish(
"必须提供一种时间选项: --cron, --interval, 或 --date。"
)
except ValueError as e:
await schedule_cmd.finish(f"时间格式错误: {e}")
await schedule_cmd.finish(f"时间参数解析错误: {e}")
job_kwargs = None
if kwargs_str.available:
@ -258,69 +451,76 @@ async def _(
f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}"
)
if all_enabled.available:
success, fail = await scheduler_manager.add_schedule_for_all(
plugin_name, "cron", trigger_config, job_kwargs
)
await schedule_cmd.finish(f"已为 {success} 个群组设置任务,{fail} 个失败。")
target_group_id: str | None = None
if group_id.available and group_id.result.lower() == "all":
target_group_id = "__ALL_GROUPS__"
elif all_enabled.available:
target_group_id = "__ALL_GROUPS__"
elif group_id.available:
success, msg = await scheduler_manager.add_schedule(
plugin_name, group_id.result, "cron", trigger_config, job_kwargs
)
await schedule_cmd.finish(msg)
target_group_id = group_id.result
else:
success, msg = await scheduler_manager.add_schedule(
plugin_name, None, "cron", trigger_config, job_kwargs
)
await schedule_cmd.finish(f"已设置全局任务: {msg}")
target_group_id = None
success, msg = await scheduler_manager.add_schedule(
plugin_name,
target_group_id,
trigger_type,
trigger_config,
job_kwargs,
bot_id=bot_id_to_operate,
)
if target_group_id == "__ALL_GROUPS__":
target_desc = "所有群组"
elif target_group_id is None:
target_desc = "全局"
else:
target_desc = f"群组 {target_group_id}"
if success:
await schedule_cmd.finish(f"已成功为 [{target_desc}] {msg}")
else:
await schedule_cmd.finish(f"为 [{target_desc}] 设置任务失败: {msg}")
@schedule_cmd.assign("删除")
async def _(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
all_enabled: Query[bool] = Query("删除.all"),
target: TargetScope = Depends(ParseScheduleTargetForDelete),
bot_id_to_operate: str = Depends(GetBotId),
):
if schedule_id.available:
success, message = await scheduler_manager.remove_schedule_by_id(
schedule_id.result
)
if isinstance(target, TargetByID):
_, message = await scheduler_manager.remove_schedule_by_id(target.id)
await schedule_cmd.finish(message)
elif plugin_name.available:
p_name = plugin_name.result
elif isinstance(target, TargetByPlugin):
p_name = target.plugin
if p_name not in scheduler_manager.get_registered_plugins():
await schedule_cmd.finish(f"未找到插件 '{p_name}'")
if all_enabled.available:
removed_count = await scheduler_manager.remove_schedule_for_all(p_name)
if target.all_groups:
removed_count = await scheduler_manager.remove_schedule_for_all(
p_name, bot_id=bot_id_to_operate
)
message = (
f"已取消了 {removed_count} 个群组的插件 '{p_name}' 定时任务。"
if removed_count > 0
else f"没有找到插件 '{p_name}' 的定时任务。"
)
await schedule_cmd.finish(message)
elif group_id.available:
success, message = await scheduler_manager.remove_schedule(
p_name, group_id.result
else:
_, message = await scheduler_manager.remove_schedule(
p_name, target.group_id, bot_id=bot_id_to_operate
)
await schedule_cmd.finish(message)
else:
gid = str(event.group_id)
success, message = await scheduler_manager.remove_schedule(p_name, gid)
await schedule_cmd.finish(message)
elif all_enabled.available:
if group_id.available:
gid = group_id.result
success, message = await scheduler_manager.remove_schedules_by_group(gid)
elif isinstance(target, TargetAll):
if target.for_group:
_, message = await scheduler_manager.remove_schedules_by_group(
target.for_group
)
await schedule_cmd.finish(message)
else:
count, message = await scheduler_manager.remove_all_schedules()
_, message = await scheduler_manager.remove_all_schedules()
await schedule_cmd.finish(message)
else:
@ -331,44 +531,35 @@ async def _(
@schedule_cmd.assign("暂停")
async def _(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
all_enabled: Query[bool] = Query("暂停.all"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
target: TargetScope = Depends(ParseScheduleTargetForPause),
bot_id_to_operate: str = Depends(GetBotId),
):
if schedule_id.available:
success, message = await scheduler_manager.pause_schedule(schedule_id.result)
if isinstance(target, TargetByID):
_, message = await scheduler_manager.pause_schedule(target.id)
await schedule_cmd.finish(message)
elif plugin_name.available:
p_name = plugin_name.result
elif isinstance(target, TargetByPlugin):
p_name = target.plugin
if p_name not in scheduler_manager.get_registered_plugins():
await schedule_cmd.finish(f"未找到插件 '{p_name}'")
if all_enabled.available:
count, message = await scheduler_manager.pause_schedules_by_plugin(p_name)
await schedule_cmd.finish(message)
elif group_id.available:
gid = group_id.result
success, message = await scheduler_manager.pause_schedule_by_plugin_group(
p_name, gid
)
if target.all_groups:
_, message = await scheduler_manager.pause_schedules_by_plugin(p_name)
await schedule_cmd.finish(message)
else:
gid = str(event.group_id)
success, message = await scheduler_manager.pause_schedule_by_plugin_group(
p_name, gid
_, message = await scheduler_manager.pause_schedule_by_plugin_group(
p_name, target.group_id, bot_id=bot_id_to_operate
)
await schedule_cmd.finish(message)
elif all_enabled.available:
if group_id.available:
gid = group_id.result
count, message = await scheduler_manager.pause_schedules_by_group(gid)
elif isinstance(target, TargetAll):
if target.for_group:
_, message = await scheduler_manager.pause_schedules_by_group(
target.for_group
)
await schedule_cmd.finish(message)
else:
count, message = await scheduler_manager.pause_all_schedules()
_, message = await scheduler_manager.pause_all_schedules()
await schedule_cmd.finish(message)
else:
@ -377,44 +568,35 @@ async def _(
@schedule_cmd.assign("恢复")
async def _(
event: GroupMessageEvent,
schedule_id: Match[int] = AlconnaMatch("schedule_id"),
all_enabled: Query[bool] = Query("恢复.all"),
plugin_name: Match[str] = AlconnaMatch("plugin_name"),
group_id: Match[str] = AlconnaMatch("group_id"),
target: TargetScope = Depends(ParseScheduleTargetForResume),
bot_id_to_operate: str = Depends(GetBotId),
):
if schedule_id.available:
success, message = await scheduler_manager.resume_schedule(schedule_id.result)
if isinstance(target, TargetByID):
_, message = await scheduler_manager.resume_schedule(target.id)
await schedule_cmd.finish(message)
elif plugin_name.available:
p_name = plugin_name.result
elif isinstance(target, TargetByPlugin):
p_name = target.plugin
if p_name not in scheduler_manager.get_registered_plugins():
await schedule_cmd.finish(f"未找到插件 '{p_name}'")
if all_enabled.available:
count, message = await scheduler_manager.resume_schedules_by_plugin(p_name)
await schedule_cmd.finish(message)
elif group_id.available:
gid = group_id.result
success, message = await scheduler_manager.resume_schedule_by_plugin_group(
p_name, gid
)
if target.all_groups:
_, message = await scheduler_manager.resume_schedules_by_plugin(p_name)
await schedule_cmd.finish(message)
else:
gid = str(event.group_id)
success, message = await scheduler_manager.resume_schedule_by_plugin_group(
p_name, gid
_, message = await scheduler_manager.resume_schedule_by_plugin_group(
p_name, target.group_id, bot_id=bot_id_to_operate
)
await schedule_cmd.finish(message)
elif all_enabled.available:
if group_id.available:
gid = group_id.result
count, message = await scheduler_manager.resume_schedules_by_group(gid)
elif isinstance(target, TargetAll):
if target.for_group:
_, message = await scheduler_manager.resume_schedules_by_group(
target.for_group
)
await schedule_cmd.finish(message)
else:
count, message = await scheduler_manager.resume_all_schedules()
_, message = await scheduler_manager.resume_all_schedules()
await schedule_cmd.finish(message)
else:
@ -423,27 +605,44 @@ async def _(
@schedule_cmd.assign("执行")
async def _(schedule_id: int):
success, message = await scheduler_manager.trigger_now(schedule_id)
_, message = await scheduler_manager.trigger_now(schedule_id)
await schedule_cmd.finish(message)
@schedule_cmd.assign("更新")
async def _(schedule_id: int, time: Match[str], kwargs_str: Match[str]):
if not time.available and not kwargs_str.available:
await schedule_cmd.finish("请提供需要更新的时间 (--time) 或参数 (--kwargs)。")
async def _(
schedule_id: int,
cron_expr: Match[str] = AlconnaMatch("cron.cron_expr"),
interval_expr: Match[str] = AlconnaMatch("interval.interval_expr"),
date_expr: Match[str] = AlconnaMatch("date.date_expr"),
kwargs_str: Match[str] = AlconnaMatch("kwargs_str"),
):
if not any(
[
cron_expr.available,
interval_expr.available,
date_expr.available,
kwargs_str.available,
]
):
await schedule_cmd.finish(
"请提供需要更新的时间 (--cron/--interval/--date) 或参数 (--kwargs)。"
)
trigger_config = None
if time.available:
try:
time_parts = time.result.split(":")
if len(time_parts) != 2:
raise ValueError("时间格式应为 HH:MM")
hour, minute = map(int, time_parts)
if not (0 <= hour <= 23) or not (0 <= minute <= 59):
raise ValueError("小时应在 0-23 范围内,分钟应在 0-59 范围内")
trigger_config = {"hour": hour, "minute": minute}
except ValueError as e:
await schedule_cmd.finish(f"时间格式错误: {e}")
try:
if cron_expr.available:
parts = cron_expr.result.split()
if len(parts) != 5:
raise ValueError("Cron 表达式必须有5个部分")
cron_keys = ["minute", "hour", "day", "month", "day_of_week"]
trigger_config = dict(zip(cron_keys, parts))
elif interval_expr.available:
trigger_config = _parse_interval(interval_expr.result)
elif date_expr.available:
trigger_config = {"run_date": datetime.fromisoformat(date_expr.result)}
except ValueError as e:
await schedule_cmd.finish(f"时间参数解析错误: {e}")
job_kwargs = None
if kwargs_str.available:
@ -494,7 +693,7 @@ async def _(schedule_id: int, time: Match[str], kwargs_str: Match[str]):
f"参数格式错误,请使用 'key=value,key2=value2' 格式。错误: {e}"
)
success, message = await scheduler_manager.update_schedule(
_, message = await scheduler_manager.update_schedule(
schedule_id, trigger_config, job_kwargs
)
await schedule_cmd.finish(message)

View File

@ -6,9 +6,17 @@ from zhenxun.services.db_context import Model
class ScheduleInfo(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
bot_id = fields.CharField(
255, null=True, default=None, description="任务关联的Bot ID"
)
"""任务关联的Bot ID"""
plugin_name = fields.CharField(255, description="插件模块名")
"""插件模块名"""
group_id = fields.CharField(255, null=True, description="群组ID, 为空表示全局任务")
group_id = fields.CharField(
255,
null=True,
description="群组ID, '__ALL_GROUPS__' 表示所有群, 为空表示全局任务",
)
"""群组ID, 为空表示全局任务"""
trigger_type = fields.CharField(
max_length=20, default="cron", description="触发器类型 (cron, interval, date)"

View File

@ -1,5 +1,7 @@
import asyncio
from collections.abc import Callable, Coroutine
import inspect
import random
from typing import ClassVar
import nonebot
@ -58,7 +60,7 @@ class SchedulerManager:
async def _execute_job(self, schedule_id: int):
"""
APScheduler 调度的入口函数
它会根据 group_id 执行单个群组的任务
根据 schedule_id 处理特定任务所有群组任务或全局任务
"""
schedule = await ScheduleInfo.get_or_none(id=schedule_id)
if not schedule or not schedule.is_enabled:
@ -66,7 +68,6 @@ class SchedulerManager:
return
plugin_name = schedule.plugin_name
group_id = schedule.group_id
task_meta = self._registered_tasks.get(plugin_name)
if not task_meta:
@ -79,13 +80,78 @@ class SchedulerManager:
return
try:
bot = nonebot.get_bot()
if schedule.bot_id:
bot = nonebot.get_bot(schedule.bot_id)
else:
bot = nonebot.get_bot()
logger.debug(
f"任务 {schedule_id} 未关联特定Bot使用默认Bot {bot.self_id}"
)
except KeyError:
logger.warning(
f"定时任务 {schedule_id} 需要的 Bot {schedule.bot_id} "
f"不在线,本次执行跳过。"
)
return
except ValueError:
logger.warning(f"当前没有Bot在线定时任务 {schedule_id} 跳过。")
return
if schedule.group_id == "__ALL_GROUPS__":
await self._execute_for_all_groups(schedule, task_meta, bot)
else:
await self._execute_for_single_target(schedule, task_meta, bot)
async def _execute_for_all_groups(
self, schedule: ScheduleInfo, task_meta: dict, bot
):
"""为所有群组执行任务,并处理优先级覆盖。"""
plugin_name = schedule.plugin_name
logger.info(
f"开始执行针对 [所有群组] 的任务 "
f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})"
)
all_gids = set()
try:
group_list, _ = await PlatformUtils.get_group_list(bot)
all_gids.update(
g.group_id for g in group_list if g.group_id and not g.channel_id
)
except Exception as e:
logger.error(f"'all' 任务获取 Bot {bot.self_id} 的群列表失败", e=e)
return
specific_tasks_gids = set(
await ScheduleInfo.filter(
plugin_name=plugin_name, group_id__in=list(all_gids)
).values_list("group_id", flat=True)
)
for gid in all_gids:
if gid in specific_tasks_gids:
logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。")
continue
temp_schedule = schedule
temp_schedule.group_id = gid
await self._execute_for_single_target(temp_schedule, task_meta, bot)
await asyncio.sleep(random.uniform(0.1, 0.5))
async def _execute_for_single_target(
self, schedule: ScheduleInfo, task_meta: dict, bot
):
"""为单个目标(具体群组或全局)执行任务。"""
plugin_name = schedule.plugin_name
group_id = schedule.group_id
try:
is_blocked = await CommonUtils.task_is_block(bot, plugin_name, group_id)
if is_blocked:
target_desc = f"{group_id}" if group_id else "全局"
logger.info(
f"插件 '{plugin_name}' 在群 {group_id} "
f"的定时任务因功能被禁用而跳过执行。"
f"插件 '{plugin_name}' 的定时任务在目标 [{target_desc}]"
"因功能被禁用而跳过执行。"
)
return
@ -93,13 +159,17 @@ class SchedulerManager:
job_kwargs = schedule.job_kwargs
if not isinstance(job_kwargs, dict):
logger.error(
f"任务 {schedule_id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}"
f"任务 {schedule.id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}"
)
return
sig = inspect.signature(task_func)
if "bot" in sig.parameters:
job_kwargs["bot"] = bot
logger.info(
f"插件 '{plugin_name}' 开始为群 {group_id} 执行定时任务 "
f"(ID: {schedule_id})。"
f"插件 '{plugin_name}' 开始为目标 [{group_id or '全局'}] "
f"执行定时任务 (ID: {schedule.id})。"
)
task = asyncio.create_task(task_func(group_id, **job_kwargs))
self._running_tasks.add(task)
@ -107,7 +177,8 @@ class SchedulerManager:
await task
except Exception as e:
logger.error(
f"执行定时任务 (ID: {schedule_id}, Plugin: {plugin_name}) 时发生异常",
f"执行定时任务 (ID: {schedule.id}, 插件: {plugin_name}, "
f"目标: {group_id or '全局'}) 时发生异常",
e=e,
)
@ -155,30 +226,30 @@ class SchedulerManager:
trigger_type: str,
trigger_config: dict,
job_kwargs: dict | None = None,
bot_id: str | None = None,
) -> tuple[bool, str]:
"""
添加或更新一个定时任务
如果已有相同 plugin_name group_id 的任务则会更新它
Args:
plugin_name: 插件名
group_id: 群组ID为None表示全局任务
trigger_type: "cron", "interval", "date"
trigger_config: 对应触发器的配置字典 e.g., {"hour": 8, "minute": 30}
job_kwargs: 传递给任务函数的额外关键字参数
"""
if plugin_name not in self._registered_tasks:
return False, f"插件 '{plugin_name}' 没有注册可用的定时任务。"
search_kwargs = {
"plugin_name": plugin_name,
"group_id": group_id,
"bot_id": bot_id,
}
defaults = {
"trigger_type": trigger_type,
"trigger_config": trigger_config,
"job_kwargs": job_kwargs if job_kwargs is not None else {},
"is_enabled": True,
}
schedule, created = await ScheduleInfo.update_or_create(
plugin_name=plugin_name,
group_id=group_id,
defaults={
"trigger_type": trigger_type,
"trigger_config": trigger_config,
"job_kwargs": job_kwargs if job_kwargs is not None else {},
"is_enabled": True,
},
**search_kwargs,
defaults=defaults,
)
self._add_aps_job(schedule)
action = "设置" if created else "更新"
@ -254,24 +325,41 @@ class SchedulerManager:
return True, f"成功更新了任务 ID: {schedule_id} 的配置。"
async def remove_schedule(
self, plugin_name: str, group_id: str | None
self, plugin_name: str, group_id: str | None, bot_id: str | None = None
) -> tuple[bool, str]:
"""移除指定插件和群组的定时任务。"""
schedules = await ScheduleInfo.filter(
plugin_name=plugin_name, group_id=group_id
)
query = {"plugin_name": plugin_name, "group_id": group_id}
if bot_id:
query["bot_id"] = bot_id
schedules = await ScheduleInfo.filter(**query)
if not schedules:
return False, f"{group_id} 未设置插件 '{plugin_name}' 的定时任务。"
msg = (
f"未找到与 Bot {bot_id} 相关的群 {group_id} "
f"的插件 '{plugin_name}' 定时任务。"
)
return (False, msg)
for schedule in schedules:
self._remove_aps_job(schedule.id)
await schedule.delete()
return True, f"已取消群 {group_id} 的插件 '{plugin_name}' 所有定时任务。"
target_desc = f"{group_id}" if group_id else "全局"
msg = (
f"已取消 Bot {bot_id} 在 [{target_desc}] "
f"的插件 '{plugin_name}' 所有定时任务。"
)
return (True, msg)
async def remove_schedule_for_all(self, plugin_name: str) -> int:
async def remove_schedule_for_all(
self, plugin_name: str, bot_id: str | None = None
) -> int:
"""移除指定插件在所有群组的定时任务。"""
schedules_to_delete = await ScheduleInfo.filter(plugin_name=plugin_name).all()
query = {"plugin_name": plugin_name}
if bot_id:
query["bot_id"] = bot_id
schedules_to_delete = await ScheduleInfo.filter(**query).all()
if not schedules_to_delete:
return 0
@ -364,12 +452,14 @@ class SchedulerManager:
)
async def pause_schedule_by_plugin_group(
self, plugin_name: str, group_id: str | None
self, plugin_name: str, group_id: str | None, bot_id: str | None = None
) -> tuple[bool, str]:
"""暂停指定插件在指定群组的定时任务。"""
schedules = await ScheduleInfo.filter(
plugin_name=plugin_name, group_id=group_id, is_enabled=True
)
query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": True}
if bot_id:
query["bot_id"] = bot_id
schedules = await ScheduleInfo.filter(**query)
if not schedules:
return (
False,
@ -388,12 +478,14 @@ class SchedulerManager:
)
async def resume_schedule_by_plugin_group(
self, plugin_name: str, group_id: str | None
self, plugin_name: str, group_id: str | None, bot_id: str | None = None
) -> tuple[bool, str]:
"""恢复指定插件在指定群组的定时任务。"""
schedules = await ScheduleInfo.filter(
plugin_name=plugin_name, group_id=group_id, is_enabled=False
)
query = {"plugin_name": plugin_name, "group_id": group_id, "is_enabled": False}
if bot_id:
query["bot_id"] = bot_id
schedules = await ScheduleInfo.filter(**query)
if not schedules:
return (
False,
@ -508,6 +600,7 @@ class SchedulerManager:
status = {
"id": schedule.id,
"bot_id": schedule.bot_id,
"plugin_name": schedule.plugin_name,
"group_id": schedule.group_id,
"is_enabled": schedule.is_enabled,