Merge remote-tracking branch 'origin/main'

# Conflicts:
#	README.md
#	plugins/bilibili_sub/__init__.py
This commit is contained in:
zyj2134 2022-08-25 23:00:49 +08:00
commit 4595bdcee4
48 changed files with 2160 additions and 1137 deletions

16
.gitignore vendored
View File

@ -138,10 +138,24 @@ dmypy.json
# Cython debug symbols # Cython debug symbols
cython_debug/ cython_debug/
demo.py
test.py test.py
server_ip.py server_ip.py
game_utils.py
member_activity_handle.py member_activity_handle.py
Yu-Gi-Oh/ Yu-Gi-Oh/
csgo/ csgo/
fantasy_card/ fantasy_card/
data/
log/
backup/
extensive_plugin/
test/
bot.py
data/
.env
.env.dev
resources/
!/configs/config.py
!/configs/config.yaml
!/.env
!/.env.dev

View File

@ -128,7 +128,7 @@
- [x] 移动图片 (同上) - [x] 移动图片 (同上)
- [x] 删除图片 (同上) - [x] 删除图片 (同上)
- [x] 群内B站订阅 - [x] 群内B站订阅
- [x] 词条 - [x] 词条设置
- [x] 休息吧/醒来 - [x] 休息吧/醒来
### 已实现的超级用户功能 ### 已实现的超级用户功能
@ -252,6 +252,10 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
### 感谢名单 ### 感谢名单
(可以告诉我你的 __github__ 地址我偷偷换掉0v|) (可以告诉我你的 __github__ 地址我偷偷换掉0v|)
[爱发电用户_4jrf](https://afdian.net/u/6b2cdcc817c611ed949152540025c377)
[爱发电用户_TBsd](https://afdian.net/u/db638b60217911ed9efd52540025c377)
[烟寒若雨](https://afdian.net/u/067bd2161eec11eda62b52540025c377)
[ln](https://afdian.net/u/b51914ba1c6611ed8a4e52540025c377)
[爱发电用户_b9S4](https://afdian.net/u/3d8f30581a2911edba6d52540025c377) [爱发电用户_b9S4](https://afdian.net/u/3d8f30581a2911edba6d52540025c377)
[爱发电用户_c58s](https://afdian.net/u/a6ad8dda195e11ed9a4152540025c377) [爱发电用户_c58s](https://afdian.net/u/a6ad8dda195e11ed9a4152540025c377)
[爱发电用户_eNr9](https://afdian.net/u/05fdb41c0c9a11ed814952540025c377) [爱发电用户_eNr9](https://afdian.net/u/05fdb41c0c9a11ed814952540025c377)
@ -273,6 +277,32 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
## 更新 ## 更新
### 2022/8/23
* 修了下模糊匹配 issue#1026 [@pull/1026](https://github.com/HibiKier/zhenxun_bot/pull/1026)
### 2022/8/22
* 修复首次安装时词条旧表出错(因为根本就没有这张表!)
* 取消配置替换定时任务,统一存储
* 对米游社cookie进行判断整合米游社签到信息 [@pull/1014](https://github.com/HibiKier/zhenxun_bot/pull/1014)
* 修正尘歌壶和质变仪图片获取地址 [@pull/1010](https://github.com/HibiKier/zhenxun_bot/pull/1010)
* 修复词库问答 **很多** 问题[@pull/1012](https://github.com/HibiKier/zhenxun_bot/pull/1012)
### 2022/8/21 \[v0.1.6.3]
* 重构群词条改为词库Plus增加 精准|模糊|正则 问题匹配问题与回答均支持atimageface超级用户额外提供 全局|私聊 词库设置,数据迁移目前只提供了问题和回答都是纯文本的词条
* 修复b站转发解析av号无法解析
* B站订阅直播订阅支持短号
* 开箱提供重置开箱命令,重置今日所有开箱数据(重置次数,并不会删除今日已开箱记录)
* 提供全局字典GDict通过from utils.manager import GDict导入
* 适配omega 13w张图的数据结构表建议删表重导
* 除首次启动外将配置替换加入单次定时任务,加快启动速度
* fix: WordBank.check() [@pull/1008](https://github.com/HibiKier/zhenxun_bot/pull/1008)
* 改进插件 `我有一个朋友`,避免触发过于频繁 [@pull/1001](https://github.com/HibiKier/zhenxun_bot/pull/1001)
* 原神便笺新增洞天宝钱和参量质变仪提示 [@pull/1005](https://github.com/HibiKier/zhenxun_bot/pull/1005)
* 新增米游社签到功能,自动领取(白嫖)米游币 [@pull/991](https://github.com/HibiKier/zhenxun_bot/pull/991)
### 2022/8/14 ### 2022/8/14
* 修复epic未获取到时间时出错 * 修复epic未获取到时间时出错
@ -633,7 +663,7 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
* 修复点歌无法正确发送 * 修复点歌无法正确发送
* 修复我有一个朋友有时文本会包含CQ码 * 修复我有一个朋友有时文本会包含CQ码
* 修复群欢消息被动控制文本未删除 [@pull/124](https://github.com/HibiKier/zhenxun_bot/pull/124) * 修复群欢消息被动控制文本未删除 [@pull/124](https://github.com/HibiKier/zhenxun_bot/pull/124)
* message_builder.image不再提供参数abspath * message_builder.image不再提供参数:abspath
### 2022/2/23 ### 2022/2/23
@ -771,7 +801,7 @@ PS: **ARM平台** 请使用全量版 同时 **如果你的机器 RAM < 1G 可能
__..... 更多更新信息请查看文档__ __..... 更多更新信息请查看文档__
## Todo ## Todo
- [ ] web管理 - [x] web管理
## 感谢 ## 感谢
[botuniverse / onebot](https://github.com/botuniverse/onebot) :超棒的机器人协议 [botuniverse / onebot](https://github.com/botuniverse/onebot) :超棒的机器人协议

View File

@ -1 +1 @@
__version__: v0.1.6.2 __version__: v0.1.6.3

View File

@ -14,7 +14,6 @@ from .check_plugin_status import check_plugin_status
from nonebot.adapters.onebot.v11 import Bot from nonebot.adapters.onebot.v11 import Bot
from configs.path_config import DATA_PATH from configs.path_config import DATA_PATH
from services.log import logger from services.log import logger
from pathlib import Path
from nonebot import Driver from nonebot import Driver
import nonebot import nonebot
@ -28,7 +27,7 @@ driver: Driver = nonebot.get_driver()
@driver.on_startup @driver.on_startup
def _(): async def _():
""" """
初始化数据 初始化数据
""" """

View File

@ -5,7 +5,6 @@ from utils.manager import (
plugins2settings_manager, plugins2settings_manager,
plugins2block_manager, plugins2block_manager,
plugins_manager, plugins_manager,
resources_manager
) )
from services.log import logger from services.log import logger
from utils.utils import get_matchers from utils.utils import get_matchers

View File

@ -1,3 +1,5 @@
import asyncio
from datetime import datetime, timedelta
from pathlib import Path from pathlib import Path
from ruamel.yaml import round_trip_load, round_trip_dump, YAML from ruamel.yaml import round_trip_load, round_trip_dump, YAML
from utils.manager import admin_manager, plugins_manager from utils.manager import admin_manager, plugins_manager
@ -5,8 +7,8 @@ from configs.config import Config
from services.log import logger from services.log import logger
from utils.text_utils import prompt2cn from utils.text_utils import prompt2cn
from utils.utils import get_matchers from utils.utils import get_matchers
from utils.utils import scheduler
from ruamel import yaml from ruamel import yaml
import nonebot
_yaml = YAML(typ="safe") _yaml = YAML(typ="safe")
@ -105,6 +107,24 @@ def init_plugins_config(data_path):
round_trip_dump( round_trip_dump(
_data, wf, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True _data, wf, indent=2, Dumper=yaml.RoundTripDumper, allow_unicode=True
) )
user_config_file = Path() / "configs" / "config.yaml"
# if not user_config_file.exists():
_replace_config()
# else:
# logger.info('五分钟后将进行配置数据替换,请注意...')
# scheduler.add_job(
# _replace_config,
# "date",
# run_date=datetime.now() + timedelta(minutes=5),
# id=f"_replace_config"
# )
def _replace_config():
"""
说明:
定时任务加载的配置读取替换
"""
# 再开始读取用户配置 # 再开始读取用户配置
user_config_file = Path() / "configs" / "config.yaml" user_config_file = Path() / "configs" / "config.yaml"
_data = {} _data = {}

View File

@ -26,18 +26,8 @@ def init_plugins_resources():
else: else:
path = Path(_module.__getattribute__("__file__")).parent path = Path(_module.__getattribute__("__file__")).parent
for resource in resources.keys(): for resource in resources.keys():
resources_manager.add_resource(matcher.plugin_name, path / resource, resources[resource]) resources_manager.add_resource(
matcher.plugin_name, path / resource, resources[resource]
)
resources_manager.save() resources_manager.save()
resources_manager.start_move() resources_manager.start_move()

View File

@ -24,7 +24,11 @@ def init_plugins_settings(data_path: str):
_plugin = nonebot.plugin.get_plugin(x) _plugin = nonebot.plugin.get_plugin(x)
_module = _plugin.module _module = _plugin.module
metadata = _plugin.metadata metadata = _plugin.metadata
plugin_name = metadata.name if metadata else _module.__getattribute__("__zx_plugin_name__") plugin_name = (
metadata.name
if metadata
else _module.__getattribute__("__zx_plugin_name__")
)
_tmp_module[x] = plugin_name _tmp_module[x] = plugin_name
except (KeyError, AttributeError) as e: except (KeyError, AttributeError) as e:
logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}") logger.warning(f"配置文件 模块:{x} 获取 plugin_name 失败...{e}")
@ -77,11 +81,9 @@ def init_plugins_settings(data_path: str):
"__plugin_settings__" "__plugin_settings__"
) )
except AttributeError: except AttributeError:
plugin_settings = { plugin_settings = {"cmd": [matcher.plugin_name, plugin_name]}
"cmd": [matcher.plugin_name, plugin_name] if not plugin_settings.get("cost_gold"):
} plugin_settings["cost_gold"] = 0
if not plugin_settings.get('cost_gold'):
plugin_settings['cost_gold'] = 0
if ( if (
plugin_settings.get("cmd") is not None plugin_settings.get("cmd") is not None
and plugin_name not in plugin_settings["cmd"] and plugin_name not in plugin_settings["cmd"]
@ -99,9 +101,7 @@ def init_plugins_settings(data_path: str):
) )
else: else:
try: try:
plugin_type = _module.__getattribute__( plugin_type = _module.__getattribute__("__plugin_type__")
"__plugin_type__"
)
except AttributeError: except AttributeError:
plugin_type = ("normal",) plugin_type = ("normal",)
if plugin_settings and matcher.plugin_name: if plugin_settings and matcher.plugin_name:

View File

@ -1,3 +1,5 @@
import random
from asyncpg.exceptions import ( from asyncpg.exceptions import (
DuplicateColumnError, DuplicateColumnError,
UndefinedColumnError, UndefinedColumnError,
@ -13,6 +15,7 @@ from configs.path_config import TEXT_PATH
from asyncio.exceptions import TimeoutError from asyncio.exceptions import TimeoutError
from typing import List from typing import List
from utils.http_utils import AsyncHttpx from utils.http_utils import AsyncHttpx
from utils.manager import GDict
from utils.utils import scheduler from utils.utils import scheduler
import nonebot import nonebot
@ -108,6 +111,18 @@ async def _():
"ALTER TABLE genshin ADD bind_group Integer;", "ALTER TABLE genshin ADD bind_group Integer;",
"genshin" "genshin"
), # 新增原神群号绑定字段 ), # 新增原神群号绑定字段
(
"ALTER TABLE genshin ADD login_ticket VARCHAR(255) DEFAULT '';",
"genshin"
), # 新增米游社login_ticket绑定字段
(
"ALTER TABLE genshin ADD stuid VARCHAR(255) DEFAULT '';",
"genshin"
), # 新增米游社stuid绑定字段
(
"ALTER TABLE genshin ADD stoken VARCHAR(255) DEFAULT '';",
"genshin"
), # 新增米游社stoken绑定字段
( (
"ALTER TABLE chat_history ADD plain_text Text;", "ALTER TABLE chat_history ADD plain_text Text;",
"chat_history" "chat_history"
@ -121,8 +136,11 @@ async def _():
"goods_info" "goods_info"
), # 新增纯文本 ), # 新增纯文本
] ]
for sql in sql_str: for sql in sql_str + GDict.get('run_sql', []):
try: try:
if isinstance(sql, str):
flag = f'{random.randint(1, 10000)}'
else:
flag = sql[1] flag = sql[1]
sql = sql[0] sql = sql[0]
query = db.text(sql) query = db.text(sql)
@ -136,29 +154,6 @@ async def _():
# bag_user 将文本转为字典格式 # bag_user 将文本转为字典格式
await __database_script(_flag) await __database_script(_flag)
# 完成后
end_sql_str = [
# "ALTER TABLE bag_users DROP COLUMN props;" # 删除 bag_users 的 props 字段(还不到时候)
]
for sql in end_sql_str:
try:
query = db.text(sql)
await db.first(query)
logger.info(f"完成执行sql操作{sql}")
except (DuplicateColumnError, UndefinedColumnError):
pass
except PostgresSyntaxError:
logger.error(f"语法错误执行sql失败{sql}")
# str2json_sql = ["alter table bag_users alter COLUMN props type json USING props::json;"] # 字段类型替换
# rename_sql = 'alter table {} rename {} to {};' # 字段更名
# for sql in str2json_sql:
# try:
# query = db.text(sql)
# await db.first(query)
# except DuplicateColumnError:
# pass
@driver.on_bot_connect @driver.on_bot_connect
async def _(bot: Bot): async def _(bot: Bot):

View File

@ -94,13 +94,14 @@ class ConfigsManager:
del self._data[module] del self._data[module]
self.save() self.save()
def set_config(self, module: str, key: str, value: Any , save_simple_data: bool = False): def set_config(self, module: str, key: str, value: Any, auto_save: bool = False, save_simple_data: bool = True):
""" """
设置配置值 设置配置值
:param module: 模块名 :param module: 模块名
:param key: 配置名称 :param key: 配置名称
:param value: :param value:
:param save_simple_data: 同时保存至config.yaml :param auto_save: 自动保存
:param save_simple_data: 保存至config.yaml
""" """
if module in self._data.keys(): if module in self._data.keys():
if ( if (
@ -109,7 +110,8 @@ class ConfigsManager:
): ):
self._data[module][key]["value"] = value self._data[module][key]["value"] = value
self._simple_data[module][key] = value self._simple_data[module][key] = value
self.save(save_simple_data = save_simple_data) if auto_save:
self.save(save_simple_data=save_simple_data)
def set_help(self, module: str, key: str, help_: str): def set_help(self, module: str, key: str, help_: str):
""" """
@ -200,13 +202,12 @@ class ConfigsManager:
重新加载配置文件 重新加载配置文件
""" """
_yaml = YAML() _yaml = YAML()
temp_file = Path() / "configs" / "config.yaml" if self._simple_file.exists():
if temp_file.exists(): with open(self._simple_file, "r", encoding="utf8") as f:
with open(temp_file, "r", encoding="utf8") as f: self._simple_data = _yaml.load(f)
temp = _yaml.load(f) for key in self._simple_data.keys():
for key in temp.keys(): for k in self._simple_data[key].keys():
for k in temp[key].keys(): self._data[key][k]["value"] = self._simple_data[key][k]
self._data[key][k]["value"] = temp[key][k]
self.save() self.save()
def get_admin_level_data(self): def get_admin_level_data(self):

BIN
docs_image/logo.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 694 KiB

BIN
logo.png

Binary file not shown.

Before

Width:  |  Height:  |  Size: 694 KiB

After

Width:  |  Height:  |  Size: 326 KiB

View File

@ -226,7 +226,7 @@ async def _():
bot = get_bot() bot = get_bot()
sub = None sub = None
if bot: if bot:
try: # try:
await sub_manager.reload_sub_data() await sub_manager.reload_sub_data()
sub = await sub_manager.random_sub_data() sub = await sub_manager.random_sub_data()
if sub: if sub:
@ -234,10 +234,10 @@ async def _():
rst = await get_sub_status(sub.sub_id, sub.sub_type) rst = await get_sub_status(sub.sub_id, sub.sub_type)
await send_sub_msg(rst, sub, bot) await send_sub_msg(rst, sub, bot)
if sub.sub_type == "live": if sub.sub_type == "live":
rst = await get_sub_status(sub.uid, "up") rst = await get_sub_status(sub.sub_id, "up")
await send_sub_msg(rst, sub, bot) await send_sub_msg(rst, sub, bot)
except Exception as e: # except Exception as e:
logger.error(f"B站订阅推送发生错误 sub_id{sub.sub_id if sub else 0} {type(e)}{e}") # logger.error(f"B站订阅推送发生错误 sub_id{sub.sub_id if sub else 0} {type(e)}{e}")
async def send_sub_msg(rst: str, sub: BilibiliSub, bot: Bot): async def send_sub_msg(rst: str, sub: BilibiliSub, bot: Bot):

View File

@ -39,7 +39,6 @@ async def add_live_sub(live_id: int, sub_user: str) -> str:
:return: :return:
""" """
try: try:
async with db.transaction():
try: try:
"""bilibili_api.live库的LiveRoom类中get_room_info改为bilireq.live库的get_room_info_by_id方法""" """bilibili_api.live库的LiveRoom类中get_room_info改为bilireq.live库的get_room_info_by_id方法"""
live_info = await get_room_info_by_id(live_id) live_info = await get_room_info_by_id(live_id)
@ -58,13 +57,13 @@ async def add_live_sub(live_id: int, sub_user: str) -> str:
live_short_id=short_id, live_short_id=short_id,
live_status=live_status, live_status=live_status,
): ):
await _get_up_status(live_id) await _get_up_status(room_id)
uname = (await BilibiliSub.get_sub(live_id)).uname uname = (await BilibiliSub.get_sub(room_id)).uname
return ( return (
"已成功订阅主播:\n" "已成功订阅主播:\n"
f"\ttitle{title}\n" f"\ttitle{title}\n"
f"\tname {uname}\n" f"\tname {uname}\n"
f"\tlive_id{live_id}\n" f"\tlive_id{room_id}\n"
f"\tuid{uid}" f"\tuid{uid}"
) )
else: else:
@ -243,7 +242,7 @@ async def _get_live_status(id_: int) -> Optional[str]:
async def _get_up_status(id_: int) -> Optional[str]: async def _get_up_status(id_: int) -> Optional[str]:
""" """
获取用户投稿状态 获取用户投稿状态
:param id_: 用户 id :param id_: 订阅 id
:return: :return:
""" """
_user = await BilibiliSub.get_sub(id_) _user = await BilibiliSub.get_sub(id_)
@ -251,13 +250,13 @@ async def _get_up_status(id_: int) -> Optional[str]:
user_info = await get_user_info(_user.uid) user_info = await get_user_info(_user.uid)
uname = user_info["name"] uname = user_info["name"]
"""bilibili_api.user库中User类的get_videos改为bilireq.user库的get_videos方法""" """bilibili_api.user库中User类的get_videos改为bilireq.user库的get_videos方法"""
video_info = await get_videos(id_) video_info = await get_videos(_user.uid)
latest_video_created = 0 latest_video_created = 0
video = None video = None
dividing_line = "\n-------------\n" dividing_line = "\n-------------\n"
if _user.uname != uname: if _user.uname != uname:
await BilibiliSub.update_sub_info(id_, uname=uname) await BilibiliSub.update_sub_info(id_, uname=uname)
dynamic_img, dynamic_upload_time, link = await get_user_dynamic(id_, _user) dynamic_img, dynamic_upload_time, link = await get_user_dynamic(_user.uid, _user)
if video_info["list"].get("vlist"): if video_info["list"].get("vlist"):
video = video_info["list"]["vlist"][0] video = video_info["list"]["vlist"][0]
latest_video_created = video["created"] latest_video_created = video["created"]

View File

@ -65,7 +65,6 @@ class BilibiliSub(db.Model):
:param season_update_time: 番剧更新时间 :param season_update_time: 番剧更新时间
""" """
try: try:
async with db.transaction():
query = ( query = (
await cls.query.where(cls.sub_id == sub_id) await cls.query.where(cls.sub_id == sub_id)
.with_for_update() .with_for_update()

View File

@ -36,7 +36,7 @@ __plugin_configs__ = {
}, },
} }
coser = on_regex(r"^(\d?)连?(cos|COS|coser|括丝)$", priority=5, block=True) coser = on_regex(r"^(\d)?连?(cos|COS|coser|括丝)$", priority=5, block=True)
# 纯cos较慢:https://picture.yinux.workers.dev # 纯cos较慢:https://picture.yinux.workers.dev
# 比较杂,有福利姬,较快:https://api.jrsgslb.cn/cos/url.php?return=img # 比较杂,有福利姬,较快:https://api.jrsgslb.cn/cos/url.php?return=img
@ -44,7 +44,7 @@ url = "https://picture.yinux.workers.dev/"
@coser.handle() @coser.handle()
async def _(bot: Bot, event: MessageEvent, reg_group: Tuple[Any, ...] = RegexGroup()): async def _(event: MessageEvent, reg_group: Tuple[Any, ...] = RegexGroup()):
num = reg_group[0] or 1 num = reg_group[0] or 1
for _ in range(int(num)): for _ in range(int(num)):
try: try:

View File

@ -19,6 +19,9 @@ class Genshin(db.Model):
resin_remind = db.Column(db.Boolean(), default=False) # 树脂提醒 resin_remind = db.Column(db.Boolean(), default=False) # 树脂提醒
resin_recovery_time = db.Column(db.DateTime(timezone=True)) # 满树脂提醒日期 resin_recovery_time = db.Column(db.DateTime(timezone=True)) # 满树脂提醒日期
bind_group = db.Column(db.BigInteger()) bind_group = db.Column(db.BigInteger())
login_ticket = db.Column(db.String(), default="")
stuid = db.Column(db.String(), default="")
stoken = db.Column(db.String(), default="")
_idx1 = db.Index("genshin_uid_idx1", "user_qq", "uid", unique=True) _idx1 = db.Index("genshin_uid_idx1", "user_qq", "uid", unique=True)
@ -386,3 +389,96 @@ class Genshin(db.Model):
for u in await cls.query.with_for_update().gino.all(): for u in await cls.query.with_for_update().gino.all():
if u.today_query_uid: if u.today_query_uid:
await u.update(today_query_uid="").apply() await u.update(today_query_uid="").apply()
@classmethod
async def set_stuid(cls, uid: int, stuid: str) -> bool:
"""
说明:
设置stuid
参数:
:param uid: 原神uid
:param stuid: stuid
"""
query = cls.query.where(cls.uid == uid).with_for_update()
user = await query.gino.first()
if user:
await user.update(stuid=stuid).apply()
return True
return False
@classmethod
async def set_stoken(cls, uid: int, stoken: str) -> bool:
"""
说明:
设置stoken
参数:
:param uid: 原神uid
:param stoken: stoken
"""
query = cls.query.where(cls.uid == uid).with_for_update()
user = await query.gino.first()
if user:
await user.update(stoken=stoken).apply()
return True
return False
@classmethod
async def set_login_ticket(cls, uid: int, login_ticket: str) -> bool:
"""
说明:
设置login_ticket
参数:
:param uid: 原神uid
:param login_ticket: login_ticket
"""
query = cls.query.where(cls.uid == uid).with_for_update()
user = await query.gino.first()
if user:
await user.update(login_ticket=login_ticket).apply()
return True
return False
# 获取login_ticket
@classmethod
async def get_login_ticket(cls, uid: int) -> Optional[str]:
"""
说明:
获取login_ticket
参数:
:param uid: 原神uid
"""
query = cls.query.where(cls.uid == uid)
user = await query.gino.first()
if user:
return user.login_ticket
return None
# 获取stuid
@classmethod
async def get_stuid(cls, uid: int) -> Optional[str]:
"""
说明:
获取stuid
参数:
:param uid: 原神uid
"""
query = cls.query.where(cls.uid == uid)
user = await query.gino.first()
if user:
return user.stuid
return None
# 获取stoken
@classmethod
async def get_stoken(cls, uid: int) -> Optional[str]:
"""
说明:
获取stoken
参数:
:param uid: 原神uid
"""
query = cls.query.where(cls.uid == uid)
user = await query.gino.first()
if user:
return user.stoken
return None

View File

@ -5,6 +5,8 @@ from .._models import Genshin
from services.log import logger from services.log import logger
from nonebot.params import CommandArg, Command from nonebot.params import CommandArg, Command
from typing import Tuple from typing import Tuple
from utils.http_utils import AsyncHttpx
import json
__zx_plugin_name__ = "原神绑定" __zx_plugin_name__ = "原神绑定"
@ -39,6 +41,10 @@ bind = on_command(
unbind = on_command("原神解绑", priority=5, block=True) unbind = on_command("原神解绑", priority=5, block=True)
web_Api = "https://api-takumi.mihoyo.com"
bbs_Cookie_url = "https://webapi.account.mihoyo.com/Api/cookie_accountinfo_by_loginticket?login_ticket={}"
bbs_Cookie_url2 = web_Api + "/auth/api/getMultiTokenByLoginTicket?login_ticket={}&token_types=3&uid={}"
@bind.handle() @bind.handle()
async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command(), arg: Message = CommandArg()): async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command(), arg: Message = CommandArg()):
@ -81,6 +87,35 @@ async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command(), arg: Message
if msg.endswith('"') or msg.endswith("'"): if msg.endswith('"') or msg.endswith("'"):
msg = msg[:-1] msg = msg[:-1]
await Genshin.set_cookie(uid, msg) await Genshin.set_cookie(uid, msg)
cookie = msg
# 用: 代替=, ,代替;
cookie = '{"' + cookie.replace('=', '": "').replace("; ", '","') + '"}'
print(cookie)
cookie_json = json.loads(cookie)
print(cookie_json)
if 'login_ticket' not in cookie_json:
await bind.finish("请发送正确完整的cookie")
login_ticket = cookie_json['login_ticket']
# try:
res = await AsyncHttpx.get(url=bbs_Cookie_url.format(login_ticket))
res.encoding = "utf-8"
data = json.loads(res.text)
print(data)
if "成功" in data["data"]["msg"]:
stuid = str(data["data"]["cookie_info"]["account_id"])
res = await AsyncHttpx.get(url=bbs_Cookie_url2.format(
login_ticket, stuid))
res.encoding = "utf-8"
data = json.loads(res.text)
stoken = data["data"]["list"][0]["token"]
# await Genshin.set_cookie(uid, cookie)
await Genshin.set_stoken(uid, stoken)
await Genshin.set_stuid(uid, stuid)
await Genshin.set_login_ticket(uid, login_ticket)
# except Exception as e:
# await bind.finish("获取登陆信息失败请检查cookie是否正确或更新cookie")
elif data["data"]["msg"] == "登录信息已失效,请重新登录":
await bind.finish("登录信息失效请重新获取最新cookie进行绑定")
_x = f"已成功为uid{uid} 设置cookie" _x = f"已成功为uid{uid} 设置cookie"
if isinstance(event, GroupMessageEvent): if isinstance(event, GroupMessageEvent):
await Genshin.set_bind_group(uid, event.group_id) await Genshin.set_bind_group(uid, event.group_id)

View File

@ -1,4 +1,5 @@
from .data_source import get_sign_reward_list, genshin_sign from .data_source import get_sign_reward_list, genshin_sign
from ..mihoyobbs_sign import mihoyobbs_sign
from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent from nonebot.adapters.onebot.v11 import MessageEvent, GroupMessageEvent
from nonebot import on_command from nonebot import on_command
from services.log import logger from services.log import logger
@ -43,6 +44,8 @@ async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command()):
uid = await Genshin.get_user_uid(event.user_id) uid = await Genshin.get_user_uid(event.user_id)
if cmd == "查看我的cookie": if cmd == "查看我的cookie":
my_cookie = await Genshin.get_user_cookie(uid, True) my_cookie = await Genshin.get_user_cookie(uid, True)
if isinstance(event, GroupMessageEvent):
await genshin_matcher.finish("请私聊查看您的cookie")
await genshin_matcher.finish("您的cookie为" + my_cookie) await genshin_matcher.finish("您的cookie为" + my_cookie)
if not uid or not await Genshin.get_user_cookie(uid, True): if not uid or not await Genshin.get_user_cookie(uid, True):
await genshin_matcher.finish("请先绑定uid和cookie") await genshin_matcher.finish("请先绑定uid和cookie")
@ -50,11 +53,14 @@ async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command()):
# await genshin_matcher.finish("请更新cookie") # await genshin_matcher.finish("请更新cookie")
if cmd == "原神我硬签": if cmd == "原神我硬签":
try: try:
await genshin_matcher.send("正在进行签到...", at_sender=True)
msg = await genshin_sign(uid) msg = await genshin_sign(uid)
return_data = await mihoyobbs_sign(event.user_id)
logger.info( logger.info(
f"(USER {event.user_id}, " f"(USER {event.user_id}, "
f"GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) UID{uid} 原神签到" f"GROUP {event.group_id if isinstance(event, GroupMessageEvent) else 'private'}) UID{uid} 原神签到"
) )
logger.info(msg)
# 硬签,移除定时任务 # 硬签,移除定时任务
try: try:
for i in range(3): for i in range(3):
@ -66,7 +72,7 @@ async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command()):
await u.clear_sign_time(uid) await u.clear_sign_time(uid)
next_date = await Genshin.random_sign_time(uid) next_date = await Genshin.random_sign_time(uid)
add_job(event.user_id, uid, next_date) add_job(event.user_id, uid, next_date)
msg += f"因开启自动签到\n下一次签到时间为:{next_date.replace(microsecond=0)}" msg += f"\n{return_data}\n因开启自动签到\n下一次签到时间为:{next_date.replace(microsecond=0)}"
except Exception as e: except Exception as e:
msg = "原神签到失败..请尝试检查cookie或报告至管理员" msg = "原神签到失败..请尝试检查cookie或报告至管理员"
logger.info( logger.info(

View File

@ -1,7 +1,7 @@
from utils.http_utils import AsyncHttpx from utils.http_utils import AsyncHttpx
from configs.config import Config from configs.config import Config
from services.log import logger from services.log import logger
from .._utils import random_hex, get_old_ds from ..mihoyobbs_sign.setting import *
from .._models import Genshin from .._models import Genshin
from typing import Optional, Dict from typing import Optional, Dict
import hashlib import hashlib
@ -21,6 +21,7 @@ async def genshin_sign(uid: int) -> Optional[str]:
return "签到失败..." return "签到失败..."
status = data["message"] status = data["message"]
if status == "OK": if status == "OK":
try:
sign_info = await _get_sign_info(uid) sign_info = await _get_sign_info(uid)
if sign_info: if sign_info:
sign_info = sign_info["data"] sign_info = sign_info["data"]
@ -32,19 +33,25 @@ async def genshin_sign(uid: int) -> Optional[str]:
int(sign_info["total_sign_day"]) - 1 int(sign_info["total_sign_day"]) - 1
]["cnt"] ]["cnt"]
get_im = f"本次签到获得:{get_reward}x{reward_num}" get_im = f"本次签到获得:{get_reward}x{reward_num}"
logger.info("get_im:" + get_im + "\nsign_info:" + str(sign_info))
if status == "OK" and sign_info["is_sign"]: if status == "OK" and sign_info["is_sign"]:
return f"\n原神签到成功!\n{get_im}\n本月漏签次数:{sign_info['sign_cnt_missed']}" return f"原神签到成功!\n{get_im}\n本月漏签次数:{sign_info['sign_cnt_missed']}"
except Exception as e:
logger.error(f"原神签到发生错误 UID{str(data)}")
return f"原神签到发生错误: {str(data)}"
else: else:
return status return status
return None if data["data"]["risk_code"] == 375:
return "原神签到失败\n账号可能被风控,请前往米游社手动签到!"
return str(data)
# 获取请求Header里的DS 当web为true则生成网页端的DS # 获取请求Header里的DS 当web为true则生成网页端的DS
def get_ds(web: bool) -> str: def get_ds(web: bool) -> str:
if web: if web:
n = "9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7" n = mihoyobbs_Salt_web
else: else:
n = "9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7" n = mihoyobbs_Salt
i = str(timestamp()) i = str(timestamp())
r = random_text(6) r = random_text(6)
c = md5("salt=" + n + "&t=" + i + "&r=" + r) c = md5("salt=" + n + "&t=" + i + "&r=" + r)
@ -82,24 +89,15 @@ async def _sign(uid: int, server_id: str = "cn_gf01") -> Optional[Dict[str, str]
server_id = "cn_qd01" server_id = "cn_qd01"
try: try:
cookie = await Genshin.get_user_cookie(uid, True) cookie = await Genshin.get_user_cookie(uid, True)
headers['DS'] = get_ds(web=True)
headers['Referer'] = 'https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true' \
f'&act_id={genshin_Act_id}&utm_source=bbs&utm_medium=mys&utm_campaign=icon'
headers['Cookie'] = cookie
headers['x-rpc-device_id'] = get_device_id(cookie)
req = await AsyncHttpx.post( req = await AsyncHttpx.post(
url="https://api-takumi.mihoyo.com/event/bbs_sign_reward/sign", url=genshin_Signurl,
headers={ headers=headers,
'Accept': 'application/json, text/plain, */*', json={"act_id": genshin_Act_id, "uid": uid, "region": server_id},
'DS': get_ds(web=True),
'Origin': 'https://webstatic.mihoyo.com',
'x-rpc-app_version': "2.34.1",
'User-Agent': 'Mozilla/5.0 (Linux; Android 9; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) '
'Version/4.0 Chrome/39.0.0.0 Mobile Safari/537.36 miHoYoBBS/2.3.0',
'x-rpc-client_type': "5",
'Referer': 'https://webstatic.mihoyo.com/bbs/event/signin-ys/index.html?bbs_auth_required=true&act_id=e202009291139501&utm_source=bbs&utm_medium=mys&utm_campaign=icon',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,en-US;q=0.8',
'X-Requested-With': 'com.mihoyo.hyperion',
"Cookie": cookie,
'x-rpc-device_id': get_device_id(cookie)
},
json={"act_id": "e202009291139501", "uid": uid, "region": server_id},
) )
return req.json() return req.json()
except Exception as e: except Exception as e:

View File

@ -1,4 +1,5 @@
from .data_source import genshin_sign from .data_source import genshin_sign
from ..mihoyobbs_sign import mihoyobbs_sign
from models.group_member_info import GroupInfoUser from models.group_member_info import GroupInfoUser
from utils.message_builder import at from utils.message_builder import at
from services.log import logger from services.log import logger
@ -57,6 +58,11 @@ async def _sign(user_id: int, uid: int, count: int):
:param uid: uid :param uid: uid
:param count: 执行次数 :param count: 执行次数
""" """
try:
return_data = await mihoyobbs_sign(user_id)
except Exception as e:
logger.error(f"mihoyobbs_sign error{e}")
return_data = "米游社签到失败,请尝试发送'米游社签到'进行手动签到"
if count < 3: if count < 3:
try: try:
msg = await genshin_sign(uid) msg = await genshin_sign(uid)
@ -101,6 +107,7 @@ async def _sign(user_id: int, uid: int, count: int):
bot = get_bot() bot = get_bot()
if bot: if bot:
if user_id in [x["user_id"] for x in await bot.get_friend_list()]: if user_id in [x["user_id"] for x in await bot.get_friend_list()]:
await bot.send_private_msg(user_id=user_id, message=return_data)
await bot.send_private_msg(user_id=user_id, message=msg) await bot.send_private_msg(user_id=user_id, message=msg)
else: else:
if not (group_id := await Genshin.get_bind_group(uid)): if not (group_id := await Genshin.get_bind_group(uid)):

View File

@ -0,0 +1,81 @@
from nonebot.adapters.onebot.v11 import MessageEvent
from nonebot import on_command
from services.log import logger
# from .init_task import add_job, scheduler, _sign
# from apscheduler.jobstores.base import JobLookupError
from .._models import Genshin
from nonebot.params import Command
from typing import Tuple
from .mihoyobbs import *
__zx_plugin_name__ = "米游社自动签到"
__plugin_usage__ = """
usage
发送'米游社签到'或绑定原神自动签到
即可手动/自动进行米游社签到
若启用了原神自动签到会在签到原神同时完成米游币领取
--> 每天白嫖90-110米游币不香吗
需要重新绑定原神cookie
遇到问题请提issue或@作者
""".strip()
__plugin_des__ = "米游社自动签到任务"
__plugin_cmd__ = ["米游社签到", "米游社我硬签"]
__plugin_type__ = ("原神相关",)
__plugin_version__ = 0.1
__plugin_author__ = "HDU_Nbsp"
__plugin_settings__ = {
"level": 5,
"default_status": True,
"limit_superuser": False,
"cmd": ["原神签到"],
}
mihoyobbs_matcher = on_command(
"米游社签到", aliases={"米游社我硬签"}, priority=5, block=True
)
@mihoyobbs_matcher.handle()
async def _(event: MessageEvent, cmd: Tuple[str, ...] = Command()):
await mihoyobbs_matcher.send("提交米游社签到申请", at_sender=True)
return_data = await mihoyobbs_sign(event.user_id)
if return_data:
await mihoyobbs_matcher.finish(return_data, at_sender=True)
else:
await mihoyobbs_matcher.finish("米游社签到失败,请查看控制台输出", at_sender=True)
async def mihoyobbs_sign(user_id):
uid = await Genshin.get_user_uid(user_id)
if not uid or not await Genshin.get_user_cookie(uid, True):
await mihoyobbs_matcher.finish("请先绑定uid和cookie", at_sender=True)
stuid = await Genshin.get_stuid(uid)
stoken = await Genshin.get_stoken(uid)
cookie = await Genshin.get_user_cookie(uid)
bbs = mihoyobbs.Mihoyobbs(stuid=stuid, stoken=stoken, cookie=cookie)
await bbs.init()
return_data = ""
if bbs.Task_do["bbs_Sign"] and bbs.Task_do["bbs_Read_posts"] and bbs.Task_do["bbs_Like_posts"] and \
bbs.Task_do["bbs_Share"]:
return_data += f"今天的米游社签到任务已经全部完成了!\n" \
f"一共获得{mihoyobbs.today_have_get_coins}个米游币\n目前有{mihoyobbs.Have_coins}个米游币"
logger.info(f"今天已经全部完成了!一共获得{mihoyobbs.today_have_get_coins}个米游币,目前有{mihoyobbs.Have_coins}个米游币")
else:
i = 0
print("开始签到")
print(mihoyobbs.today_have_get_coins)
while mihoyobbs.today_get_coins != 0 and i < 3:
# if i > 0:
await bbs.refresh_list()
await bbs.signing()
await bbs.read_posts()
await bbs.like_posts()
await bbs.share_post()
await bbs.get_tasks_list()
i += 1
return_data += "\n" + f"今天已经获得{mihoyobbs.today_have_get_coins}个米游币\n" \
f"还能获得{mihoyobbs.today_get_coins}个米游币\n目前有{mihoyobbs.Have_coins}个米游币"
logger.info(f"今天已经获得{mihoyobbs.today_have_get_coins}个米游币,"
f"还能获得{mihoyobbs.today_get_coins}个米游币,目前有{mihoyobbs.Have_coins}个米游币")
return return_data

View File

@ -0,0 +1,6 @@
class CookieError(Exception):
def __init__(self, info):
self.info = info
def __str__(self):
return repr(self.info)

View File

@ -0,0 +1,193 @@
from services.log import logger
from .error import CookieError
from utils.http_utils import AsyncHttpx
from .setting import *
from .tools import *
import json
today_get_coins = 0
today_have_get_coins = 0 # 这个变量以后可能会用上,先留着了
Have_coins = 0
class Mihoyobbs:
def __init__(self, stuid: str, stoken: str, cookie: str) -> None:
self.postsList = None
self.headers = {
"DS": get_ds(web=False),
"cookie": f'stuid={stuid};stoken={stoken}',
"x-rpc-client_type": mihoyobbs_Client_type,
"x-rpc-app_version": mihoyobbs_Version,
"x-rpc-sys_version": "6.0.1",
"x-rpc-channel": "miyousheluodi",
"x-rpc-device_id": get_device_id(cookie=cookie),
"x-rpc-device_name": random_text(random.randint(1, 10)),
"x-rpc-device_model": "Mi 10",
"Referer": "https://app.mihoyo.com",
"Host": "bbs-api.mihoyo.com",
"User-Agent": "okhttp/4.8.0"
}
self.Task_do = {
"bbs_Sign": False,
"bbs_Read_posts": False,
"bbs_Read_posts_num": 3,
"bbs_Like_posts": False,
"bbs_Like_posts_num": 5,
"bbs_Share": False
}
async def init(self):
await self.get_tasks_list()
# 如果这三个任务都做了就没必要获取帖子了
if self.Task_do["bbs_Read_posts"] and self.Task_do["bbs_Like_posts"] and self.Task_do["bbs_Share"]:
pass
else:
self.postsList = await self.get_list()
async def refresh_list(self) -> None:
self.postsList = await self.get_list()
# 获取任务列表,用来判断做了哪些任务
async def get_tasks_list(self):
global today_get_coins
global today_have_get_coins
global Have_coins
logger.info("正在获取任务列表")
req = await AsyncHttpx.get(url=bbs_Tasks_list, headers=self.headers)
data = req.json()
if "err" in data["message"] or data["retcode"] == -100:
logger.error("获取任务列表失败你的cookie可能已过期请重新设置cookie。")
raise CookieError('Cookie expires')
else:
today_get_coins = data["data"]["can_get_points"]
today_have_get_coins = data["data"]["already_received_points"]
Have_coins = data["data"]["total_points"]
# 如果当日可获取米游币数量为0直接判断全部任务都完成了
if today_get_coins == 0:
self.Task_do["bbs_Sign"] = True
self.Task_do["bbs_Read_posts"] = True
self.Task_do["bbs_Like_posts"] = True
self.Task_do["bbs_Share"] = True
else:
# 如果第0个大于或等于62则直接判定任务没做
if data["data"]["states"][0]["mission_id"] >= 62:
logger.info(f"今天可以获得{today_get_coins}个米游币")
pass
else:
logger.info(f"还有任务未完成,今天还能获得{today_get_coins}米游币")
for i in data["data"]["states"]:
# 58是讨论区签到
if i["mission_id"] == 58:
if i["is_get_award"]:
self.Task_do["bbs_Sign"] = True
# 59是看帖子
elif i["mission_id"] == 59:
if i["is_get_award"]:
self.Task_do["bbs_Read_posts"] = True
else:
self.Task_do["bbs_Read_posts_num"] -= i["happened_times"]
# 60是给帖子点赞
elif i["mission_id"] == 60:
if i["is_get_award"]:
self.Task_do["bbs_Like_posts"] = True
else:
self.Task_do["bbs_Like_posts_num"] -= i["happened_times"]
# 61是分享帖子
elif i["mission_id"] == 61:
if i["is_get_award"]:
self.Task_do["bbs_Share"] = True
# 分享帖子,是最后一个任务,到这里了下面都是一次性任务,直接跳出循环
break
# 获取要帖子列表
async def get_list(self) -> list:
temp_list = []
logger.info("正在获取帖子列表......")
req = await AsyncHttpx.get(url=bbs_List_url.format(mihoyobbs_List_Use[0]["forumId"]),
headers=self.headers)
data = req.json()["data"]["list"]
for n in range(5):
r_l = random.choice(data)
while r_l["post"]["subject"] in str(temp_list):
r_l = random.choice(data)
temp_list.append([r_l["post"]["post_id"], r_l["post"]["subject"]])
# temp_list.append([data["data"]["list"][n]["post"]["post_id"], data["data"]["list"][n]["post"]["subject"]])
logger.info("已获取{}个帖子".format(len(temp_list)))
return temp_list
# 进行签到操作
async def signing(self):
if self.Task_do["bbs_Sign"]:
logger.info("讨论区任务已经完成过了~")
else:
logger.info("正在签到......")
header = {}
header.update(self.headers)
for i in mihoyobbs_List_Use:
header["DS"] = get_ds2("", json.dumps({"gids": i["id"]}))
req = await AsyncHttpx.post(url=bbs_Sign_url, json={"gids": i["id"]}, headers=header)
data = req.json()
if "err" not in data["message"]:
logger.info(str(i["name"] + data["message"]))
time.sleep(random.randint(2, 8))
else:
logger.error("签到失败你的cookie可能已过期请重新设置cookie。")
raise CookieError('Cookie expires')
# 看帖子
async def read_posts(self):
if self.Task_do["bbs_Read_posts"]:
logger.info("看帖任务已经完成过了~")
else:
logger.info("正在看帖......")
for i in range(self.Task_do["bbs_Read_posts_num"]):
req = await AsyncHttpx.get(url=bbs_Detail_url.format(self.postsList[i][0]), headers=self.headers)
data = req.json()
if data["message"] == "OK":
logger.debug("看帖:{} 成功".format(self.postsList[i][1]))
time.sleep(random.randint(2, 8))
# 点赞
async def like_posts(self):
if self.Task_do["bbs_Like_posts"]:
logger.info("点赞任务已经完成过了~")
else:
logger.info("正在点赞......")
for i in range(self.Task_do["bbs_Like_posts_num"]):
req = await AsyncHttpx.post(url=bbs_Like_url, headers=self.headers,
json={"post_id": self.postsList[i][0], "is_cancel": False})
data = req.json()
if data["message"] == "OK":
logger.debug("点赞:{} 成功".format(self.postsList[i][1]))
# 判断取消点赞是否打开
# if config.config["mihoyobbs"]["un_like"] :
# time.sleep(random.randint(2, 8))
# req = httpx.post(url=bbs_Like_url, headers=self.headers,
# json={"post_id": self.postsList[i][0], "is_cancel": True})
# data = req.json()
# if data["message"] == "OK":
# logger.debug("取消点赞:{} 成功".format(self.postsList[i][1]))
time.sleep(random.randint(2, 8))
# 分享操作
async def share_post(self):
if self.Task_do["bbs_Share"]:
logger.info("分享任务已经完成过了~")
else:
logger.info("正在执行分享任务......")
for i in range(3):
req = await AsyncHttpx.get(url=bbs_Share_url.format(self.postsList[0][0]), headers=self.headers)
data = req.json()
if data["message"] == "OK":
logger.debug("分享:{} 成功".format(self.postsList[0][1]))
logger.info("分享任务执行成功......")
break
else:
logger.debug(f"分享任务执行失败,正在执行第{i + 2}共3次")
time.sleep(random.randint(2, 8))
time.sleep(random.randint(2, 8))

View File

@ -0,0 +1,124 @@
# 米游社的Salt
mihoyobbs_Salt = "z8DRIUjNDT7IT5IZXvrUAxyupA1peND9"
mihoyobbs_Salt2 = "t0qEgfub6cvueAPgR5m9aQWWVciEer7v"
mihoyobbs_Salt_web = "9nQiU3AV0rJSIBWgdynfoGMGKaklfbM7"
# 米游社的版本
mihoyobbs_Version = "2.34.1" # Slat和Version相互对应
# 米游社的客户端类型
mihoyobbs_Client_type = "2" # 1为ios 2为安卓
mihoyobbs_Client_type_web = "5" # 4为pc web 5为mobile web
# 米游社的分区列表
mihoyobbs_List = [{
"id": "1",
"forumId": "1",
"name": "崩坏3",
"url": "https://bbs.mihoyo.com/bh3/"
}, {
"id": "2",
"forumId": "26",
"name": "原神",
"url": "https://bbs.mihoyo.com/ys/"
}, {
"id": "3",
"forumId": "30",
"name": "崩坏2",
"url": "https://bbs.mihoyo.com/bh2/"
}, {
"id": "4",
"forumId": "37",
"name": "未定事件簿",
"url": "https://bbs.mihoyo.com/wd/"
}, {
"id": "5",
"forumId": "34",
"name": "大别野",
"url": "https://bbs.mihoyo.com/dby/"
}, {
"id": "6",
"forumId": "52",
"name": "崩坏:星穹铁道",
"url": "https://bbs.mihoyo.com/sr/"
}, {
"id": "8",
"forumId": "57",
"name": "绝区零",
"url": "https://bbs.mihoyo.com/zzz/"
}]
game_id2name = {
"bh2_cn": "崩坏2",
"bh3_cn": "崩坏3",
"nxx_cn": "未定事件簿",
"hk4e_cn": "原神",
}
# Config Load之后run里面进行列表的选择
mihoyobbs_List_Use = [{
"id": "2",
"forumId": "26",
"name": "原神",
"url": "https://bbs.mihoyo.com/ys/"
},
# 不玩原神可以把签到讨论区换为大别墅
# {
# "id": "5",
# "forumId": "34",
# "name": "大别野",
# "url": "https://bbs.mihoyo.com/dby/"
# }
]
# 游戏签到的请求头
headers = {
'Accept': 'application/json, text/plain, */*',
'DS': "",
'Origin': 'https://webstatic.mihoyo.com',
'x-rpc-app_version': mihoyobbs_Version,
'User-Agent': 'Mozilla/5.0 (Linux; Android 12; Unspecified Device) AppleWebKit/537.36 (KHTML, like Gecko) '
f'Version/4.0 Chrome/103.0.5060.129 Mobile Safari/537.36 miHoYoBBS/{mihoyobbs_Version}',
'x-rpc-client_type': mihoyobbs_Client_type_web,
'Referer': '',
'Accept-Encoding': 'gzip, deflate',
'Accept-Language': 'zh-CN,en-US;q=0.8',
'X-Requested-With': 'com.mihoyo.hyperion',
"Cookie": "",
'x-rpc-device_id': ""
}
# 通用设置
bbs_Api = "https://bbs-api.mihoyo.com"
web_Api = "https://api-takumi.mihoyo.com"
account_Info_url = web_Api + "/binding/api/getUserGameRolesByCookie?game_biz="
# 米游社的API列表
bbs_Cookie_url = "https://webapi.account.mihoyo.com/Api/cookie_accountinfo_by_loginticket?login_ticket={}"
bbs_Cookie_url2 = web_Api + "/auth/api/getMultiTokenByLoginTicket?login_ticket={}&token_types=3&uid={}"
bbs_Tasks_list = bbs_Api + "/apihub/sapi/getUserMissionsState" # 获取任务列表
bbs_Sign_url = bbs_Api + "/apihub/app/api/signIn" # post
bbs_List_url = bbs_Api + "/post/api/getForumPostList?forum_id={}&is_good=false&is_hot=false&page_size=20&sort_type=1"
bbs_Detail_url = bbs_Api + "/post/api/getPostFull?post_id={}"
bbs_Share_url = bbs_Api + "/apihub/api/getShareConf?entity_id={}&entity_type=1"
bbs_Like_url = bbs_Api + "/apihub/sapi/upvotePost" # post json
# 崩坏2自动签到相关的相关设置
honkai2_Act_id = "e202203291431091"
honkai2_checkin_rewards = f'{web_Api}/event/luna/home?lang=zh-cn&act_id={honkai2_Act_id}'
honkai2_Is_signurl = web_Api + "/event/luna/info?lang=zh-cn&act_id={}&region={}&uid={}"
honkai2_Sign_url = web_Api + "/event/luna/sign"
# 崩坏3自动签到相关的设置
honkai3rd_Act_id = "e202207181446311"
honkai3rd_checkin_rewards = f'{web_Api}/event/luna/home?lang=zh-cn&act_id={honkai3rd_Act_id}'
honkai3rd_Is_signurl = web_Api + "/event/luna/info?lang=zh-cn&act_id={}&region={}&uid={}"
honkai3rd_Sign_url = web_Api + "/event/luna/sign"
# 未定事件簿自动签到相关设置
tearsofthemis_Act_id = "e202202251749321"
tearsofthemis_checkin_rewards = f'{web_Api}/event/luna/home?lang=zh-cn&act_id={tearsofthemis_Act_id}'
tearsofthemis_Is_signurl = honkai2_Is_signurl
tearsofthemis_Sign_url = honkai2_Sign_url # 和二崩完全一致
# 原神自动签到相关的设置
genshin_Act_id = "e202009291139501"
genshin_checkin_rewards = f'{web_Api}/event/bbs_sign_reward/home?act_id={genshin_Act_id}'
genshin_Is_signurl = web_Api + "/event/bbs_sign_reward/info?act_id={}&region={}&uid={}"
genshin_Signurl = web_Api + "/event/bbs_sign_reward/sign"

View File

@ -0,0 +1,65 @@
import uuid
import time
import random
import string
import hashlib
from .setting import *
# md5计算
def md5(text: str) -> str:
md5 = hashlib.md5()
md5.update(text.encode())
return md5.hexdigest()
# 随机文本
def random_text(num: int) -> str:
return ''.join(random.sample(string.ascii_lowercase + string.digits, num))
# 时间戳
def timestamp() -> int:
return int(time.time())
# 获取请求Header里的DS 当web为true则生成网页端的DS
def get_ds(web: bool) -> str:
if web:
n = mihoyobbs_Salt_web
else:
n = mihoyobbs_Salt
i = str(timestamp())
r = random_text(6)
c = md5("salt=" + n + "&t=" + i + "&r=" + r)
return f"{i},{r},{c}"
# 获取请求Header里的DS(版本2) 这个版本ds之前见到都是查询接口里的
def get_ds2(q: str, b: str) -> str:
n = mihoyobbs_Salt2
i = str(timestamp())
r = str(random.randint(100001, 200000))
add = f'&b={b}&q={q}'
c = md5("salt=" + n + "&t=" + i + "&r=" + r + add)
return f"{i},{r},{c}"
# 生成一个device id
def get_device_id(cookie) -> str:
return str(uuid.uuid3(uuid.NAMESPACE_URL, cookie))
# 获取签到的奖励名称
def get_item(raw_data: dict) -> str:
temp_name = raw_data["name"]
temp_cnt = raw_data["cnt"]
return f"{temp_name}x{temp_cnt}"
# 获取明天早晨0点的时间戳
def next_day() -> int:
now_time = int(time.time())
next_day_time = now_time - now_time % 86400 + time.timezone + 86400
return next_day_time

View File

@ -26,12 +26,17 @@ memo_path.mkdir(exist_ok=True, parents=True)
@driver.on_startup @driver.on_startup
async def _(): async def _():
for name, url in zip( for name, url in zip(
["resin.png", "task.png", "resin_discount.png"], [
"resin.png", "task.png", "resin_discount.png", "chengehu.png",
"zhibian.png"
],
[ [
"https://upload-bbs.mihoyo.com/upload/2021/09/29/8819732/54266243c7d15ba31690c8f5d63cc3c6_71491376413333325" "https://upload-bbs.mihoyo.com/upload/2021/09/29/8819732/54266243c7d15ba31690c8f5d63cc3c6_71491376413333325"
"20.png?x-oss-process=image//resize,s_600/quality,q_80/auto-orient,0/interlace,1/format,png", "20.png?x-oss-process=image//resize,s_600/quality,q_80/auto-orient,0/interlace,1/format,png",
"https://patchwiki.biligame.com/images/ys/thumb/c/cc/6k6kuj1kte6m1n7hexqfrn92z6h4yhh.png/60px-委托任务logo.png", "https://patchwiki.biligame.com/images/ys/thumb/c/cc/6k6kuj1kte6m1n7hexqfrn92z6h4yhh.png/60px-委托任务logo.png",
"https://patchwiki.biligame.com/images/ys/d/d9/t1hv6wpucbwucgkhjntmzroh90nmcdv.png", "https://patchwiki.biligame.com/images/ys/d/d9/t1hv6wpucbwucgkhjntmzroh90nmcdv.png",
"https://s3.bmp.ovh/imgs/2022/08/21/3a3b2e6c22e305ff.png",
"https://s3.bmp.ovh/imgs/2022/08/21/c2d7ace21e1d46cf.png",
], ],
): ):
file = memo_path / name file = memo_path / name
@ -40,7 +45,8 @@ async def _():
logger.info(f"已下载原神便签资源 -> {file}...") logger.info(f"已下载原神便签资源 -> {file}...")
async def get_user_memo(user_id: int, uid: int, uname: str) -> Optional[Union[str, MessageSegment]]: async def get_user_memo(user_id: int, uid: int,
uname: str) -> Optional[Union[str, MessageSegment]]:
uid = str(uid) uid = str(uid)
if uid[0] in ["1", "2"]: if uid[0] in ["1", "2"]:
server_id = "cn_gf01" server_id = "cn_gf01"
@ -54,12 +60,16 @@ async def get_user_memo(user_id: int, uid: int, uname: str) -> Optional[Union[st
async def get_memo(uid: str, server_id: str) -> "Union[str, dict], int": async def get_memo(uid: str, server_id: str) -> "Union[str, dict], int":
try: try:
req = await AsyncHttpx.get( req = await AsyncHttpx.get(
url=f"https://api-takumi-record.mihoyo.com/game_record/app/genshin/api/dailyNote?server={server_id}&role_id={uid}", url=
f"https://api-takumi-record.mihoyo.com/game_record/app/genshin/api/dailyNote?server={server_id}&role_id={uid}",
headers={ headers={
"DS": get_ds(f"role_id={uid}&server={server_id}"), "DS": get_ds(f"role_id={uid}&server={server_id}"),
"x-rpc-app_version": Config.get_config("genshin", "mhyVersion"), "x-rpc-app_version": Config.get_config("genshin",
"User-Agent": "Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1", "mhyVersion"),
"x-rpc-client_type": Config.get_config("genshin", "client_type"), "User-Agent":
"Mozilla/5.0 (iPhone; CPU iPhone OS 13_2_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) miHoYoBBS/2.11.1",
"x-rpc-client_type": Config.get_config("genshin",
"client_type"),
"Referer": "https://webstatic.mihoyo.com/", "Referer": "https://webstatic.mihoyo.com/",
"Cookie": await Genshin.get_user_cookie(int(uid)) "Cookie": await Genshin.get_user_cookie(int(uid))
}, },
@ -75,11 +85,18 @@ async def get_memo(uid: str, server_id: str) -> "Union[str, dict], int":
return "发生了一些错误,请稍后再试", 998 return "发生了一些错误,请稍后再试", 998
def create_border( def create_border(image_name: str, content: str, notice_text: str,
image_name: str, content: str, notice_text: str, value: str value: str) -> BuildImage:
) -> BuildImage: border = BuildImage(500,
border = BuildImage(500, 100, color="#E0D9D1", font="HYWenHei-85W.ttf", font_size=20) 75,
text_bk = BuildImage(350, 96, color="#F5F1EB", font_size=23, font="HYWenHei-85W.ttf") color="#E0D9D1",
font="HYWenHei-85W.ttf",
font_size=20)
text_bk = BuildImage(350,
75,
color="#F5F1EB",
font_size=23,
font="HYWenHei-85W.ttf")
_x = 70 if image_name == "resin.png" else 50 _x = 70 if image_name == "resin.png" else 50
_px = 10 if image_name == "resin.png" else 20 _px = 10 if image_name == "resin.png" else 20
text_bk.paste( text_bk.paste(
@ -88,7 +105,7 @@ def create_border(
True, True,
center_type="by_height", center_type="by_height",
) )
text_bk.text((87, 20), content) text_bk.text((87, 15), content)
text_bk.paste( text_bk.paste(
BuildImage( BuildImage(
0, 0,
@ -98,18 +115,19 @@ def create_border(
font="HYWenHei-85W.ttf", font="HYWenHei-85W.ttf",
font_size=17, font_size=17,
), ),
(87, 50), (87, 45),
True, True,
) )
font_width, _ = border.getsize(value) font_width, _ = border.getsize(value)
border.text((350 + 76 - int(font_width / 2), 0), value, center_type="by_height") border.text((350 + 76 - int(font_width / 2), 0),
value,
center_type="by_height")
border.paste(text_bk, (2, 0), center_type="by_height") border.paste(text_bk, (2, 0), center_type="by_height")
return border return border
async def parse_data_and_draw( async def parse_data_and_draw(user_id: int, uid: str, server_id: str,
user_id: int, uid: str, server_id: str, uname: str uname: str) -> Union[str, MessageSegment]:
) -> Union[str, MessageSegment]:
data, code = await get_memo(uid, server_id) data, code = await get_memo(uid, server_id)
if code != 200: if code != 200:
return data return data
@ -120,13 +138,11 @@ async def parse_data_and_draw(
if not role_avatar.exists(): if not role_avatar.exists():
await AsyncHttpx.download_file(x["avatar_side_icon"], role_avatar) await AsyncHttpx.download_file(x["avatar_side_icon"], role_avatar)
return await asyncio.get_event_loop().run_in_executor( return await asyncio.get_event_loop().run_in_executor(
None, _parse_data_and_draw, data, user_avatar, uid, uname None, _parse_data_and_draw, data, user_avatar, uid, uname)
)
def _parse_data_and_draw( def _parse_data_and_draw(data: dict, user_avatar: BytesIO, uid: int,
data: dict, user_avatar: BytesIO, uid: int, uname: str uname: str) -> Union[str, MessageSegment]:
) -> Union[str, MessageSegment]:
current_resin = data["current_resin"] # 当前树脂 current_resin = data["current_resin"] # 当前树脂
max_resin = data["max_resin"] # 最大树脂 max_resin = data["max_resin"] # 最大树脂
resin_recovery_time = data["resin_recovery_time"] # 树脂全部回复时间 resin_recovery_time = data["resin_recovery_time"] # 树脂全部回复时间
@ -137,17 +153,37 @@ def _parse_data_and_draw(
current_expedition_num = data["current_expedition_num"] # 当前挖矿人数 current_expedition_num = data["current_expedition_num"] # 当前挖矿人数
max_expedition_num = data["max_expedition_num"] # 每日挖矿最大人数 max_expedition_num = data["max_expedition_num"] # 每日挖矿最大人数
expeditions = data["expeditions"] # 挖矿详情 expeditions = data["expeditions"] # 挖矿详情
current_coin = data["current_home_coin"] # 当前宝钱
max_coin = data["max_home_coin"] # 最大宝钱
coin_recovery_time = data["home_coin_recovery_time"] # 宝钱全部回复时间
transformer_available = data["transformer"]["obtained"] # 参量质变仪可获取
transformer_state = data["transformer"]["recovery_time"][
"reached"] # 参量质变仪状态
transformer_recovery_time = data["transformer"]["recovery_time"][
"Day"] # 参量质变仪回复时间
transformer_recovery_hour = data["transformer"]["recovery_time"][
"Hour"] # 参量质变仪回复时间
coin_minute, coin_second = divmod(int(coin_recovery_time), 60)
coin_hour, coin_minute = divmod(coin_minute, 60)
#print(data)
minute, second = divmod(int(resin_recovery_time), 60) minute, second = divmod(int(resin_recovery_time), 60)
hour, minute = divmod(minute, 60) hour, minute = divmod(minute, 60)
A = BuildImage(1030, 520, color="#f1e9e1", font_size=15, font="HYWenHei-85W.ttf") A = BuildImage(1030,
570,
color="#f1e9e1",
font_size=15,
font="HYWenHei-85W.ttf")
A.text((10, 15), "原神便笺 | Create By ZhenXun", (198, 186, 177)) A.text((10, 15), "原神便笺 | Create By ZhenXun", (198, 186, 177))
ava = BuildImage(100, 100, background=user_avatar) ava = BuildImage(100, 100, background=user_avatar)
ava.circle() ava.circle()
A.paste(ava, (40, 40), True) A.paste(ava, (40, 40), True)
A.paste( A.paste(
BuildImage(0, 0, plain_text=uname, font_size=20, font="HYWenHei-85W.ttf"), BuildImage(0,
0,
plain_text=uname,
font_size=20,
font="HYWenHei-85W.ttf"),
(160, 62), (160, 62),
True, True,
) )
@ -177,26 +213,52 @@ def _parse_data_and_draw(
"今日委托已全部完成" if finished_task_num == total_task_num else "今日委托完成数量不足", "今日委托已全部完成" if finished_task_num == total_task_num else "今日委托完成数量不足",
f"{finished_task_num}/{total_task_num}", f"{finished_task_num}/{total_task_num}",
) )
A.paste(border, (10, 265)) A.paste(border, (10, 235))
border = create_border( border = create_border(
"resin_discount.png", "resin_discount.png",
"值得铭记的强敌", "值得铭记的强敌",
"本周剩余消耗减半次数", "本周剩余消耗减半次数",
f"{remain_resin_discount_num}/{resin_discount_num_limit}", f"{remain_resin_discount_num}/{resin_discount_num_limit}",
) )
A.paste(border, (10, 375)) A.paste(border, (10, 315))
expeditions_border = BuildImage( border = create_border(
470, 430, color="#E0D9D1", font="HYWenHei-85W.ttf", font_size=20 "chengehu.png",
"洞天财翁-洞天宝钱",
"洞天财翁已达到存储上限"
if current_coin == max_coin else f"{coin_hour}小时{coin_minute}分钟后存满",
f"{current_coin}/{max_coin}",
) )
expeditions_text = BuildImage( A.paste(border, (10, 395))
466, 426, color="#F5F1EB", font_size=23, font="HYWenHei-85W.ttf" border = create_border(
"zhibian.png",
"参量质变仪",
"不存在" if not transformer_available else
"已准备完成 " if transformer_state else f"{transformer_recovery_hour}小时后可使用" if not transformer_recovery_time else f"{transformer_recovery_time}天后可使用",
"不存在" if not transformer_available else
"可使用" if transformer_state else "冷却中",
) )
A.paste(border, (10, 475))
expeditions_border = BuildImage(470,
510,
color="#E0D9D1",
font="HYWenHei-85W.ttf",
font_size=20)
expeditions_text = BuildImage(466,
506,
color="#F5F1EB",
font_size=23,
font="HYWenHei-85W.ttf")
expeditions_text.text( expeditions_text.text(
(5, 5), f"探索派遣限制{current_expedition_num}/{max_expedition_num}", (100, 100, 98) (5, 5), f"探索派遣限制{current_expedition_num}/{max_expedition_num}",
) (100, 100, 98))
h = 45 h = 45
for x in expeditions: for x in expeditions:
_bk = BuildImage(400, 66, color="#ECE3D8", font="HYWenHei-85W.ttf", font_size=21) _bk = BuildImage(400,
82,
color="#ECE3D8",
font="HYWenHei-85W.ttf",
font_size=21)
file_name = x["avatar_side_icon"].split("_")[-1] file_name = x["avatar_side_icon"].split("_")[-1]
role_avatar = memo_path / "role_avatar" / file_name role_avatar = memo_path / "role_avatar" / file_name
_ava_img = BuildImage(75, 75, background=role_avatar) _ava_img = BuildImage(75, 75, background=role_avatar)
@ -227,7 +289,7 @@ def _parse_data_and_draw(
_bk.circle_corner(20) _bk.circle_corner(20)
expeditions_text.paste(_bk, (25, h), True) expeditions_text.paste(_bk, (25, h), True)
h += 75 h += 75 + 16
expeditions_border.paste(expeditions_text, center_type="center") expeditions_border.paste(expeditions_text, center_type="center")

View File

@ -27,7 +27,7 @@ __plugin_settings__ = {
} }
one_friend = on_regex( one_friend = on_regex(
"^我.*?朋友.*?[想问问|说|让我问问|想问|让我问|想知道|让我帮他问问|让我帮他问|让我帮忙问|让我帮忙问问|问](.*)", "^我.{0,4}朋友.{0,2}(?:想问问|说|让我问问|想问|让我问|想知道|让我帮他问问|让我帮他问|让我帮忙问|让我帮忙问问|问)(.{0,30})$",
priority=4, priority=4,
block=True, block=True,
) )

View File

@ -43,6 +43,7 @@ usage
__plugin_superuser_usage__ = """ __plugin_superuser_usage__ = """
usage usage
更新皮肤指令 更新皮肤指令
重置开箱 重置今日开箱所有次数
指令 指令
更新开箱图片 ?[武器箱] 更新开箱图片 ?[武器箱]
更新开箱价格 ?[武器箱] 更新开箱价格 ?[武器箱]
@ -100,6 +101,13 @@ cases_matcher_group = MatcherGroup(priority=5, permission=GROUP, block=True)
k_open_case = cases_matcher_group.on_command("开箱") k_open_case = cases_matcher_group.on_command("开箱")
reload_count = cases_matcher_group.on_command("重置开箱")
@reload_count.handle()
async def _(event: GroupMessageEvent):
await update_count_daily()
@k_open_case.handle() @k_open_case.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()): async def _(event: GroupMessageEvent, arg: Message = CommandArg()):

View File

@ -114,7 +114,7 @@ async def _(event: GroupMessageEvent):
msg = msg[index + 2 : index + 11] msg = msg[index + 2 : index + 11]
if is_number(msg): if is_number(msg):
url = f"https://www.bilibili.com/video/{msg}" url = f"https://www.bilibili.com/video/{msg}"
vd_info = await video.get_video_base_info(msg) vd_info = await video.get_video_base_info('av' + msg)
elif "https://b23.tv" in msg: elif "https://b23.tv" in msg:
url = "https://" + msg[msg.find("b23.tv"): msg.find("b23.tv") + 14] url = "https://" + msg[msg.find("b23.tv"): msg.find("b23.tv") + 14]
async with aiohttp.ClientSession( async with aiohttp.ClientSession(

View File

@ -1,4 +1,5 @@
from configs.config import Config from configs.config import Config
from utils.manager import GDict
import nonebot import nonebot
@ -60,6 +61,8 @@ Config.add_plugin_config(
default_value=True default_value=True
) )
GDict['run_sql'].append("ALTER TABLE omega_pixiv_illusts ADD classified Integer;")
nonebot.load_plugins("plugins/pix_gallery") nonebot.load_plugins("plugins/pix_gallery")

View File

@ -1,5 +1,4 @@
from typing import Optional, List from typing import Optional, List, Tuple
from datetime import datetime
from services.db_context import db from services.db_context import db
@ -12,13 +11,12 @@ class OmegaPixivIllusts(db.Model):
uid = db.Column(db.BigInteger(), nullable=False) uid = db.Column(db.BigInteger(), nullable=False)
title = db.Column(db.String(), nullable=False) title = db.Column(db.String(), nullable=False)
uname = db.Column(db.String(), nullable=False) uname = db.Column(db.String(), nullable=False)
classified = db.Column(db.Integer(), nullable=False)
nsfw_tag = db.Column(db.Integer(), nullable=False) nsfw_tag = db.Column(db.Integer(), nullable=False)
width = db.Column(db.Integer(), nullable=False) width = db.Column(db.Integer(), nullable=False)
height = db.Column(db.Integer(), nullable=False) height = db.Column(db.Integer(), nullable=False)
tags = db.Column(db.String(), nullable=False) tags = db.Column(db.String(), nullable=False)
url = db.Column(db.String(), nullable=False) url = db.Column(db.String(), nullable=False)
created_at = db.Column(db.DateTime(timezone=True))
updated_at = db.Column(db.DateTime(timezone=True))
_idx1 = db.Index("omega_pixiv_illusts_idx1", "pid", "url", unique=True) _idx1 = db.Index("omega_pixiv_illusts_idx1", "pid", "url", unique=True)
@ -32,10 +30,9 @@ class OmegaPixivIllusts(db.Model):
url: str, url: str,
uid: int, uid: int,
uname: str, uname: str,
classified: int,
nsfw_tag: int, nsfw_tag: int,
tags: str, tags: str,
created_at: datetime,
updated_at: datetime,
): ):
""" """
说明: 说明:
@ -48,10 +45,9 @@ class OmegaPixivIllusts(db.Model):
:param url: url链接 :param url: url链接
:param uid: 作者uid :param uid: 作者uid
:param uname: 作者名称 :param uname: 作者名称
:param nsfw_tag: nsfw标签, 0=safe, 1=setu. 2=r18 :param classified: 标记标签, 0=未标记, 1=已人工标记或从可信已标记来源获取
:param nsfw_tag: nsfw标签,-1=未标记, 0=safe, 1=setu. 2=r18
:param tags: 相关tag :param tags: 相关tag
:param created_at: 创建日期
:param updated_at: 更新日期
""" """
if not await cls.check_exists(pid): if not await cls.check_exists(pid):
await cls.create( await cls.create(
@ -62,6 +58,7 @@ class OmegaPixivIllusts(db.Model):
url=url, url=url,
uid=uid, uid=uid,
uname=uname, uname=uname,
classified=classified,
nsfw_tag=nsfw_tag, nsfw_tag=nsfw_tag,
tags=tags, tags=tags,
) )
@ -113,7 +110,7 @@ class OmegaPixivIllusts(db.Model):
return bool(query) return bool(query)
@classmethod @classmethod
async def get_keyword_num(cls, tags: List[str] = None) -> "int, int, int": async def get_keyword_num(cls, tags: List[str] = None) -> Tuple[int, int, int]:
""" """
说明: 说明:
获取相关关键词(keyword, tag)在图库中的数量 获取相关关键词(keyword, tag)在图库中的数量

View File

@ -1,3 +1,5 @@
import re
from nonebot import on_command from nonebot import on_command
from utils.utils import is_number from utils.utils import is_number
from nonebot.permission import SUPERUSER from nonebot.permission import SUPERUSER
@ -11,7 +13,6 @@ import time
from services.log import logger from services.log import logger
from pathlib import Path from pathlib import Path
from typing import List from typing import List
from datetime import datetime
import asyncio import asyncio
import os import os
@ -144,38 +145,36 @@ async def _(arg: Message = CommandArg()):
@check_omega.handle() @check_omega.handle()
async def _(): async def _():
async def _tasks(line: str, all_pid: List[int], length: int, index: int): async def _tasks(line: str, all_pid: List[int], length: int, index: int):
data = line.split("VALUES", maxsplit=1)[-1].strip() data = line.split("VALUES", maxsplit=1)[-1].strip()[1:-2]
if data.startswith("("): num_list = re.findall(r'(\d+)', data)
data = data[1:] pid = int(num_list[1])
if data.endswith(");"): uid = int(num_list[2])
data = data[:-2] id_ = 3
x = data.split(maxsplit=3) while num_list[id_] not in ['0', '1']:
pid = int(x[1][:-1].strip()) id_ += 1
classified = int(num_list[id_])
nsfw_tag = int(num_list[id_ + 1])
width = int(num_list[id_ + 2])
height = int(num_list[id_ + 3])
str_list = re.findall(r"'(.*?)',", data)
title = str_list[0]
uname = str_list[1]
tags = str_list[2]
url = str_list[3]
if pid in all_pid: if pid in all_pid:
logger.info(f"添加OmegaPixivIllusts图库数据已存在 ---> pid{pid}") logger.info(f"添加OmegaPixivIllusts图库数据已存在 ---> pid{pid}")
return return
uid = int(x[2][:-1].strip())
x = x[3].split(", '")
title = x[0].strip()[1:-1]
tmp = x[1].split(", ")
author = tmp[0].strip()[:-1]
nsfw_tag = int(tmp[1])
width = int(tmp[2])
height = int(tmp[3])
tags = x[2][:-1]
url = x[3][:-1]
if await OmegaPixivIllusts.add_image_data( if await OmegaPixivIllusts.add_image_data(
pid, pid=pid,
title, title=title,
width, width=width,
height, height=height,
url, url=url,
uid, uid=uid,
author, nsfw_tag=nsfw_tag,
nsfw_tag, tags=tags,
tags, uname=uname,
datetime.min, classified=classified
datetime.min,
): ):
logger.info( logger.info(
f"成功添加OmegaPixivIllusts图库数据 pid{pid} 本次预计存储 {length} 张,已更新第 {index}" f"成功添加OmegaPixivIllusts图库数据 pid{pid} 本次预计存储 {length} 张,已更新第 {index}"
@ -197,6 +196,7 @@ async def _():
for line in lines: for line in lines:
if "INSERT INTO" in line.upper(): if "INSERT INTO" in line.upper():
index += 1 index += 1
logger.info(f'line: {line} 加入更新计划')
tasks.append( tasks.append(
asyncio.ensure_future(_tasks(line, all_pid, length, index)) asyncio.ensure_future(_tasks(line, all_pid, length, index))
) )

View File

@ -1,4 +1,4 @@
from nonebot import on_command from nonebot import on_regex
from services.log import logger from services.log import logger
from nonebot.adapters.onebot.v11 import Bot, MessageEvent, GroupMessageEvent from nonebot.adapters.onebot.v11 import Bot, MessageEvent, GroupMessageEvent
from nonebot.typing import T_State from nonebot.typing import T_State
@ -24,7 +24,7 @@ __plugin_settings__ = {
} }
quotations = on_command("语录", aliases={"二次元", "二次元语录"}, priority=5, block=True) quotations = on_regex("^(语录|二次元)$", priority=5, block=True)
url = "https://international.v1.hitokoto.cn/?c=a" url = "https://international.v1.hitokoto.cn/?c=a"

View File

@ -24,8 +24,6 @@ from io import BytesIO
import asyncio import asyncio
import random import random
import nonebot import nonebot
import time
import locale
import os import os

View File

@ -10,25 +10,4 @@ Config.add_plugin_config(
default_value=5 default_value=5
) )
Config.add_plugin_config(
"word_bank",
"WORD_BANK_FUZZY",
False,
help_="模糊匹配",
default_value=False
)
Config.add_plugin_config(
"word_bank",
"WORD_BANK_KEY",
True,
help_="关键字匹配",
default_value=True
)
Config.add_plugin_config(
"word_bank",
"WORD_BANK_MIX",
25,
help_="查看词条时图片内最多显示条数",
default_value=25
)
nonebot.load_plugins("plugins/word_bank") nonebot.load_plugins("plugins/word_bank")

View File

@ -0,0 +1,23 @@
scope2int = {
"全局": 0,
"群聊": 1,
"私聊": 2,
}
type2int = {
"精准": 0,
"模糊": 1,
"正则": 2,
"图片": 3,
}
int2type = {
0: "精准",
1: "模糊",
2: "正则",
3: "图片",
}

View File

@ -1,54 +1,211 @@
from .model import WordBank from pathlib import Path
from typing import Union
from nonebot.adapters.onebot.v11 import Message, MessageSegment
from services import logger
from utils.image_utils import text2image
from utils.message_builder import image
from ._model import WordBank
from typing import Optional, Tuple, Union, List, Any
from utils.utils import is_number
import nonebot
driver = nonebot.get_driver()
class WordBankBuilder: async def get_problem_str(
id_: Union[str, int], group_id: Optional[int] = None, word_scope: int = 1
def __init__(self, user_id: int, group_id: int, problem: str): ) -> Tuple[str, int]:
self._data = {
"user_id": user_id,
"group_id": group_id}
self.problem = problem
def set_placeholder(self, id_: int, placeholder: Union[str, int]):
""" """
设置占位符 说明:
:param id_: 站位id 通过id获取问题字符串
:param placeholder: 占位符内容 参数:
:param id_: 下标
:param group_id: 群号
:param word_scope: 获取类型
""" """
if self._data.get("placeholder") is None: if word_scope in [0, 2]:
self._data["placeholder"] = [] all_problem = await WordBank.get_problem_by_scope(word_scope)
self._data["placeholder"].append((id_, placeholder)) else:
all_problem = await WordBank.get_group_all_problem(group_id)
if id_.startswith("id:"):
id_ = id_.split(":")[-1]
if not is_number(id_) or int(id_) < 0 or int(id_) > len(all_problem):
return "id必须为数字且在范围内", 999
return all_problem[int(id_)][0], 200
def set_answer(self, answer: str):
async def update_word(params: str, group_id: Optional[int] = None, word_scope: int = 1) -> str:
""" """
设置回答 说明:
:param answer: 回答 修改群词条
参数:
:param params: 参数
:param group_id: 群号
:param word_scope: 词条范围
""" """
self._data["answer"] = answer return await word_handle(params, group_id, "update", word_scope)
def set_problem(self, problem: str):
async def delete_word(params: str, group_id: Optional[int] = None, word_scope: int = 1) -> str:
""" """
设置问题 说明:
:param problem: 问题 删除群词条
参数:
:param params: 参数
:param group_id: 群号
:param word_scope: 词条范围
""" """
self._data["problem"] = problem return await word_handle(params, group_id, "delete", word_scope)
async def word_handle(params: str, group_id: Optional[int], type_: str, word_scope: int = 0) -> str:
"""
说明:
词条操作
参数:
:param params: 参数
:param group_id: 群号
:param type_: 类型
:param word_scope: 词条范围
"""
params = params.split()
problem = params[0]
if problem.startswith("id:"):
problem, code = await get_problem_str(problem, group_id, word_scope)
if code != 200:
return problem
if type_ == "delete":
index = params[1] if len(params) > 1 else None
if index:
answer_num = len(await WordBank.get_problem_all_answer(problem, group_id))
if not is_number(index) or int(index) < 0 or int(index) > answer_num:
return "指定回答下标id必须为数字且在范围内"
index = int(index)
await WordBank.delete_group_problem(problem, group_id, index, word_scope)
return "删除词条成功"
if type_ == "update":
replace_str = params[1]
await WordBank.update_group_problem(problem, replace_str, group_id, word_scope=word_scope)
return "修改词条成功"
async def show_word(
problem: str,
id_: Optional[int],
gid: Optional[int],
group_id: Optional[int] = None,
word_scope: Optional[int] = None,
) -> Union[str, List[Union[str, Message]]]:
if problem:
msg_list = []
if word_scope is not None:
problem = (await WordBank.get_problem_by_scope(word_scope))[id_][0]
id_ = None
_problem_list = await WordBank.get_problem_all_answer(
problem, id_ if id_ is not None else gid, group_id if gid is None else None, word_scope
)
for index, msg in enumerate(_problem_list):
if isinstance(msg, Message):
temp = ""
for seg in msg:
if seg.type == "text":
temp += seg
elif seg.type == "face":
temp += f"[face:{seg.data.id}]"
elif seg.type == "at":
temp += f'[at:{seg.data["qq"]}]'
elif seg.type == "image":
temp += f"[image]"
msg += temp
msg_list.append(f"{index}." + msg if isinstance(msg, str) else msg[1])
msg_list = [
f'词条:{problem or (f"id: {id_}" if id_ is not None else f"gid: {gid}")} 的回答'
] + msg_list
return msg_list
else:
if group_id:
_problem_list = await WordBank.get_group_all_problem(group_id)
else:
_problem_list = await WordBank.get_problem_by_scope(word_scope)
global_problem_list = await WordBank.get_problem_by_scope(0)
if not _problem_list and not global_problem_list:
return "未收录任何词条.."
msg_list = await build_message(_problem_list)
global_msg_list = await build_message(global_problem_list)
if global_msg_list:
msg_list.append("###以下为全局词条###")
msg_list = msg_list + global_msg_list
return msg_list
async def build_message(_problem_list: List[Tuple[Any, Union[MessageSegment, str]]]):
index = 0
str_temp_list = []
msg_list = []
temp_str = ""
for _, problem in _problem_list:
if len(temp_str.split("\n")) > 50:
img = await text2image(
temp_str,
padding=10,
color="#f9f6f2",
)
msg_list.append(image(b64=img.pic2bs4()))
temp_str = ""
if isinstance(problem, str):
if problem not in str_temp_list:
str_temp_list.append(problem)
temp_str += f"{index}. {problem}\n"
else:
if temp_str:
img = await text2image(
temp_str,
padding=10,
color="#f9f6f2",
)
msg_list.append(image(b64=img.pic2bs4()))
temp_str = ""
msg_list.append(f"{index}." + problem)
index += 1
if temp_str:
img = await text2image(
temp_str,
padding=10,
color="#f9f6f2",
)
msg_list.append(image(b64=img.pic2bs4()))
return msg_list
@driver.on_startup
async def _():
try:
from ._old_model import WordBank as OldWordBank
except ModuleNotFoundError:
return
if await WordBank.get_group_all_problem(0):
return
logger.info('开始迁移词条 纯文本 数据')
try:
word_list = await OldWordBank.get_all()
for word in word_list:
problem: str = word.problem
user_id = word.user_qq
group_id = word.group_id
format_ = word.format
answer = word.answer
# 仅对纯文本做处理
if '[CQ' not in problem and '[CQ' not in answer and '[_to_me' not in problem:
if not format_:
await WordBank.add_problem_answer(user_id, group_id, 1, 0, problem, answer)
await WordBank.add_problem_answer(0, 0, 999, 0, '_[OK', '_[OK')
logger.info('词条 纯文本 数据迁移完成')
(Path() / 'plugins' / 'word_bank' / '_old_model.py').unlink()
except Exception as e:
logger.warning(f'迁移词条发生错误,如果为首次安装请无视 {type(e)}{e}')
async def save(self, search_type):
user_id = self._data["user_id"]
group_id = self._data["group_id"]
problem = self._data["problem"]
answer = self._data["answer"]
placeholder = self._data.get("placeholder")
return await WordBank.add_problem_answer(user_id, group_id, search_type, problem, answer, placeholder)
async def update(self, index):
user_id = self._data["user_id"]
group_id = self._data["group_id"]
problem = self._data["problem"]
answer = self._data["answer"]
placeholder = self._data.get("placeholder")
return await WordBank.update_problem_answer(user_id, group_id, problem, answer, index, placeholder)
def __str__(self):
return str(self._data)

481
plugins/word_bank/_model.py Normal file
View File

@ -0,0 +1,481 @@
import time
from nonebot.internal.adapter.template import MessageTemplate
from nonebot.adapters.onebot.v11 import (
Message,
MessageEvent,
GroupMessageEvent,
MessageSegment,
)
from services.db_context import db
from typing import Optional, List, Union, Tuple, Any
from datetime import datetime
from configs.path_config import DATA_PATH
import random
from ._config import int2type
from utils.image_utils import get_img_hash
from utils.http_utils import AsyncHttpx
import re
from utils.message_builder import image, face, at
from utils.utils import get_message_img
path = DATA_PATH / "word_bank"
class WordBank(db.Model):
__tablename__ = "word_bank2"
id = db.Column(db.Integer(), primary_key=True)
user_qq = db.Column(db.BigInteger(), nullable=False)
group_id = db.Column(db.Integer())
word_scope = db.Column(
db.Integer(), nullable=False, default=0
) # 生效范围 0: 全局 1: 群聊 2: 私聊
word_type = db.Column(
db.Integer(), nullable=False, default=0
) # 词条类型 0: 完全匹配 1: 模糊 2: 正则 3: 图片
status = db.Column(db.Boolean(), nullable=False, default=True) # 词条状态
problem = db.Column(db.String(), nullable=False) # 问题为图片时使用图片hash
answer = db.Column(db.String(), nullable=False) # 回答
placeholder = db.Column(db.String()) # 占位符
image_path = db.Column(db.String()) # 使用图片作为问题时图片存储的路径
create_time = db.Column(db.DateTime(), nullable=False)
update_time = db.Column(db.DateTime(), nullable=False)
@classmethod
async def exists(
cls,
user_id: Optional[int],
group_id: Optional[int],
problem: str,
word_scope: Optional[int] = None,
word_type: Optional[int] = None,
) -> bool:
"""
说明:
检测问题是否存在
参数:
:param user_id: 用户id
:param group_id: 群号
:param problem: 问题
:param word_scope: 词条范围
:param word_type: 词条类型
"""
query = cls.query.where(cls.problem == problem)
if user_id:
query = query.where(cls.user_qq == user_id)
if group_id:
query = query.where(cls.group_id == group_id)
if word_type:
query = query.where(cls.word_type == word_type)
if word_scope:
query = query.where(cls.word_scope == word_scope)
return bool(await query.gino.first())
@classmethod
async def add_problem_answer(
cls,
user_id: int,
group_id: Optional[int],
word_scope: int,
word_type: int,
problem: Union[str, Message],
answer: Union[str, Message],
):
"""
说明:
添加或新增一个问答
参数:
:param user_id: 用户id
:param group_id: 群号
:param word_scope: 词条范围,
:param word_type: 词条类型,
:param problem: 问题
:param answer: 回答
"""
# 对图片做额外处理
image_path = None
if word_type == 3:
url = get_message_img(problem)[0]
_file = (
path / "problem" / f"{group_id}" / f"{user_id}_{int(time.time())}.jpg"
)
_file.parent.mkdir(exist_ok=True, parents=True)
await AsyncHttpx.download_file(url, _file)
problem = str(get_img_hash(_file))
image_path = f"problem/{group_id}/{user_id}_{int(time.time())}.jpg"
answer, _list = await cls._answer2format(answer, user_id, group_id)
await cls.create(
user_qq=user_id,
group_id=group_id,
word_scope=word_scope,
word_type=word_type,
status=True,
problem=problem,
answer=answer,
image_path=image_path,
placeholder=",".join(_list),
create_time=datetime.now().replace(microsecond=0),
update_time=datetime.now().replace(microsecond=0),
)
@classmethod
async def _answer2format(
cls, answer: Union[str, Message], user_id: int, group_id: int
) -> Tuple[str, List[Any]]:
"""
说明:
将CQ码转化为占位符
参数:
:param answer: 回答内容
:param user_id: 用户id
:param group_id: 群号
"""
if isinstance(answer, str):
return answer, []
_list = []
text = ""
index = 0
for seg in answer:
if isinstance(seg, str):
text += seg
elif seg.type == "text":
text += seg.data["text"]
elif seg.type == "face":
text += f"[face:placeholder_{index}]"
_list.append(seg.data.id)
elif seg.type == "at":
text += f"[at:placeholder_{index}]"
_list.append(seg.data["qq"])
else:
text += f"[image:placeholder_{index}]"
index += 1
t = int(time.time())
_file = path / "answer" / f"{group_id}" / f"{user_id}_{t}.jpg"
_file.parent.mkdir(exist_ok=True, parents=True)
await AsyncHttpx.download_file(seg.data["url"], _file)
_list.append(f"answer/{group_id}/{user_id}_{t}.jpg")
return text, _list
@classmethod
async def _format2answer(
cls,
problem: str,
answer: Union[str, Message],
user_id: int,
group_id: int,
query: Optional["WordBank"] = None,
) -> Union[str, Message]:
"""
说明:
将占位符转换为CQ码
参数:
:param problem: 问题内容
:param answer: 回答内容
:param user_id: 用户id
:param group_id: 群号
"""
if query:
answer = query.answer
else:
query = await cls.query.where(
(cls.problem == problem)
& (cls.user_qq == user_id)
& (cls.group_id == group_id)
& (cls.answer == answer)
).gino.first()
if query and query.placeholder:
type_list = re.findall(rf"\[(.*):placeholder_.*]", answer)
temp_answer = re.sub(rf"\[(.*):placeholder_.*]", "{}", answer)
seg_list = []
for t, p in zip(type_list, query.placeholder.split(",")):
if t == "image":
seg_list.append(image(path / p))
elif t == "face":
seg_list.append(face(p))
elif t == "at":
seg_list.append(at(p))
return MessageTemplate(temp_answer, Message).format(*seg_list)
return answer
@classmethod
async def check(
cls,
event: MessageEvent,
problem: str,
word_scope: Optional[int] = None,
word_type: Optional[int] = None,
) -> Optional[Any]:
"""
说明:
检测是否包含该问题并获取所有回答
参数:
:param event: event
:param problem: 问题内容
:param word_scope: 词条范围
:param word_type: 词条类型
"""
query = cls.query
sql_text = "SELECT * FROM public.word_bank2 where 1 = 1"
# 救命没找到gino的正则表达式方法暂时使用sql语句
if isinstance(event, GroupMessageEvent):
if word_scope:
query = query.where(cls.word_scope == word_scope)
sql_text += f" and word_scope = {word_scope}"
else:
query = query.where(
(cls.group_id == event.group_id) | (cls.word_scope == 0)
)
sql_text += f" and (group_id = {event.group_id} or word_scope = 0)"
else:
query = query.where((cls.word_scope == 2) | (cls.word_scope == 0))
sql_text += f" and (word_scope = 2 or word_scope = 0)"
if word_type:
query = query.where(cls.word_scope == word_type)
sql_text += f" and word_scope = {word_scope}"
# 完全匹配
if await query.where(
((cls.word_type == 0) | (cls.word_type == 3)) & (cls.problem == problem)
).gino.first():
return query.where(
((cls.word_type == 0) | (cls.word_type == 3)) & (cls.problem == problem)
)
# 模糊匹配
if await db.first(
db.text(
sql_text
+ f" and word_type = 1 and :problem like '%' || problem || '%';"
),
problem=problem,
):
return (
sql_text
+ f" and word_type = 1 and :problem like '%' || problem || '%';"
)
# 正则匹配
if await db.first(
db.text(
sql_text
+ f" and word_type = 2 and word_scope != 999 and :problem ~ problem;"
),
problem=problem,
):
return (
sql_text
+ f" and word_type = 2 and word_scope != 999 and :problem ~ problem;"
)
# if await db.first(
# db.text(sql_text + f" and word_type = 1 and word_scope != 999 and '{problem}' ~ problem;")
# ):
# return sql_text + f" and word_type = 1 and word_scope != 999 and '{problem}' ~ problem;"
# return None
@classmethod
async def get_answer(
cls,
event: MessageEvent,
problem: str,
word_scope: Optional[int] = None,
word_type: Optional[int] = None,
) -> Optional[Union[str, Message]]:
"""
说明:
根据问题内容获取随机回答
参数:
:param event: event
:param problem: 问题内容
:param word_scope: 词条范围
:param word_type: 词条类型
"""
query = await cls.check(event, problem, word_scope, word_type)
if query is not None:
if isinstance(query, str):
answer_list = await db.all(db.text(query), problem=problem)
answer = random.choice(answer_list)
return (
await cls._format2answer(problem, answer[7], answer[1], answer[2])
if answer.placeholder
else answer.answer
)
else:
answer_list = await query.gino.all()
answer = random.choice(answer_list)
return (
await cls._format2answer(
problem, answer.answer, answer.user_qq, answer.group_id
)
if answer.placeholder
else answer.answer
)
@classmethod
async def get_problem_all_answer(
cls,
problem: str,
index: Optional[int] = None,
group_id: Optional[int] = None,
word_scope: Optional[int] = 0,
) -> List[Union[str, Message]]:
"""
说明:
获取指定问题所有回答
参数:
:param problem: 问题
:param index: 下标
:param group_id: 群号
:param word_scope: 词条范围
"""
if index is not None:
if group_id:
problem = (await cls.query.where(cls.group_id == group_id).gino.all())[
index
]
else:
problem = (
await cls.query.where(
cls.word_scope == (word_scope or 0)
).gino.all()
)[index]
problem = problem.problem
answer = cls.query.where(cls.problem == problem)
if group_id:
answer = answer.where(cls.group_id == group_id)
return [
await cls._format2answer("", "", 0, 0, x) for x in (await answer.gino.all())
]
@classmethod
async def delete_group_problem(
cls,
problem: str,
group_id: int,
index: Optional[int] = None,
word_scope: int = 1,
):
"""
说明:
删除指定问题全部或指定回答
参数:
:param problem: 问题文本
:param group_id: 群号
:param index: 回答下标
:param word_scope: 词条范围
"""
if index is not None:
if group_id:
query = await cls.query.where(
(cls.group_id == group_id) & (cls.problem == problem)
).gino.all()
else:
query = await cls.query.where(
(cls.word_scope == 0) & (cls.problem == problem)
).gino.all()
await query[index].delete()
else:
if group_id:
await WordBank.delete.where(
(cls.group_id == group_id) & (cls.problem == problem)
).gino.status()
else:
await WordBank.delete.where(
(cls.word_scope == word_scope) & (cls.problem == problem)
).gino.status()
@classmethod
async def update_group_problem(
cls,
problem: str,
replace_str: str,
group_id: int,
index: Optional[int] = None,
word_scope: int = 1,
):
"""
说明:
修改词条问题
参数:
:param problem: 问题
:param replace_str: 替换问题
:param group_id: 群号
:param index: 下标
:param word_scope: 词条范围
"""
if index is not None:
if group_id:
query = await cls.query.where(
(cls.group_id == group_id) & (cls.problem == problem)
).gino.all()
else:
query = await cls.query.where(
(cls.word_scope == word_scope) & (cls.problem == problem)
).gino.all()
await query[index].update(problem=replace_str).apply()
else:
if group_id:
await WordBank.update.values(problem=replace_str).where(
(cls.group_id == group_id) & (cls.problem == problem)
).gino.status()
else:
await WordBank.update.values(problem=replace_str).where(
(cls.word_scope == word_scope) & (cls.problem == problem)
).gino.status()
@classmethod
async def get_group_all_problem(
cls, group_id: int
) -> List[Tuple[Any, Union[MessageSegment, str]]]:
"""
说明:
获取群聊所有词条
参数:
:param group_id: 群号
"""
return cls._handle_problem(
await cls.query.where(cls.group_id == group_id).gino.all()
)
@classmethod
async def get_problem_by_scope(cls, word_scope: int):
"""
说明:
通过词条范围获取词条
参数:
:param word_scope: 词条范围
"""
return cls._handle_problem(
await cls.query.where(cls.word_scope == word_scope).gino.all()
)
@classmethod
async def get_problem_by_type(cls, word_type: int):
"""
说明:
通过词条类型获取词条
参数:
:param word_type: 词条类型
"""
return cls._handle_problem(
await cls.query.where(cls.word_type == word_type).gino.all()
)
@classmethod
def _handle_problem(cls, msg_list: List[Union[str, MessageSegment]]):
"""
说明:
格式化处理问题
参数:
:param msg_list: 消息列表
"""
_tmp = []
problem_list = []
for q in msg_list:
if q.problem not in _tmp:
problem = (
q.problem,
image(path / q.image_path)
if q.image_path
else f"[{int2type[q.word_type]}] " + q.problem,
)
problem_list.append(problem)
_tmp.append(q.problem)
return problem_list

View File

@ -0,0 +1,20 @@
from services.db_context import db
from typing import List
class WordBank(db.Model):
__tablename__ = "word_bank"
user_qq = db.Column(db.BigInteger(), nullable=False)
group_id = db.Column(db.Integer())
search_type = db.Column(db.Integer(), nullable=False, default=0)
problem = db.Column(db.String(), nullable=False)
answer = db.Column(db.String(), nullable=False)
format = db.Column(db.String())
create_time = db.Column(db.DateTime(), nullable=False)
update_time = db.Column(db.DateTime(), nullable=False)
@classmethod
async def get_all(cls) -> List['WordBank']:
return await cls.query.gino.all()

View File

@ -1,18 +1,36 @@
import re import imagehash
from nonebot.adapters.onebot.v11 import GroupMessageEvent, Event from PIL import Image
from utils.utils import get_message_img_file from io import BytesIO
from .model import WordBank from httpx import TimeoutException
from nonebot.typing import T_State
from nonebot.adapters.onebot.v11 import MessageEvent
from utils.utils import get_message_text, get_message_img, get_message_at
from ._model import WordBank
from utils.http_utils import AsyncHttpx
async def check(event: Event) -> bool: async def check(event: MessageEvent, state: T_State) -> bool:
if isinstance(event, GroupMessageEvent): text = get_message_text(event.message)
msg = event.raw_message img = get_message_img(event.message)
list_img = get_message_img_file(event.json()) at = get_message_at(event.message)
if list_img: problem = text
for img_file in list_img: if not text and len(img) == 1:
strinfo = re.compile(f"{img_file},.*?]") try:
msg = strinfo.sub(f'{img_file}]', msg) r = await AsyncHttpx.get(img[0])
strinfo_face = re.compile(f",type=sticker]") problem = str(imagehash.average_hash(Image.open(BytesIO(r.content))))
msg = strinfo_face.sub(f']', msg) except TimeoutException:
return bool(await WordBank.check(event.group_id, msg,)) pass
if at:
temp = ''
for seg in event.message:
if seg.type == 'at':
temp += f"[at:{seg.data['qq']}]"
elif seg.type == 'text':
temp += seg.data["text"]
problem = temp
if problem and (await WordBank.check(event, problem) is not None):
state["problem"] = problem
return True
return False return False

View File

@ -1,14 +1,10 @@
from utils.message_builder import image, at, face from services import logger
from typing import Tuple
from ._rule import check from ._rule import check
from .model import WordBank from ._model import WordBank
from configs.path_config import DATA_PATH from configs.path_config import DATA_PATH
from nonebot.adapters.onebot.v11 import GroupMessageEvent from nonebot.adapters.onebot.v11 import GroupMessageEvent, MessageEvent
from utils.utils import get_message_at, get_message_img, change_img_md5
from nonebot import on_message from nonebot import on_message
from models.group_member_info import GroupInfoUser from nonebot.typing import T_State
from utils.utils import get_message_img_file, is_number
import re
__zx_plugin_name__ = "词库问答回复操作 [Hidden]" __zx_plugin_name__ = "词库问答回复操作 [Hidden]"
@ -19,129 +15,12 @@ message_handle = on_message(priority=6, block=True, rule=check)
@message_handle.handle() @message_handle.handle()
async def _(event: GroupMessageEvent): async def _(event: MessageEvent, state: T_State):
msg = event.raw_message if problem := state.get("problem"):
list_img = get_message_img_file(event.json()) if msg := await WordBank.get_answer(event, problem):
if list_img: await message_handle.send(msg)
for img_file in list_img: logger.info(
strinfo = re.compile(f"{img_file},.*?]") f"(USER {event.user_id}, GROUP "
msg = strinfo.sub(f'{img_file}]', msg) f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
strinfo_face = re.compile(f",type=sticker]") f" 触发词条 {problem}"
msg = strinfo_face.sub(f']', msg)
q = await WordBank.check(event.group_id, msg, )
await message_handle.send(await get_one_answer(event, q.format, q.answer))
# 处理单条回答
async def get_one_answer(event, format: str, _answer: str, all: bool = True) -> str:
path = data_dir / f"{event.group_id}"
placeholder_list = (
[
(x.split("<_s>")[0], x.split("<_s>")[1])
for x in format.split("<format>")[:-1]
]
if format
else []
) )
answer = ""
_a = _answer
if not placeholder_list:
answer = _a
return answer
else:
for idx, placeholder in placeholder_list:
if placeholder.endswith("jpg"):
change_img_md5(path / placeholder)
answer += _a[: _a.find(f"[__placeholder_{idx}]")] + image(
path / placeholder
)
else:
if all:
answer += _a[: _a.find(f"[__placeholder_{idx}]")] + at(int(placeholder))
else:
q = await GroupInfoUser.get_member_info(
int(placeholder), event.group_id)
answer += _a[: _a.find(f"[__placeholder_{idx}]")] + "@" + q.user_name
_a = _a[_a.find(f"[__placeholder_{idx}]") + len(f"[__placeholder_{idx}]"):]
return answer + _a
# 处理单条问题
async def get_one_problem(event, problem: str, ) -> Tuple[str, str]:
strinfo = re.compile(f",subType=\d")
problem = strinfo.sub('', problem)
_problem = problem
_p = problem
problem = ''
for img in get_message_img(event.json()):
_x = img.split("?")[0]
r = re.search(rf"\[CQ:image,file=(.*),url={_x}.*?]", _p)
if r:
_problem = _problem.replace(
rf",url={img}",
f"",
)
problem += _p[: _p.find(f"[CQ:image,file={r.group(1)},url={img}]")] + image(img)
_p = _p[
_p.find(f"[CQ:image,file={r.group(1)},url={img}]") + len(f"[CQ:image,file={r.group(1)},url={img}]"):]
for at_ in get_message_at(event.json()):
r = re.search(rf"\[CQ:at,qq={at_}]", problem)
if r:
q = await GroupInfoUser.get_member_info(
int(at_), event.group_id)
problem += _p[: _p.find(f"[CQ:at,qq={at_}]")] + "@" + q.user_name
_p = _p[_p.find(f"[CQ:at,qq={at_}]") + len(f"[CQ:at,qq={at_}]"):]
return _problem, problem + _p
# 显示单条数据库问题
async def get_one_image_problem(event, problem: str) -> str:
path = data_dir / f"{event.group_id}" / "problem"
placeholder_list = []
idx = 0
img_list = re.findall(rf"\[CQ:image,file=(.*?)]", problem)
at_list = re.findall(rf"\[CQ:at,qq=(.*?)]", problem)
if img_list:
for img in img_list:
problem = problem.replace(f'[CQ:image,file={img}]', f'[__placeholder_{idx}]', 1)
placeholder_list.append([idx, img])
idx += 1
if at_list:
for ats in at_list:
problem = problem.replace(f'[CQ:at,qq={ats}]', f'[__placeholder_{idx}]', 1)
placeholder_list.append([idx, ats])
idx += 1
_p = problem
problem = ''
if not placeholder_list:
problem = _p
return problem
else:
for idx, placeholder in placeholder_list:
if is_number(placeholder):
q = await GroupInfoUser.get_member_info(
int(placeholder), event.group_id)
problem += _p[: _p.find(f"[__placeholder_{idx}]")] + "@" + q.user_name
else:
problem += _p[: _p.find(f"[__placeholder_{idx}]")] + image(
path / f"{placeholder}.jpg"
)
_p = _p[_p.find(f"[__placeholder_{idx}]") + len(f"[__placeholder_{idx}]"):]
return problem + _p
# 替换cq码
async def replace_cq(group_id, msg: str, is_face: bool = True) -> str:
strinfo_img = re.compile(f"\[CQ:image.*?]")
msg = strinfo_img.sub('[图片]', msg)
at_list = re.findall(rf"\[CQ:at,qq=(.*?)]", msg)
if at_list:
for ats in at_list:
q = await GroupInfoUser.get_member_info(
int(ats), group_id)
msg = msg.replace(f'[CQ:at,qq={ats}]', "@" + q.user_name)
if is_face:
strinfo_face = re.compile(f"\[CQ:face,id=.*?]")
msg = strinfo_face.sub('[表情]', msg)
return msg

View File

@ -1,269 +0,0 @@
from services.db_context import db
from typing import Optional, List, Union, Tuple
from datetime import datetime
from configs.path_config import DATA_PATH
import random
from configs.config import Config
class WordBank(db.Model):
__tablename__ = "word_bank"
user_qq = db.Column(db.BigInteger(), nullable=False)
group_id = db.Column(db.Integer())
search_type = db.Column(db.Integer(), nullable=False, default=0)
problem = db.Column(db.String(), nullable=False)
answer = db.Column(db.String(), nullable=False)
format = db.Column(db.String())
create_time = db.Column(db.DateTime(), nullable=False)
update_time = db.Column(db.DateTime(), nullable=False)
@classmethod
async def add_problem_answer(
cls,
user_id: int,
group_id: Optional[int],
search_type: [int],
problem: str,
answer: str,
format_: Optional[List[Tuple[int, Union[int, str]]]],
) -> bool:
"""
添加或新增一个问答
:param user_id: 用户id
:param group_id: 群号
:search_type: 问题类型,
:param problem: 问题
:param answer: 回答
:param format_: 格式化数据
"""
_str = None
if format_:
_str = ""
for x, y in format_:
_str += f"{x}<_s>{y}<format>"
return await cls._problem_answer_handle(
user_id, group_id, problem, "add", search_type=search_type, answer=answer, format_=_str
)
@classmethod
async def delete_problem_answer(
cls, user_id: int, group_id: Optional[int], problem: str, index: Optional[int]
) -> str:
"""
删除某问题一个或全部回答
:param user_id: 用户id
:param group_id: 群号
:param problem: 问题
:param index: 回答下标
"""
return await cls._problem_answer_handle(
user_id, group_id, problem, "delete", index=index
)
@classmethod
async def update_problem_answer(
cls,
user_id: int,
group_id: Optional[int],
problem: str,
answer: str,
index: Optional[int],
format_: Optional[List[Tuple[int, Union[int, str]]]],
) -> str:
"""
修改某问题一个或全部回答
:param user_id: 用户id
:param group_id: 群号
:param problem: 问题
:param index: 回答下标
"""
_str = None
if format_:
_str = ""
for x, y in format_:
_str += f"{x}<_s>{y}<format>"
return await cls._problem_answer_handle(
user_id, group_id, problem, "update", answer=answer, index=index, format_=_str
)
@classmethod
async def get_problem_answer(
cls, user_id: int, group_id: Optional[int], problem: str
) -> List[str]:
"""
获取问题的所有回答
:param user_id: 用户id
:param group_id: 群号
:param problem: 问题
"""
return await cls._problem_answer_handle(user_id, group_id, problem, "get")
@classmethod
async def get_group_all_answer(cls, group_id: int, problem: str) -> List[str]:
"""
获取群聊指定词条所有回答
:param group_id: 群号
:param problem: 问题
"""
q = await cls.query.where(
(cls.group_id == group_id) & (cls.problem == problem)
).gino.all()
return [(x.answer, x.format) for x in q] if q else None
@classmethod
async def get_group_all_problem(cls, group_id: int) -> List[str]:
"""
获取群聊所有词条
:param group_id: 群号
"""
q = await cls.query.where(cls.group_id == group_id).gino.all()
q = [x.problem for x in q]
q.sort()
_tmp = []
for problem in q:
_tmp.append(problem)
return list(set(_tmp))
@classmethod
async def check(cls, group_id: int, problem: str) -> Optional["WordBank"]:
"""
检测词条并随机返回
:param group_id: 群号
:param problem: 问题
"""
if problem:
FUZZY = Config.get_config("word_bank", "WORD_BANK_FUZZY")
KEY = Config.get_config("word_bank", "WORD_BANK_KEY")
q = await cls.query.where(
(cls.group_id == group_id) & (cls.problem == problem)
).gino.all()
if KEY and FUZZY:
q_fuzzy = await cls.query.where(
(cls.group_id == group_id) & (cls.search_type == 2) & (
cls.problem.contains(f'{problem}'))).gino.all()
q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all()
q_key = [x for x in q_key if str(x.problem) in (problem)]
q += q_fuzzy + q_key
elif FUZZY:
q_fuzzy = await cls.query.where(
(cls.group_id == group_id) & (cls.search_type == 2) & (
cls.problem.contains(f'{problem}'))).gino.all()
q += q_fuzzy
elif KEY:
q_key = await cls.query.where((cls.group_id == group_id) & (cls.search_type == 1)).gino.all()
q_key = [x for x in q_key if str(x.problem) in (problem)]
q += q_key
else:
return None
return random.choice(q) if q else None
@classmethod
async def _problem_answer_handle(
cls,
user_id: int,
group_id: Optional[int],
problem: str,
type_: str,
*,
search_type: [int] = 0,
answer: Optional[str] = None,
index: Optional[int] = None,
format_: Optional[str] = None,
) -> Union[List[Union[str, Tuple[str, str]]], bool, str]:
"""
添加或新增一个问答
:param user_id: 用户id
:param group_id: 群号
:param problem: 问题
:param type_: 操作类型
:param answer: 回答
:param format_: 格式化数据
"""
if problem.startswith("id:"):
problem_index = int(problem.split(":")[-1])
q = await cls.get_group_all_problem(group_id)
if not q:
return []
if len(q) > problem_index:
problem = q[problem_index]
if group_id:
q = cls.query.where((cls.group_id == group_id) & (cls.problem == problem))
else:
q = cls.query.where((cls.user_qq == user_id) & (cls.problem == problem))
if type_ == "add":
q = await q.where((cls.answer == answer) & (cls.search_type == search_type)).gino.all()
try:
if not q or ".jpg" in format_:
await cls.create(
user_qq=user_id,
group_id=group_id,
search_type=search_type,
problem=problem,
answer=answer,
format=format_,
create_time=datetime.now().replace(microsecond=0),
update_time=datetime.now().replace(microsecond=0),
)
except:
return False
return True
elif type_ == "delete":
q = await q.with_for_update().gino.all()
if q:
path = DATA_PATH / "word_bank" / f"{group_id}"
if index is not None:
q = [q[index]]
answer = "\n".join([x.answer for x in q])
for x in q:
format_ = x.format
if format_:
for sp in format_.split("<format>")[:-1]:
_, image_name = sp.split("<_s>")
if image_name.endswith("jpg"):
_path = path / image_name
if _path.exists():
_path.unlink()
await cls.delete.where(
(cls.update_time == x.update_time)
& (cls.problem == problem)
& (cls.answer == x.answer)
& (cls.group_id == group_id)
).gino.status()
return answer
elif type_ == "update":
new_format = format_
new_answer = answer
q = await q.with_for_update().gino.all()
if q:
path = DATA_PATH / "word_bank" / f"{group_id}"
if index is not None:
q = [q[index]]
else:
q = [q[0]]
for x in q:
format_ = x.format
if format_:
for sp in format_.split("<format>")[:-1]:
_, image_name = sp.split("<_s>")
if image_name.endswith("jpg"):
_path = path / image_name
if _path.exists():
_path.unlink()
await cls.update.values(answer=new_answer,
format=new_format,
update_time=datetime.now().replace(microsecond=0), ).where(
(cls.problem == problem)
& (cls.answer == x.answer)
& (cls.group_id == group_id)
& (cls.group_id == group_id)
& (cls.update_time == x.update_time)
).gino.status()
return True
elif type_ == "get":
q = await q.gino.all()
if q:
return [(x.answer, x.format.split("<format>")[:-1]) for x in q]
return False

View File

@ -0,0 +1,319 @@
import re
from typing import Tuple, Any, Optional
from nonebot.internal.params import Arg, ArgStr
from nonebot.typing import T_State
from utils.utils import get_message_at, is_number, get_message_img
from nonebot.params import CommandArg, RegexGroup, Command
from nonebot.exception import FinishedException
from services.log import logger
from configs.path_config import DATA_PATH
from utils.message_builder import custom_forward_msg
from ._model import WordBank
from nonebot.adapters.onebot.v11 import Bot, GroupMessageEvent, Message, MessageEvent, PrivateMessageEvent, unescape
from nonebot import on_command, on_regex
from configs.config import Config
from ._data_source import delete_word, update_word, show_word
from ._config import scope2int, type2int
__zx_plugin_name__ = "词库问答 [Admin]"
__plugin_usage__ = r"""
usage
对指定问题的随机回答对相同问题可以设置多个不同回答
删除词条后每个词条的id可能会变化请查看后再删除
更推荐使用id方式删除
问题回答支持的CQat, face, image
查看词条命令群聊时为 群词条+全局词条私聊时为 私聊词条+全局词条
添加词条正则添加词条(模糊|正则|图片)?\s*?(\S*\s?\S*)\s*?\s?(\S*)
指令
添加词条 ?[模糊|正则|图片]......添加问答词条可重复添加相同问题的不同回答
删除词条 [问题/下标] ?[下标]删除指定词条指定或全部回答
修改词条 [问题/下标] [新问题]修改词条问题
查看词条 ?[问题/下标]查看全部词条或对应词条回答
示例添加词条问图片答嗨嗨嗨
[图片]...
示例添加词条@萝莉 我来啦
示例添加词条问谁是萝莉答是我
示例删除词条 谁是萝莉
示例删除词条 谁是萝莉 0
示例删除词条 id:0 1
示例修改词条 谁是萝莉 是你
示例修改词条 id:0 是你
示例查看词条
示例查看词条 谁是萝莉
示例查看词条 id:0 (/私聊词条)
示例查看词条 gid:0 (全局词条)
""".strip()
__plugin_superuser_usage__ = r"""
usage:
在私聊中超级用户额外设置
指令
(全局|私聊)?添加词条\s*?(模糊|正则|图片)?\s*?(\S*\s?\S*)\s*?\s?(\S*)添加问答词条可重复添加相同问题的不同回答
全局添加词条
私聊添加词条
私聊情况下删除词条: 删除私聊词条
私聊情况下删除全局词条
私聊情况下修改词条: 修改词条私聊词条
私聊情况下修改全局词条
用法与普通用法相同
""".strip()
__plugin_des__ = "自定义词条内容随机回复"
__plugin_cmd__ = [
"添加词条 ?[模糊/关键字]问...答..",
"删除词条 [问题/下标] ?[下标]",
"修改词条 [问题/下标] ?[下标/新回答] [新回答]",
"查看词条 ?[问题/下标]",
]
__plugin_version__ = 0.3
__plugin_author__ = "HibiKier & yajiwa"
__plugin_settings__ = {
"admin_level": Config.get_config("word_bank", "WORD_BANK_LEVEL [LEVEL]"),
"cmd": ["词库问答", "添加词条", "删除词条", "修改词条", "查看词条"],
}
data_dir = DATA_PATH / "word_bank"
data_dir.mkdir(parents=True, exist_ok=True)
add_word = on_regex(
r"^(全局|私聊)?添加词条\s*?(模糊|正则|图片)?问\s*?(\S*\s?\S*)\s*?答\s?(\S*)", priority=5, block=True
)
delete_word_matcher = on_command("删除词条", aliases={'删除全局词条'}, priority=5, block=True)
update_word_matcher = on_command("修改词条", aliases={'修改全局词条'}, priority=5, block=True)
show_word_matcher = on_command("显示词条", aliases={"查看词条"}, priority=5, block=True)
@add_word.handle()
async def _(
bot: Bot,
event: MessageEvent,
state: T_State,
reg_group: Tuple[Any, ...] = RegexGroup(),
):
if str(event.user_id) not in bot.config.superusers:
await add_word.finish('权限不足捏')
word_scope, word_type, problem, answer = reg_group
if (
word_scope
and word_scope in ["全局", "私聊"]
and str(event.user_id) not in bot.config.superusers
):
await add_word.finish("权限不足,无法添加该范围词条")
if (not problem or not problem.strip()) and word_type != "图片":
await add_word.finish("词条问题不能为空!")
if (not answer or not answer.strip()) and not len(get_message_img(event.message)):
await add_word.finish("词条回答不能为空!")
if word_type != "图片":
state["problem_image"] = "YES"
answer = event.message
# 对at问题对额外处理
if get_message_at(event.message):
for index, seg in enumerate(event.message):
if seg.type == 'text' and '' in str(seg):
_problem = event.message[:index]
answer = event.message[index:]
answer[0] = str(answer[0])[str(answer[0]).index('')+1:]
_problem[0] = str(_problem[0])[str(_problem[0]).index('')+1:]
if _problem[-1].type != 'at' or seg.data['text'][:seg.data['text'].index('')].lstrip():
_problem.append(seg.data['text'][:seg.data['text'].index('')])
temp = ''
for g in _problem:
if isinstance(g, str):
temp += g
elif g.type == 'text':
temp += g.data['text']
elif g.type == 'at':
temp += f"[at:{g.data['qq']}]"
problem = temp
break
problem = unescape(problem)
index = len((word_scope or "") + "添加词条" + (word_type or "") + problem) + 1
event.message[0] = event.message[0].data["text"][index + 1 :].strip()
state["word_scope"] = word_scope
state["word_type"] = word_type
state["problem"] = problem
state["answer"] = answer
@add_word.got("problem_image", prompt="请发送该回答设置的问题图片")
async def _(
event: MessageEvent,
word_scope: Optional[str] = ArgStr("word_scope"),
word_type: Optional[str] = ArgStr("word_type"),
problem: Optional[str] = ArgStr("problem"),
answer: Message = Arg("answer"),
problem_image: Message = Arg("problem_image"),
):
try:
if word_type == "正则":
try:
re.compile(problem)
except re.error:
await add_word.finish(f"添加词条失败,正则表达式 {problem} 非法!")
await WordBank.add_problem_answer(
event.user_id,
event.group_id if isinstance(event, GroupMessageEvent) and (not word_scope or word_scope == '1') else 0,
scope2int[word_scope] if word_scope else 1,
type2int[word_type] if word_type else 0,
problem or problem_image,
answer,
)
except Exception as e:
if isinstance(e, FinishedException):
await add_word.finish()
logger.error(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 添加词条 {problem} 发生错误 {type(e)}: {e} "
)
await add_word.finish(f"添加词条 {problem} 发生错误!")
await add_word.send("添加词条 " + (problem or problem_image) + " 成功!")
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 添加词条 {problem} 成功!"
)
@delete_word_matcher.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
if not (msg := arg.extract_plain_text().strip()):
await delete_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
result = await delete_word(msg, event.group_id)
await delete_word_matcher.send(result)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id})"
f" 删除词条:" + msg
)
@delete_word_matcher.handle()
async def _(bot: Bot, event: PrivateMessageEvent, arg: Message = CommandArg(), cmd: Tuple[str, ...] = Command()):
if str(event.user_id) not in bot.config.superusers:
await delete_word_matcher.finish("权限不足捏!")
if not (msg := arg.extract_plain_text().strip()):
await delete_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
result = await delete_word(msg, word_scope=2 if cmd[0] == '删除词条' else 0)
await delete_word_matcher.send(result)
logger.info(
f"(USER {event.user_id})"
f" 删除词条:" + msg
)
@update_word_matcher.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
if not (msg := arg.extract_plain_text().strip()):
await update_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
if len(msg.split()) < 2:
await update_word_matcher.finish("此命令需要两个参数,请查看帮助")
result = await update_word(msg, event.group_id)
await update_word_matcher.send(result)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id})"
f" 更新词条词条:" + msg
)
@update_word_matcher.handle()
async def _(bot: Bot, event: PrivateMessageEvent, arg: Message = CommandArg(), cmd: Tuple[str, ...] = Command()):
if str(event.user_id) not in bot.config.superusers:
await delete_word_matcher.finish("权限不足捏!")
if not (msg := arg.extract_plain_text().strip()):
await update_word_matcher.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
if len(msg.split()) < 2:
await update_word_matcher.finish("此命令需要两个参数,请查看帮助")
result = await update_word(msg, word_scope=2 if cmd[0] == '修改词条' else 0)
await update_word_matcher.send(result)
logger.info(
f"(USER {event.user_id})"
f" 更新词条词条:" + msg
)
@show_word_matcher.handle()
async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
if problem := arg.extract_plain_text().strip():
id_ = None
gid = None
if problem.startswith("id:"):
id_ = problem.split(":")[-1]
if (
not is_number(id_)
or int(id_) < 0
or int(id_)
>= len(await WordBank.get_group_all_problem(event.group_id))
):
await show_word_matcher.finish("id必须为数字且在范围内")
id_ = int(id_)
if problem.startswith("gid:"):
gid = problem.split(":")[-1]
if (
not is_number(gid)
or int(gid) < 0
or int(gid)
>= len(await WordBank.get_problem_by_scope(0))
):
await show_word_matcher.finish("gid必须为数字且在范围内")
gid = int(gid)
msg_list = await show_word(problem, id_, gid, None if gid else event.group_id)
else:
msg_list = await show_word(problem, None, None, event.group_id)
if isinstance(msg_list, str):
await show_word_matcher.send(msg_list)
else:
await bot.send_group_forward_msg(
group_id=event.group_id, messages=custom_forward_msg(msg_list, bot.self_id)
)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 发送查看词条回答:" + problem
)
@show_word_matcher.handle()
async def _(event: PrivateMessageEvent, arg: Message = CommandArg()):
if problem := arg.extract_plain_text().strip():
id_ = None
gid = None
if problem.startswith("id:"):
id_ = problem.split(":")[-1]
if (
not is_number(id_)
or int(id_) < 0
or int(id_)
> len(await WordBank.get_problem_by_scope(2))
):
await show_word_matcher.finish("id必须为数字且在范围内")
id_ = int(id_)
if problem.startswith("gid:"):
gid = problem.split(":")[-1]
if (
not is_number(gid)
or int(gid) < 0
or int(gid)
> len(await WordBank.get_problem_by_scope(0))
):
await show_word_matcher.finish("gid必须为数字且在范围内")
gid = int(gid)
msg_list = await show_word(problem, id_, gid, word_scope=2 if id_ is not None else None)
else:
msg_list = await show_word(problem, None, None, word_scope=2)
if isinstance(msg_list, str):
await show_word_matcher.send(msg_list)
else:
t = ""
for msg in msg_list:
t += msg + '\n'
await show_word_matcher.send(t[:-1])
logger.info(
f"(USER {event.user_id}, GROUP "
f"private)"
f" 发送查看词条回答:" + problem
)

View File

@ -1,311 +0,0 @@
from utils.utils import get_message_at, is_number, get_message_img
from nonebot.params import CommandArg
from services.log import logger
from configs.path_config import DATA_PATH
from utils.http_utils import AsyncHttpx
from ._data_source import WordBankBuilder
from utils.message_builder import image
from utils.image_utils import text2image
from .message_handle import get_one_answer, get_one_problem, get_one_image_problem, replace_cq
from .model import WordBank
from nonebot.adapters.onebot.v11 import (
Bot,
GroupMessageEvent,
Message
)
from nonebot import on_command
import random
import os
import re
from configs.config import NICKNAME, Config
from models.group_member_info import GroupInfoUser
__zx_plugin_name__ = "词库问答 [Admin]"
__plugin_usage__ = """
usage
对指定问题的随机回答对相同问题可以设置多个不同回答
删除词条后每个词条的id可能会变化请查看后再删除
指令
添加词条 ?[模糊/关键字|]......添加问答词条可重复添加相同问题的不同回答
删除词条 [问题/下标] ?[下标]删除指定词条指定或全部回答
修改词条 [问题/下标] ?[下标/新回答] [新回答]修改指定词条指定回答默认修改为第一条
查看词条 ?[问题/下标]查看全部词条或对应词条回答
示例添加词条问谁是萝莉答是我
示例删除词条 谁是萝莉
示例删除词条 谁是萝莉 0
示例删除词条 id:0
示例修改词条 谁是萝莉 是你
示例修改词条 谁是萝莉 0 是你
示例修改词条 id:0 是你
示例查看词条
示例查看词条 谁是萝莉
示例查看词条 id:0
""".strip()
__plugin_des__ = "自定义词条内容随机回复"
__plugin_cmd__ = [
"添加词条 ?[模糊/关键字]问...答..",
"删除词条 [问题/下标] ?[下标]",
"修改词条 [问题/下标] ?[下标/新回答] [新回答]",
"查看词条 ?[问题/下标]",
]
__plugin_version__ = 0.3
__plugin_author__ = "HibiKier & yajiwa"
__plugin_settings__ = {
"admin_level": Config.get_config("word_bank", "WORD_BANK_LEVEL [LEVEL]"),
"cmd": ["词库问答", "添加词条", "删除词条", "修改词条", "查看词条"],
}
data_dir = DATA_PATH / "word_bank"
data_dir.mkdir(parents=True, exist_ok=True)
add_word = on_command("添加词条", priority=5, block=True)
delete_word = on_command("删除词条", priority=5, block=True)
update_word = on_command("修改词条", priority=5, block=True)
show_word = on_command("显示词条", aliases={"查看词条"}, priority=5, block=True)
@add_word.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
msg = str(arg)
r = re.search(r"问(.+)\s?答([\s\S]*)", msg)
if not r:
await add_word.finish("未检测到词条问题...")
problem = r.group(1).strip()
if not problem:
await add_word.finish("未检测到词条问题...")
answer = msg.split("", maxsplit=1)[-1]
if not answer:
await add_word.finish("未检测到词条回答...")
idx = 0
_problem = problem
search_type = 0
if re.search("^关键字|词(.*)", msg):
search_type = 1
elif re.search("^模糊(.*)", msg):
search_type = 2
_builder = await get__builder(event, _problem, answer, idx)
if await _builder.save(search_type):
logger.info(f"已保存词条 问:{_builder.problem} 答:{answer}")
await add_word.send("已保存词条:" + _builder.problem)
else:
await delete_word.send("保存失败,可能是回答重复")
@delete_word.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
msg = str(arg)
if not msg:
await delete_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
index = None
_sp_msg = msg.split()
if len(_sp_msg) > 1:
if is_number(_sp_msg[-1]):
index = int(_sp_msg[-1])
msg = " ".join(_sp_msg[:-1])
problem = msg
if problem.startswith("id:"):
x = problem.split(":")[-1]
if not is_number(x) or int(x) < 0:
await delete_word.finish("id必须为数字且符合规范")
p = await WordBank.get_group_all_problem(event.group_id)
if p:
problem = p[int(x)]
try:
_problem, problem = await get_one_problem(event, problem)
if answer := await WordBank.delete_problem_answer(
event.user_id, event.group_id, _problem, index
):
await delete_word.send(Message(
"删除词条成功:\n" + await replace_cq(event.group_id, problem, False) + f"\n回答:\n" + await replace_cq(
event.group_id, answer, False) + "\n"))
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 删除词条: {problem}"
)
else:
await delete_word.send("删除词条:" + problem + "失败,可能该词条不存在")
except IndexError:
await delete_word.send("指定下标错误...请通过查看词条来确定..")
@update_word.handle()
async def _(event: GroupMessageEvent, arg: Message = CommandArg()):
msg = str(arg)
if not msg:
await update_word.finish("此命令之后需要跟随指定词条,通过“显示词条“查看")
index = None
new_answer = None
problem = None
_sp_msg = msg.split()
len_msg = len(_sp_msg)
if 1 < len_msg:
problem = "".join(_sp_msg[0])
if len_msg == 3:
if is_number(_sp_msg[1]):
index = int(_sp_msg[1])
new_answer = "".join(_sp_msg[2:])
else:
new_answer = "".join(_sp_msg[1:])
else:
await update_word.finish("此命令之后需要跟随修改内容")
idx = 0
_problem = problem
_builder = await get__builder(event, _problem, new_answer, idx)
try:
if await _builder.update(index):
await update_word.send(f"修改词条成功:" + _builder.problem)
logger.info(
f"(USER {event.user_id}, GROUP "
f"{event.group_id if isinstance(event, GroupMessageEvent) else 'private'})"
f" 修改词条: {problem}"
)
else:
await update_word.send(f"修改词条:" + _builder.problem + f"失败,可能该词条不存在")
except IndexError:
await update_word.send("指定下标错误...请通过查看词条来确定..")
@show_word.handle()
async def _(bot: Bot, event: GroupMessageEvent, arg: Message = CommandArg()):
msg = str(arg).strip()
if not msg:
_problem_list = await WordBank.get_group_all_problem(event.group_id)
if not _problem_list:
await show_word.finish("该群未收录任replace_cq何词条..")
_problem_list = [f"\t{i}. {await replace_cq(event.group_id, x)}" for i, x in enumerate(_problem_list)]
long_problem_list = len(_problem_list)
max_line = Config.get_config("word_bank", "WORD_BANK_MIX")
if long_problem_list > max_line:
pic_list = []
mes_list = []
img_nu = long_problem_list // max_line
one_msg = "该群已收录的词条:"
await show_word.send(one_msg)
for i in range(img_nu + 1):
if _problem_list:
one_img = image(
b64=(await text2image("\n".join(_problem_list[:max_line]),
padding=10,
color="#f9f6f2",
)).pic2bs4()
)
if img_nu > 2:
pic_list.append(one_img)
else:
await show_word.send(one_img)
del _problem_list[:max_line]
if pic_list:
for img in pic_list:
data = {
"type": "node",
"data": {"name": f"{NICKNAME}", "uin": f"{bot.self_id}", "content": img},
}
mes_list.append(data)
await bot.send_group_forward_msg(group_id=event.group_id, messages=mes_list)
else:
await show_word.send(
image(
b64=(await text2image(
"该群已收录的词条:\n\n" + "\n".join(_problem_list),
padding=10,
color="#f9f6f2",
)).pic2bs4()
)
)
else:
_answer_list = []
if msg.startswith("id:"):
x = msg.split(":")[-1]
if not is_number(x) or int(x) < 0:
return await delete_word.finish("id必须为数字且符合规范")
p = await WordBank.get_group_all_problem(event.group_id)
if p:
_problem = p[int(x)]
_answer_list = await WordBank.get_group_all_answer(event.group_id, _problem)
msg += '' + await get_one_image_problem(event, _problem)
else:
_problem, msg = await get_one_problem(event, msg)
_answer_list = await WordBank.get_group_all_answer(event.group_id, _problem)
if not _answer_list:
await show_word.send("未收录该词条...")
else:
# 解析图片和@
_answer_img_nu_list = [await get_one_answer(event, format, answer, False) for answer, format in
_answer_list]
word_nu = len(_answer_img_nu_list)
img_nu = 0
answer = "词条" + msg + "\n回答:"
for i, x, in enumerate(_answer_img_nu_list):
r = re.findall(rf"\[CQ:image,file=", str(x))
if r:
img_nu += len(r)
answer += "\n" + f"{i}." + x
if (img_nu > 2 and word_nu > 5) or word_nu > 10 or img_nu > 4:
data = {
"type": "node",
"data": {"name": f"{NICKNAME}", "uin": f"{bot.self_id}", "content": answer},
}
await bot.send_group_forward_msg(group_id=event.group_id, messages=data)
else:
await show_word.send(answer)
# await show_word.send(f"词条 {msg} 回答:\n" + "\n".join(_answer_list))
async def get__builder(event, _problem: str, answer: str, idx: int):
(data_dir / f"{event.group_id}").mkdir(exist_ok=True, parents=True)
(data_dir / f"{event.group_id}" / "problem").mkdir(exist_ok=True, parents=True)
_builder = WordBankBuilder(event.user_id, event.group_id, _problem)
problem = ''
_p = _problem
for at_ in get_message_at(event.json()):
r = re.search(rf"\[CQ:at,qq={at_}]", answer)
if r:
answer = answer.replace(f"[CQ:at,qq={at_}]", f"[__placeholder_{idx}]", 1)
_builder.set_placeholder(idx, at_)
idx += 1
r_problem = re.search(rf"\[CQ:at,qq={at_}]", _problem)
if r_problem:
q = await GroupInfoUser.get_member_info(
int(at_), event.group_id)
problem += _p[: _p.find(f"[CQ:at,qq={at_}]")] + "@" + q.user_name
_p = _p[_p.find(f"[CQ:at,qq={at_}]") + len(f"[CQ:at,qq={at_}]"):]
for img in get_message_img(event.json()):
_x = img.split("?")[0]
_x_list = img.split("?")
r = re.search(rf"\[CQ:image,file=(.*),url={_x}.*?]", answer)
if r:
rand = random.randint(1, 10000) + random.randint(1, 114514)
for _ in range(10):
if f"__placeholder_{rand}_{idx}.jpg" not in os.listdir(data_dir / f"{event.group_id}"):
break
rand = random.randint(1, 10000) + random.randint(1, 114514)
strinfo = re.compile(f"\[CQ:image,file={r.group(1)},.*url={_x_list[0]}\?{_x_list[1]}.*?]")
answer = strinfo.sub(f"[__placeholder_{idx}]", answer)
await AsyncHttpx.download_file(
img, data_dir / f"{event.group_id}" / f"__placeholder_{rand}_{idx}.jpg"
)
_builder.set_placeholder(idx, f"__placeholder_{rand}_{idx}.jpg")
idx += 1
r_problem = re.search(rf"\[CQ:image,file=(.*?)(,subType=\d)?,url={_x}.*?]", _p)
if r_problem:
strinfo = re.compile(f"(,subType=\d)?,url={_x_list[0]}\?{_x_list[1]}.*?]")
_problem = strinfo.sub(f"]", _problem)
_p = strinfo.sub(f"]", _p)
problem += _p[: _p.find(f"[CQ:image,file={r_problem.group(1)}]")] + image(img)
_p = _p[_p.find(f"[CQ:image,file={r_problem.group(1)}]") + len(f"[CQ:image,file={r_problem.group(1)}]"):]
problem_img = r_problem.group(1)
if f"{problem_img}.jpg" not in os.listdir(data_dir / f"{event.group_id}" / f"problem"):
await AsyncHttpx.download_file(
img, data_dir / f"{event.group_id}" / f"problem" / f"{problem_img}.jpg"
)
_builder.set_answer(answer)
_builder.set_problem(_problem)
_builder.problem = problem + _p
return _builder

Binary file not shown.

View File

@ -8,10 +8,7 @@
"configs/path_config.py", "configs/path_config.py",
"configs/utils", "configs/utils",
"poetry.lock", "poetry.lock",
"pyproject.toml", "pyproject.toml"
"resources/font",
"resources/image/zhenxun",
"resources/image/other"
], ],
"add_file": [], "add_file": [],
"delete_file": [] "delete_file": []

View File

@ -14,6 +14,11 @@ from .requests_manager import RequestManager
from configs.path_config import DATA_PATH from configs.path_config import DATA_PATH
# 全局字典
GDict = {
"run_sql": [] # 需要启动前运行的sql语句
}
# 群功能开关 | 群被动技能 | 群权限 管理 # 群功能开关 | 群被动技能 | 群权限 管理
group_manager: Optional[GroupManager] = GroupManager( group_manager: Optional[GroupManager] = GroupManager(
DATA_PATH / "manager" / "group_manager.json" DATA_PATH / "manager" / "group_manager.json"