mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 14:22:55 +08:00
⚡️ 代码优化
This commit is contained in:
parent
8470777f6c
commit
f02c276310
@ -10,6 +10,8 @@ from nonebot_plugin_session import EventSession
|
||||
|
||||
from zhenxun.configs.path_config import IMAGE_PATH
|
||||
from zhenxun.configs.utils import PluginExtraData
|
||||
from zhenxun.models.ban_console import BanConsole
|
||||
from zhenxun.models.group_console import GroupConsole
|
||||
from zhenxun.models.plugin_info import PluginInfo
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.enum import PluginType
|
||||
@ -34,6 +36,15 @@ _path = IMAGE_PATH / "_base" / "laugh"
|
||||
|
||||
@_matcher.handle()
|
||||
async def _(matcher: Matcher, message: UniMsg, session: EventSession):
|
||||
gid = session.id3 or session.id2
|
||||
if await BanConsole.is_ban(session.id1, gid):
|
||||
return
|
||||
if gid:
|
||||
if await BanConsole.is_ban(None, gid):
|
||||
return
|
||||
if g := await GroupConsole.get_group(gid):
|
||||
if g.level < 0:
|
||||
return
|
||||
if text := message.extract_plain_text().strip():
|
||||
if plugin := await PluginInfo.get_or_none(
|
||||
name=text, load_status=True, plugin_type=PluginType.NORMAL
|
||||
|
||||
@ -1,4 +1,5 @@
|
||||
from tortoise import fields
|
||||
from typing_extensions import Self
|
||||
|
||||
from zhenxun.services.db_context import Model
|
||||
|
||||
@ -39,6 +40,21 @@ class GroupConsole(Model):
|
||||
table_description = "群组信息表"
|
||||
unique_together = ("group_id", "channel_id")
|
||||
|
||||
@classmethod
|
||||
async def get_group(cls, group_id: str, channel_id: str | None = None) -> Self:
|
||||
"""获取群组
|
||||
|
||||
参数:
|
||||
group_id: 群组id
|
||||
channel_id: 频道id.
|
||||
|
||||
返回:
|
||||
Self: GroupConsole
|
||||
"""
|
||||
if channel_id:
|
||||
return await cls.get(group_id=group_id, channel_id=channel_id)
|
||||
return await cls.get(group_id=group_id, channel_id__isnull=True)
|
||||
|
||||
@classmethod
|
||||
async def is_super_group(cls, group_id: str, channel_id: str | None = None) -> bool:
|
||||
"""是否超级用户指定群
|
||||
|
||||
@ -56,7 +56,7 @@ __plugin_meta__ = PluginMetadata(
|
||||
|
||||
|
||||
@_russian_matcher.handle()
|
||||
async def _(money: int, num: Match[int], at_user: Match[alcAt]):
|
||||
async def _(money: int, num: Match[str], at_user: Match[alcAt]):
|
||||
_russian_matcher.set_path_arg("money", money)
|
||||
if num.available:
|
||||
_russian_matcher.set_path_arg("num", num.result)
|
||||
@ -73,7 +73,7 @@ async def _(
|
||||
message: UniMsg,
|
||||
arparma: Arparma,
|
||||
money: int,
|
||||
num: int,
|
||||
num: str,
|
||||
at_user: Match[alcAt],
|
||||
uname: str = UserName(),
|
||||
):
|
||||
@ -86,16 +86,21 @@ async def _(
|
||||
await Text("群组id为空...").finish()
|
||||
if money <= 0:
|
||||
await Text("赌注金额必须大于0!").finish(reply=True)
|
||||
if num < 0 or num > 6:
|
||||
if num in ["取消", "算了"]:
|
||||
await Text("已取消装弹...").finish()
|
||||
if not num.isdigit():
|
||||
await Text("输入的子弹数必须是数字!").finish(reply=True)
|
||||
b_num = int(num)
|
||||
if b_num < 0 or b_num > 6:
|
||||
await Text("子弹数量必须在1-6之间!").finish(reply=True)
|
||||
_at_user = at_user.result.target if at_user.available else None
|
||||
rus = Russian(
|
||||
at_user=_at_user, player1=(session.id1, uname), money=money, bullet_num=num
|
||||
at_user=_at_user, player1=(session.id1, uname), money=money, bullet_num=b_num
|
||||
)
|
||||
result = await russian_manage.add_russian(bot, gid, rus)
|
||||
await result.send()
|
||||
logger.info(
|
||||
f"添加俄罗斯轮盘 装弹: {num}, 金额: {money}",
|
||||
f"添加俄罗斯轮盘 装弹: {b_num}, 金额: {money}",
|
||||
arparma.header_result,
|
||||
session=session,
|
||||
)
|
||||
|
||||
@ -7,7 +7,7 @@ from zhenxun.utils.rules import ensure_group
|
||||
_russian_matcher = on_alconna(
|
||||
Alconna(
|
||||
"俄罗斯轮盘",
|
||||
Args["money", int]["num?", int]["at_user?", alcAt],
|
||||
Args["money", int]["num?", str]["at_user?", alcAt],
|
||||
),
|
||||
aliases={"装弹", "俄罗斯转盘"},
|
||||
rule=ensure_group,
|
||||
|
||||
@ -1,3 +1,4 @@
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.plugin import PluginMetadata
|
||||
from nonebot_plugin_alconna import Alconna, Args, Arparma
|
||||
from nonebot_plugin_alconna import Image as alcImg
|
||||
@ -7,6 +8,8 @@ from nonebot_plugin_session import EventSession
|
||||
|
||||
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.platform import PlatformUtils
|
||||
from zhenxun.utils.utils import template2forward
|
||||
|
||||
from .saucenao import get_saucenao_image
|
||||
|
||||
@ -62,11 +65,13 @@ async def _(mode: Match[str], image: Match[alcImg]):
|
||||
|
||||
@_matcher.got_path("image", prompt="图来!")
|
||||
async def _(
|
||||
bot: Bot,
|
||||
session: EventSession,
|
||||
arparma: Arparma,
|
||||
mode: str,
|
||||
image: alcImg,
|
||||
):
|
||||
gid = session.id3 or session.id2
|
||||
if not image.url:
|
||||
await Text("图片url为空...").finish()
|
||||
await Text("开始处理图片...").send()
|
||||
@ -75,6 +80,14 @@ async def _(
|
||||
await Text(info_list).finish(at_sender=True)
|
||||
if not info_list:
|
||||
await Text("未查询到...").finish()
|
||||
platform = PlatformUtils.get_platform(bot)
|
||||
if "qq" == platform and gid:
|
||||
forward = template2forward(info_list, bot.self_id) # type: ignore
|
||||
await bot.send_group_forward_msg(
|
||||
group_id=int(gid),
|
||||
messages=forward, # type: ignore
|
||||
)
|
||||
else:
|
||||
for info in info_list[1:]:
|
||||
await info.send()
|
||||
logger.info(f" 识图: {image.url}", arparma.header_result, session=session)
|
||||
|
||||
@ -2,6 +2,7 @@ import random
|
||||
from typing import Tuple
|
||||
|
||||
from nonebot.adapters import Bot
|
||||
from nonebot.adapters.onebot.v11 import MessageSegment
|
||||
from nonebot.matcher import Matcher
|
||||
from nonebot.message import run_postprocessor
|
||||
from nonebot.plugin import PluginMetadata
|
||||
@ -22,9 +23,11 @@ from zhenxun.configs.utils import PluginCdBlock, PluginExtraData, RegisterConfig
|
||||
from zhenxun.models.sign_user import SignUser
|
||||
from zhenxun.models.user_console import UserConsole
|
||||
from zhenxun.services.log import logger
|
||||
from zhenxun.utils.platform import PlatformUtils
|
||||
from zhenxun.utils.utils import template2forward
|
||||
from zhenxun.utils.withdraw_manage import WithdrawManager
|
||||
|
||||
from ._data_source import SetuManage, base_config
|
||||
from ._data_source import Image, SetuManage, base_config
|
||||
|
||||
__plugin_meta__ = PluginMetadata(
|
||||
name="色图",
|
||||
@ -211,6 +214,21 @@ async def _(
|
||||
result_list = await SetuManage.get_setu(tags=_tags, num=_num, is_r18=is_r18)
|
||||
if isinstance(result_list, str):
|
||||
await Text(result_list).finish(reply=True)
|
||||
max_once_num2forward = base_config.get("MAX_ONCE_NUM2FORWARD")
|
||||
platform = PlatformUtils.get_platform(bot)
|
||||
if (
|
||||
"qq" == platform
|
||||
and gid
|
||||
and max_once_num2forward
|
||||
and len(result_list) >= max_once_num2forward
|
||||
):
|
||||
logger.debug("使用合并转发转发色图数据", arparma.header_result, session=session)
|
||||
forward = template2forward(result_list, bot.self_id) # type: ignore
|
||||
await bot.send_group_forward_msg(
|
||||
group_id=int(gid),
|
||||
messages=forward, # type: ignore
|
||||
)
|
||||
else:
|
||||
for result in result_list:
|
||||
logger.info(f"发送色图 {result}", arparma.header_result, session=session)
|
||||
receipt = await result.send()
|
||||
|
||||
@ -4,7 +4,6 @@ from pathlib import Path
|
||||
|
||||
from asyncpg import UniqueViolationError
|
||||
from nonebot_plugin_saa import Image, MessageFactory, Text
|
||||
from pydantic import BaseModel
|
||||
|
||||
from zhenxun.configs.config import NICKNAME, Config
|
||||
from zhenxun.configs.path_config import IMAGE_PATH, TEMP_PATH
|
||||
|
||||
@ -3,13 +3,16 @@ import time
|
||||
from collections import defaultdict
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from re import L
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
import pypinyin
|
||||
import pytz
|
||||
from nonebot.adapters.onebot.v11 import Message, MessageSegment
|
||||
from nonebot_plugin_saa import Image, MessageFactory, Text
|
||||
|
||||
from zhenxun.configs.config import Config
|
||||
from zhenxun.configs.config import NICKNAME, Config
|
||||
from zhenxun.services.log import logger
|
||||
|
||||
|
||||
@ -230,3 +233,61 @@ def is_valid_date(date_text: str, separator: str = "-") -> bool:
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
def custom_forward_msg(
|
||||
msg_list: list[str | Message],
|
||||
uin: str,
|
||||
name: str = f"这里是{NICKNAME}",
|
||||
) -> list[dict]:
|
||||
"""生成自定义合并消息
|
||||
|
||||
参数:
|
||||
msg_list: 消息列表
|
||||
uin: 发送者 QQ
|
||||
name: 自定义名称
|
||||
|
||||
返回:
|
||||
list[dict]: 转发消息
|
||||
"""
|
||||
mes_list = []
|
||||
for _message in msg_list:
|
||||
data = {
|
||||
"type": "node",
|
||||
"data": {
|
||||
"name": name,
|
||||
"uin": f"{uin}",
|
||||
"content": _message,
|
||||
},
|
||||
}
|
||||
mes_list.append(data)
|
||||
return mes_list
|
||||
|
||||
|
||||
def template2forward(
|
||||
msg_list: list[MessageFactory | Text | Image], uni: str
|
||||
) -> list[dict]:
|
||||
"""模板转转发消息
|
||||
|
||||
参数:
|
||||
msg_list: 消息列表
|
||||
uni: 发送者qq
|
||||
|
||||
返回:
|
||||
list[dict]: 转发消息
|
||||
"""
|
||||
forward_data = []
|
||||
for r_list in msg_list:
|
||||
s = ""
|
||||
if isinstance(r_list, MessageFactory):
|
||||
for r in r_list:
|
||||
if isinstance(r, Text):
|
||||
s += str(r)
|
||||
elif isinstance(r, Image):
|
||||
s += MessageSegment.image(r.data["image"])
|
||||
elif isinstance(r_list, Image):
|
||||
s = MessageSegment.image(r_list.data["image"])
|
||||
else:
|
||||
s = str(r_list)
|
||||
forward_data.append(s)
|
||||
return custom_forward_msg(forward_data, uni)
|
||||
|
||||
Loading…
Reference in New Issue
Block a user