mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-14 21:52:56 +08:00
将配置替换加入单次定时任务,提高启动速度
This commit is contained in:
parent
708e912069
commit
5c695899a1
@ -281,6 +281,8 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
|
||||
* 开箱提供重置开箱命令,重置今日所有开箱数据(重置次数,并不会删除今日已开箱记录)
|
||||
* 提供全局字典GDict,通过from utils.manager import GDict导入
|
||||
* 适配omega 13w张图的数据结构表(建议删表重导)
|
||||
* 除首次启动外将配置替换加入单次定时任务,提高启动速度
|
||||
* fix: WordBank.check() [@pull/1008](https://github.com/HibiKier/zhenxun_bot/pull/1008)
|
||||
* 改进插件 `我有一个朋友`,避免触发过于频繁 [@pull/1001](https://github.com/HibiKier/zhenxun_bot/pull/1001)
|
||||
* 原神便笺新增洞天宝钱和参量质变仪提示 [@pull/1005](https://github.com/HibiKier/zhenxun_bot/pull/1005)
|
||||
* 新增米游社签到功能,自动领取(白嫖)米游币 [@pull/991](https://github.com/HibiKier/zhenxun_bot/pull/991)
|
||||
|
||||
@ -14,7 +14,6 @@ from .check_plugin_status import check_plugin_status
|
||||
from nonebot.adapters.onebot.v11 import Bot
|
||||
from configs.path_config import DATA_PATH
|
||||
from services.log import logger
|
||||
from pathlib import Path
|
||||
from nonebot import Driver
|
||||
import nonebot
|
||||
|
||||
@ -28,7 +27,7 @@ driver: Driver = nonebot.get_driver()
|
||||
|
||||
|
||||
@driver.on_startup
|
||||
def _():
|
||||
async def _():
|
||||
"""
|
||||
初始化数据
|
||||
"""
|
||||
|
||||
@ -5,7 +5,6 @@ from utils.manager import (
|
||||
plugins2settings_manager,
|
||||
plugins2block_manager,
|
||||
plugins_manager,
|
||||
resources_manager
|
||||
)
|
||||
from services.log import logger
|
||||
from utils.utils import get_matchers
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
from ruamel.yaml import round_trip_load, round_trip_dump, YAML
|
||||
from utils.manager import admin_manager, plugins_manager
|
||||
@ -5,6 +7,7 @@ from configs.config import Config
|
||||
from services.log import logger
|
||||
from utils.text_utils import prompt2cn
|
||||
from utils.utils import get_matchers
|
||||
from utils.utils import scheduler
|
||||
from ruamel import yaml
|
||||
|
||||
|
||||
@ -104,78 +107,96 @@ def init_plugins_config(data_path):
|
||||
round_trip_dump(
|
||||
_data, wf, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True
|
||||
)
|
||||
# 再开始读取用户配置
|
||||
user_config_file = Path() / "configs" / "config.yaml"
|
||||
_data = {}
|
||||
_tmp_data = {}
|
||||
if user_config_file.exists():
|
||||
with open(user_config_file, "r", encoding="utf8") as f:
|
||||
_data = _yaml.load(f)
|
||||
# 数据替换
|
||||
for plugin in Config.keys():
|
||||
_tmp_data[plugin] = {}
|
||||
for k in Config[plugin].keys():
|
||||
try:
|
||||
if _data.get(plugin) and k in _data[plugin].keys():
|
||||
Config.set_config(plugin, k, _data[plugin][k])
|
||||
if level2module := Config.get_level2module(plugin, k):
|
||||
try:
|
||||
admin_manager.set_admin_level(
|
||||
level2module, _data[plugin][k]
|
||||
)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
f"{level2module} 设置权限等级失败:{_data[plugin][k]}"
|
||||
)
|
||||
_tmp_data[plugin][k] = Config.get_config(plugin, k)
|
||||
except AttributeError as e:
|
||||
raise AttributeError(
|
||||
f"{e}\n" + prompt2cn("可能为config.yaml配置文件填写不规范", 46)
|
||||
)
|
||||
Config.save()
|
||||
temp_file = Path() / "configs" / "temp_config.yaml"
|
||||
try:
|
||||
with open(temp_file, "w", encoding="utf8") as wf:
|
||||
yaml.dump(
|
||||
_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
|
||||
)
|
||||
with open(temp_file, "r", encoding="utf8") as rf:
|
||||
_data = round_trip_load(rf)
|
||||
# 添加注释
|
||||
for plugin in _data.keys():
|
||||
rst = ""
|
||||
plugin_name = None
|
||||
try:
|
||||
plugin_data = Config.get(plugin)
|
||||
for x in list(Config.get(plugin).keys()):
|
||||
user_config_file = Path() / "configs" / "config.yaml"
|
||||
if not user_config_file.exists():
|
||||
_replace_config()
|
||||
else:
|
||||
logger.info('五分钟后将进行配置数据替换,请注意...')
|
||||
scheduler.add_job(
|
||||
_replace_config,
|
||||
"date",
|
||||
run_date=datetime.now() + timedelta(minutes=5),
|
||||
id=f"_replace_config"
|
||||
)
|
||||
|
||||
|
||||
def _replace_config():
|
||||
"""
|
||||
说明:
|
||||
定时任务加载的配置读取替换
|
||||
"""
|
||||
# 再开始读取用户配置
|
||||
user_config_file = Path() / "configs" / "config.yaml"
|
||||
_data = {}
|
||||
_tmp_data = {}
|
||||
if user_config_file.exists():
|
||||
with open(user_config_file, "r", encoding="utf8") as f:
|
||||
_data = _yaml.load(f)
|
||||
# 数据替换
|
||||
for plugin in Config.keys():
|
||||
_tmp_data[plugin] = {}
|
||||
for k in Config[plugin].keys():
|
||||
try:
|
||||
if _data.get(plugin) and k in _data[plugin].keys():
|
||||
Config.set_config(plugin, k, _data[plugin][k])
|
||||
if level2module := Config.get_level2module(plugin, k):
|
||||
try:
|
||||
_x = plugin_data[x].get("name")
|
||||
if _x:
|
||||
plugin_name = _x
|
||||
except AttributeError:
|
||||
pass
|
||||
except (KeyError, AttributeError):
|
||||
plugin_name = None
|
||||
if not plugin_name:
|
||||
admin_manager.set_admin_level(
|
||||
level2module, _data[plugin][k]
|
||||
)
|
||||
except KeyError:
|
||||
logger.warning(
|
||||
f"{level2module} 设置权限等级失败:{_data[plugin][k]}"
|
||||
)
|
||||
_tmp_data[plugin][k] = Config.get_config(plugin, k)
|
||||
except AttributeError as e:
|
||||
raise AttributeError(
|
||||
f"{e}\n" + prompt2cn("可能为config.yaml配置文件填写不规范", 46)
|
||||
)
|
||||
Config.save()
|
||||
temp_file = Path() / "configs" / "temp_config.yaml"
|
||||
try:
|
||||
with open(temp_file, "w", encoding="utf8") as wf:
|
||||
yaml.dump(
|
||||
_tmp_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
|
||||
)
|
||||
with open(temp_file, "r", encoding="utf8") as rf:
|
||||
_data = round_trip_load(rf)
|
||||
# 添加注释
|
||||
for plugin in _data.keys():
|
||||
rst = ""
|
||||
plugin_name = None
|
||||
try:
|
||||
plugin_data = Config.get(plugin)
|
||||
for x in list(Config.get(plugin).keys()):
|
||||
try:
|
||||
plugin_name = plugins_manager.get(plugin)["plugin_name"]
|
||||
except (AttributeError, TypeError):
|
||||
plugin_name = plugin
|
||||
plugin_name = (
|
||||
plugin_name.replace("[Hidden]", "")
|
||||
.replace("[Superuser]", "")
|
||||
.replace("[Admin]", "")
|
||||
.strip()
|
||||
)
|
||||
rst += plugin_name + "\n"
|
||||
for k in _data[plugin].keys():
|
||||
rst += f'{k}: {Config[plugin][k]["help"]}' + "\n"
|
||||
_data[plugin].yaml_set_start_comment(rst[:-1], indent=2)
|
||||
with open(Path() / "configs" / "config.yaml", "w", encoding="utf8") as wf:
|
||||
round_trip_dump(
|
||||
_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"生成简易配置注释错误 {type(e)}:{e}")
|
||||
if temp_file.exists():
|
||||
temp_file.unlink()
|
||||
_x = plugin_data[x].get("name")
|
||||
if _x:
|
||||
plugin_name = _x
|
||||
except AttributeError:
|
||||
pass
|
||||
except (KeyError, AttributeError):
|
||||
plugin_name = None
|
||||
if not plugin_name:
|
||||
try:
|
||||
plugin_name = plugins_manager.get(plugin)["plugin_name"]
|
||||
except (AttributeError, TypeError):
|
||||
plugin_name = plugin
|
||||
plugin_name = (
|
||||
plugin_name.replace("[Hidden]", "")
|
||||
.replace("[Superuser]", "")
|
||||
.replace("[Admin]", "")
|
||||
.strip()
|
||||
)
|
||||
rst += plugin_name + "\n"
|
||||
for k in _data[plugin].keys():
|
||||
rst += f'{k}: {Config[plugin][k]["help"]}' + "\n"
|
||||
_data[plugin].yaml_set_start_comment(rst[:-1], indent=2)
|
||||
with open(Path() / "configs" / "config.yaml", "w", encoding="utf8") as wf:
|
||||
round_trip_dump(
|
||||
_data, wf, Dumper=yaml.RoundTripDumper, allow_unicode=True
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"生成简易配置注释错误 {type(e)}:{e}")
|
||||
if temp_file.exists():
|
||||
temp_file.unlink()
|
||||
|
||||
@ -26,18 +26,8 @@ def init_plugins_resources():
|
||||
else:
|
||||
path = Path(_module.__getattribute__("__file__")).parent
|
||||
for resource in resources.keys():
|
||||
resources_manager.add_resource(matcher.plugin_name, path / resource, resources[resource])
|
||||
resources_manager.add_resource(
|
||||
matcher.plugin_name, path / resource, resources[resource]
|
||||
)
|
||||
resources_manager.save()
|
||||
resources_manager.start_move()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@ -24,7 +24,11 @@ def init_plugins_settings(data_path: str):
|
||||
_plugin = nonebot.plugin.get_plugin(x)
|
||||
_module = _plugin.module
|
||||
metadata = _plugin.metadata
|
||||
plugin_name = metadata.name if metadata else _module.__getattribute__("__zx_plugin_name__")
|
||||
plugin_name = (
|
||||
metadata.name
|
||||
if metadata
|
||||
else _module.__getattribute__("__zx_plugin_name__")
|
||||
)
|
||||
_tmp_module[x] = plugin_name
|
||||
except (KeyError, AttributeError) as e:
|
||||
logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}")
|
||||
@ -77,11 +81,9 @@ def init_plugins_settings(data_path: str):
|
||||
"__plugin_settings__"
|
||||
)
|
||||
except AttributeError:
|
||||
plugin_settings = {
|
||||
"cmd": [matcher.plugin_name, plugin_name]
|
||||
}
|
||||
if not plugin_settings.get('cost_gold'):
|
||||
plugin_settings['cost_gold'] = 0
|
||||
plugin_settings = {"cmd": [matcher.plugin_name, plugin_name]}
|
||||
if not plugin_settings.get("cost_gold"):
|
||||
plugin_settings["cost_gold"] = 0
|
||||
if (
|
||||
plugin_settings.get("cmd") is not None
|
||||
and plugin_name not in plugin_settings["cmd"]
|
||||
@ -99,9 +101,7 @@ def init_plugins_settings(data_path: str):
|
||||
)
|
||||
else:
|
||||
try:
|
||||
plugin_type = _module.__getattribute__(
|
||||
"__plugin_type__"
|
||||
)
|
||||
plugin_type = _module.__getattribute__("__plugin_type__")
|
||||
except AttributeError:
|
||||
plugin_type = ("normal",)
|
||||
if plugin_settings and matcher.plugin_name:
|
||||
|
||||
@ -94,7 +94,7 @@ class ConfigsManager:
|
||||
del self._data[module]
|
||||
self.save()
|
||||
|
||||
def set_config(self, module: str, key: str, value: Any , save_simple_data: bool = False):
|
||||
def set_config(self, module: str, key: str, value: Any, save_simple_data: bool = False):
|
||||
"""
|
||||
设置配置值
|
||||
:param module: 模块名
|
||||
@ -109,7 +109,7 @@ class ConfigsManager:
|
||||
):
|
||||
self._data[module][key]["value"] = value
|
||||
self._simple_data[module][key] = value
|
||||
self.save(save_simple_data = save_simple_data)
|
||||
self.save(save_simple_data=save_simple_data)
|
||||
|
||||
def set_help(self, module: str, key: str, help_: str):
|
||||
"""
|
||||
@ -145,12 +145,18 @@ class ConfigsManager:
|
||||
:param default: 没有key值内容的默认返回值
|
||||
"""
|
||||
key = key.upper()
|
||||
if module in self._data.keys():
|
||||
# 优先使用simple_data
|
||||
if module in self._simple_data.keys():
|
||||
for key in [key, f"{key} [LEVEL]"]:
|
||||
if self._data[module].get(key) is not None:
|
||||
if self._data[module][key]["value"] is None:
|
||||
return self._data[module][key]["default_value"]
|
||||
return self._data[module][key]["value"]
|
||||
if self._simple_data[module].get(key) is not None:
|
||||
return self._simple_data[module][key]
|
||||
# if module in self._data.keys():
|
||||
# if module in self._data.keys():
|
||||
# for key in [key, f"{key} [LEVEL]"]:
|
||||
# if self._data[module].get(key) is not None:
|
||||
# if self._data[module][key]["value"] is None:
|
||||
# return self._data[module][key]["value"]
|
||||
# return self._data[module][key]["default_value"]
|
||||
if default is not None:
|
||||
return default
|
||||
return None
|
||||
@ -200,13 +206,12 @@ class ConfigsManager:
|
||||
重新加载配置文件
|
||||
"""
|
||||
_yaml = YAML()
|
||||
temp_file = Path() / "configs" / "config.yaml"
|
||||
if temp_file.exists():
|
||||
with open(temp_file, "r", encoding="utf8") as f:
|
||||
temp = _yaml.load(f)
|
||||
for key in temp.keys():
|
||||
for k in temp[key].keys():
|
||||
self._data[key][k]["value"] = temp[key][k]
|
||||
if self._simple_file.exists():
|
||||
with open(self._simple_file, "r", encoding="utf8") as f:
|
||||
self._simple_data = _yaml.load(f)
|
||||
for key in self._simple_data.keys():
|
||||
for k in self._simple_data[key].keys():
|
||||
self._data[key][k]["value"] = self._simple_data[key][k]
|
||||
self.save()
|
||||
|
||||
def get_admin_level_data(self):
|
||||
|
||||
@ -238,14 +238,14 @@ class WordBank(db.Model):
|
||||
return query.where(cls.problem == problem)
|
||||
# 正则匹配
|
||||
if await db.first(
|
||||
db.text(sql_text + f" and word_type = 2 and '{problem}' ~ problem;")
|
||||
db.text(sql_text + f" and word_type = 2 and word_scope != 999 and '{problem}' ~ problem;")
|
||||
):
|
||||
return sql_text + f" and word_type = 2 and '{problem}' ~ problem;"
|
||||
return sql_text + f" and word_type = 2 and word_scope != 999 and '{problem}' ~ problem;"
|
||||
# 模糊匹配
|
||||
if await db.first(
|
||||
db.text(sql_text + f" and word_type = 1 and '{problem}' ~ problem;")
|
||||
db.text(sql_text + f" and word_type = 1 and word_scope != 999 and '{problem}' ~ problem;")
|
||||
):
|
||||
return sql_text + f" and word_type = 1 and '{problem}' ~ problem;"
|
||||
return sql_text + f" and word_type = 1 and word_scope != 999 and '{problem}' ~ problem;"
|
||||
return None
|
||||
|
||||
@classmethod
|
||||
|
||||
Loading…
Reference in New Issue
Block a user