From be86e0bb7f811a196f6f1def0d3a10078374020f Mon Sep 17 00:00:00 2001 From: Rumio <32546670+webjoin111@users.noreply.github.com> Date: Wed, 6 Aug 2025 09:02:23 +0800 Subject: [PATCH] =?UTF-8?q?=E2=99=BB=EF=B8=8F=20refactor(scheduler):=20?= =?UTF-8?q?=E9=87=8D=E6=9E=84=E5=AE=9A=E6=97=B6=E4=BB=BB=E5=8A=A1=E7=B3=BB?= =?UTF-8?q?=E7=BB=9F=E5=B9=B6=E5=A2=9E=E5=BC=BA=E5=8A=9F=E8=83=BD=20(#2009?= =?UTF-8?q?)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * ♻️ refactor(scheduler): 重构定时任务系统并增强功能 - 【模型重命名】将 `ScheduleInfo` 模型及其数据库表重命名为 `ScheduledJob`,以提高语义清晰度。 - 【触发器抽象】引入 `Trigger` 工厂类,提供类型安全的 Cron、Interval 和 Date 触发器配置。 - 【执行策略】新增 `ExecutionPolicy` 模型,允许为定时任务定义重试策略、延迟、异常类型以及成功/失败回调。 - 【任务执行】重构任务执行逻辑,支持 NoneBot 的依赖注入,并根据 `ExecutionPolicy` 处理任务的重试和回调。 - 【临时任务】增加声明式和编程式的临时任务调度能力,支持非持久化任务在运行时动态创建和执行。 - 【管理命令】更新定时任务管理命令 (`schedule_admin`),使其适配新的 `ScheduledJob` 模型和参数验证逻辑。 - 【展示优化】改进定时任务列表和状态展示,使用新的触发器格式化逻辑和参数模型信息。 - 【重试装饰器】为 `Retry.api` 装饰器添加 `on_success` 回调,允许在任务成功执行后触发额外操作。 * :rotating_light: auto fix by pre-commit hooks --------- Co-authored-by: webjoin111 <455457521@qq.com> Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- .../scheduler_admin/handlers.py | 20 +- .../scheduler_admin/presenters.py | 94 ++---- .../{schedule_info.py => scheduled_job.py} | 4 +- zhenxun/services/scheduler/__init__.py | 6 +- zhenxun/services/scheduler/adapter.py | 90 ++++- zhenxun/services/scheduler/job.py | 311 ++++++++++-------- zhenxun/services/scheduler/lifecycle.py | 51 ++- zhenxun/services/scheduler/repository.py | 69 ++-- zhenxun/services/scheduler/service.py | 272 ++++++++++++--- zhenxun/services/scheduler/targeter.py | 22 +- zhenxun/services/scheduler/triggers.py | 80 +++++ zhenxun/utils/decorator/retry.py | 54 ++- 12 files changed, 772 insertions(+), 301 deletions(-) rename zhenxun/models/{schedule_info.py => scheduled_job.py} (95%) create mode 100644 zhenxun/services/scheduler/triggers.py diff --git a/zhenxun/builtin_plugins/scheduler_admin/handlers.py b/zhenxun/builtin_plugins/scheduler_admin/handlers.py index 839ece12..060aeab3 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/handlers.py +++ b/zhenxun/builtin_plugins/scheduler_admin/handlers.py @@ -7,10 +7,11 @@ from nonebot.permission import SUPERUSER from nonebot_plugin_alconna import AlconnaMatch, Arparma, Match, Query from pydantic import BaseModel, ValidationError -from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.models.scheduled_job import ScheduledJob from zhenxun.services.scheduler import scheduler_manager from zhenxun.services.scheduler.targeter import ScheduleTargeter from zhenxun.utils.message import MessageUtils +from zhenxun.utils.pydantic_compat import model_dump from . import presenters from .commands import ( @@ -149,7 +150,10 @@ async def handle_set( job_kwargs = {} if kwargs_str.available: - task_meta = scheduler_manager._registered_tasks[p_name] + task_meta = scheduler_manager._registered_tasks.get(p_name) + if not task_meta: + await schedule_cmd.finish(f"插件 '{p_name}' 未注册。") + params_model = task_meta.get("model") if not ( params_model @@ -168,11 +172,7 @@ async def handle_set( validated_model = model_validate(raw_kwargs) - model_dump = getattr(validated_model, "model_dump", None) - if not model_dump: - await schedule_cmd.finish(f"插件 '{p_name}' 的参数模型不支持导出") - - job_kwargs = model_dump() + job_kwargs = model_dump(validated_model) except ValidationError as e: errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] await schedule_cmd.finish( @@ -220,7 +220,7 @@ async def handle_set( @schedule_cmd.assign("删除") async def handle_delete(targeter: ScheduleTargeter = GetTargeter("删除")): - schedules_to_remove: list[ScheduleInfo] = await targeter._get_schedules() + schedules_to_remove: list[ScheduledJob] = await targeter._get_schedules() if not schedules_to_remove: await schedule_cmd.finish("没有找到可删除的任务。") @@ -239,7 +239,7 @@ async def handle_delete(targeter: ScheduleTargeter = GetTargeter("删除")): @schedule_cmd.assign("暂停") async def handle_pause(targeter: ScheduleTargeter = GetTargeter("暂停")): - schedules_to_pause: list[ScheduleInfo] = await targeter._get_schedules() + schedules_to_pause: list[ScheduledJob] = await targeter._get_schedules() if not schedules_to_pause: await schedule_cmd.finish("没有找到可暂停的任务。") @@ -258,7 +258,7 @@ async def handle_pause(targeter: ScheduleTargeter = GetTargeter("暂停")): @schedule_cmd.assign("恢复") async def handle_resume(targeter: ScheduleTargeter = GetTargeter("恢复")): - schedules_to_resume: list[ScheduleInfo] = await targeter._get_schedules() + schedules_to_resume: list[ScheduledJob] = await targeter._get_schedules() if not schedules_to_resume: await schedule_cmd.finish("没有找到可恢复的任务。") diff --git a/zhenxun/builtin_plugins/scheduler_admin/presenters.py b/zhenxun/builtin_plugins/scheduler_admin/presenters.py index ef6785bd..e1a2715a 100644 --- a/zhenxun/builtin_plugins/scheduler_admin/presenters.py +++ b/zhenxun/builtin_plugins/scheduler_admin/presenters.py @@ -1,8 +1,10 @@ import asyncio +from typing import Any -from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.models.scheduled_job import ScheduledJob from zhenxun.services.scheduler import scheduler_manager from zhenxun.utils._image_template import ImageTemplate, RowStyle +from zhenxun.utils.pydantic_compat import model_json_schema def _get_type_name(annotation) -> str: @@ -15,10 +17,17 @@ def _get_type_name(annotation) -> str: return str(annotation) -def _format_trigger(schedule: dict) -> str: - """格式化触发器信息为可读字符串""" - trigger_type = schedule.get("trigger_type") - config = schedule.get("trigger_config") +def _get_schedule_attr(schedule: ScheduledJob | dict, attr_name: str) -> Any: + """兼容地从字典或对象获取属性""" + if isinstance(schedule, dict): + return schedule.get(attr_name) + return getattr(schedule, attr_name, None) + + +def _format_trigger_info(schedule: ScheduledJob | dict) -> str: + """格式化触发器信息为可读字符串(兼容字典和对象)""" + trigger_type = _get_schedule_attr(schedule, "trigger_type") + config = _get_schedule_attr(schedule, "trigger_config") if not isinstance(config, dict): return f"配置错误: {config}" @@ -51,59 +60,15 @@ def _format_trigger(schedule: dict) -> str: return f"未知触发器类型: {trigger_type}" -def _format_trigger_for_card(schedule_info: ScheduleInfo | dict) -> str: - """为信息卡片格式化触发器规则""" - trigger_type = ( - schedule_info.get("trigger_type") - if isinstance(schedule_info, dict) - else schedule_info.trigger_type - ) - config = ( - schedule_info.get("trigger_config") - if isinstance(schedule_info, dict) - else schedule_info.trigger_config - ) - - if not isinstance(config, dict): - return f"配置错误: {config}" - - if trigger_type == "cron": - hour = config.get("hour", "??") - minute = config.get("minute", "??") - try: - hour_int = int(hour) - minute_int = int(minute) - return f"每天 {hour_int:02d}:{minute_int:02d}" - except (ValueError, TypeError): - return f"每天 {hour}:{minute}" - elif trigger_type == "interval": - units = { - "weeks": "周", - "days": "天", - "hours": "小时", - "minutes": "分钟", - "seconds": "秒", - } - for unit, unit_name in units.items(): - if value := config.get(unit): - return f"每 {value} {unit_name}" - return "未知间隔" - elif trigger_type == "date": - run_date = config.get("run_date", "N/A") - return f"特定时间 {run_date}" - else: - return f"未知规则: {trigger_type}" - - def _format_operation_result_card( - title: str, schedule_info: ScheduleInfo, extra_info: list[str] | None = None + title: str, schedule_info: ScheduledJob, extra_info: list[str] | None = None ) -> str: """ 生成一个标准的操作结果信息卡片。 参数: title: 卡片的标题 (例如 "✅ 成功暂停定时任务!") - schedule_info: 相关的 ScheduleInfo 对象 + schedule_info: 相关的 ScheduledJob 对象 extra_info: (可选) 额外的补充信息行 """ target_desc = ( @@ -120,7 +85,7 @@ def _format_operation_result_card( f"✓ 任务 ID: {schedule_info.id}", f"🖋 插件: {schedule_info.plugin_name}", f"🎯 目标: {target_desc}", - f"⏰ 时间: {_format_trigger_for_card(schedule_info)}", + f"⏰ 时间: {_format_trigger_info(schedule_info)}", ] if extra_info: info_lines.extend(extra_info) @@ -128,27 +93,27 @@ def _format_operation_result_card( return "\n".join(info_lines) -def format_pause_success(schedule_info: ScheduleInfo) -> str: +def format_pause_success(schedule_info: ScheduledJob) -> str: """格式化暂停成功的消息""" return _format_operation_result_card("✅ 成功暂停定时任务!", schedule_info) -def format_resume_success(schedule_info: ScheduleInfo) -> str: +def format_resume_success(schedule_info: ScheduledJob) -> str: """格式化恢复成功的消息""" return _format_operation_result_card("▶️ 成功恢复定时任务!", schedule_info) -def format_remove_success(schedule_info: ScheduleInfo) -> str: +def format_remove_success(schedule_info: ScheduledJob) -> str: """格式化删除成功的消息""" return _format_operation_result_card("❌ 成功删除定时任务!", schedule_info) -def format_trigger_success(schedule_info: ScheduleInfo) -> str: +def format_trigger_success(schedule_info: ScheduledJob) -> str: """格式化手动触发成功的消息""" return _format_operation_result_card("🚀 成功手动触发定时任务!", schedule_info) -def format_update_success(schedule_info: ScheduleInfo) -> str: +def format_update_success(schedule_info: ScheduledJob) -> str: """格式化更新成功的消息""" return _format_operation_result_card("🔄️ 成功更新定时任务配置!", schedule_info) @@ -174,7 +139,7 @@ def _format_params(schedule_status: dict) -> str: async def format_schedule_list_as_image( - schedules: list[ScheduleInfo], title: str, current_page: int + schedules: list[ScheduledJob], title: str, current_page: int ): """将任务列表格式化为图片""" page_size = 15 @@ -204,7 +169,7 @@ async def format_schedule_list_as_image( s.get("bot_id") or "N/A", s["group_id"] or "全局", s["next_run_time"], - _format_trigger(s), + _format_trigger_info(s), _format_params(s), get_status_text(s["is_enabled"]), ] @@ -235,7 +200,7 @@ def format_single_status_message(status: dict) -> str: f"▫️ 目标: {status['group_id'] or '全局'}", f"▫️ 状态: {'✔️ 已启用' if status['is_enabled'] else '⏸️ 已暂停'}", f"▫️ 下次运行: {status['next_run_time']}", - f"▫️ 触发规则: {_format_trigger(status)}", + f"▫️ 触发规则: {_format_trigger_info(status)}", f"▫️ 任务参数: {_format_params(status)}", ] return "\n".join(info_lines) @@ -260,11 +225,12 @@ async def format_plugins_list() -> str: and isinstance(params_model, type) and issubclass(params_model, BaseModel) ): - model_fields = getattr(params_model, "model_fields", None) - if model_fields: + schema = model_json_schema(params_model) + properties = schema.get("properties", {}) + if properties: param_info_str = "参数: " + ", ".join( - f"{field_name}({_get_type_name(field_info.annotation)})" - for field_name, field_info in model_fields.items() + f"{field_name}({prop.get('type', 'any')})" + for field_name, prop in properties.items() ) elif params_model: param_info_str = "⚠️ 参数模型配置错误" diff --git a/zhenxun/models/schedule_info.py b/zhenxun/models/scheduled_job.py similarity index 95% rename from zhenxun/models/schedule_info.py rename to zhenxun/models/scheduled_job.py index c7583078..80d61674 100644 --- a/zhenxun/models/schedule_info.py +++ b/zhenxun/models/scheduled_job.py @@ -3,7 +3,7 @@ from tortoise import fields from zhenxun.services.db_context import Model -class ScheduleInfo(Model): +class ScheduledJob(Model): id = fields.IntField(pk=True, generated=True, auto_increment=True) """自增id""" bot_id = fields.CharField( @@ -34,5 +34,5 @@ class ScheduleInfo(Model): """创建时间""" class Meta: # pyright: ignore [reportIncompatibleVariableOverride] - table = "schedule_info" + table = "scheduled_jobs" table_description = "通用定时任务表" diff --git a/zhenxun/services/scheduler/__init__.py b/zhenxun/services/scheduler/__init__.py index 603339fd..c4a8ebba 100644 --- a/zhenxun/services/scheduler/__init__.py +++ b/zhenxun/services/scheduler/__init__.py @@ -4,9 +4,11 @@ 提供一个统一的、持久化的定时任务管理器,供所有插件使用。 """ +from .job import ScheduleContext from .lifecycle import _load_schedules_from_db -from .service import scheduler_manager +from .service import ExecutionPolicy, scheduler_manager +from .triggers import Trigger _ = _load_schedules_from_db -__all__ = ["scheduler_manager"] +__all__ = ["ExecutionPolicy", "ScheduleContext", "Trigger", "scheduler_manager"] diff --git a/zhenxun/services/scheduler/adapter.py b/zhenxun/services/scheduler/adapter.py index 104a0457..7cbb166a 100644 --- a/zhenxun/services/scheduler/adapter.py +++ b/zhenxun/services/scheduler/adapter.py @@ -5,12 +5,14 @@ 使上层服务与调度器实现解耦。 """ +from collections.abc import Callable + from nonebot_plugin_apscheduler import scheduler -from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.models.scheduled_job import ScheduledJob from zhenxun.services.log import logger -from .job import _execute_job +from .job import ScheduleContext, _execute_job JOB_PREFIX = "zhenxun_schedule_" @@ -20,12 +22,25 @@ class APSchedulerAdapter: @staticmethod def _get_job_id(schedule_id: int) -> str: - """生成 APScheduler 的 Job ID""" + """ + 生成 APScheduler 的 Job ID + + 参数: + schedule_id: 定时任务的ID。 + + 返回: + str: APScheduler 使用的 Job ID。 + """ return f"{JOB_PREFIX}{schedule_id}" @staticmethod - def add_or_reschedule_job(schedule: ScheduleInfo): - """根据 ScheduleInfo 添加或重新调度一个 APScheduler 任务""" + def add_or_reschedule_job(schedule: ScheduledJob): + """ + 根据 ScheduledJob 添加或重新调度一个 APScheduler 任务 + + 参数: + schedule: 定时任务对象,包含任务的所有配置信息。 + """ job_id = APSchedulerAdapter._get_job_id(schedule.id) if not isinstance(schedule.trigger_config, dict): @@ -54,7 +69,12 @@ class APSchedulerAdapter: @staticmethod def remove_job(schedule_id: int): - """移除一个 APScheduler 任务""" + """ + 移除一个 APScheduler 任务 + + 参数: + schedule_id: 要移除的定时任务ID。 + """ job_id = APSchedulerAdapter._get_job_id(schedule_id) try: scheduler.remove_job(job_id) @@ -64,7 +84,12 @@ class APSchedulerAdapter: @staticmethod def pause_job(schedule_id: int): - """暂停一个 APScheduler 任务""" + """ + 暂停一个 APScheduler 任务 + + 参数: + schedule_id: 要暂停的定时任务ID。 + """ job_id = APSchedulerAdapter._get_job_id(schedule_id) try: scheduler.pause_job(job_id) @@ -73,7 +98,12 @@ class APSchedulerAdapter: @staticmethod def resume_job(schedule_id: int): - """恢复一个 APScheduler 任务""" + """ + 恢复一个 APScheduler 任务 + + 参数: + schedule_id: 要恢复的定时任务ID。 + """ job_id = APSchedulerAdapter._get_job_id(schedule_id) try: scheduler.resume_job(job_id) @@ -91,7 +121,15 @@ class APSchedulerAdapter: @staticmethod def get_job_status(schedule_id: int) -> dict: - """获取 APScheduler Job 的状态""" + """ + 获取 APScheduler Job 的状态 + + 参数: + schedule_id: 定时任务的ID。 + + 返回: + dict: 包含任务状态信息的字典,包含next_run_time等字段。 + """ job_id = APSchedulerAdapter._get_job_id(schedule_id) job = scheduler.get_job(job_id) return { @@ -100,3 +138,37 @@ class APSchedulerAdapter: else "N/A", "is_paused_in_scheduler": not bool(job.next_run_time) if job else "N/A", } + + @staticmethod + def add_ephemeral_job( + job_id: str, + func: Callable, + trigger_type: str, + trigger_config: dict, + context: ScheduleContext, + ): + """ + 直接向 APScheduler 添加一个临时的、非持久化的任务 + + 参数: + job_id: 临时任务的唯一ID。 + func: 要执行的函数。 + trigger_type: 触发器类型。 + trigger_config: 触发器配置字典。 + context: 任务执行上下文。 + """ + job = scheduler.get_job(job_id) + if job: + logger.warning(f"尝试添加一个已存在的临时任务ID: {job_id},操作被忽略。") + return + + scheduler.add_job( + _execute_job, + trigger=trigger_type, + id=job_id, + misfire_grace_time=60, + args=[None], + kwargs={"context_override": context}, + **trigger_config, + ) + logger.debug(f"已添加新的临时APScheduler任务: {job_id}") diff --git a/zhenxun/services/scheduler/job.py b/zhenxun/services/scheduler/job.py index dd7abdfc..e7024dfc 100644 --- a/zhenxun/services/scheduler/job.py +++ b/zhenxun/services/scheduler/job.py @@ -6,25 +6,148 @@ import asyncio import copy -import inspect +from functools import partial import random import nonebot +from nonebot.adapters import Bot +from nonebot.dependencies import Dependent +from nonebot.exception import FinishedException, PausedException, SkippedException +from nonebot.matcher import Matcher +from nonebot.typing import T_State +from pydantic import BaseModel, Field from zhenxun.configs.config import Config -from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.models.scheduled_job import ScheduledJob from zhenxun.services.log import logger from zhenxun.utils.common_utils import CommonUtils from zhenxun.utils.decorator.retry import Retry from zhenxun.utils.platform import PlatformUtils +from zhenxun.utils.pydantic_compat import parse_as SCHEDULE_CONCURRENCY_KEY = "all_groups_concurrency_limit" +class ScheduleContext(BaseModel): + """ + 定时任务执行上下文,可通过依赖注入获取。 + """ + + schedule_id: int = Field(..., description="数据库中的任务ID") + plugin_name: str = Field(..., description="任务所属的插件名称") + bot_id: str | None = Field(None, description="执行任务的Bot ID") + group_id: str | None = Field(None, description="任务目标群组ID") + job_kwargs: dict = Field(default_factory=dict, description="任务配置的参数") + + +async def _execute_single_job_instance(schedule: ScheduledJob, bot): + """ + 负责执行一个具体目标的任务实例。 + """ + plugin_name = schedule.plugin_name + group_id = schedule.group_id + + from .service import ExecutionPolicy, scheduler_manager + + task_meta = scheduler_manager._registered_tasks.get(plugin_name) + + if not task_meta: + logger.error(f"无法执行任务:插件 '{plugin_name}' 在执行期间变得不可用。") + return + + is_blocked = await CommonUtils.task_is_block(bot, plugin_name, group_id) + if is_blocked: + target_desc = f"群 {group_id}" if group_id else "全局" + logger.info( + f"插件 '{plugin_name}' 的定时任务在目标 [{target_desc}] " + f"因功能被禁用而跳过执行。" + ) + return + + context = ScheduleContext( + schedule_id=schedule.id, + plugin_name=schedule.plugin_name, + bot_id=bot.self_id, + group_id=schedule.group_id, + job_kwargs=schedule.job_kwargs if isinstance(schedule.job_kwargs, dict) else {}, + ) + state: T_State = {ScheduleContext: context} + + policy_data = context.job_kwargs.pop("execution_policy", {}) + policy = ExecutionPolicy(**policy_data) + + async def task_execution_coro(): + injected_params = {"context": context} + + params_model = task_meta.get("model") + if params_model and isinstance(context.job_kwargs, dict): + try: + if isinstance(params_model, type) and issubclass( + params_model, BaseModel + ): + params_instance = parse_as(params_model, context.job_kwargs) + injected_params["params"] = params_instance # type: ignore + except Exception as e: + logger.error( + f"任务 {schedule.id} (目标: {group_id}) 参数验证失败: {e}", e=e + ) + raise + + async def wrapper(bot: Bot): + return await task_meta["func"](bot=bot, **injected_params) # type: ignore + + dependent = Dependent.parse( + call=wrapper, + allow_types=Matcher.HANDLER_PARAM_TYPES, + ) + return await dependent(bot=bot, state=state) + + try: + if policy.retries > 0: + on_success_handler = None + if policy.on_success_callback: + on_success_handler = partial(policy.on_success_callback, context) + + on_failure_handler = None + if policy.on_failure_callback: + on_failure_handler = partial(policy.on_failure_callback, context) + + retry_exceptions = tuple(policy.retry_on_exceptions or []) + + retry_decorator = Retry.api( + stop_max_attempt=policy.retries + 1, + strategy="exponential" if policy.retry_backoff else "fixed", + wait_fixed_seconds=policy.retry_delay_seconds, + exception=retry_exceptions, + on_success=on_success_handler, + on_failure=on_failure_handler, + log_name=f"ScheduledJob-{schedule.id}-{schedule.group_id or 'global'}", + ) + + decorated_executor = retry_decorator(task_execution_coro) + await decorated_executor() + else: + logger.info( + f"插件 '{plugin_name}' 开始为目标 [{group_id or '全局'}] " + f"执行定时任务 (ID: {schedule.id})。" + ) + await task_execution_coro() + + except (PausedException, FinishedException, SkippedException) as e: + logger.warning( + f"定时任务 {schedule.id} (目标: {group_id}) 被中断: {type(e).__name__}" + ) + except Exception as e: + logger.error( + f"执行定时任务 {schedule.id} (目标: {group_id}) " + f"时发生未被策略处理的最终错误", + e=e, + ) + + async def _execute_job(schedule_id: int): """ - APScheduler 调度的入口函数。 - 根据 schedule_id 处理特定任务、所有群组任务或全局任务。 + APScheduler 调度的入口函数,现在作为分发器。 """ from .repository import ScheduleRepository from .service import scheduler_manager @@ -36,12 +159,10 @@ async def _execute_job(schedule_id: int): logger.warning(f"定时任务 {schedule_id} 不存在或已禁用,跳过执行。") return - plugin_name = schedule.plugin_name - - task_meta = scheduler_manager._registered_tasks.get(plugin_name) - if not task_meta: + if schedule.plugin_name not in scheduler_manager._registered_tasks: logger.error( - f"无法执行定时任务:插件 '{plugin_name}' 未注册或已卸载。将禁用该任务。" + f"无法执行定时任务:插件 '{schedule.plugin_name}' " + f"未注册或已卸载。将禁用该任务。" ) schedule.is_enabled = False await ScheduleRepository.save(schedule, update_fields=["is_enabled"]) @@ -51,142 +172,68 @@ async def _execute_job(schedule_id: int): return try: - if schedule.bot_id: - bot = nonebot.get_bot(schedule.bot_id) - else: - bot = nonebot.get_bot() - logger.debug( - f"任务 {schedule_id} 未关联特定Bot,使用默认Bot {bot.self_id}" - ) - except KeyError: + bot = ( + nonebot.get_bot(schedule.bot_id) + if schedule.bot_id + else nonebot.get_bot() + ) + except (KeyError, ValueError): logger.warning( f"定时任务 {schedule_id} 需要的 Bot {schedule.bot_id} " f"不在线,本次执行跳过。" ) return - except ValueError: - logger.warning(f"当前没有Bot在线,定时任务 {schedule_id} 跳过。") - return if schedule.group_id == scheduler_manager.ALL_GROUPS: - await _execute_for_all_groups(schedule, task_meta, bot) - else: - await _execute_for_single_target(schedule, task_meta, bot) - finally: - scheduler_manager._running_tasks.discard(schedule_id) - - -async def _execute_for_all_groups(schedule: ScheduleInfo, task_meta: dict, bot): - """为所有群组执行任务,并处理优先级覆盖。""" - plugin_name = schedule.plugin_name - - concurrency_limit = Config.get_config( - "SchedulerManager", SCHEDULE_CONCURRENCY_KEY, 5 - ) - if not isinstance(concurrency_limit, int) or concurrency_limit <= 0: - logger.warning( - f"无效的定时任务并发限制配置 '{concurrency_limit}',将使用默认值 5。" - ) - concurrency_limit = 5 - - logger.info( - f"开始执行针对 [所有群组] 的任务 " - f"(ID: {schedule.id}, 插件: {plugin_name}, Bot: {bot.self_id})," - f"并发限制: {concurrency_limit}" - ) - - all_gids = set() - try: - group_list, _ = await PlatformUtils.get_group_list(bot) - all_gids.update( - g.group_id for g in group_list if g.group_id and not g.channel_id - ) - except Exception as e: - logger.error(f"为 'all' 任务获取 Bot {bot.self_id} 的群列表失败", e=e) - return - - specific_tasks_gids = set( - await ScheduleInfo.filter( - plugin_name=plugin_name, group_id__in=list(all_gids) - ).values_list("group_id", flat=True) - ) - - semaphore = asyncio.Semaphore(concurrency_limit) - - async def worker(gid: str): - """使用 Semaphore 包装单个群组的任务执行""" - await asyncio.sleep(random.uniform(0, 59)) - async with semaphore: - temp_schedule = copy.deepcopy(schedule) - temp_schedule.group_id = gid - await _execute_for_single_target(temp_schedule, task_meta, bot) - await asyncio.sleep(random.uniform(0.1, 0.5)) - - tasks_to_run = [] - for gid in all_gids: - if gid in specific_tasks_gids: - logger.debug(f"群组 {gid} 已有特定任务,跳过 'all' 任务的执行。") - continue - tasks_to_run.append(worker(gid)) - - if tasks_to_run: - await asyncio.gather(*tasks_to_run) - - -async def _execute_for_single_target(schedule: ScheduleInfo, task_meta: dict, bot): - """为单个目标(具体群组或全局)执行任务。""" - - plugin_name = schedule.plugin_name - group_id = schedule.group_id - - try: - is_blocked = await CommonUtils.task_is_block(bot, plugin_name, group_id) - if is_blocked: - target_desc = f"群 {group_id}" if group_id else "全局" - logger.info( - f"插件 '{plugin_name}' 的定时任务在目标 [{target_desc}]" - "因功能被禁用而跳过执行。" + concurrency_limit = Config.get_config( + "SchedulerManager", SCHEDULE_CONCURRENCY_KEY, 5 ) - return + if not isinstance(concurrency_limit, int) or concurrency_limit <= 0: + concurrency_limit = 5 - max_retries = Config.get_config("SchedulerManager", "JOB_MAX_RETRIES", 2) - retry_delay = Config.get_config("SchedulerManager", "JOB_RETRY_DELAY", 10) + logger.info( + f"开始执行针对 [所有群组] 的任务 (ID: {schedule.id}, " + f"插件: {schedule.plugin_name}, Bot: {bot.self_id})," + f"并发限制: {concurrency_limit}" + ) - @Retry.simple( - stop_max_attempt=max_retries + 1, - wait_fixed_seconds=retry_delay, - log_name=f"定时任务执行:{schedule.plugin_name}", - ) - async def _execute_task_with_retry(): - task_func = task_meta["func"] - job_kwargs = schedule.job_kwargs - if not isinstance(job_kwargs, dict): - logger.error( - f"任务 {schedule.id} 的 job_kwargs 不是字典类型: {type(job_kwargs)}" - ) + try: + group_list, _ = await PlatformUtils.get_group_list(bot) + all_gids = { + g.group_id for g in group_list if g.group_id and not g.channel_id + } + except Exception as e: + logger.error(f"为 'all' 任务获取 Bot {bot.self_id} 的群列表失败", e=e) return - sig = inspect.signature(task_func) - if "bot" in sig.parameters: - job_kwargs["bot"] = bot + specific_tasks_gids = set( + await ScheduledJob.filter( + plugin_name=schedule.plugin_name, group_id__in=list(all_gids) + ).values_list("group_id", flat=True) + ) - await task_func(group_id, **job_kwargs) + semaphore = asyncio.Semaphore(concurrency_limit) - try: + async def worker(gid: str): + await asyncio.sleep(random.uniform(0.1, 1.0)) + async with semaphore: + temp_schedule = copy.deepcopy(schedule) + temp_schedule.group_id = gid + await _execute_single_job_instance(temp_schedule, bot) + + tasks_to_run = [ + worker(gid) for gid in all_gids if gid not in specific_tasks_gids + ] + + if tasks_to_run: + await asyncio.gather(*tasks_to_run) logger.info( - f"插件 '{schedule.plugin_name}' 开始为目标 " - f"[{schedule.group_id or '全局'}] 执行定时任务 (ID: {schedule.id})。" + f"针对 [所有群组] 的任务 (ID: {schedule.id}) 执行完毕," + f"共处理 {len(tasks_to_run)} 个群组。" ) - await _execute_task_with_retry() - except Exception as e: - logger.error( - f"执行定时任务 (ID: {schedule.id}, 插件: {schedule.plugin_name}, " - f"目标: {schedule.group_id or '全局'}) 在所有重试后最终失败", - e=e, - ) - except Exception as e: - logger.error( - f"执行定时任务 (ID: {schedule.id}, 插件: {plugin_name}, " - f"目标: {group_id or '全局'}) 时发生异常", - e=e, - ) + + else: + await _execute_single_job_instance(schedule, bot) + + finally: + scheduler_manager._running_tasks.discard(schedule_id) diff --git a/zhenxun/services/scheduler/lifecycle.py b/zhenxun/services/scheduler/lifecycle.py index 19f6c627..2822ccf8 100644 --- a/zhenxun/services/scheduler/lifecycle.py +++ b/zhenxun/services/scheduler/lifecycle.py @@ -6,8 +6,10 @@ from zhenxun.services.log import logger from zhenxun.utils.manager.priority_manager import PriorityLifecycle +from zhenxun.utils.pydantic_compat import model_dump from .adapter import APSchedulerAdapter +from .job import ScheduleContext from .repository import ScheduleRepository from .service import scheduler_manager @@ -29,9 +31,9 @@ async def _load_schedules_from_db(): logger.info("正在检查并注册声明式默认任务...") declared_count = 0 for task_info in scheduler_manager._declared_tasks: - plugin_name = task_info["plugin_name"] - group_id = task_info["group_id"] - bot_id = task_info["bot_id"] + plugin_name = task_info.plugin_name + group_id = task_info.group_id + bot_id = task_info.bot_id query_kwargs = { "plugin_name": plugin_name, @@ -42,12 +44,17 @@ async def _load_schedules_from_db(): if not exists: logger.info(f"为插件 '{plugin_name}' 注册新的默认定时任务...") + + trigger_config_dict = model_dump( + task_info.trigger, exclude={"trigger_type"} + ) + schedule = await scheduler_manager.add_schedule( plugin_name=plugin_name, group_id=group_id, - trigger_type=task_info["trigger_type"], - trigger_config=task_info["trigger_config"], - job_kwargs=task_info["job_kwargs"], + trigger_type=task_info.trigger.trigger_type, + trigger_config=trigger_config_dict, + job_kwargs=task_info.job_kwargs, bot_id=bot_id, ) if schedule: @@ -60,3 +67,35 @@ async def _load_schedules_from_db(): if declared_count > 0: logger.info(f"声明式任务检查完成,新注册了 {declared_count} 个默认任务。") + + logger.info("正在调度声明式临时任务...") + ephemeral_count = 0 + for declaration in scheduler_manager._ephemeral_declared_tasks: + try: + job_id = f"runtime::{declaration.plugin_name}::{declaration.func.__name__}" + + context = ScheduleContext( + schedule_id=0, + plugin_name=job_id, + bot_id=None, + group_id=None, + job_kwargs={}, + ) + + trigger_config_dict = model_dump( + declaration.trigger, exclude={"trigger_type"} + ) + + APSchedulerAdapter.add_ephemeral_job( + job_id=job_id, + func=declaration.func, + trigger_type=declaration.trigger.trigger_type, + trigger_config=trigger_config_dict, + context=context, + ) + ephemeral_count += 1 + except Exception as e: + logger.error(f"调度临时任务 '{declaration.plugin_name}' 失败", e=e) + + if ephemeral_count > 0: + logger.info(f"临时任务调度完成,共成功加载 {ephemeral_count} 个任务。") diff --git a/zhenxun/services/scheduler/repository.py b/zhenxun/services/scheduler/repository.py index 7e168db9..b9a00c5a 100644 --- a/zhenxun/services/scheduler/repository.py +++ b/zhenxun/services/scheduler/repository.py @@ -1,64 +1,83 @@ """ 数据持久层 (Repository) -封装所有对 ScheduleInfo 模型的数据库操作,将数据访问逻辑与业务逻辑分离。 +封装所有对 ScheduledJob 模型的数据库操作,将数据访问逻辑与业务逻辑分离。 """ from typing import Any from tortoise.queryset import QuerySet -from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.models.scheduled_job import ScheduledJob class ScheduleRepository: - """封装 ScheduleInfo 模型的数据库操作""" + """封装 ScheduledJob 模型的数据库操作""" @staticmethod - async def get_by_id(schedule_id: int) -> ScheduleInfo | None: - """通过ID获取任务""" - return await ScheduleInfo.get_or_none(id=schedule_id) + async def get_by_id(schedule_id: int) -> ScheduledJob | None: + """ + 通过ID获取任务 + + 参数: + schedule_id: 任务ID。 + + 返回: + ScheduledJob | None: 任务对象,不存在时返回None。 + """ + return await ScheduledJob.get_or_none(id=schedule_id) @staticmethod - async def get_all_enabled() -> list[ScheduleInfo]: - """获取所有启用的任务""" - return await ScheduleInfo.filter(is_enabled=True).all() + async def get_all_enabled() -> list[ScheduledJob]: + """ + 获取所有启用的任务 + + 返回: + list[ScheduledJob]: 所有启用状态的任务列表。 + """ + return await ScheduledJob.filter(is_enabled=True).all() @staticmethod - async def get_all(plugin_name: str | None = None) -> list[ScheduleInfo]: + async def get_all(plugin_name: str | None = None) -> list[ScheduledJob]: """获取所有任务,可按插件名过滤""" if plugin_name: - return await ScheduleInfo.filter(plugin_name=plugin_name).all() - return await ScheduleInfo.all() + return await ScheduledJob.filter(plugin_name=plugin_name).all() + return await ScheduledJob.all() @staticmethod - async def save(schedule: ScheduleInfo, update_fields: list[str] | None = None): - """保存任务""" + async def save(schedule: ScheduledJob, update_fields: list[str] | None = None): + """ + 保存任务 + + 参数: + schedule: 要保存的任务对象。 + update_fields: 要更新的字段列表,None表示更新所有字段。 + """ await schedule.save(update_fields=update_fields) @staticmethod async def exists(**kwargs: Any) -> bool: """检查任务是否存在""" - return await ScheduleInfo.exists(**kwargs) + return await ScheduledJob.exists(**kwargs) @staticmethod async def get_by_plugin_and_group( plugin_name: str, group_ids: list[str] - ) -> list[ScheduleInfo]: + ) -> list[ScheduledJob]: """根据插件和群组ID列表获取任务""" - return await ScheduleInfo.filter( + return await ScheduledJob.filter( plugin_name=plugin_name, group_id__in=group_ids ).all() @staticmethod async def update_or_create( defaults: dict, **kwargs: Any - ) -> tuple[ScheduleInfo, bool]: + ) -> tuple[ScheduledJob, bool]: """更新或创建任务""" - return await ScheduleInfo.update_or_create(defaults=defaults, **kwargs) + return await ScheduledJob.update_or_create(defaults=defaults, **kwargs) @staticmethod - async def query_schedules(**filters: Any) -> list[ScheduleInfo]: + async def query_schedules(**filters: Any) -> list[ScheduledJob]: """ 根据任意条件查询任务列表 @@ -66,14 +85,14 @@ class ScheduleRepository: **filters: 过滤条件,如 group_id="123", plugin_name="abc" 返回: - list[ScheduleInfo]: 任务列表 + list[ScheduledJob]: 任务列表 """ cleaned_filters = {k: v for k, v in filters.items() if v is not None} if not cleaned_filters: - return await ScheduleInfo.all() - return await ScheduleInfo.filter(**cleaned_filters).all() + return await ScheduledJob.all() + return await ScheduledJob.filter(**cleaned_filters).all() @staticmethod - def filter(**kwargs: Any) -> QuerySet[ScheduleInfo]: + def filter(**kwargs: Any) -> QuerySet[ScheduledJob]: """提供一个通用的过滤查询接口,供Targeter使用""" - return ScheduleInfo.filter(**kwargs) + return ScheduledJob.filter(**kwargs) diff --git a/zhenxun/services/scheduler/service.py b/zhenxun/services/scheduler/service.py index 4ed98c12..4f99f1de 100644 --- a/zhenxun/services/scheduler/service.py +++ b/zhenxun/services/scheduler/service.py @@ -5,21 +5,67 @@ 它负责编排业务逻辑,并调用 Repository 和 Adapter 层来完成具体工作。 """ -from collections.abc import Callable, Coroutine +from collections.abc import Awaitable, Callable, Coroutine from datetime import datetime +import inspect from typing import Any, ClassVar +import uuid import nonebot from pydantic import BaseModel from zhenxun.configs.config import Config -from zhenxun.models.schedule_info import ScheduleInfo +from zhenxun.models.scheduled_job import ScheduledJob from zhenxun.services.log import logger +from zhenxun.utils.pydantic_compat import model_dump from .adapter import APSchedulerAdapter -from .job import _execute_job +from .job import ScheduleContext, _execute_job from .repository import ScheduleRepository from .targeter import ScheduleTargeter +from .triggers import BaseTrigger + + +class ExecutionPolicy(BaseModel): + """ + 封装定时任务的执行策略,包括重试和回调。 + """ + + retries: int = 0 + retry_delay_seconds: int = 30 + retry_backoff: bool = False + retry_on_exceptions: list[type[Exception]] | None = None + on_success_callback: Callable[[ScheduleContext, Any], Awaitable[None]] | None = None + on_failure_callback: ( + Callable[[ScheduleContext, Exception], Awaitable[None]] | None + ) = None + + class Config: + arbitrary_types_allowed = True + + +class ScheduledJobDeclaration(BaseModel): + """用于在启动时声明默认定时任务的内部数据模型""" + + plugin_name: str + group_id: str | None + bot_id: str | None + trigger: BaseTrigger + job_kwargs: dict[str, Any] + + class Config: + arbitrary_types_allowed = True + + +class EphemeralJobDeclaration(BaseModel): + """用于在启动时声明临时任务的内部数据模型""" + + plugin_name: str + func: Callable[..., Coroutine] + trigger: BaseTrigger + + class Config: + arbitrary_types_allowed = True class SchedulerManager: @@ -27,7 +73,8 @@ class SchedulerManager: _registered_tasks: ClassVar[ dict[str, dict[str, Callable | type[BaseModel] | None]] ] = {} - _declared_tasks: ClassVar[list[dict[str, Any]]] = [] + _declared_tasks: ClassVar[list[ScheduledJobDeclaration]] = [] + _ephemeral_declared_tasks: ClassVar[list[EphemeralJobDeclaration]] = [] _running_tasks: ClassVar[set] = set() def target(self, **filters: Any) -> ScheduleTargeter: @@ -42,24 +89,29 @@ class SchedulerManager: """ return ScheduleTargeter(self, **filters) - def task( + def job( self, - trigger: str, + trigger: BaseTrigger, group_id: str | None = None, bot_id: str | None = None, - **trigger_kwargs, + default_params: BaseModel | None = None, + policy: ExecutionPolicy | None = None, ): """ - 声明式定时任务装饰器 + 声明式定时任务的统一装饰器。 + + 此装饰器用于将一个异步函数注册为一个可调度的定时任务, + 并为其创建一个默认的调度计划。 参数: - trigger: 触发器类型,如'cron'、'interval'等。 - group_id: 目标群组ID,None表示全局任务。 - bot_id: 目标Bot ID,None表示使用默认Bot。 - **trigger_kwargs: 触发器配置参数。 - - 返回: - Callable: 装饰器函数。 + trigger: 一个由 `Trigger` 工厂类创建的触发器配置对象 + (例如 `Trigger.cron(hour=8)`)。 + group_id: 默认的目标群组ID。`None` 表示全局任务, + `SchedulerManager.ALL_GROUPS` 表示所有群组。 + bot_id: 默认的目标Bot ID,`None` 表示使用任意可用Bot。 + default_params: (可选) 一个Pydantic模型实例,为任务提供默认参数。 + 任务函数需要有对应的Pydantic模型类型注解。 + policy: (可选) 一个ExecutionPolicy实例,定义任务的执行策略。 """ def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]: @@ -69,15 +121,36 @@ class SchedulerManager: raise ValueError(f"函数 {func.__name__} 不在任何已加载的插件中。") plugin_name = plugin.name - task_declaration = { - "plugin_name": plugin_name, + params_model = None + from .job import ScheduleContext + + for param in inspect.signature(func).parameters.values(): + if ( + isinstance(param.annotation, type) + and issubclass(param.annotation, BaseModel) + and param.annotation is not ScheduleContext + ): + params_model = param.annotation + break + + if plugin_name in self._registered_tasks: + logger.warning(f"插件 '{plugin_name}' 的定时任务已被重复注册。") + self._registered_tasks[plugin_name] = { "func": func, - "group_id": group_id, - "bot_id": bot_id, - "trigger_type": trigger, - "trigger_config": trigger_kwargs, - "job_kwargs": {}, + "model": params_model, } + + job_kwargs = model_dump(default_params) if default_params else {} + if policy: + job_kwargs["execution_policy"] = model_dump(policy) + + task_declaration = ScheduledJobDeclaration( + plugin_name=plugin_name, + group_id=group_id, + bot_id=bot_id, + trigger=trigger, + job_kwargs=job_kwargs, + ) self._declared_tasks.append(task_declaration) logger.debug( f"发现声明式定时任务 '{plugin_name}',将在启动时进行注册。" @@ -89,6 +162,46 @@ class SchedulerManager: return decorator + def runtime_job(self, trigger: BaseTrigger): + """ + 声明一个临时的、非持久化的定时任务。 + + 这个任务只存在于内存中,随程序重启而消失。 + 它非常适合用于插件内部的、固定的、无需用户配置的系统级定时任务。 + 被此装饰器修饰的函数依然可以享受完整的依赖注入功能。 + + 参数: + trigger: 一个由 `Trigger` 工厂类创建的触发器配置对象。 + """ + + def decorator(func: Callable[..., Coroutine]) -> Callable[..., Coroutine]: + try: + plugin = nonebot.get_plugin_by_module_name(func.__module__) + if not plugin: + raise ValueError(f"函数 {func.__name__} 不在任何已加载的插件中。") + plugin_name = plugin.name + + self._registered_tasks[f"ephemeral::{plugin_name}::{func.__name__}"] = { + "func": func, + "model": None, + } + + declaration = EphemeralJobDeclaration( + plugin_name=plugin_name, + func=func, + trigger=trigger, + ) + self._ephemeral_declared_tasks.append(declaration) + logger.debug( + f"发现临时定时任务 '{plugin_name}:{func.__name__}',将在启动时调度" + ) + except Exception as e: + logger.error(f"注册临时定时任务失败: {func.__name__}, 错误: {e}") + + return func + + return decorator + def register( self, plugin_name: str, params_model: type[BaseModel] | None = None ) -> Callable: @@ -127,6 +240,39 @@ class SchedulerManager: """ return list(self._registered_tasks.keys()) + async def run_at(self, func: Callable[..., Coroutine], trigger: BaseTrigger) -> str: + """ + 【新增】在未来的某个时间点,运行一个一次性的临时任务。 + + 这是一个编程式API,用于动态调度一个非持久化的任务。 + + 参数: + func: 要执行的异步函数。 + trigger: 一个由 `Trigger` 工廠類創建的觸發器配置對象。 + + 返回: + str: 临时任务的唯一ID,可用于未来的管理(如取消)。 + """ + job_id = f"ephemeral_runtime_{uuid.uuid4()}" + + context = ScheduleContext( + schedule_id=0, + plugin_name=f"runtime::{func.__module__}", + bot_id=None, + group_id=None, + job_kwargs={}, + ) + + APSchedulerAdapter.add_ephemeral_job( + job_id=job_id, + func=func, + trigger_type=trigger.trigger_type, + trigger_config=model_dump(trigger, exclude={"trigger_type"}), + context=context, + ) + logger.info(f"已动态调度一个临时任务 (ID: {job_id}),将在 {trigger} 触发。") + return job_id + async def add_daily_task( self, plugin_name: str, @@ -136,7 +282,7 @@ class SchedulerManager: second: int = 0, job_kwargs: dict | None = None, bot_id: str | None = None, - ) -> "ScheduleInfo | None": + ) -> "ScheduledJob | None": """ 添加每日定时任务 @@ -150,7 +296,7 @@ class SchedulerManager: bot_id: 目标Bot ID,None表示使用默认Bot。 返回: - ScheduleInfo | None: 创建的任务信息,失败时返回None。 + ScheduledJob | None: 创建的任务信息,失败时返回None。 """ trigger_config = { "hour": hour, @@ -180,8 +326,21 @@ class SchedulerManager: start_date: str | datetime | None = None, job_kwargs: dict | None = None, bot_id: str | None = None, - ) -> "ScheduleInfo | None": - """添加间隔性定时任务""" + ) -> "ScheduledJob | None": + """ + 添加间隔性定时任务 + + 参数: + plugin_name: 插件名称。 + group_id: 目标群组ID,None表示全局任务。 + weeks/days/hours/minutes/seconds: 间隔时间,至少指定一个。 + start_date: 开始时间,None表示立即开始。 + job_kwargs: 任务参数字典。 + bot_id: 目标Bot ID。 + + 返回: + ScheduledJob | None: 创建的任务信息,失败时返回None。 + """ trigger_config = { "weeks": weeks, "days": days, @@ -231,11 +390,7 @@ class SchedulerManager: validated_model = model_validate(job_kwargs) - model_dump = getattr(validated_model, "model_dump", None) - if not model_dump: - return False, f"插件 '{plugin_name}' 的参数模型不支持导出" - - return True, model_dump() + return True, model_dump(validated_model) except ValidationError as e: errors = [f" - {err['loc'][0]}: {err['msg']}" for err in e.errors()] error_str = "\n".join(errors) @@ -250,7 +405,7 @@ class SchedulerManager: trigger_config: dict, job_kwargs: dict | None = None, bot_id: str | None = None, - ) -> "ScheduleInfo | None": + ) -> "ScheduledJob | None": """ 添加定时任务(通用方法) @@ -263,7 +418,7 @@ class SchedulerManager: bot_id: 目标Bot ID,None表示使用默认Bot。 返回: - ScheduleInfo | None: 创建的任务信息,失败时返回None。 + ScheduledJob | None: 创建的任务信息,失败时返回None。 """ if plugin_name not in self._registered_tasks: logger.error(f"插件 '{plugin_name}' 没有注册可用的定时任务。") @@ -298,18 +453,12 @@ class SchedulerManager: ) return schedule - async def get_all_schedules(self) -> list[ScheduleInfo]: - """ - 获取所有定时任务信息 - """ - return await self.get_schedules() - async def get_schedules( self, plugin_name: str | None = None, group_id: str | None = None, bot_id: str | None = None, - ) -> list[ScheduleInfo]: + ) -> list[ScheduledJob]: """ 根据条件获取定时任务列表 @@ -319,7 +468,7 @@ class SchedulerManager: bot_id: Bot ID,None表示不限制。 返回: - list[ScheduleInfo]: 符合条件的任务信息列表。 + list[ScheduledJob]: 符合条件的任务信息列表。 """ return await ScheduleRepository.query_schedules( plugin_name=plugin_name, group_id=group_id, bot_id=bot_id @@ -382,7 +531,15 @@ class SchedulerManager: return True, f"成功更新了任务 ID: {schedule_id} 的配置。" async def get_schedule_status(self, schedule_id: int) -> dict | None: - """获取定时任务的详细状态信息""" + """ + 获取定时任务的详细状态信息 + + 参数: + schedule_id: 定时任务的ID。 + + 返回: + dict | None: 任务详细信息字典,不存在时返回None。 + """ schedule = await ScheduleRepository.get_by_id(schedule_id) if not schedule: return None @@ -408,7 +565,15 @@ class SchedulerManager: } async def pause_schedule(self, schedule_id: int) -> tuple[bool, str]: - """暂停指定的定时任务""" + """ + 暂停指定的定时任务 + + 参数: + schedule_id: 要暂停的定时任务ID。 + + 返回: + tuple[bool, str]: (是否成功, 操作结果消息)。 + """ schedule = await ScheduleRepository.get_by_id(schedule_id) if not schedule or not schedule.is_enabled: return False, "任务不存在或已暂停。" @@ -419,7 +584,15 @@ class SchedulerManager: return True, f"已暂停任务 (ID: {schedule.id})。" async def resume_schedule(self, schedule_id: int) -> tuple[bool, str]: - """恢复指定的定时任务""" + """ + 恢复指定的定时任务 + + 参数: + schedule_id: 要恢复的定时任务ID。 + + 返回: + tuple[bool, str]: (是否成功, 操作结果消息)。 + """ schedule = await ScheduleRepository.get_by_id(schedule_id) if not schedule or schedule.is_enabled: return False, "任务不存在或已启用。" @@ -430,7 +603,15 @@ class SchedulerManager: return True, f"已恢复任务 (ID: {schedule.id})。" async def trigger_now(self, schedule_id: int) -> tuple[bool, str]: - """立即手动触发指定的定时任务""" + """ + 立即手动触发指定的定时任务 + + 参数: + schedule_id: 要触发的定时任务ID。 + + 返回: + tuple[bool, str]: (是否成功, 操作结果消息)。 + """ schedule = await ScheduleRepository.get_by_id(schedule_id) if not schedule: return False, f"未找到 ID 为 {schedule_id} 的定时任务。" @@ -446,3 +627,4 @@ class SchedulerManager: scheduler_manager = SchedulerManager() +scheduler = scheduler_manager diff --git a/zhenxun/services/scheduler/targeter.py b/zhenxun/services/scheduler/targeter.py index a5b3277f..c35de8b4 100644 --- a/zhenxun/services/scheduler/targeter.py +++ b/zhenxun/services/scheduler/targeter.py @@ -17,17 +17,33 @@ class ScheduleTargeter: """ def __init__(self, manager: Any, **filters: Any): - """初始化目标选择器""" + """ + 初始化目标选择器 + + 参数: + manager: SchedulerManager 实例。 + **filters: 过滤条件,支持plugin_name、group_id、bot_id等字段。 + """ self._manager = manager self._filters = {k: v for k, v in filters.items() if v is not None} async def _get_schedules(self): - """根据过滤器获取任务""" + """ + 根据过滤器获取任务 + + 返回: + list[ScheduledJob]: 符合过滤条件的任务列表。 + """ query = ScheduleRepository.filter(**self._filters) return await query.all() def _generate_target_description(self) -> str: - """根据过滤条件生成友好的目标描述""" + """ + 根据过滤条件生成友好的目标描述 + + 返回: + str: 描述目标的友好字符串。 + """ if "id" in self._filters: return f"任务 ID {self._filters['id']} 的" diff --git a/zhenxun/services/scheduler/triggers.py b/zhenxun/services/scheduler/triggers.py new file mode 100644 index 00000000..60523f18 --- /dev/null +++ b/zhenxun/services/scheduler/triggers.py @@ -0,0 +1,80 @@ +from datetime import datetime +from typing import Literal + +from pydantic import BaseModel, Field + + +class BaseTrigger(BaseModel): + """触发器配置的基类""" + + trigger_type: str = Field(..., exclude=True) + + +class CronTrigger(BaseTrigger): + """Cron 触发器配置""" + + trigger_type: Literal["cron"] = "cron" # type: ignore + year: int | str | None = None + month: int | str | None = None + day: int | str | None = None + week: int | str | None = None + day_of_week: int | str | None = None + hour: int | str | None = None + minute: int | str | None = None + second: int | str | None = None + start_date: datetime | str | None = None + end_date: datetime | str | None = None + timezone: str | None = None + jitter: int | None = None + + +class IntervalTrigger(BaseTrigger): + """Interval 触发器配置""" + + trigger_type: Literal["interval"] = "interval" # type: ignore + weeks: int = 0 + days: int = 0 + hours: int = 0 + minutes: int = 0 + seconds: int = 0 + start_date: datetime | str | None = None + end_date: datetime | str | None = None + timezone: str | None = None + jitter: int | None = None + + +class DateTrigger(BaseTrigger): + """Date 触发器配置""" + + trigger_type: Literal["date"] = "date" # type: ignore + run_date: datetime | str + timezone: str | None = None + + +class Trigger: + """ + 一个用于创建类型安全触发器配置的工厂类。 + 提供了流畅的、具备IDE自动补全功能的API。 + + 使用示例: + from zhenxun.services.scheduler import Trigger + + @scheduler.job(trigger=Trigger.cron(hour=8)) + async def my_task(): + ... + """ + + @staticmethod + def cron(**kwargs) -> CronTrigger: + """创建一个 Cron 触发器配置。""" + return CronTrigger(**kwargs) + + @staticmethod + def interval(**kwargs) -> IntervalTrigger: + """创建一个 Interval 触发器配置。""" + return IntervalTrigger(**kwargs) + + @staticmethod + def date(**kwargs) -> DateTrigger: + """创建一个 Date 触发器配置。""" + return DateTrigger(**kwargs) diff --git a/zhenxun/utils/decorator/retry.py b/zhenxun/utils/decorator/retry.py index e81aa334..ad5d6580 100644 --- a/zhenxun/utils/decorator/retry.py +++ b/zhenxun/utils/decorator/retry.py @@ -130,6 +130,7 @@ class Retry: wait_exp_multiplier: int = 1, wait_exp_max: int = 10, log_name: str | None = None, + on_success: Callable[[Any], Any] | None = None, on_failure: Callable[[Exception], Any] | None = None, return_on_failure: Any = _SENTINEL, ): @@ -146,6 +147,8 @@ class Retry: wait_exp_multiplier: 指数退避的乘数。 wait_exp_max: 指数退避的最大等待时间。 log_name: 用于日志记录的操作名称,方便区分不同的重试场景。 + on_success: (可选) 当函数成功执行(且未触发重试)后, + 会调用此函数,并将函数的返回值作为参数传入。 on_failure: (可选) 当所有重试都失败后,在抛出异常或返回默认值之前, 会调用此函数,并将最终的异常实例作为参数传入。 return_on_failure: (可选) 如果设置了此参数,当所有重试失败后, @@ -186,14 +189,49 @@ class Retry: decorated_func = tenacity_retry_decorator(func) if return_on_failure is _SENTINEL: - return decorated_func + if is_coroutine_callable(func): + + @wraps(func) + async def async_success_wrapper(*args, **kwargs): + result = await decorated_func(*args, **kwargs) + if on_success: + if is_coroutine_callable(on_success): + await on_success(result) + else: + on_success(result) + return result + + return async_success_wrapper + else: + + @wraps(func) + def sync_success_wrapper(*args, **kwargs): + result = decorated_func(*args, **kwargs) + if on_success: + if is_coroutine_callable(on_success): + logger.error( + f"不能在同步函数 '{func.__name__}' 中调用异步的 " + f"on_success 回调。", + LOG_COMMAND, + ) + else: + on_success(result) + return result + + return sync_success_wrapper if is_coroutine_callable(func): @wraps(func) async def async_wrapper(*args, **kwargs): try: - return await decorated_func(*args, **kwargs) + result = await decorated_func(*args, **kwargs) + if on_success: + if is_coroutine_callable(on_success): + await on_success(result) + else: + on_success(result) + return result except Exception as e: if on_failure: if is_coroutine_callable(on_failure): @@ -208,7 +246,17 @@ class Retry: @wraps(func) def sync_wrapper(*args, **kwargs): try: - return decorated_func(*args, **kwargs) + result = decorated_func(*args, **kwargs) + if on_success: + if is_coroutine_callable(on_success): + logger.error( + f"不能在同步函数 '{func.__name__}' 中调用异步的 " + f"on_success 回调。", + LOG_COMMAND, + ) + else: + on_success(result) + return result except Exception as e: if on_failure: if is_coroutine_callable(on_failure):