update check_update

This commit is contained in:
HibiKier 2022-05-22 19:04:58 +08:00
parent 35c9585b22
commit b241a58e84
2 changed files with 340 additions and 340 deletions

View File

@ -1,124 +1,124 @@
# from nonebot.adapters.onebot.v11 import Bot from nonebot.adapters.onebot.v11 import Bot
# from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
# from nonebot import on_command from nonebot import on_command
# from .data_source import check_update, get_latest_version_data from .data_source import check_update, get_latest_version_data
# from services.log import logger from services.log import logger
# from utils.utils import scheduler, get_bot from utils.utils import scheduler, get_bot
# from pathlib import Path from pathlib import Path
# from configs.config import Config from configs.config import Config
# from nonebot.rule import to_me from nonebot.rule import to_me
# from nonebot.params import ArgStr from nonebot.params import ArgStr
# import platform import platform
# import os import os
#
#
# __zx_plugin_name__ = "自动更新 [Superuser]" __zx_plugin_name__ = "自动更新 [Superuser]"
# __plugin_usage__ = """ __plugin_usage__ = """
# usage usage
# 检查更新真寻最新版本,包括了自动更新 检查更新真寻最新版本包括了自动更新
# 指令: 指令
# 检查更新真寻 检查更新真寻
# 重启 重启
# """.strip() """.strip()
# __plugin_des__ = "就算是真寻也会成长的" __plugin_des__ = "就算是真寻也会成长的"
# __plugin_cmd__ = ["检查更新真寻", "重启"] __plugin_cmd__ = ["检查更新真寻", "重启"]
# __plugin_version__ = 0.1 __plugin_version__ = 0.1
# __plugin_author__ = "HibiKier" __plugin_author__ = "HibiKier"
# __plugin_configs__ = { __plugin_configs__ = {
# "AUTO_UPDATE_ZHENXUN": { "AUTO_UPDATE_ZHENXUN": {
# "value": False, "value": False,
# "help": "真寻是否自动检查更新", "help": "真寻是否自动检查更新",
# "default": False, "default": False,
# } }
# } }
#
# update_zhenxun = on_command("检查更新真寻", permission=SUPERUSER, priority=1, block=True) update_zhenxun = on_command("检查更新真寻", permission=SUPERUSER, priority=1, block=True)
#
# restart = on_command( restart = on_command(
# "重启", "重启",
# aliases={"restart"}, aliases={"restart"},
# permission=SUPERUSER, permission=SUPERUSER,
# rule=to_me(), rule=to_me(),
# priority=1, priority=1,
# block=True, block=True,
# ) )
#
#
# @update_zhenxun.handle() @update_zhenxun.handle()
# async def _(bot: Bot): async def _(bot: Bot):
# try: try:
# code, error = await check_update(bot) code, error = await check_update(bot)
# if error: if error:
# logger.error(f"更新真寻未知错误 {error}") logger.error(f"更新真寻未知错误 {error}")
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), message=f"更新真寻未知错误 {error}" user_id=int(list(bot.config.superusers)[0]), message=f"更新真寻未知错误 {error}"
# ) )
# except Exception as e: except Exception as e:
# logger.error(f"更新真寻未知错误 {type(e)}{e}") logger.error(f"更新真寻未知错误 {type(e)}{e}")
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), user_id=int(list(bot.config.superusers)[0]),
# message=f"更新真寻未知错误 {type(e)}{e}", message=f"更新真寻未知错误 {type(e)}{e}",
# ) )
# else: else:
# if code == 200: if code == 200:
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), message=f"更新完毕,请重启真寻...." user_id=int(list(bot.config.superusers)[0]), message=f"更新完毕,请重启真寻...."
# ) )
#
#
# @restart.handle() @restart.handle()
# async def _(): async def _():
# if str(platform.system()).lower() == "windows": if str(platform.system()).lower() == "windows":
# await restart.finish("暂无windows重启脚本...") await restart.finish("暂无windows重启脚本...")
#
#
# @restart.got("flag", prompt="确定是否重启真寻?(重启失败咱们将失去联系,请谨慎!)") @restart.got("flag", prompt="确定是否重启真寻?(重启失败咱们将失去联系,请谨慎!)")
# async def _(flag: str = ArgStr("flag")): async def _(flag: str = ArgStr("flag")):
# if flag.lower() in ["true", "是", "好", "确定", "确定是"]: if flag.lower() in ["true", "", "", "确定", "确定是"]:
# await restart.send("开始重启真寻..请稍等...") await restart.send("开始重启真寻..请稍等...")
# open("is_restart", "w") open("is_restart", "w")
# os.system("./restart.sh") os.system("./restart.sh")
# else: else:
# await restart.send("已取消操作...") await restart.send("已取消操作...")
#
#
# @scheduler.scheduled_job( @scheduler.scheduled_job(
# "cron", "cron",
# hour=12, hour=12,
# minute=0, minute=0,
# ) )
# async def _(): async def _():
# if Config.get_config("check_zhenxun_update", "AUTO_UPDATE_ZHENXUN"): if Config.get_config("check_zhenxun_update", "AUTO_UPDATE_ZHENXUN"):
# _version = "v0.0.0" _version = "v0.0.0"
# _version_file = Path() / "__version__" _version_file = Path() / "__version__"
# if _version_file.exists(): if _version_file.exists():
# _version = ( _version = (
# open(_version_file, "r", encoding="utf8") open(_version_file, "r", encoding="utf8")
# .readline() .readline()
# .split(":")[-1] .split(":")[-1]
# .strip() .strip()
# ) )
# data = await get_latest_version_data() data = await get_latest_version_data()
# if data: if data:
# latest_version = data["name"] latest_version = data["name"]
# if _version != latest_version: if _version != latest_version:
# bot = get_bot() bot = get_bot()
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), user_id=int(list(bot.config.superusers)[0]),
# message=f"检测到真寻版本更新\n" message=f"检测到真寻版本更新\n"
# f"当前版本:{_version},最新版本:{latest_version}", f"当前版本:{_version},最新版本:{latest_version}",
# ) )
# # try: # try:
# # code = await check_update(bot) # code = await check_update(bot)
# # except Exception as e: # except Exception as e:
# # logger.error(f"更新真寻未知错误 {type(e)}{e}") # logger.error(f"更新真寻未知错误 {type(e)}{e}")
# # await bot.send_private_msg( # await bot.send_private_msg(
# # user_id=int(list(bot.config.superusers)[0]), # user_id=int(list(bot.config.superusers)[0]),
# # message=f"更新真寻未知错误 {type(e)}{e}\n", # message=f"更新真寻未知错误 {type(e)}{e}\n",
# # ) # )
# # else: # else:
# # if code == 200: # if code == 200:
# # await bot.send_private_msg( # await bot.send_private_msg(
# # user_id=int(list(bot.config.superusers)[0]), # user_id=int(list(bot.config.superusers)[0]),
# # message=f"更新完毕,请重启真寻....", # message=f"更新完毕,请重启真寻....",
# # ) # )

View File

@ -1,216 +1,216 @@
# from nonebot.adapters.onebot.v11 import Bot, Message from nonebot.adapters.onebot.v11 import Bot, Message
# from utils.image_utils import BuildImage from utils.image_utils import BuildImage
# from configs.path_config import IMAGE_PATH from configs.path_config import IMAGE_PATH
# from utils.message_builder import image from utils.message_builder import image
# from utils.http_utils import AsyncHttpx from utils.http_utils import AsyncHttpx
# from typing import List from typing import List
# from services.log import logger from services.log import logger
# from pathlib import Path from pathlib import Path
# import ujson as json import ujson as json
# import nonebot import nonebot
# import asyncio import asyncio
# import platform import platform
# import tarfile import tarfile
# import shutil import shutil
# import os import os
#
# # if str(platform.system()).lower() == "windows": # if str(platform.system()).lower() == "windows":
# # policy = asyncio.WindowsSelectorEventLoopPolicy() # policy = asyncio.WindowsSelectorEventLoopPolicy()
# # asyncio.set_event_loop_policy(policy) # asyncio.set_event_loop_policy(policy)
#
#
# driver = nonebot.get_driver() driver = nonebot.get_driver()
#
# release_url = "https://api.github.com/repos/HibiKier/zhenxun_bot/releases/latest" release_url = "https://api.github.com/repos/HibiKier/zhenxun_bot/releases/latest"
#
# _version_file = Path() / "__version__" _version_file = Path() / "__version__"
# zhenxun_latest_tar_gz = Path() / "zhenxun_latest_file.tar.gz" zhenxun_latest_tar_gz = Path() / "zhenxun_latest_file.tar.gz"
# temp_dir = Path() / "temp" temp_dir = Path() / "temp"
# backup_dir = Path() / "backup" backup_dir = Path() / "backup"
#
#
# @driver.on_bot_connect @driver.on_bot_connect
# async def remind(bot: Bot): async def remind(bot: Bot):
# if str(platform.system()).lower() != "windows": if str(platform.system()).lower() != "windows":
# restart = Path() / "restart.sh" restart = Path() / "restart.sh"
# if not restart.exists(): if not restart.exists():
# with open(restart, "w", encoding="utf8") as f: with open(restart, "w", encoding="utf8") as f:
# f.write( f.write(
# f"pid=$(netstat -tunlp | grep " f"pid=$(netstat -tunlp | grep "
# + str(bot.config.port) + str(bot.config.port)
# + " | awk '{print $7}')\n" + " | awk '{print $7}')\n"
# "pid=${pid%/*}\n" "pid=${pid%/*}\n"
# "kill -9 $pid\n" "kill -9 $pid\n"
# "sleep 3\n" "sleep 3\n"
# "python3 bot.py" "python3 bot.py"
# ) )
# os.system("chmod +x ./restart.sh") os.system("chmod +x ./restart.sh")
# logger.info("已自动生成 restart.sh(重启) 文件,请检查脚本是否与本地指令符合...") logger.info("已自动生成 restart.sh(重启) 文件,请检查脚本是否与本地指令符合...")
# is_restart_file = Path() / "is_restart" is_restart_file = Path() / "is_restart"
# if is_restart_file.exists(): if is_restart_file.exists():
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), user_id=int(list(bot.config.superusers)[0]),
# message=f"真寻重启完毕...", message=f"真寻重启完毕...",
# ) )
# is_restart_file.unlink() is_restart_file.unlink()
#
#
# async def check_update(bot: Bot) -> 'int, str': async def check_update(bot: Bot) -> 'int, str':
# logger.info("开始检查更新真寻酱....") logger.info("开始检查更新真寻酱....")
# _version = "v0.0.0" _version = "v0.0.0"
# if _version_file.exists(): if _version_file.exists():
# _version = ( _version = (
# open(_version_file, "r", encoding="utf8").readline().split(":")[-1].strip() open(_version_file, "r", encoding="utf8").readline().split(":")[-1].strip()
# ) )
# data = await get_latest_version_data() data = await get_latest_version_data()
# if data: if data:
# latest_version = data["name"] latest_version = data["name"]
# if _version != latest_version: if _version != latest_version:
# tar_gz_url = data["tarball_url"] tar_gz_url = data["tarball_url"]
# logger.info(f"检测真寻已更新,当前版本:{_version},最新版本:{latest_version}") logger.info(f"检测真寻已更新,当前版本:{_version},最新版本:{latest_version}")
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), user_id=int(list(bot.config.superusers)[0]),
# message=f"检测真寻已更新,当前版本:{_version},最新版本:{latest_version}\n" f"开始更新.....", message=f"检测真寻已更新,当前版本:{_version},最新版本:{latest_version}\n" f"开始更新.....",
# ) )
# logger.info(f"开始下载真寻最新版文件....") logger.info(f"开始下载真寻最新版文件....")
# tar_gz_url = (await AsyncHttpx.get(tar_gz_url)).headers.get('Location') tar_gz_url = (await AsyncHttpx.get(tar_gz_url)).headers.get('Location')
# if await AsyncHttpx.download_file(tar_gz_url, zhenxun_latest_tar_gz): if await AsyncHttpx.download_file(tar_gz_url, zhenxun_latest_tar_gz):
# logger.info("下载真寻最新版文件完成....") logger.info("下载真寻最新版文件完成....")
# error = await asyncio.get_event_loop().run_in_executor( error = await asyncio.get_event_loop().run_in_executor(
# None, _file_handle, latest_version None, _file_handle, latest_version
# ) )
# if error: if error:
# return 998, error return 998, error
# logger.info("真寻更新完毕,清理文件完成....") logger.info("真寻更新完毕,清理文件完成....")
# logger.info("开始获取真寻更新日志.....") logger.info("开始获取真寻更新日志.....")
# update_info = data["body"] update_info = data["body"]
# width = 0 width = 0
# height = len(update_info.split('\n')) * 24 height = len(update_info.split('\n')) * 24
# A = BuildImage(width, height, font_size=20) A = BuildImage(width, height, font_size=20)
# for m in update_info.split('\n'): for m in update_info.split('\n'):
# w, h = A.getsize(m) w, h = A.getsize(m)
# if w > width: if w > width:
# width = w width = w
# A = BuildImage(width + 50, height, font_size=20) A = BuildImage(width + 50, height, font_size=20)
# A.text((10, 10), update_info) A.text((10, 10), update_info)
# A.save(f'{IMAGE_PATH}/update_info.png') A.save(f'{IMAGE_PATH}/update_info.png')
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), user_id=int(list(bot.config.superusers)[0]),
# message=Message(f"真寻更新完成,版本:{_version} -> {latest_version}\n" message=Message(f"真寻更新完成,版本:{_version} -> {latest_version}\n"
# f"更新日期:{data['created_at']}\n" f"更新日期:{data['created_at']}\n"
# f"更新日志:\n" f"更新日志:\n"
# f"{image('update_info.png')}"), f"{image('update_info.png')}"),
# ) )
# return 200, '' return 200, ''
# else: else:
# logger.warning(f"下载真寻最新版本失败...版本号:{latest_version}") logger.warning(f"下载真寻最新版本失败...版本号:{latest_version}")
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), user_id=int(list(bot.config.superusers)[0]),
# message=f"下载真寻最新版本失败...版本号:{latest_version}.", message=f"下载真寻最新版本失败...版本号:{latest_version}.",
# ) )
# else: else:
# logger.info(f"自动获取真寻版本成功:{latest_version},当前版本为最新版,无需更新...") logger.info(f"自动获取真寻版本成功:{latest_version},当前版本为最新版,无需更新...")
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), user_id=int(list(bot.config.superusers)[0]),
# message=f"自动获取真寻版本成功:{latest_version},当前版本为最新版,无需更新...", message=f"自动获取真寻版本成功:{latest_version},当前版本为最新版,无需更新...",
# ) )
# else: else:
# logger.warning("自动获取真寻版本失败....") logger.warning("自动获取真寻版本失败....")
# await bot.send_private_msg( await bot.send_private_msg(
# user_id=int(list(bot.config.superusers)[0]), message=f"自动获取真寻版本失败...." user_id=int(list(bot.config.superusers)[0]), message=f"自动获取真寻版本失败...."
# ) )
# return 999, '' return 999, ''
#
#
# def _file_handle(latest_version: str) -> str: def _file_handle(latest_version: str) -> str:
# if not temp_dir.exists(): if not temp_dir.exists():
# temp_dir.mkdir(exist_ok=True, parents=True) temp_dir.mkdir(exist_ok=True, parents=True)
# if backup_dir.exists(): if backup_dir.exists():
# shutil.rmtree(backup_dir) shutil.rmtree(backup_dir)
# tf = None tf = None
# error = '' error = ''
# # try: # try:
# backup_dir.mkdir(exist_ok=True, parents=True) backup_dir.mkdir(exist_ok=True, parents=True)
# logger.info("开始解压真寻文件压缩包....") logger.info("开始解压真寻文件压缩包....")
# tf = tarfile.open(zhenxun_latest_tar_gz) tf = tarfile.open(zhenxun_latest_tar_gz)
# tf.extractall(temp_dir) tf.extractall(temp_dir)
# logger.info("解压真寻文件压缩包完成....") logger.info("解压真寻文件压缩包完成....")
# zhenxun_latest_file = Path(temp_dir) / os.listdir(temp_dir)[0] zhenxun_latest_file = Path(temp_dir) / os.listdir(temp_dir)[0]
# update_info_file = Path(zhenxun_latest_file) / "update_info.json" update_info_file = Path(zhenxun_latest_file) / "update_info.json"
# update_info = json.load(open(update_info_file, "r", encoding="utf8")) update_info = json.load(open(update_info_file, "r", encoding="utf8"))
# update_file = update_info["update_file"] update_file = update_info["update_file"]
# add_file = update_info["add_file"] add_file = update_info["add_file"]
# delete_file = update_info["delete_file"] delete_file = update_info["delete_file"]
# config_file = Path() / "configs" / "config.py" config_file = Path() / "configs" / "config.py"
# config_path_file = Path() / "configs" / "path_config.py" config_path_file = Path() / "configs" / "path_config.py"
# for file in [config_file.name]: for file in [config_file.name]:
# tmp = "" tmp = ""
# new_file = Path(zhenxun_latest_file) / "configs" / file new_file = Path(zhenxun_latest_file) / "configs" / file
# old_file = Path() / "configs" / file old_file = Path() / "configs" / file
# new_lines = open(new_file, "r", encoding="utf8").readlines() new_lines = open(new_file, "r", encoding="utf8").readlines()
# old_lines = open(old_file, "r", encoding="utf8").readlines() old_lines = open(old_file, "r", encoding="utf8").readlines()
# for nl in new_lines: for nl in new_lines:
# tmp += check_old_lines(old_lines, nl) tmp += check_old_lines(old_lines, nl)
# with open(old_file, "w", encoding="utf8") as f: with open(old_file, "w", encoding="utf8") as f:
# f.write(tmp) f.write(tmp)
# for file in delete_file + update_file: for file in delete_file + update_file:
# if file != "configs": if file != "configs":
# file = Path() / file file = Path() / file
# backup_file = Path(backup_dir) / file backup_file = Path(backup_dir) / file
# if file.exists(): if file.exists():
# backup_file.parent.mkdir(parents=True, exist_ok=True) backup_file.parent.mkdir(parents=True, exist_ok=True)
# if backup_file.exists(): if backup_file.exists():
# backup_file.unlink() backup_file.unlink()
# if file not in [config_file, config_path_file]: if file not in [config_file, config_path_file]:
# os.rename(file.absolute(), backup_file.absolute()) os.rename(file.absolute(), backup_file.absolute())
# else: else:
# with open(file, "r", encoding="utf8") as rf: with open(file, "r", encoding="utf8") as rf:
# data = rf.read() data = rf.read()
# with open(backup_file, "w", encoding="utf8") as wf: with open(backup_file, "w", encoding="utf8") as wf:
# wf.write(data) wf.write(data)
# logger.info(f"已备份文件:{file}") logger.info(f"已备份文件:{file}")
# for file in add_file + update_file: for file in add_file + update_file:
# new_file = Path(zhenxun_latest_file) / file new_file = Path(zhenxun_latest_file) / file
# old_file = Path() / file old_file = Path() / file
# if old_file not in [config_file, config_path_file] and file != "configs": if old_file not in [config_file, config_path_file] and file != "configs":
# if not old_file.exists() and new_file.exists(): if not old_file.exists() and new_file.exists():
# os.rename(new_file.absolute(), old_file.absolute()) os.rename(new_file.absolute(), old_file.absolute())
# logger.info(f"已更新文件:{file}") logger.info(f"已更新文件:{file}")
# # except Exception as e: # except Exception as e:
# # error = f'{type(e)}{e}' # error = f'{type(e)}{e}'
# if tf: if tf:
# tf.close() tf.close()
# if temp_dir.exists(): if temp_dir.exists():
# shutil.rmtree(temp_dir) shutil.rmtree(temp_dir)
# if zhenxun_latest_tar_gz.exists(): if zhenxun_latest_tar_gz.exists():
# zhenxun_latest_tar_gz.unlink() zhenxun_latest_tar_gz.unlink()
# local_update_info_file = Path() / "update_info.json" local_update_info_file = Path() / "update_info.json"
# if local_update_info_file.exists(): if local_update_info_file.exists():
# local_update_info_file.unlink() local_update_info_file.unlink()
# with open(_version_file, "w", encoding="utf8") as f: with open(_version_file, "w", encoding="utf8") as f:
# f.write(f"__version__: {latest_version}") f.write(f"__version__: {latest_version}")
# return error return error
#
#
# # 获取最新版本号 # 获取最新版本号
# async def get_latest_version_data() -> dict: async def get_latest_version_data() -> dict:
# for _ in range(3): for _ in range(3):
# try: try:
# res = await AsyncHttpx.get(release_url) res = await AsyncHttpx.get(release_url)
# if res.status_code == 200: if res.status_code == 200:
# return res.json() return res.json()
# except TimeoutError: except TimeoutError:
# pass pass
# except Exception as e: except Exception as e:
# logger.error(f"检查更新真寻获取版本失败 {type(e)}{e}") logger.error(f"检查更新真寻获取版本失败 {type(e)}{e}")
# return {} return {}
#
#
# # 逐行检测 # 逐行检测
# def check_old_lines(lines: List[str], line: str) -> str: def check_old_lines(lines: List[str], line: str) -> str:
# if "=" not in line: if "=" not in line:
# return line return line
# for l in lines: for l in lines:
# if "=" in l and l.split("=")[0].strip() == line.split("=")[0].strip(): if "=" in l and l.split("=")[0].strip() == line.split("=")[0].strip():
# return l return l
# return line return line