♻️ 重构群欢迎消息插件,支持多条消息随机发送 (#1768) (#1774)

This commit is contained in:
HibiKier 2024-12-16 22:56:17 +08:00 committed by GitHub
parent cf29fbcf74
commit 5956ec1148
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
4 changed files with 447 additions and 175 deletions

View File

@ -1,145 +0,0 @@
from pathlib import Path
import shutil
from typing import Annotated
from nonebot import on_command
from nonebot.params import Command
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Image, Text, UniMsg
from nonebot_plugin_session import EventSession
import ujson as json
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.rules import admin_check, ensure_group
base_config = Config.get("admin_bot_manage")
__plugin_meta__ = PluginMetadata(
name="自定义群欢迎消息",
description="自定义群欢迎消息",
usage="""
设置群欢迎消息当消息中包含 -at 时会at入群用户
设置欢迎消息 欢迎新人[图片]
设置欢迎消息 欢迎你 -at
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
plugin_type=PluginType.ADMIN,
admin_level=base_config.get("SET_GROUP_WELCOME_MESSAGE_LEVEL", 2),
configs=[
RegisterConfig(
module="admin_bot_manage",
key="SET_GROUP_WELCOME_MESSAGE_LEVEL",
value=2,
help="设置群欢迎消息所需要的管理员权限等级",
default_value=2,
)
],
).dict(),
)
_matcher = on_command(
"设置欢迎消息",
rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL")
& ensure_group,
priority=5,
block=True,
)
BASE_PATH = DATA_PATH / "welcome_message"
BASE_PATH.mkdir(parents=True, exist_ok=True)
# 旧数据迁移
old_file = DATA_PATH / "custom_welcome_msg" / "custom_welcome_msg.json"
if old_file.exists():
try:
old_data: dict[str, str] = json.load(old_file.open(encoding="utf8"))
for group_id, message in old_data.items():
file = BASE_PATH / "qq" / f"{group_id}" / "text.json"
file.parent.mkdir(parents=True, exist_ok=True)
json.dump(
{"at": "[at]" in message, "message": message.replace("[at]", "")},
file.open("w", encoding="utf8"),
ensure_ascii=False,
indent=4,
)
logger.debug("群欢迎消息数据迁移", group_id=group_id)
shutil.rmtree(old_file.parent.absolute())
except Exception as e:
logger.error("群欢迎消息数据迁移失败...", e=e)
def get_path(session: EventSession) -> Path:
"""根据Session获取存储路径
参数:
session: EventSession:
返回:
Path: 存储路径
"""
path = BASE_PATH / f"{session.platform or session.bot_type}" / f"{session.id2}"
if session.id3:
path = (
BASE_PATH
/ f"{session.platform or session.bot_type}"
/ f"{session.id3}"
/ f"{session.id2}"
)
path.mkdir(parents=True, exist_ok=True)
for f in path.iterdir():
f.unlink()
return path
async def save(path: Path, message: UniMsg) -> str:
"""保存群欢迎消息
参数:
path: 存储路径
message: 消息内容
返回:
str: 消息内容文本格式
"""
idx = 0
text = ""
file = path / "text.json"
for msg in message:
if isinstance(msg, Text):
text += msg.text
elif isinstance(msg, Image):
if msg.url:
text += f"[image:{idx}]"
if await AsyncHttpx.download_file(msg.url, path / f"{idx}.png"):
idx += 1
else:
logger.warning("图片 URL 为空...", "设置欢迎消息")
json.dump(
{"at": "-at" in text, "message": text.replace("-at", "", 1)},
file.open("w", encoding="utf-8"),
ensure_ascii=False,
indent=4,
)
return text
@_matcher.handle()
async def _(
session: EventSession,
message: UniMsg,
command: Annotated[tuple[str, ...], Command()],
):
path = get_path(session)
message[0].text = message[0].text.replace(command[0], "").strip()
text = await save(path, message)
uni_msg = Text("设置欢迎消息成功: \n") + message
await uni_msg.send()
logger.info(f"设置群欢迎消息成功: {text}", command[0], session=session)

View File

@ -0,0 +1,132 @@
from typing import Annotated
from nonebot import on_command
from nonebot.params import Command
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
Alconna,
AlconnaMatcher,
Args,
Arparma,
Field,
Match,
Text,
UniMsg,
on_alconna,
)
from nonebot_plugin_uninfo import Uninfo
from zhenxun.configs.config import Config
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.rules import admin_check, ensure_group
from .data_source import Manager
base_config = Config.get("admin_bot_manage")
__plugin_meta__ = PluginMetadata(
name="自定义群欢迎消息",
description="自定义群欢迎消息",
usage="""
设置群欢迎消息当消息中包含 -at 时会at入群用户
可以设置多条欢迎消息包含多条欢迎消息时将随机发送
指令:
设置欢迎消息
查看欢迎消息 ?[id]: 存在id时查看指定欢迎消息内容
删除欢迎消息 [id]
示例:
设置欢迎消息 欢迎新人[图片]
设置欢迎消息 欢迎你 -at
查看欢迎消息
查看欢迎消息 2
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
plugin_type=PluginType.ADMIN,
admin_level=base_config.get("SET_GROUP_WELCOME_MESSAGE_LEVEL", 2),
configs=[
RegisterConfig(
module="admin_bot_manage",
key="SET_GROUP_WELCOME_MESSAGE_LEVEL",
value=2,
help="设置群欢迎消息所需要的管理员权限等级",
default_value=2,
)
],
).dict(),
)
_matcher = on_command(
"设置欢迎消息",
rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL")
& ensure_group,
priority=5,
block=True,
)
_show_matcher = on_alconna(
Alconna("查看欢迎消息", Args["idx?", int]),
rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL")
& ensure_group,
priority=5,
block=True,
)
_del_matcher: type[AlconnaMatcher] = on_alconna(
Alconna(
"删除欢迎消息",
Args[
"idx",
int,
Field(
missing_tips=lambda: "请在命令后跟随指定id",
unmatch_tips=lambda _: "删除指定id必须为数字",
),
],
),
skip_for_unmatch=False,
rule=admin_check("admin_bot_manage", "SET_GROUP_WELCOME_MESSAGE_LEVEL")
& ensure_group,
priority=5,
block=True,
)
@_matcher.handle()
async def _(
session: Uninfo,
message: UniMsg,
command: Annotated[tuple[str, ...], Command()],
):
path = Manager.get_path(session)
if not path:
await MessageUtils.build_message("群组不存在...").finish()
message[0].text = message[0].text.replace(command[0], "").strip()
await Manager.save(path, message)
uni_msg = Text("设置欢迎消息成功: \n") + message
await uni_msg.send()
logger.info(f"设置群欢迎消息成功: {message}", command[0], session=session)
@_show_matcher.handle()
async def _(session: Uninfo, arparma: Arparma, idx: Match[int]):
result = await Manager.get_group_message(
session, idx.result if idx.available else None
)
if not result:
await MessageUtils.build_message("当前还未设置群组欢迎消息哦...").finish()
await MessageUtils.build_message(result).send()
logger.info("查看群组欢迎信息", arparma.header_result, session=session)
@_del_matcher.handle()
async def _(session: Uninfo, arparma: Arparma, idx: int):
result = await Manager.delete_group_message(session, int(idx))
if not result:
await MessageUtils.build_message("未查找到指定id的群组欢迎消息...").finish()
await MessageUtils.build_message(result).send()
logger.info(f"删除群组欢迎信息: {result}", arparma.header_result, session=session)

View File

@ -0,0 +1,262 @@
import os
from pathlib import Path
import re
import shutil
import uuid
import nonebot
from nonebot_plugin_alconna import UniMessage, UniMsg
from nonebot_plugin_uninfo import Uninfo
import ujson as json
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger
from zhenxun.utils._build_image import BuildImage
from zhenxun.utils._image_template import ImageTemplate
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.platform import PlatformUtils
BASE_PATH = DATA_PATH / "welcome_message"
BASE_PATH.mkdir(parents=True, exist_ok=True)
driver = nonebot.get_driver()
old_file = DATA_PATH / "custom_welcome_msg" / "custom_welcome_msg.json"
if old_file.exists():
try:
old_data: dict[str, str] = json.load(old_file.open(encoding="utf8"))
for group_id, message in old_data.items():
file = BASE_PATH / "qq" / f"{group_id}" / "text.json"
file.parent.mkdir(parents=True, exist_ok=True)
json.dump(
{
uuid.uuid4(): {
"at": "[at]" in message,
"status": True,
"message": message.replace("[at]", ""),
}
},
file.open("w", encoding="utf8"),
ensure_ascii=False,
indent=4,
)
logger.debug("群欢迎消息数据迁移", group_id=group_id)
shutil.rmtree(old_file.parent.absolute())
except Exception as e:
logger.error("群欢迎消息数据迁移失败...", e=e)
def migrate(path: Path):
"""数据迁移
参数:
path: 路径
"""
text_file = path / "text.json"
with text_file.open(encoding="utf8") as f:
json_data = json.load(f)
new_data = {}
if "at" in json_data:
split_msg = re.split(r"\[image:\d\]", str(json_data["message"]))
data = []
for i in range(len(split_msg)):
msg = split_msg[i]
data.append(
{
"type": "text",
"text": msg,
}
)
image_file = path / f"{i}.png"
if image_file.exists():
data.append(
{
"type": "image",
"path": str(image_file),
}
)
new_data[uuid.uuid4()] = {
"at": json_data.get("at", False),
"status": json_data.get("status", True),
"message": data,
}
with text_file.open("w", encoding="utf8") as f:
json.dump(new_data, f, ensure_ascii=False, indent=4)
@driver.on_startup
def _():
"""数据迁移
参数:
path: 存储路径
json_data: 存储数据
"""
flag_file = BASE_PATH / "flag.txt"
if flag_file.exists():
return
logger.info("开始迁移群欢迎消息数据...")
base_path = BASE_PATH
path_list = []
for platform in os.listdir(BASE_PATH):
base_path = base_path / platform
for group_id in os.listdir(base_path):
group_path = base_path / group_id
is_channel = False
for file in os.listdir(group_path):
inner_file = group_path / file
if inner_file.is_dir():
path_list.append(inner_file)
is_channel = True
if not is_channel:
path_list.append(group_path)
if path_list:
for path in path_list:
migrate(path)
if not flag_file.exists():
flag_file.touch()
logger.success("迁移群欢迎消息数据完成!", "")
class Manager:
@classmethod
def __get_data(cls, session: Uninfo) -> dict | None:
"""获取存储数据
参数:
session: Uninfo
返回:
dict | None: 欢迎消息数据
"""
if not session.group:
return None
path = cls.get_path(session)
if not path:
return None
file = path / "text.json"
if not file.exists():
return None
with file.open(encoding="utf8") as f:
return json.load(f)
@classmethod
def get_path(cls, session: Uninfo) -> Path | None:
"""根据Session获取存储路径
参数:
session: Uninfo:
返回:
Path: 存储路径
"""
if not session.group:
return None
platform = PlatformUtils.get_platform(session)
path = BASE_PATH / f"{platform}" / f"{session.group.id}"
if session.group.parent:
path = (
BASE_PATH
/ f"{platform}"
/ f"{session.group.parent.id}"
/ f"{session.group.id}"
)
path.mkdir(parents=True, exist_ok=True)
return path
@classmethod
async def save(cls, path: Path, message: UniMsg):
"""保存群欢迎消息
参数:
path: 存储路径
message: 消息内容
"""
file = path / "text.json"
json_data = {}
if file.exists():
with file.open(encoding="utf8") as f:
json_data = json.load(f)
data = []
is_at = False
for msg in message.dump(True):
if msg["type"] == "image":
image_file = path / f"{uuid.uuid4()}.png"
await AsyncHttpx.download_file(msg["url"], image_file)
msg["path"] = str(image_file)
if not is_at and msg["type"] == "text" and "-at" in msg["text"]:
msg["text"] = msg["text"].replace("-at", "", 1).strip()
is_at = True
data.append(msg)
json_data[str(uuid.uuid4())] = {"at": is_at, "status": True, "message": data}
with file.open("w", encoding="utf8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
@classmethod
async def get_group_message(
cls, session: Uninfo, idx: int | None
) -> BuildImage | UniMessage | None:
"""获取群欢迎消息
参数:
session: Uninfo
idx: 指定id
返回:
list: 消息内容
"""
json_data = cls.__get_data(session)
if not json_data:
return None
if idx is not None:
key_list = list(json_data.keys())
if idx < 0 or idx > len(key_list):
return None
return UniMessage().load(json_data[key_list[idx]]["message"])
else:
msg_list = []
for i, uid in enumerate(json_data):
msg_data = json_data[uid]
msg_list.append(
[
i,
"开启" if msg_data["status"] else "关闭",
"" if msg_data["at"] else "",
str(UniMessage().load(msg_data["message"])),
]
)
if not msg_list:
return None
column_name = ["ID", "状态", "是否@", "消息"]
return await ImageTemplate.table_page(
"群欢迎消息", session.group.id, column_name, msg_list
)
@classmethod
async def delete_group_message(cls, session: Uninfo, idx: int) -> str | None:
"""获取群欢迎消息
参数:
session: EventSession:
id: 消息ID
返回:
list: 消息内容
"""
json_data = cls.__get_data(session)
if not json_data:
return None
key_list = list(json_data.keys())
if idx < 0 or idx >= len(key_list):
return None
old_msg = str(UniMessage().load(json_data[key_list[idx]]["message"]))
for msg in json_data[key_list[idx]]["message"]:
if msg["type"] == "image" and msg["path"]:
image_path = Path(msg["path"])
if image_path.exists():
image_path.unlink()
del json_data[key_list[idx]]
with file.open("w", encoding="utf8") as f:
json.dump(json_data, f, ensure_ascii=False, indent=4)
return f"删除群组欢迎消息成功!消息内容: {old_msg}"

View File

@ -2,10 +2,9 @@ from datetime import datetime
import os
from pathlib import Path
import random
import re
from nonebot.adapters import Bot
from nonebot_plugin_alconna import At
from nonebot_plugin_alconna import At, UniMessage
from nonebot_plugin_uninfo import Uninfo
import ujson as json
@ -28,7 +27,7 @@ base_config = Config.get("invite_manager")
limit_cd = base_config.get("welcome_msg_cd")
WELCOME_PATH = DATA_PATH / "welcome_message" / "qq"
WELCOME_PATH = DATA_PATH / "welcome_message"
DEFAULT_IMAGE_PATH = IMAGE_PATH / "qxz"
@ -153,44 +152,68 @@ class GroupManager:
await cls.__refresh_level(bot, group_id)
@classmethod
def __build_welcome_message(cls, user_id: str, path: Path) -> list[At | Path | str]:
"""构造群欢迎消息
def get_path(cls, session: Uninfo) -> Path | None:
"""根据Session获取存储路径
参数:
user_id: 用户id
path: 群欢迎消息存储路径
session: Uninfo:
返回:
list[At | Path | str]: 消息列表
Path: 存储路径
"""
file = path / "text.json"
data = json.load(file.open(encoding="utf-8"))
message = data["message"]
msg_split = re.split(r"\[image:\d+\]", message)
msg_list = []
if data["at"]:
msg_list.append(At(flag="user", target=user_id))
for i, text in enumerate(msg_split):
msg_list.append(text)
img_file = path / f"{i}.png"
if img_file.exists():
msg_list.append(img_file)
return msg_list
if not session.group:
return None
platform = PlatformUtils.get_platform(session)
path = WELCOME_PATH / f"{platform}" / f"{session.group.id}"
if session.group.parent:
path = (
WELCOME_PATH
/ f"{platform}"
/ f"{session.group.parent.id}"
/ f"{session.group.id}"
)
path.mkdir(parents=True, exist_ok=True)
return path
@classmethod
async def __send_welcome_message(cls, user_id: str, group_id: str):
def __get_welcome_data(cls, session: Uninfo) -> dict | None:
"""获取存储数据
参数:
session: Uninfo
返回:
dict | None: 欢迎消息数据
"""
if not session.group:
return None
path = cls.get_path(session)
if not path:
return None
file = path / "text.json"
if not file.exists():
return None
with file.open(encoding="utf8") as f:
return json.load(f)
@classmethod
async def __send_welcome_message(cls, session: Uninfo, user_id: str):
"""发送群欢迎消息
参数:
user_id: 用户id
group_id: 群组id
"""
cls._flmt.start_cd(group_id)
path = WELCOME_PATH / f"{group_id}"
file = path / "text.json"
if file.exists():
msg_list = cls.__build_welcome_message(user_id, path)
logger.info("发送群欢迎消息...", "入群检测", group_id=group_id)
if not session.group:
return
cls._flmt.start_cd(session.group.id)
if json_data := cls.__get_welcome_data(session):
key = random.choice([k for k in json_data.keys() if json_data[k]["status"]])
welcome_data = json_data[key]
msg_list = UniMessage().load(welcome_data["message"])
if welcome_data["at"]:
msg_list.insert(0, At("user", user_id))
logger.info("发送群欢迎消息...", "入群检测", session=session)
if msg_list:
await MessageUtils.build_message(msg_list).send() # type: ignore
else:
@ -199,7 +222,7 @@ class GroupManager:
)
await MessageUtils.build_message(
[
"新人快跑啊!!本群现状↓(快使用自定义",
"新人快跑啊!!本群现状↓(快使用自定义群欢迎消息",
image,
]
).send()
@ -224,7 +247,7 @@ class GroupManager:
if not await CommonUtils.task_is_block(
session, "group_welcome"
) and cls._flmt.check(group_id):
await cls.__send_welcome_message(user_id, group_id)
await cls.__send_welcome_message(session, user_id)
@classmethod
async def kick_bot(cls, bot: Bot, group_id: str, operator_id: str):