update v0.1

This commit is contained in:
HibiKier 2022-02-09 20:05:49 +08:00
parent 4cb91b47e6
commit 0efb2bbc60
33 changed files with 875 additions and 84 deletions

View File

@ -25,6 +25,9 @@
## 真寻的帮助
请对真寻说: '真寻帮助' or '管理员帮助' or '超级用户帮助' or '真寻帮助 指令'
## 普通帮助图片
![x](https://github.com/HibiKier/zhenxun_bot/blob/0.0.8.2/docs_image/3238573864-836268675-E2FFBB2AC143EAF4DDDF150438508721.png)
## 提供符合真寻标准的插件仓库
[AkashiCoin/nonebot_plugins_zhenxun_bot](https://github.com/AkashiCoin/nonebot_plugins_zhenxun_bot)
@ -32,9 +35,6 @@
## 来点优点?
一.作为bot
* 实现了许多功能,且提供了大量功能管理命令
* __..... 更多详细请通过`传送门`查看文档__
二.作为框架?:
* 通过Config配置项将所有插件配置统计保存至config.yaml利于统一用户修改
* 方便增删插件原生nonebot2 matcher不需要额外修改仅仅通过简单的配置属性就可以生成`帮助图片`和`帮助信息`
* 提供了cd阻塞每日次数等限制仅仅通过简单的属性就可以生成一个限制例如`__plugin_cd_limit__`
@ -67,6 +67,8 @@
- [x] 原神资源查询 (借鉴[Genshin_Impact_bot](https://github.com/H-K-Y/Genshin_Impact_bot)插件)
- [x] 原神便笺查询
- [x] 原神玩家查询
- [x] 原神树脂提醒
- [x] 原神签到/自动签到
- [x] 金币红包
- [x] 微博热搜
- [x] B站主播/UP/番剧订阅
@ -147,6 +149,7 @@
- [x] 清理临时数据
- [x] 增删群认证
- [x] 同意/拒绝好友/群聊请求
- [x] 配置重载
#### 超级用户的被动技能
- [x] 邀请入群提醒(别人邀请真寻入群)
@ -171,6 +174,7 @@
- [x] 群管理员监控,自动为新晋管理员增加权限,为失去群管理员的用户删除权限
- [x] 群权限系统
- [x] 定时更新权限
- [x] 自动配置重载
</details>
## 详细配置请前往文档以下为最简部署和配置如果你有基础并学习过nonebot2的话
@ -217,10 +221,25 @@ python bot.py
```
## 使用Docker
__Docker 最新版本由 [Sakuracio](https://github.com/Sakuracio) 提供__
#### GitHub[Sakuracio/zxenv](https://github.com/Sakuracio/zxenv)
#### DockerHub[hibikier/zhenxun_bot](https://hub.docker.com/r/hibikier/zhenxun_bot)
## 更新
### 2021/2/9 \[v0.1]
* 新增原神自动签到和手动签到
* 新增原神树脂提醒
* 新增手动重载Config.yaml命令以及重载配置定时任务极少部分帮助或配置可能需要重启
* 修改了发送本地图库的matcher改为on_message
* register_use可以通过返回值发送消息
* 修复修改商品时限制时间出错
* 修复超时商品依旧可以被购买
### 2021/1/16 \[v0.0.9.0]
* Ai提供文本敏感词过滤器
@ -240,7 +259,7 @@ python bot.py
* “send_success_msg”(发送成功的交互信息->即:使用道具 {name} {num} 次成功)
* “_max_num_limit”(该道具单次使用的最多个数默认1)
### 2021/1/5 \[v0.0.8.3]
### 2021/1/5 \[v0.0.8.2]
* 提供金币消费hook可在plugins2settings.yaml中配置该功能需要消费的金币
* 商店插件将作为内置插件移动至basic_plugins
@ -311,5 +330,5 @@ __..... 更多更新信息请查看文档__
[H-K-Y / Genshin_Impact_bot](https://github.com/H-K-Y/Genshin_Impact_bot) 原神bot这是一个基于nonebot和HoshinoBot的原神娱乐及信息查询插件
[NothAmor / nonebot2_luxun_says](https://github.com/NothAmor/nonebot2_luxun_says) 基于nonebot2机器人框架的鲁迅说插件
[Kyomotoi / AnimeThesaurus](https://github.com/Kyomotoi/AnimeThesaurus) :一个~~特二刺螈~~文爱的适用于任何bot的词库
[Ailitonia / omega-miya](https://github.com/Ailitonia/omega-miya) 基于nonebot2的qq机器人
[Ailitonia / omega-miya](https://github.com/Ailitonia/omega-miya) 基于nonebot2的qq机器人
[KimigaiiWuyi / GenshinUID]("https://github.com/KimigaiiWuyi/GenshinUID") 一个基于HoshinoBot/NoneBot2的原神UID查询插件

View File

@ -11,7 +11,6 @@ from .init_plugins_limit import (
)
from .init import init
from .check_plugin_status import check_plugin_status
from utils.manager import admin_manager
from nonebot.adapters.cqhttp import Bot
from configs.path_config import DATA_PATH
from services.log import logger

View File

@ -93,6 +93,18 @@ async def _():
"ALTER TABLE bag_users ADD property json NOT NULL DEFAULT '{}';",
"bag_users",
), # bag_users 新增字段 property 替代 props
(
"ALTER TABLE genshin ADD auto_sign_time timestamp with time zone;",
"genshin"
), # 新增原神自动签到字段
(
"ALTER TABLE genshin ADD resin_remind boolean DEFAULT False;",
"genshin"
), # 新增原神自动签到字段
(
"ALTER TABLE genshin ADD resin_recovery_time timestamp with time zone;",
"genshin"
), # 新增原神自动签到字段
]
for sql in sql_str:
try:

View File

@ -7,6 +7,7 @@ from models.bag_user import BagUser
from services.db_context import db
from nonebot.adapters.cqhttp.permission import GROUP
from models.goods_info import GoodsInfo
import time
__zx_plugin_name__ = "商店 - 购买道具"
@ -20,7 +21,7 @@ usage
""".strip()
__plugin_des__ = "商店 - 购买道具"
__plugin_cmd__ = ["购买 [序号或名称] ?[数量=1]"]
__plugin_type__ = ('商店',)
__plugin_type__ = ("商店",)
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
@ -29,9 +30,7 @@ __plugin_settings__ = {
"limit_superuser": False,
"cmd": ["商店", "购买道具"],
}
__plugin_cd_limit__ = {
"cd": 3
}
__plugin_cd_limit__ = {"cd": 3}
buy = on_command("购买", aliases={"购买道具"}, priority=5, block=True, permission=GROUP)
@ -42,8 +41,11 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
goods = None
if get_message_text(event.json()) in ["神秘药水"]:
await buy.finish("你们看看就好啦,这是不可能卖给你们的~", at_sender=True)
goods_lst = await GoodsInfo.get_all_goods()
goods_name_lst = [x.goods_name for x in goods_lst]
goods_name_lst = [
x.goods_name
for x in await GoodsInfo.get_all_goods()
if x.goods_limit_time > time.time() or x.goods_limit_time == 0
]
msg = get_message_text(event.json()).split()
num = 1
if len(msg) > 1:
@ -51,17 +53,16 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
num = int(msg[1])
else:
await buy.finish("购买的数量要是数字且大于0", at_sender=True)
# print(msg, num)
if is_number(msg[0]):
msg = int(msg[0])
if msg > len(goods_lst) or msg < 1:
if msg > len(goods_name_lst) or msg < 1:
await buy.finish("请输入正确的商品id", at_sender=True)
goods = goods_lst[msg - 1]
goods = goods_name_lst[msg - 1]
else:
if msg[0] in goods_name_lst:
for i in range(len(goods_name_lst)):
if msg[0] == goods_name_lst[i]:
goods = goods_lst[i]
goods = goods_name_lst[i]
break
else:
await buy.finish("请输入正确的商品名称!")

View File

@ -217,11 +217,12 @@ async def update_goods(**kwargs) -> "str, str, int":
tmp += f'折扣:{discount} --> {kwargs["discount"]}\n'
discount = kwargs["discount"]
if kwargs.get("limit_time"):
kwargs["limit_time"] = float(kwargs["limit_time"])
new_time = time.strftime(
"%Y-%m-%d %H:%M:%S",
time.localtime(time.time() + int(kwargs["limit_time"] * 60 * 60)),
time.localtime(time.time() + kwargs["limit_time"] * 60 * 60),
)
tmp += f"折扣至: {new_time}\n"
tmp += f"限时至: {new_time}\n"
limit_time = kwargs["limit_time"]
return (
await GoodsInfo.update_goods(
@ -249,7 +250,6 @@ def parse_goods_info(msg: str) -> Union[dict, str]:
sp = x.split(":", maxsplit=1)
if str(sp[1]).strip():
sp[1] = sp[1].strip()
print(sp)
if sp[0] == "name":
data["name"] = sp[1]
elif sp[0] == "price":

View File

@ -66,9 +66,11 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
await use_props.finish(f"该道具单次只能使用 {n} 个!")
if await BagUser.delete_property(
event.user_id, event.group_id, name, num
) and await effect(bot, event, name, num):
):
if func_manager.check_send_success_message(name):
await use_props.send(f"使用道具 {name} {num} 次成功!", at_sender=True)
if msg := await effect(bot, event, name, num):
await use_props.send(msg, at_sender=True)
logger.info(
f"USER {event.user_id} GROUP {event.group_id} 使用道具 {name} {num} 次成功"
)

View File

@ -1,6 +1,7 @@
from nonebot.adapters.cqhttp import GroupMessageEvent
from nonebot.adapters.cqhttp import GroupMessageEvent, MessageSegment
from services.log import logger
from nonebot.adapters.cqhttp import Bot
from typing import Optional, Union
import asyncio
@ -32,7 +33,7 @@ class GoodsUseFuncManager:
return self._data[goods_name]["kwargs"]["_max_num_limit"]
return 1
async def use(self, **kwargs):
async def use(self, **kwargs) -> Optional[Union[str, MessageSegment]]:
"""
使用道具
:param kwargs: kwargs
@ -40,11 +41,11 @@ class GoodsUseFuncManager:
goods_name = kwargs.get("goods_name")
if self.exists(goods_name):
if asyncio.iscoroutinefunction(self._data[goods_name]["func"]):
await self._data[goods_name]["func"](
return await self._data[goods_name]["func"](
**kwargs,
)
else:
self._data[goods_name]["func"](
return self._data[goods_name]["func"](
**kwargs,
)
@ -70,7 +71,9 @@ class GoodsUseFuncManager:
func_manager = GoodsUseFuncManager()
async def effect(bot: Bot, event: GroupMessageEvent, goods_name: str, num: int) -> bool:
async def effect(
bot: Bot, event: GroupMessageEvent, goods_name: str, num: int
) -> Optional[Union[str, MessageSegment]]:
"""
商品生效
:param bot: Bot
@ -83,7 +86,7 @@ async def effect(bot: Bot, event: GroupMessageEvent, goods_name: str, num: int)
try:
if func_manager.exists(goods_name):
_kwargs = func_manager.get_kwargs(goods_name)
await func_manager.use(
return await func_manager.use(
**{
**_kwargs,
"_bot": bot,
@ -94,10 +97,9 @@ async def effect(bot: Bot, event: GroupMessageEvent, goods_name: str, num: int)
"goods_name": goods_name,
}
)
return True
except Exception as e:
logger.error(f"use 商品生效函数effect 发生错误 {type(e)}{e}")
return False
return None
def register_use(goods_name: str, func, **kwargs):

View File

@ -8,7 +8,10 @@ from utils.manager import (
group_manager,
)
from nonebot.typing import T_State
from configs.config import Config
from services.log import logger
from nonebot.adapters.cqhttp import Bot, MessageEvent
from utils.utils import scheduler
__zx_plugin_name__ = "重载插件配置 [Superuser]"
@ -28,6 +31,18 @@ __plugin_cmd__ = [
]
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_configs__ = {
"AUTO_RELOAD": {
"value": False,
"help": "自动重载配置文件",
"default_value": False
},
"AUTO_RELOAD_TIME": {
"value": 180,
"help": "控制自动重载配置文件时长",
"default_value": 180
}
}
reload_plugins_manager = on_command(
@ -41,4 +56,19 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
plugins2cd_manager.reload()
plugins2block_manager.reload()
group_manager.reload()
Config.reload()
await reload_plugins_manager.send("重载完成...")
@scheduler.scheduled_job(
'interval',
seconds=Config.get_config("reload_setting", "AUTO_RELOAD_TIME", 180),
)
async def _():
if Config.get_config("reload_setting", "AUTO_RELOAD"):
plugins2settings_manager.reload()
plugins2cd_manager.reload()
plugins2block_manager.reload()
group_manager.reload()
Config.reload()
logger.debug("已自动重载所有配置文件...")

View File

@ -131,8 +131,10 @@ class GroupInfoUser(db.Model):
@classmethod
async def get_user_all_group(cls, user_qq: int) -> List[int]:
"""
获取该用户所在的所有群聊
:param user_qq: 用户qq
说明
获取该用户所在的所有群聊
参数
:param user_qq: 用户qq
"""
query = await cls.query.where(cls.user_qq == user_qq).gino.all()
if query:

View File

@ -193,7 +193,6 @@ async def get_sub_status(id_: int, sub_type: str) -> Optional[str]:
获取订阅状态
:param id_: 订阅 id
:param sub_type: 订阅类型
:return:
"""
try:
if sub_type == "live":
@ -203,7 +202,7 @@ async def get_sub_status(id_: int, sub_type: str) -> Optional[str]:
elif sub_type == "season":
return await _get_season_status(id_)
except ResponseCodeException:
return "获取信息失败...请检查订阅Id是否存在或稍后再试..."
return f"Id{id_} 获取信息失败...请检查订阅Id是否存在或稍后再试..."
# except Exception as e:
# logger.error(f"获取订阅状态发生预料之外的错误 id_{id_} {type(e)}{e}")
# return "发生了预料之外的错误..请稍后再试或联系管理员....."

View File

@ -133,7 +133,7 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
user_id = 0
if is_number(msg[1]):
group_id = int(msg[1])
text = msg[2]
text = " ".join(msg[2:])
else:
await reply.finish("群号错误...", at_sender=True)
else:

View File

@ -109,18 +109,6 @@ async def update_genshin_info():
data, code = await update_info(
url,
"genshin_arms",
[
"头像",
"名称",
"类型",
"稀有度.alt",
"获取途径",
"初始基础属性1",
"初始基础属性2",
"攻击力MAX",
"副属性MAX",
"技能",
],
)
if code == 200:
ALL_ARMS = init_game_pool("genshin_arms", data, GenshinChar)
@ -177,7 +165,6 @@ def _get_genshin_card(mode: int = 1, pool_name: str = "", add: float = 0.0):
if flag and star > 3 and pool_name:
# 获取up角色列表
up_char_lst = [x.operators for x in data_lst if x.star == star][0]
print(up_char_lst)
# 成功获取up角色
if random.random() < 0.5:
up_char_name = random.choice(up_char_lst)

View File

@ -179,6 +179,9 @@ async def _last_check(data: dict, game_name: str):
# 对抓取每行数据是否需要额外处理?
def intermediate_check(member_dict: dict, key: str, game_name: str, td: bs4.element.Tag):
if game_name == "genshin_arms":
if key == "稀有度":
member_dict["稀有度"] = td.find("img")["alt"].split('.')[0]
if game_name == 'prts':
if key == '获取途径':
msg = re.search('<td.*?>([\\s\\S]*)</td>', str(td)).group(1).strip()

View File

@ -131,8 +131,9 @@ async def retrieve_char_data(char: bs4.element.Tag, game_name: str, data: dict,
'名称': remove_prohibited_str(char.find('a')['title']),
'星级': 3 - index}
if game_name == 'azur':
char = char.find('div').find('div').find('div').find('div')
char = char.find('div').find('div').find('div')
avatar_img = char.find('a').find('img')
char = char.find('div')
try:
member_dict['名称'] = remove_prohibited_str(char.find('a')['title'])
except TypeError:

View File

@ -90,12 +90,12 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
if get_message_text(event.json()):
if get_message_text(event.json()).find("@可爱的小真寻") != -1:
await fudu.finish("复制粘贴的虚空艾特?", at_sender=True)
imgs = get_message_img(event.json())
img = get_message_img(event.json())
msg = get_message_text(event.json())
if not imgs and not msg:
if not img and not msg:
return
if imgs:
img_hash = await get_fudu_img_hash(imgs[0], event.group_id)
if img:
img_hash = await get_fudu_img_hash(img[0], event.group_id)
else:
img_hash = ""
add_msg = msg + "|-|" + img_hash
@ -113,15 +113,17 @@ async def _(bot: Bot, event: GroupMessageEvent, state: T_State):
if random.random() < 0.2:
await fudu.finish("打断施法!")
_fudu_list.set_repeater(event.group_id)
if imgs and msg:
if img and msg:
rst = msg + image(f"compare_{event.group_id}_img.jpg", "temp")
elif imgs:
elif img:
rst = image(f"compare_{event.group_id}_img.jpg", "temp")
elif msg:
rst = msg
else:
rst = ""
if rst:
if rst.endswith("打断施法!"):
rst = "打断" + rst
await fudu.send(rst)

View File

@ -40,6 +40,9 @@ __plugin_settings__ = {
"limit_superuser": False,
"cmd": ["原神资源查询", "原神资源列表"],
}
__plugin_block_limit__ = {
"rst": "您有资源正在查询!"
}
qr = on_command("原神资源查询", aliases={"原神资源查找"}, priority=5, block=True)
qr_lst = on_command("原神资源列表", priority=5, block=True)

View File

@ -14,6 +14,12 @@ Config.add_plugin_config(
"xV8v4Qu54lUKrEYFZkJhB8cuOh9Asafs"
)
Config.add_plugin_config(
"genshin",
"n",
"h8w582wxwgqvahcdkpvdhbh2w9casgfl"
)
Config.add_plugin_config(
"genshin",
"client_type",
@ -24,3 +30,4 @@ nonebot.load_plugins("plugins/genshin/query_user")

View File

@ -70,9 +70,9 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
uid = await Genshin.get_user_uid(event.user_id)
if not uid:
await bind.finish("请先绑定原神uid..")
if msg.startswith('"'):
if msg.startswith('"') or msg.startswith("'"):
msg = msg[1:]
if msg.endswith('"'):
if msg.endswith('"') or msg.endswith("'"):
msg = msg[:-1]
await Genshin.set_cookie(uid, msg)
_x = f"已成功为uid{uid} 设置cookie"

View File

@ -0,0 +1,101 @@
from .data_source import get_sign_reward_list, genshin_sign
from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
from nonebot import on_command
from nonebot.typing import T_State
from services.log import logger
from .init_task import add_job, scheduler, _sign
from apscheduler.jobstores.base import JobLookupError
from ..models import Genshin
__zx_plugin_name__ = "原神自动签到"
__plugin_usage__ = """
usage
米游社原神签到需要uid以及cookie
且在第二天自动排序签到时间
# 不听,就要手动签到!(使用命令 “原神我硬签 or 米游社我硬签”
指令
/关原神自动签到
原神我硬签
""".strip()
__plugin_des__ = "原神懒人签到"
__plugin_cmd__ = ["开启/关闭原神自动签到", "原神我硬签"]
__plugin_type__ = ("原神相关",)
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["原神签到"],
}
genshin_matcher = on_command(
"开原神自动签到", aliases={"关原神自动签到", "原神我硬签"}, priority=5, block=True
)
@genshin_matcher.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
uid = await Genshin.get_user_uid(event.user_id)
if not uid or not await Genshin.get_user_cookie(uid, True):
await genshin_matcher.finish("请先绑定uid和cookie")
if "account_id" not in await Genshin.get_user_cookie(uid, True):
await genshin_matcher.finish("请更新cookie")
if state["_prefix"]["raw_command"] == "原神我硬签":
try:
msg = await genshin_sign(uid)
logger.info(
f"(USER {event.user_id}, "
f"GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) UID{uid} 原神签到"
)
# 硬签,移除定时任务
try:
for i in range(3):
scheduler.remove_job(f"genshin_auto_sign_{uid}_{event.user_id}_{i}",)
except JobLookupError:
pass
u = await Genshin.get_user_by_uid(uid)
if u and u.auto_sign:
await u.clear_sign_time(uid)
next_date = await Genshin.random_sign_time(uid)
add_job(event.user_id, uid, next_date)
msg += f"因开启自动签到\n下一次签到时间为:{next_date.replace(microsecond=0)}"
except Exception as e:
msg = "原神签到失败..请尝试检查cookie或报告至管理员"
logger.info(
f"(USER {event.user_id}, "
f"GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) UID{uid} 原神签到发生错误 "
f"{type(e)}{e}"
)
msg = msg or "请检查cookie是否更新"
await genshin_matcher.send(msg, at_sender=True)
else:
for i in range(3):
try:
scheduler.remove_job(f"genshin_auto_sign_{uid}_{event.user_id}_{i}")
except JobLookupError:
pass
if state["_prefix"]["raw_command"][0] == "":
await Genshin.set_auto_sign(uid, True)
next_date = await Genshin.random_sign_time(uid)
add_job(event.user_id, uid, next_date)
await genshin_matcher.send(
f"已开启原神自动签到!\n下一次签到时间为:{next_date.replace(microsecond=0)}",
at_sender=True,
)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 开启原神自动签到"
)
else:
await Genshin.set_auto_sign(uid, False)
await Genshin.clear_sign_time(uid)
await genshin_matcher.send(f"已关闭原神自动签到!", at_sender=True)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 关闭原神自动签到"
)

View File

@ -0,0 +1,104 @@
from utils.http_utils import AsyncHttpx
from configs.config import Config
from services.log import logger
from ..utils import random_hex, get_old_ds
from ..models import Genshin
from typing import Optional, Dict
async def genshin_sign(uid: int) -> Optional[str]:
"""
原神签到信息
:param uid: uid
"""
data = await _sign(uid)
if not data:
return "签到失败..."
status = data["message"]
if status == "OK":
sign_info = await _get_sign_info(uid)
if sign_info:
sign_info = sign_info["data"]
sign_list = await get_sign_reward_list()
get_reward = sign_list["data"]["awards"][
int(sign_info["total_sign_day"]) - 1
]["name"]
reward_num = sign_list["data"]["awards"][
int(sign_info["total_sign_day"]) - 1
]["cnt"]
get_im = f"本次签到获得:{get_reward}x{reward_num}"
if status == "OK" and sign_info["is_sign"]:
return f"\n原神签到成功!\n{get_im}\n本月漏签次数:{sign_info['sign_cnt_missed']}"
else:
return status
return None
async def _sign(uid: int, server_id: str = "cn_gf01") -> Optional[Dict[str, str]]:
"""
米游社签到
:param uid: uid
:param server_id: 服务器id
"""
if str(uid)[0] == "5":
server_id = "cn_qd01"
try:
req = await AsyncHttpx.post(
url="https://api-takumi.mihoyo.com/event/bbs_sign_reward/sign",
headers={
"User_Agent": "Mozilla/5.0 (Linux; Android 10; MIX 2 Build/QKQ1.190825.002; wv) AppleWebKit/537.36 (KHTML, like Gecko) Version/4.0 Chrome/83.0.4103.101 Mobile Safari/537.36 miHoYoBBS/2.3.0",
"Cookie": await Genshin.get_user_cookie(int(uid), True),
"x-rpc-device_id": random_hex(32),
"Origin": "https://webstatic.mihoyo.com",
"X_Requested_With": "com.mihoyo.hyperion",
"DS": get_old_ds(),
"x-rpc-client_type": "5",
"Referer": "https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon",
"x-rpc-app_version": "2.3.0",
},
json={"act_id": "e202009291139501", "uid": uid, "region": server_id},
)
return req.json()
except Exception as e:
logger.error(f"米游社签到发生错误 UID{uid} {type(e)}{e}")
return None
async def get_sign_reward_list():
"""
获取签到奖励列表
"""
try:
req = await AsyncHttpx.get(
url="https://api-takumi.mihoyo.com/event/bbs_sign_reward/home?act_id=e202009291139501",
headers={
"x-rpc-app_version": str(Config.get_config("genshin", "mhyVersion")),
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1",
"x-rpc-client_type": str(Config.get_config("genshin", "client_type")),
"Referer": "https://webstatic.mihoyo.com/",
},
)
return req.json()
except Exception as e:
logger.error(f"获取签到奖励列表发生错误 {type(e)}{e}")
return None
async def _get_sign_info(uid: int, server_id: str = "cn_gf01"):
if str(uid)[0] == "5":
server_id = "cn_qd01"
try:
req = await AsyncHttpx.get(
url=f"https://api-takumi.mihoyo.com/event/bbs_sign_reward/info?act_id=e202009291139501&region={server_id}&uid={uid}",
headers={
"x-rpc-app_version": str(Config.get_config("genshin", "mhyVersion")),
"Cookie": await Genshin.get_user_cookie(int(uid), True),
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1",
"x-rpc-client_type": str(Config.get_config("genshin", "client_type")),
"Referer": "https://webstatic.mihoyo.com/",
},
)
return req.json()
except Exception as e:
logger.error(f"获取签到信息发生错误 UID{uid} {type(e)}{e}")
return None

View File

@ -0,0 +1,110 @@
from .data_source import genshin_sign
from models.group_member_info import GroupInfoUser
from utils.message_builder import at
from services.log import logger
from utils.utils import scheduler, get_bot
from apscheduler.jobstores.base import ConflictingIdError
from ..models import Genshin
from datetime import datetime, timedelta
from nonebot import Driver
import nonebot
import random
import pytz
driver: Driver = nonebot.get_driver()
@driver.on_startup
async def _():
"""
启动时分配定时任务
"""
g_list = await Genshin.get_all_auto_sign_user()
for u in g_list:
if u.auto_sign_time:
date = await Genshin.random_sign_time(u.uid)
scheduler.add_job(
_sign,
"date",
run_date=date.replace(microsecond=0),
id=f"genshin_auto_sign_{u.uid}_{u.user_qq}_0",
args=[u.user_qq, u.uid, 0],
)
logger.info(
f"genshin_sign add_jobUSER{u.user_qq} UID{u.uid} " f"{date} 原神自动签到"
)
def add_job(user_id: int, uid: int, date: datetime):
try:
scheduler.add_job(
_sign,
"date",
run_date=date.replace(microsecond=0),
id=f"genshin_auto_sign_{uid}_{user_id}_0",
args=[user_id, uid, 0],
)
logger.debug(f"genshin_sign add_job{date.replace(microsecond=0)} 原神自动签到")
except ConflictingIdError:
pass
async def _sign(user_id: int, uid: int, count: int):
"""
执行签到任务
:param user_id: 用户id
:param uid: uid
:param count: 执行次数
"""
if count < 3:
try:
msg = await genshin_sign(uid)
next_time = await Genshin.random_sign_time(uid)
msg += f"\n下一次签到时间为:{next_time.replace(microsecond=0)}"
logger.info(f"USER{user_id} UID{uid} 原神自动签到任务发生成功...")
try:
scheduler.add_job(
_sign,
"date",
run_date=next_time.replace(microsecond=0),
id=f"genshin_auto_sign_{uid}_{user_id}_0",
args=[user_id, uid, 0],
)
except ConflictingIdError:
msg += "\n定时任务设定失败..."
except Exception as e:
logger.error(f"USER{user_id} UID{uid} 原神自动签到任务发生错误 {type(e)}{e}")
msg = None
if not msg:
now = datetime.now(pytz.timezone("Asia/Shanghai"))
if now.hour < 23:
random_hours = random.randint(1, 23 - now.hour)
next_time = now + timedelta(hours=random_hours)
scheduler.add_job(
_sign,
"date",
run_date=next_time.replace(microsecond=0),
id=f"genshin_auto_sign_{uid}_{user_id}_{count}",
args=[user_id, uid, count + 1],
)
msg = (
f"{now.replace(microsecond=0)} 原神"
f"签到失败,将在 {next_time.replace(microsecond=0)} 时重试!"
)
else:
msg = "今日原神签到失败,请手动签到..."
logger.debug(f"USER{user_id} UID{uid} 原神今日签到失败...")
else:
msg = "今日原神自动签到重试次数已达到3次请手动签到。"
logger.debug(f"USER{user_id} UID{uid} 原神今日签到失败次数打到 3 次...")
bot = get_bot()
if bot:
if user_id in [x["user_id"] for x in await bot.get_friend_list()]:
await bot.send_private_msg(user_id=user_id, message=msg)
else:
group_list = await GroupInfoUser.get_user_all_group(user_id)
if group_list:
await bot.send_group_msg(
group_id=group_list[0], message=at(user_id) + msg
)

View File

@ -1,5 +1,8 @@
from services.db_context import db
from typing import Optional, Union
from typing import Optional, Union, List
from datetime import datetime, timedelta
import random
import pytz
class Genshin(db.Model):
@ -11,7 +14,10 @@ class Genshin(db.Model):
mys_id = db.Column(db.BigInteger())
cookie = db.Column(db.String(), default="")
today_query_uid = db.Column(db.String(), default="") # 该cookie今日查询的uid
auto_sign = db.Column(db.Boolean(), default=False) # 自动签到
auto_sign = db.Column(db.Boolean(), default=False)
auto_sign_time = db.Column(db.DateTime(timezone=True))
resin_remind = db.Column(db.Boolean(), default=False) # 树脂提醒
resin_recovery_time = db.Column(db.DateTime(timezone=True)) # 满树脂提醒日期
_idx1 = db.Index("genshin_uid_idx1", "user_qq", "uid", unique=True)
@ -66,11 +72,75 @@ class Genshin(db.Model):
return True
return False
@classmethod
async def set_resin_remind(cls, uid: int, flag: bool) -> bool:
"""
说明
设置体力提醒
参数
:param uid: 原神uid
:param flag: 开关状态
"""
query = cls.query.where(cls.uid == uid).with_for_update()
user = await query.gino.first()
if user:
await user.update(resin_remind=flag).apply()
return True
return False
@classmethod
async def set_user_resin_recovery_time(cls, uid: int, date: datetime):
"""
说明
设置体力完成时间
参数
:param uid: uid
:param date: 提醒日期
"""
u = await cls.query.where(cls.uid == uid).gino.first()
if u:
await u.update(resin_recovery_time=date).apply()
@classmethod
async def get_user_resin_recovery_time(cls, uid: int) -> Optional[datetime]:
"""
说明
获取体力完成时间
参数
:param uid: uid
"""
u = await cls.query.where(cls.uid == uid).gino.first()
if u:
return u.resin_recovery_time.astimezone(pytz.timezone("Asia/Shanghai"))
return None
@classmethod
async def get_all_resin_remind_user(cls) -> List["Genshin"]:
"""
说明
获取所有开启体力提醒的用户
"""
return await cls.query.where(cls.resin_remind == True).gino.all()
@classmethod
async def clear_resin_remind_time(cls, uid: int) -> bool:
"""
说明
清空提醒日期
参数
:param uid: uid
"""
user = await cls.query.where(cls.uid == uid).gino.first()
if user:
await user.update(resin_recovery_time=None).apply()
return True
return False
@classmethod
async def set_auto_sign(cls, uid: int, flag: bool) -> bool:
"""
说明
设置米游社自动签到
设置米游社/原神自动签到
参数
:param uid: 原神uid
:param flag: 开关状态
@ -82,6 +152,66 @@ class Genshin(db.Model):
return True
return False
@classmethod
async def get_all_auto_sign_user(cls) -> List["Genshin"]:
"""
说明
获取所有开启自动签到的用户
"""
return await cls.query.where(cls.auto_sign == True).gino.all()
@classmethod
async def get_all_sign_user(cls) -> List["Genshin"]:
"""
说明
获取 原神 所有今日签到用户
"""
return await cls.query.where(cls.auto_sign_time != None).gino.all()
@classmethod
async def clear_sign_time(cls, uid: int) -> bool:
"""
说明
清空签到日期
参数
:param uid: uid
"""
user = await cls.query.where(cls.uid == uid).gino.first()
if user:
await user.update(auto_sign_time=None).apply()
return True
return False
@classmethod
async def random_sign_time(cls, uid: int) -> Optional[datetime]:
"""
说明
随机签到时间
说明
:param uid: uid
"""
query = cls.query.where(cls.uid == uid).with_for_update()
user = await query.gino.first()
if user and user.cookie:
if user.auto_sign_time and user.auto_sign_time.astimezone(
pytz.timezone("Asia/Shanghai")
) - timedelta(seconds=2) >= datetime.now(pytz.timezone("Asia/Shanghai")):
return user.auto_sign_time.astimezone(pytz.timezone("Asia/Shanghai"))
hours = int(str(datetime.now()).split()[1].split(":")[0])
minutes = int(str(datetime.now()).split()[1].split(":")[1])
date = (
datetime.now()
+ timedelta(days=1)
- timedelta(hours=hours)
- timedelta(minutes=minutes - 1)
)
random_hours = random.randint(0, 22)
random_minutes = random.randint(1, 59)
date += timedelta(hours=random_hours) + timedelta(minutes=random_minutes)
await user.update(auto_sign_time=date).apply()
return date
return None
@classmethod
async def get_query_cookie(cls, uid: int) -> Optional[str]:
"""
@ -95,7 +225,9 @@ class Genshin(db.Model):
x = await query.gino.first()
if x:
return x.cookie
for u in [x for x in await cls.query.order_by(db.func.random()).gino.all() if x.cookie]:
for u in [
x for x in await cls.query.order_by(db.func.random()).gino.all() if x.cookie
]:
if not u.today_query_uid or len(u.today_query_uid[:-1].split()) < 30:
await cls._add_query_uid(uid, u.uid)
return u.cookie
@ -115,6 +247,26 @@ class Genshin(db.Model):
cookie = await cls.get_query_cookie(uid)
return cookie
@classmethod
async def get_user_by_qq(cls, user_qq: int) -> Optional["Genshin"]:
"""
说明
通过qq获取用户对象
参数
:param user_qq: qq
"""
return await cls.query.where(cls.user_qq == user_qq).gino.first()
@classmethod
async def get_user_by_uid(cls, uid: int) -> Optional["Genshin"]:
"""
说明
通过uid获取用户对象
参数
:param uid: qq
"""
return await cls.query.where(cls.uid == uid).gino.first()
@classmethod
async def get_user_uid(cls, user_qq: int) -> Optional[int]:
"""
@ -178,7 +330,7 @@ class Genshin(db.Model):
@classmethod
async def _get_user_data(
cls, user_qq: Optional[int], uid: Optional[int], type_: str
cls, user_qq: Optional[int], uid: Optional[int], type_: str
) -> Optional[Union[int, str]]:
"""
说明
@ -198,11 +350,10 @@ class Genshin(db.Model):
return user.mys_id
elif type_ == "cookie":
return user.cookie
return None
@classmethod
async def reset_today_query_uid(cls):
for u in await cls.query.with_for_update().gino.all():
if u.today_query_uid:
await u.update(
today_query_uid=""
).apply()
await u.update(today_query_uid="").apply()

View File

@ -2,8 +2,9 @@ from nonebot import on_command
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
from services.log import logger
from .data_source import get_user_memo
from .data_source import get_user_memo, get_memo
from ..models import Genshin
from nonebot.plugin import export
__zx_plugin_name__ = "原神便笺查询"
@ -25,6 +26,12 @@ __plugin_settings__ = {
"limit_superuser": False,
"cmd": ["原神便笺查询"],
}
__plugin_block_limit__ = {}
export = export()
export.get_memo = get_memo
query_memo_matcher = on_command("原神便签查询", aliases={"原神便笺查询", "yss"}, priority=5, block=True)
@ -43,7 +50,8 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
await query_memo_matcher.send(data)
logger.info(
f"(USER {event.user_id}, "
f"GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) 使用原神便笺查询 uid{uid}"
f"GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) "
f"使用原神便笺查询 uid{uid}"
)
else:
await query_memo_matcher.send("未查询到数据...")
@ -51,3 +59,4 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):

View File

@ -43,7 +43,7 @@ async def _():
async def get_user_memo(user_id: int, uid: int, uname: str) -> Optional[Union[str, MessageSegment]]:
uid = str(uid)
if uid[0] == "1" or uid[0] == "2":
if uid[0] in ["1", "2"]:
server_id = "cn_gf01"
elif uid[0] == "5":
server_id = "cn_qd01"
@ -72,7 +72,7 @@ async def get_memo(uid: str, server_id: str) -> "Union[str, dict], int":
except TimeoutError:
return "访问超时,请稍后再试", 997
except Exception as e:
logger.info(f"便签查询获取失败未知错误 {e}{e}")
logger.error(f"便签查询获取失败未知错误 {e}{e}")
return "发生了一些错误,请稍后再试", 998

View File

@ -26,6 +26,7 @@ __plugin_settings__ = {
"limit_superuser": False,
"cmd": ["原神玩家查询"],
}
__plugin_block_limit__ = {}
query_role_info_matcher = on_command("原神玩家查询", aliases={"原神玩家查找", "ys"}, priority=5, block=True)

View File

@ -238,7 +238,7 @@ def get_home_data_image(home_data_list: List[Dict]) -> BuildImage:
(0, 30), f'尘歌壶 Lv.{home_data_list[0]["level"]}', center_type="by_width"
)
region.text(
(0, 980), f'仙力: {home_data_list[0]["comfort_num"]}', center_type="by_width"
(0, region.h - 70), f'仙力: {home_data_list[0]["comfort_num"]}', center_type="by_width"
)
except (IndexError, KeyError):
region.text((0, 30), f"尘歌壶 Lv.0", center_type="by_width")

View File

@ -0,0 +1,83 @@
from nonebot import on_command
from nonebot.typing import T_State
from nonebot.adapters.cqhttp import Bot, MessageEvent, GroupMessageEvent
from apscheduler.jobstores.base import JobLookupError
from services.log import logger
from .init_task import scheduler, add_job
from ..models import Genshin
from datetime import datetime
import random
import asyncio
import pytz
__zx_plugin_name__ = "原神树脂提醒"
__plugin_usage__ = """
usage
即将满树脂的提醒
指令
开原神树脂提醒
关原神树脂提醒
""".strip()
__plugin_des__ = "时时刻刻警醒你!"
__plugin_cmd__ = ["开原神树脂提醒", "关原神树脂提醒"]
__plugin_type__ = ("原神相关",)
__plugin_version__ = 0.1
__plugin_author__ = "HibiKier"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["原神树脂提醒", "关原神树脂提醒", "开原神树脂提醒"],
}
resin_remind = on_command("开原神树脂提醒", aliases={"关原神树脂提醒"}, priority=5, block=True)
@resin_remind.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
uid = await Genshin.get_user_uid(event.user_id)
if not uid or not await Genshin.get_user_cookie(uid, True):
await resin_remind.finish("请先绑定uid和cookie")
try:
scheduler.remove_job(f"genshin_resin_remind_{uid}_{event.user_id}")
except JobLookupError:
pass
if state["_prefix"]["raw_command"][0] == "":
await Genshin.set_resin_remind(uid, True)
add_job(event.user_id, uid)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 开启原神体力提醒"
)
await resin_remind.send("开启原神树脂提醒成功!", at_sender=True)
else:
await Genshin.set_resin_remind(uid, False)
await Genshin.clear_resin_remind_time(uid)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 关闭原神体力提醒"
)
await resin_remind.send("已关闭原神树脂提醒..", at_sender=True)
@scheduler.scheduled_job(
"interval",
minutes=30,
)
async def _():
for u in await Genshin.get_all_resin_remind_user():
if u.resin_recovery_time:
if await Genshin.get_user_resin_recovery_time(u.uid) < datetime.now(
pytz.timezone("Asia/Shanghai")
):
await Genshin.clear_resin_remind_time(u.uid)
elif (
await Genshin.get_user_resin_recovery_time(u.uid)
- datetime.now(pytz.timezone("Asia/Shanghai"))
).seconds > 360:
continue
add_job(u.user_qq, u.uid)
await asyncio.sleep(random.randint(10, 30))

View File

@ -0,0 +1,123 @@
from utils.utils import get_bot, scheduler
from utils.message_builder import at
from models.group_member_info import GroupInfoUser
from apscheduler.jobstores.base import ConflictingIdError
from nonebot import Driver
from ..models import Genshin
from datetime import datetime, timedelta
from services.log import logger
from nonebot.plugin import require
import time
import nonebot
import pytz
driver: Driver = nonebot.get_driver()
get_memo = require('query_memo').get_memo
class UserManager:
def __init__(self):
self._data = []
def append(self, o: str):
if o not in self._data:
self._data.append(o)
def remove(self, o: str):
if o in self._data:
self._data.remove(o)
def exists(self, o: str):
return o in self._data
user_manager = UserManager()
@driver.on_startup
async def _():
"""
启动时分配定时任务
"""
g_list = await Genshin.get_all_resin_remind_user()
for u in g_list:
if u.resin_recovery_time and await Genshin.get_user_resin_recovery_time(
u.uid
) > datetime.now(pytz.timezone("Asia/Shanghai")):
date = await Genshin.get_user_resin_recovery_time(u.uid)
scheduler.add_job(
_remind,
"date",
run_date=date.replace(microsecond=0),
id=f"genshin_resin_remind_{u.uid}_{u.user_qq}",
args=[u.user_qq, u.uid],
)
logger.info(
f"genshin_resin_remind add_jobUSER{u.user_qq} UID{u.uid} "
f"{date} 原神树脂提醒"
)
def add_job(user_id: int, uid: int):
date = datetime.now(pytz.timezone("Asia/Shanghai")) + timedelta(seconds=30)
try:
scheduler.add_job(
_remind,
"date",
run_date=date.replace(microsecond=0),
id=f"genshin_resin_remind_{uid}_{user_id}",
args=[user_id, uid],
)
except ConflictingIdError:
pass
async def _remind(user_id: int, uid: str):
uid = str(uid)
if uid[0] in ["1", "2"]:
server_id = "cn_gf01"
elif uid[0] == "5":
server_id = "cn_qd01"
else:
return
data, code = await get_memo(uid, server_id)
if code == 200:
current_resin = data["current_resin"] # 当前树脂
max_resin = data["max_resin"] # 最大树脂
resin_recovery_time = data["resin_recovery_time"] # 树脂全部回复时间
if max_resin - current_resin > 5:
user_manager.remove(uid)
next_time = datetime.strptime(time.strftime(
"%Y-%m-%d %H:%M:%S", time.localtime(time.time() + float(resin_recovery_time))
), "%Y-%m-%d %H:%M:%S")
await Genshin.set_user_resin_recovery_time(int(uid), next_time)
scheduler.add_job(
_remind,
"date",
run_date=next_time,
id=f"genshin_resin_remind_{uid}_{user_id}",
args=[user_id, uid],
)
logger.info(f"genshin_resin_remind add_job{next_time.replace(microsecond=0)} 原神树脂提醒")
else:
if not user_manager.exists(uid):
user_manager.append(uid)
bot = get_bot()
if bot:
if user_id in [x["user_id"] for x in await bot.get_friend_list()]:
await bot.send_private_msg(
user_id=user_id,
message=f"树脂已经 {current_resin} 个啦" f",马上就要溢出了!快快刷掉刷掉!",
)
else:
group_list = await GroupInfoUser.get_user_all_group(user_id)
if group_list:
await bot.send_group_msg(
group_id=group_list[0],
message=at(user_id) + f"树脂已经 {current_resin} 个啦"
f",马上就要溢出了!快快刷掉刷掉!",
)

View File

@ -3,6 +3,7 @@ import json
import time
import random
import hashlib
import string
def _md5(text):
@ -11,7 +12,15 @@ def _md5(text):
return md5.hexdigest()
def get_ds(q: str = "", b: dict = None):
def get_old_ds() -> str:
n = Config.get_config("genshin", "n")
i = str(int(time.time()))
r = ''.join(random.sample(string.ascii_lowercase + string.digits, 6))
c = _md5("salt=" + n + "&t=" + i + "&r=" + r)
return i + "," + r + "," + c
def get_ds(q: str = "", b: dict = None) -> str:
if b:
br = json.dumps(b)
else:
@ -23,6 +32,13 @@ def get_ds(q: str = "", b: dict = None):
return t + "," + r + "," + c
def random_hex(length: int) -> str:
result = hex(random.randint(0, 16 ** length)).replace("0x", "").upper()
if len(result) < length:
result = "0" * (length - len(result)) + result
return result
element_mastery = {
"anemo": "",
"pyro": "",

View File

@ -323,7 +323,7 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
scheduler.add_job(
end_festive_redbag,
"date",
run_date=str(datetime.now() + timedelta(hours=24)).split(".")[0],
run_date=(datetime.now() + timedelta(hours=24)).replace(microsecond=0),
id=f"festive_redbag_{g}",
args=[bot, g],
)

View File

@ -1,4 +1,4 @@
from nonebot import on_command, on_keyword, on_regex
from nonebot import on_message, on_keyword, on_regex
from configs.path_config import IMAGE_PATH
from utils.message_builder import image
from utils.utils import get_message_text, is_number
@ -9,6 +9,7 @@ from utils.utils import FreqLimiter, cn2py
from pathlib import Path
from configs.config import Config
from utils.manager import group_manager, withdraw_message_manager
from .rule import rule
import random
import os
@ -50,9 +51,8 @@ Config.add_plugin_config(
_flmt = FreqLimiter(1)
cmd = set(Config.get_config("image_management", "IMAGE_DIR_LIST"))
send_img = on_command("img", aliases=cmd, priority=5, block=True)
send_img = on_message(priority=5, rule=rule, block=True)
pa = on_keyword({"丢人爬", "爪巴"}, priority=5, block=True)
pa_reg = on_regex("^爬$", priority=5, block=True)
@ -62,18 +62,24 @@ _path = Path(IMAGE_PATH) / "image_management"
@send_img.handle()
async def _(bot: Bot, event: MessageEvent, state: T_State):
img_id = get_message_text(event.json())
path = _path / cn2py(state["_prefix"]["raw_command"])
if state["_prefix"]["raw_command"] in Config.get_config(
msg = get_message_text(event.json()).split()
gallery = msg[0]
if gallery not in Config.get_config("image_management", "IMAGE_DIR_LIST"):
return
img_id = None
if len(msg) > 1:
img_id = msg[1]
path = _path / cn2py(gallery)
if gallery in Config.get_config(
"image_management", "IMAGE_DIR_LIST"
):
if not path.exists() and (path.parent.parent / cn2py(state["_prefix"]["raw_command"])).exists():
path = Path(IMAGE_PATH) / cn2py(state["_prefix"]["raw_command"])
if not path.exists() and (path.parent.parent / cn2py(gallery)).exists():
path = Path(IMAGE_PATH) / cn2py(gallery)
else:
path.mkdir(parents=True, exist_ok=True)
length = len(os.listdir(path))
if length == 0:
logger.warning(f'图库 {cn2py(state["_prefix"]["raw_command"])} 为空,调用取消!')
logger.warning(f'图库 {cn2py(gallery)} 为空,调用取消!')
await send_img.finish("该图库中没有图片噢")
index = img_id if img_id else str(random.randint(0, length - 1))
if not is_number(index):
@ -85,7 +91,7 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) "
f"发送{cn2py(state['_prefix']['raw_command'])}:"
f"发送{cn2py(gallery)}:"
+ result
)
msg_id = await send_img.send(
@ -102,7 +108,7 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) "
f"发送 {cn2py(state['_prefix']['raw_command'])} 失败"
f"发送 {cn2py(gallery)} 失败"
)
await send_img.finish(f"不想给你看Ov|")
@ -135,4 +141,4 @@ async def _(bot: Bot, event: MessageEvent, state: T_State):
return
if _flmt.check(event.user_id):
_flmt.start_cd(event.user_id)
await pa.finish(image(random.choice(os.listdir(IMAGE_PATH + "pa")), "pa"))
await pa_reg.finish(image(random.choice(os.listdir(IMAGE_PATH + "pa")), "pa"))

View File

@ -0,0 +1,18 @@
from nonebot.adapters.cqhttp import Bot, Event
from nonebot.typing import T_State
from utils.utils import get_message_text
from configs.config import Config
def rule(bot: Bot, event: Event, state: T_State) -> bool:
"""
检测文本是否是关闭功能命令
:param bot: pass
:param event: pass
:param state: pass
"""
msg = get_message_text(event.json())
for x in Config.get_config("image_management", "IMAGE_DIR_LIST"):
if msg.startswith(x):
return True
return False

View File

@ -248,13 +248,13 @@ async def text2image(
for e in placeholder[2].split():
if e.startswith("font="):
_font = e.split("=")[-1]
if e.startswith("font_size="):
if e.startswith("font_size=") or e.startswith("fs="):
_font_size = int(e.split("=")[-1])
if _font_size > 1000:
_font_size = 1000
if _font_size < 1:
_font_size = 1
if e.startswith("font_color"):
if e.startswith("font_color") or e.startswith("fc="):
_font_color = e.split("=")[-1]
text_img = BuildImage(
0,