Merge branch 'main' into feature/record-bot-sent

This commit is contained in:
HibiKier 2025-05-26 10:57:27 +08:00 committed by GitHub
commit be76ac78bd
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
34 changed files with 2799 additions and 217 deletions

View File

@ -26,6 +26,21 @@ __plugin_meta__ = PluginMetadata(
_matcher = on_alconna(Alconna("关于"), priority=5, block=True, rule=to_me())
QQ_INFO = """
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
""".strip()
INFO = """
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
项目地址https://github.com/zhenxun-org/zhenxun_bot
文档地址https://zhenxun-org.github.io/zhenxun_bot/
""".strip()
@_matcher.handle()
async def _(session: Uninfo, arparma: Arparma):
ver_file = Path() / "__version__"
@ -35,25 +50,11 @@ async def _(session: Uninfo, arparma: Arparma):
if text := await f.read():
version = text.split(":")[-1].strip()
if PlatformUtils.is_qbot(session):
info: list[str | Path] = [
f"""
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
""".strip()
]
result: list[str | Path] = [QQ_INFO.format(version=version)]
path = DATA_PATH / "about.png"
if path.exists():
info.append(path)
result.append(path)
await MessageUtils.build_message(result).send() # type: ignore
else:
info = [
f"""
绪山真寻Bot
版本{version}
简介基于Nonebot2开发支持多平台是一个非常可爱的Bot呀希望与大家要好好相处
项目地址https://github.com/HibiKier/zhenxun_bot
文档地址https://hibikier.github.io/zhenxun_bot/
""".strip()
]
await MessageUtils.build_message(info).send() # type: ignore
await MessageUtils.build_message(INFO.format(version=version)).send()
logger.info("查看关于", arparma.header_result, session=session)

View File

@ -1,4 +1,5 @@
from datetime import datetime, timedelta
from io import BytesIO
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import (
@ -14,35 +15,38 @@ from nonebot_plugin_alconna import (
from nonebot_plugin_session import EventSession
import pytz
from zhenxun.configs.utils import Command, PluginExtraData
from zhenxun.configs.config import Config
from zhenxun.configs.utils import Command, PluginExtraData, RegisterConfig
from zhenxun.models.chat_history import ChatHistory
from zhenxun.models.group_member_info import GroupInfoUser
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.image_utils import ImageTemplate
from zhenxun.utils.image_utils import BuildImage, ImageTemplate
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
__plugin_meta__ = PluginMetadata(
name="消息统计",
description="消息统计查询",
usage="""
格式:
消息排行 ?[type [,,,]] ?[--des]
消息排行 ?[type [,,,,]] ?[--des]
快捷:
[,,,]消息排行 ?[数量]
[,,,,]消息排行 ?[数量]
示例:
消息排行 : 所有记录排行
日消息排行 : 今日记录排行
周消息排行 : 今日记录排行
月消息排行 : 今日记录排行
年消息排行 : 今日记录排行
周消息排行 : 本周记录排行
月消息排行 : 本月记录排行
季消息排行 : 本季度记录排行
年消息排行 : 本年记录排行
消息排行 --des : 逆序周记录排行
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
version="0.2",
plugin_type=PluginType.NORMAL,
menu_type="数据统计",
commands=[
@ -50,8 +54,19 @@ __plugin_meta__ = PluginMetadata(
Command(command="日消息统计"),
Command(command="周消息排行"),
Command(command="月消息排行"),
Command(command="季消息排行"),
Command(command="年消息排行"),
],
configs=[
RegisterConfig(
module="chat_history",
key="SHOW_QUIT_MEMBER",
value=True,
help="是否在消息排行中显示已退群用户",
default_value=True,
type=bool,
)
],
).to_dict(),
)
@ -60,7 +75,7 @@ _matcher = on_alconna(
Alconna(
"消息排行",
Option("--des", action=store_true, help_text="逆序"),
Args["type?", ["", "", "", ""]]["count?", int, 10],
Args["type?", ["", "", "", "", ""]]["count?", int, 10],
),
aliases={"消息统计"},
priority=5,
@ -68,7 +83,7 @@ _matcher = on_alconna(
)
_matcher.shortcut(
r"(?P<type>['', '', '', ''])?消息(排行|统计)\s?(?P<cnt>\d+)?",
r"(?P<type>['', '', '', '', ''])?消息(排行|统计)\s?(?P<cnt>\d+)?",
command="消息排行",
arguments=["{type}", "{cnt}"],
prefix=True,
@ -96,20 +111,57 @@ async def _(
date_scope = (time_now - timedelta(days=7), time_now)
elif date in [""]:
date_scope = (time_now - timedelta(days=30), time_now)
column_name = ["名次", "昵称", "发言次数"]
elif date in [""]:
date_scope = (time_now - timedelta(days=90), time_now)
column_name = ["名次", "头像", "昵称", "发言次数"]
show_quit_member = Config.get_config("chat_history", "SHOW_QUIT_MEMBER", True)
fetch_count = count.result
if not show_quit_member:
fetch_count = count.result * 2
if rank_data := await ChatHistory.get_group_msg_rank(
group_id, count.result, "DES" if arparma.find("des") else "DESC", date_scope
group_id, fetch_count, "DES" if arparma.find("des") else "DESC", date_scope
):
idx = 1
data_list = []
for uid, num in rank_data:
if user := await GroupInfoUser.filter(
if len(data_list) >= count.result:
break
user_in_group = await GroupInfoUser.filter(
user_id=uid, group_id=group_id
).first():
user_name = user.user_name
).first()
if not user_in_group and not show_quit_member:
continue
if user_in_group:
user_name = user_in_group.user_name
else:
user_name = uid
data_list.append([idx, user_name, num])
user_name = f"{uid}(已退群)"
avatar_size = 40
try:
avatar_bytes = await PlatformUtils.get_user_avatar(str(uid), "qq")
if avatar_bytes:
avatar_img = BuildImage(
avatar_size, avatar_size, background=BytesIO(avatar_bytes)
)
await avatar_img.circle()
avatar_tuple = (avatar_img, avatar_size, avatar_size)
else:
avatar_img = BuildImage(avatar_size, avatar_size, color="#CCCCCC")
await avatar_img.circle()
avatar_tuple = (avatar_img, avatar_size, avatar_size)
except Exception as e:
logger.warning(f"获取用户头像失败: {e}", "chat_history")
avatar_img = BuildImage(avatar_size, avatar_size, color="#CCCCCC")
await avatar_img.circle()
avatar_tuple = (avatar_img, avatar_size, avatar_size)
data_list.append([idx, avatar_tuple, user_name, num])
idx += 1
if not date_scope:
if date_scope := await ChatHistory.get_group_first_msg_datetime(group_id):
@ -132,13 +184,3 @@ async def _(
)
await MessageUtils.build_message(A).finish(reply_to=True)
await MessageUtils.build_message("群组消息记录为空...").finish()
# # @test.handle()
# # async def _(event: MessageEvent):
# # print(await ChatHistory.get_user_msg(event.user_id, "private"))
# # print(await ChatHistory.get_user_msg_count(event.user_id, "private"))
# # print(await ChatHistory.get_user_msg(event.user_id, "group"))
# # print(await ChatHistory.get_user_msg_count(event.user_id, "group"))
# # print(await ChatHistory.get_group_msg(event.group_id))
# # print(await ChatHistory.get_group_msg_count(event.group_id))

View File

@ -40,7 +40,9 @@ async def create_help_img(
match help_type:
case "html":
result = BuildImage.open(await build_html_image(group_id, is_detail))
result = BuildImage.open(
await build_html_image(session, group_id, is_detail)
)
case "zhenxun":
result = BuildImage.open(
await build_zhenxun_image(session, group_id, is_detail)

View File

@ -1,5 +1,8 @@
from collections.abc import Callable
from nonebot_plugin_uninfo import Uninfo
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.enum import PluginType
@ -27,13 +30,15 @@ async def sort_type() -> dict[str, list[PluginInfo]]:
async def classify_plugin(
group_id: str | None, is_detail: bool, handle: Callable
session: Uninfo, group_id: str | None, is_detail: bool, handle: Callable
) -> dict[str, list]:
"""对插件进行分类并判断状态
参数:
session: Uninfo对象
group_id: 群组id
is_detail: 是否详细帮助
handle: 回调方法
返回:
dict[str, list[Item]]: 分类插件数据
@ -41,9 +46,10 @@ async def classify_plugin(
sort_data = await sort_type()
classify: dict[str, list] = {}
group = await GroupConsole.get_or_none(group_id=group_id) if group_id else None
bot = await BotConsole.get_or_none(bot_id=session.self_id)
for menu, value in sort_data.items():
for plugin in value:
if not classify.get(menu):
classify[menu] = []
classify[menu].append(handle(plugin, group, is_detail))
classify[menu].append(handle(bot, plugin, group, is_detail))
return classify

View File

@ -2,9 +2,11 @@ import os
import random
from nonebot_plugin_htmlrender import template_to_pic
from nonebot_plugin_uninfo import Uninfo
from pydantic import BaseModel
from zhenxun.configs.path_config import TEMPLATE_PATH
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.enum import BlockType
@ -48,11 +50,12 @@ ICON2STR = {
def __handle_item(
plugin: PluginInfo, group: GroupConsole | None, is_detail: bool
bot: BotConsole, plugin: PluginInfo, group: GroupConsole | None, is_detail: bool
) -> Item:
"""构造Item
参数:
bot: BotConsole
plugin: PluginInfo
group: 群组
is_detail: 是否详细
@ -73,10 +76,13 @@ def __handle_item(
]:
sta = 2
if group:
if f"{plugin.module}:super," in group.block_plugin:
if f"{plugin.module}," in group.superuser_block_plugin:
sta = 2
if f"{plugin.module}," in group.block_plugin:
sta = 1
if bot:
if f"{plugin.module}," in bot.block_plugins:
sta = 2
return Item(plugin_name=plugin.name, sta=sta)
@ -119,14 +125,17 @@ def build_plugin_data(classify: dict[str, list[Item]]) -> list[dict[str, str]]:
return plugin_list
async def build_html_image(group_id: str | None, is_detail: bool) -> bytes:
async def build_html_image(
session: Uninfo, group_id: str | None, is_detail: bool
) -> bytes:
"""构造HTML帮助图片
参数:
session: Uninfo
group_id: 群号
is_detail: 是否详细帮助
"""
classify = await classify_plugin(group_id, is_detail, __handle_item)
classify = await classify_plugin(session, group_id, is_detail, __handle_item)
plugin_list = build_plugin_data(classify)
return await template_to_pic(
template_path=str((TEMPLATE_PATH / "menu").absolute()),

View File

@ -6,6 +6,7 @@ from pydantic import BaseModel
from zhenxun.configs.config import BotConfig
from zhenxun.configs.path_config import TEMPLATE_PATH
from zhenxun.configs.utils import PluginExtraData
from zhenxun.models.bot_console import BotConsole
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.utils.enum import BlockType
@ -21,12 +22,19 @@ class Item(BaseModel):
"""插件命令"""
def __handle_item(plugin: PluginInfo, group: GroupConsole | None, is_detail: bool):
def __handle_item(
bot: BotConsole | None,
plugin: PluginInfo,
group: GroupConsole | None,
is_detail: bool,
):
"""构造Item
参数:
bot: BotConsole
plugin: PluginInfo
group: 群组
is_detail: 是否为详细
返回:
Item: Item
@ -40,6 +48,8 @@ def __handle_item(plugin: PluginInfo, group: GroupConsole | None, is_detail: boo
plugin.name = f"{plugin.name}(不可用)"
elif group and f"{plugin.module}," in group.block_plugin:
plugin.name = f"{plugin.name}(不可用)"
elif bot and f"{plugin.module}," in bot.block_plugins:
plugin.name = f"{plugin.name}(不可用)"
commands = []
nb_plugin = nonebot.get_plugin_by_module_name(plugin.module_path)
if is_detail and nb_plugin and nb_plugin.metadata and nb_plugin.metadata.extra:
@ -142,7 +152,7 @@ async def build_zhenxun_image(
group_id: 群号
is_detail: 是否详细帮助
"""
classify = await classify_plugin(group_id, is_detail, __handle_item)
classify = await classify_plugin(session, group_id, is_detail, __handle_item)
plugin_list = build_plugin_data(classify)
platform = PlatformUtils.get_platform(session)
bot_id = BotConfig.get_qbot_uid(session.self_id) or session.self_id

View File

@ -0,0 +1,252 @@
from datetime import datetime
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args, Arparma, Match, Subcommand, on_alconna
from nonebot_plugin_apscheduler import scheduler
from nonebot_plugin_uninfo import Uninfo
from nonebot_plugin_waiter import prompt_until
from zhenxun.configs.utils import PluginExtraData, RegisterConfig
from zhenxun.services.log import logger
from zhenxun.utils.depends import UserName
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.utils import is_number
from .data_source import BankManager
__plugin_meta__ = PluginMetadata(
name="小真寻银行",
description="""
小真寻银行提供高品质的存款当好感度等级达到指初识时小真寻会偷偷的帮助你哦
存款额度与好感度有关每日存款次数有限制
基础存款提供基础利息
每日存款提供高额利息
""".strip(),
usage="""
指令
存款 [金额]
取款 [金额]
银行信息
我的银行信息
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
menu_type="群内小游戏",
configs=[
RegisterConfig(
key="sign_max_deposit",
value=100,
help="好感度换算存款金额比例当值是100时最大存款金额=好感度*100存款的最低金额是100强制",
default_value=100,
type=int,
),
RegisterConfig(
key="max_daily_deposit_count",
value=3,
help="每日最大存款次数",
default_value=3,
type=int,
),
RegisterConfig(
key="rate_range",
value=[0.0005, 0.001],
help="小时利率范围",
default_value=[0.0005, 0.001],
type=list[float],
),
RegisterConfig(
key="impression_event",
value=25,
help="到达指定好感度时随机提高或降低利率",
default_value=25,
type=int,
),
RegisterConfig(
key="impression_event_range",
value=[0.00001, 0.0003],
help="到达指定好感度时随机提高或降低利率",
default_value=[0.00001, 0.0003],
type=list[float],
),
RegisterConfig(
key="impression_event_prop",
value=0.3,
help="到达指定好感度时随机提高或降低利率触发概率",
default_value=0.3,
type=float,
),
],
).to_dict(),
)
_matcher = on_alconna(
Alconna(
"mahiro-bank",
Subcommand("deposit", Args["amount?", int]),
Subcommand("withdraw", Args["amount?", int]),
Subcommand("user-info"),
Subcommand("bank-info"),
# Subcommand("loan", Args["amount?", int]),
# Subcommand("repayment", Args["amount?", int]),
),
priority=5,
block=True,
)
_matcher.shortcut(
r"存款\s*(?P<amount>\d+)?",
command="mahiro-bank",
arguments=["deposit", "{amount}"],
prefix=True,
)
_matcher.shortcut(
r"取款\s*(?P<withdraw>\d+)?",
command="mahiro-bank",
arguments=["withdraw", "{withdraw}"],
prefix=True,
)
_matcher.shortcut(
r"我的银行信息",
command="mahiro-bank",
arguments=["user-info"],
prefix=True,
)
_matcher.shortcut(
r"银行信息",
command="mahiro-bank",
arguments=["bank-info"],
prefix=True,
)
async def get_amount(handle_type: str) -> int:
amount_num = await prompt_until(
f"请输入{handle_type}金币数量",
lambda msg: is_number(msg.extract_plain_text()),
timeout=60,
retry=3,
retry_prompt="输入错误,请输入数字。剩余次数:{count}",
)
if not amount_num:
await MessageUtils.build_message(
"输入超时了哦,小真寻柜员以取消本次存款操作..."
).finish()
return int(amount_num.extract_plain_text())
@_matcher.assign("deposit")
async def _(session: Uninfo, arparma: Arparma, amount: Match[int]):
amount_num = amount.result if amount.available else await get_amount("存款")
if result := await BankManager.deposit_check(session.user.id, amount_num):
await MessageUtils.build_message(result).finish(reply_to=True)
_, rate, event_rate = await BankManager.deposit(session.user.id, amount_num)
result = (
f"存款成功!\n此次存款金额为: {amount.result}\n"
f"当前小时利率为: {rate * 100:.2f}%"
)
effective_hour = int(24 - datetime.now().hour)
if event_rate:
result += f"(小真寻偷偷将小时利率给你增加了 {event_rate:.2f}% 哦)"
result += (
f"\n预计总收益为: {int(amount.result * rate * effective_hour) or 1} 金币。"
)
logger.info(
f"小真寻银行存款:{amount_num},当前存款数:{amount.result},存款小时利率: {rate}",
arparma.header_result,
session=session,
)
await MessageUtils.build_message(result).finish(at_sender=True)
@_matcher.assign("withdraw")
async def _(session: Uninfo, arparma: Arparma, amount: Match[int]):
amount_num = amount.result if amount.available else await get_amount("取款")
if result := await BankManager.withdraw_check(session.user.id, amount_num):
await MessageUtils.build_message(result).finish(reply_to=True)
try:
user = await BankManager.withdraw(session.user.id, amount_num)
result = (
f"取款成功!\n当前取款金额为: {amount_num}\n当前存款金额为: {user.amount}"
)
logger.info(
f"小真寻银行取款:{amount_num}, 当前存款数:{user.amount},"
f" 存款小时利率:{user.rate}",
arparma.header_result,
session=session,
)
await MessageUtils.build_message(result).finish(reply_to=True)
except ValueError:
await MessageUtils.build_message("你的银行内的存款数量不足哦...").finish(
reply_to=True
)
@_matcher.assign("user-info")
async def _(session: Uninfo, arparma: Arparma, uname: str = UserName()):
result = await BankManager.get_user_info(session, uname)
await MessageUtils.build_message(result).send()
logger.info("查看银行个人信息", arparma.header_result, session=session)
@_matcher.assign("bank-info")
async def _(session: Uninfo, arparma: Arparma):
result = await BankManager.get_bank_info()
await MessageUtils.build_message(result).send()
logger.info("查看银行信息", arparma.header_result, session=session)
# @_matcher.assign("loan")
# async def _(session: Uninfo, arparma: Arparma, amount: Match[int]):
# amount_num = amount.result if amount.available else await get_amount("贷款")
# if amount_num <= 0:
# await MessageUtils.build_message("贷款数量必须大于 0 啊笨蛋!").finish()
# try:
# user, event_rate = await BankManager.loan(session.user.id, amount_num)
# result = (
# f"贷款成功!\n当前贷金额为: {user.loan_amount}"
# f"\n当前利率为: {user.loan_rate * 100}%"
# )
# if event_rate:
# result += f"(小真寻偷偷将利率给你降低了 {event_rate}% 哦)"
# result += f"\n预计每小时利息为:{int(user.loan_amount * user.loan_rate)}金币。"
# logger.info(
# f"小真寻银行贷款: {amount_num}, 当前贷款数: {user.loan_amount}, "
# f"贷款利率: {user.loan_rate}",
# arparma.header_result,
# session=session,
# )
# except ValueError:
# await MessageUtils.build_message(
# "贷款数量超过最大限制,请签到提升好感度获取更多额度吧..."
# ).finish(reply_to=True)
# @_matcher.assign("repayment")
# async def _(session: Uninfo, arparma: Arparma, amount: Match[int]):
# amount_num = amount.result if amount.available else await get_amount("还款")
# if amount_num <= 0:
# await MessageUtils.build_message("还款数量必须大于 0 啊笨蛋!").finish()
# user = await BankManager.repayment(session.user.id, amount_num)
# result = (f"还款成功!\n当前还款金额为: {amount_num}\n"
# f"当前贷款金额为: {user.loan_amount}")
# logger.info(
# f"小真寻银行还款:{amount_num},当前贷款数:{user.amount}, 贷款利率:{user.rate}",
# arparma.header_result,
# session=session,
# )
# await MessageUtils.build_message(result).finish(at_sender=True)
@scheduler.scheduled_job(
"cron",
hour=0,
minute=0,
)
async def _():
await BankManager.settlement()
logger.info("小真寻银行结算", "定时任务")

View File

@ -0,0 +1,450 @@
import asyncio
from datetime import datetime, timedelta
import random
from nonebot_plugin_htmlrender import template_to_pic
from nonebot_plugin_uninfo import Uninfo
from tortoise.expressions import RawSQL
from tortoise.functions import Count, Sum
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import TEMPLATE_PATH
from zhenxun.models.mahiro_bank import MahiroBank
from zhenxun.models.mahiro_bank_log import MahiroBankLog
from zhenxun.models.sign_user import SignUser
from zhenxun.models.user_console import UserConsole
from zhenxun.utils.enum import BankHandleType, GoldHandle
from zhenxun.utils.platform import PlatformUtils
base_config = Config.get("mahiro_bank")
class BankManager:
@classmethod
async def random_event(cls, impression: float):
"""随机事件"""
impression_event = base_config.get("impression_event")
impression_event_prop = base_config.get("impression_event_prop")
impression_event_range = base_config.get("impression_event_range")
if impression >= impression_event and random.random() < impression_event_prop:
"""触发好感度事件"""
return random.uniform(impression_event_range[0], impression_event_range[1])
return None
@classmethod
async def deposit_check(cls, user_id: str, amount: int) -> str | None:
"""检查存款是否合法
参数:
user_id: 用户id
amount: 存款金额
返回:
str | None: 存款信息
"""
if amount <= 0:
return "存款数量必须大于 0 啊笨蛋!"
user, sign_user, bank_user = await asyncio.gather(
*[
UserConsole.get_user(user_id),
SignUser.get_user(user_id),
cls.get_user(user_id),
]
)
sign_max_deposit: int = base_config.get("sign_max_deposit")
max_deposit = max(int(float(sign_user.impression) * sign_max_deposit), 100)
if user.gold < amount:
return f"金币数量不足,当前你的金币为:{user.gold}."
if bank_user.amount + amount > max_deposit:
return (
f"存款超过上限,存款上限为:{max_deposit}"
f"当前你的还可以存款金额:{max_deposit - bank_user.amount}"
)
max_daily_deposit_count: int = base_config.get("max_daily_deposit_count")
today_deposit_count = len(await cls.get_user_deposit(user_id))
if today_deposit_count >= max_daily_deposit_count:
return f"存款次数超过上限,每日存款次数上限为:{max_daily_deposit_count}"
return None
@classmethod
async def withdraw_check(cls, user_id: str, amount: int) -> str | None:
"""检查取款是否合法
参数:
user_id: 用户id
amount: 取款金额
返回:
str | None: 取款信息
"""
if amount <= 0:
return "取款数量必须大于 0 啊笨蛋!"
user = await cls.get_user(user_id)
data_list = await cls.get_user_deposit(user_id)
lock_amount = sum(data.amount for data in data_list)
if user.amount - lock_amount < amount:
return (
"取款金额不足,当前你的存款为:"
f"{user.amount}{lock_amount}已被锁定)!"
)
return None
@classmethod
async def get_user_deposit(
cls, user_id: str, is_completed: bool = False
) -> list[MahiroBankLog]:
"""获取用户今日存款次数
参数:
user_id: 用户id
返回:
list[MahiroBankLog]: 存款列表
"""
return await MahiroBankLog.filter(
user_id=user_id,
handle_type=BankHandleType.DEPOSIT,
is_completed=is_completed,
)
@classmethod
async def get_user(cls, user_id: str) -> MahiroBank:
"""查询余额
参数:
user_id: 用户id
返回:
MahiroBank
"""
user, _ = await MahiroBank.get_or_create(user_id=user_id)
return user
@classmethod
async def get_user_data(
cls,
user_id: str,
data_type: BankHandleType,
is_completed: bool = False,
count: int = 5,
) -> list[MahiroBankLog]:
return (
await MahiroBankLog.filter(
user_id=user_id, handle_type=data_type, is_completed=is_completed
)
.order_by("-id")
.limit(count)
.all()
)
@classmethod
async def complete_projected_revenue(cls, user_id: str) -> int:
"""预计收益
参数:
user_id: 用户id
返回:
int: 预计收益金额
"""
deposit_list = await cls.get_user_deposit(user_id)
if not deposit_list:
return 0
return int(
sum(
deposit.rate * deposit.amount * deposit.effective_hour
for deposit in deposit_list
)
)
@classmethod
async def get_user_info(cls, session: Uninfo, uname: str) -> bytes:
"""获取用户数据
参数:
session: Uninfo
uname: 用户id
返回:
bytes: 图片数据
"""
user_id = session.user.id
user = await cls.get_user(user_id=user_id)
(
rank,
deposit_count,
user_today_deposit,
projected_revenue,
sum_data,
) = await asyncio.gather(
*[
MahiroBank.filter(amount__gt=user.amount).count(),
MahiroBankLog.filter(user_id=user_id).count(),
cls.get_user_deposit(user_id),
cls.complete_projected_revenue(user_id),
MahiroBankLog.filter(
user_id=user_id, handle_type=BankHandleType.INTEREST
)
.annotate(sum=Sum("amount"))
.values("sum"),
]
)
now = datetime.now()
end_time = (
now
+ timedelta(days=1)
- timedelta(hours=now.hour, minutes=now.minute, seconds=now.second)
)
today_deposit_amount = sum(deposit.amount for deposit in user_today_deposit)
deposit_list = [
{
"id": deposit.id,
"date": now.date(),
"start_time": str(deposit.create_time).split(".")[0],
"end_time": end_time.replace(microsecond=0),
"amount": deposit.amount,
"rate": f"{deposit.rate * 100:.2f}",
"projected_revenue": int(
deposit.amount * deposit.rate * deposit.effective_hour
)
or 1,
}
for deposit in user_today_deposit
]
platform = PlatformUtils.get_platform(session)
data = {
"name": uname,
"rank": rank + 1,
"avatar_url": PlatformUtils.get_user_avatar_url(
user_id, platform, session.self_id
),
"amount": user.amount,
"deposit_count": deposit_count,
"today_deposit_count": len(user_today_deposit),
"cumulative_gain": sum_data[0]["sum"] or 0,
"projected_revenue": projected_revenue,
"today_deposit_amount": today_deposit_amount,
"deposit_list": deposit_list,
"create_time": now.replace(microsecond=0),
}
return await template_to_pic(
template_path=str((TEMPLATE_PATH / "mahiro_bank").absolute()),
template_name="user.html",
templates={"data": data},
pages={
"viewport": {"width": 386, "height": 700},
"base_url": f"file://{TEMPLATE_PATH}",
},
wait=2,
)
@classmethod
async def get_bank_info(cls) -> bytes:
now = datetime.now()
now_start = now - timedelta(
hours=now.hour, minutes=now.minute, seconds=now.second
)
(
bank_data,
today_count,
interest_amount,
active_user_count,
date_data,
) = await asyncio.gather(
*[
MahiroBank.annotate(
amount_sum=Sum("amount"), user_count=Count("id")
).values("amount_sum", "user_count"),
MahiroBankLog.filter(
create_time__gt=now_start, handle_type=BankHandleType.DEPOSIT
).count(),
MahiroBankLog.filter(handle_type=BankHandleType.INTEREST)
.annotate(amount_sum=Sum("amount"))
.values("amount_sum"),
MahiroBankLog.filter(
create_time__gte=now_start - timedelta(days=7),
handle_type=BankHandleType.DEPOSIT,
)
.annotate(count=Count("user_id", distinct=True))
.values("count"),
MahiroBankLog.filter(
create_time__gte=now_start - timedelta(days=7),
handle_type=BankHandleType.DEPOSIT,
)
.annotate(date=RawSQL("DATE(create_time)"), total_amount=Sum("amount"))
.group_by("date")
.values("date", "total_amount"),
]
)
date2cnt = {str(date["date"]): date["total_amount"] for date in date_data}
date = now.date()
e_date, e_amount = [], []
for _ in range(7):
if str(date) in date2cnt:
e_amount.append(date2cnt[str(date)])
else:
e_amount.append(0)
e_date.append(str(date)[5:])
date -= timedelta(days=1)
e_date.reverse()
e_amount.reverse()
date = 1
lasted_log = await MahiroBankLog.annotate().order_by("create_time").first()
if lasted_log:
date = now.date() - lasted_log.create_time.date()
date = (date.days or 1) + 1
data = {
"amount_sum": bank_data[0]["amount_sum"],
"user_count": bank_data[0]["user_count"],
"today_count": today_count,
"day_amount": int(bank_data[0]["amount_sum"] / date),
"interest_amount": interest_amount[0]["amount_sum"] or 0,
"active_user_count": active_user_count[0]["count"] or 0,
"e_data": e_date,
"e_amount": e_amount,
"create_time": now.replace(microsecond=0),
}
return await template_to_pic(
template_path=str((TEMPLATE_PATH / "mahiro_bank").absolute()),
template_name="bank.html",
templates={"data": data},
pages={
"viewport": {"width": 450, "height": 750},
"base_url": f"file://{TEMPLATE_PATH}",
},
wait=2,
)
@classmethod
async def deposit(
cls, user_id: str, amount: int
) -> tuple[MahiroBank, float, float | None]:
"""存款
参数:
user_id: 用户id
amount: 存款数量
返回:
tuple[MahiroBank, float, float]: MahiroBank利率增加的利率
"""
rate_range = base_config.get("rate_range")
rate = random.uniform(rate_range[0], rate_range[1])
sign_user = await SignUser.get_user(user_id)
random_add_rate = await cls.random_event(float(sign_user.impression))
if random_add_rate:
rate += random_add_rate
await UserConsole.reduce_gold(user_id, amount, GoldHandle.PLUGIN, "bank")
return await MahiroBank.deposit(user_id, amount, rate), rate, random_add_rate
@classmethod
async def withdraw(cls, user_id: str, amount: int) -> MahiroBank:
"""取款
参数:
user_id: 用户id
amount: 取款数量
返回:
MahiroBank
"""
await UserConsole.add_gold(user_id, amount, "bank")
return await MahiroBank.withdraw(user_id, amount)
@classmethod
async def loan(cls, user_id: str, amount: int) -> tuple[MahiroBank, float | None]:
"""贷款
参数:
user_id: 用户id
amount: 贷款数量
返回:
tuple[MahiroBank, float]: MahiroBank贷款利率
"""
rate_range = base_config.get("rate_range")
rate = random.uniform(rate_range[0], rate_range[1])
sign_user = await SignUser.get_user(user_id)
user, _ = await MahiroBank.get_or_create(user_id=user_id)
if user.loan_amount + amount > sign_user.impression * 150:
raise ValueError("贷款数量超过最大限制,请签到提升好感度获取更多额度吧...")
random_reduce_rate = await cls.random_event(float(sign_user.impression))
if random_reduce_rate:
rate -= random_reduce_rate
await UserConsole.add_gold(user_id, amount, "bank")
return await MahiroBank.loan(user_id, amount, rate), random_reduce_rate
@classmethod
async def repayment(cls, user_id: str, amount: int) -> MahiroBank:
"""还款
参数:
user_id: 用户id
amount: 还款数量
返回:
MahiroBank
"""
await UserConsole.reduce_gold(user_id, amount, GoldHandle.PLUGIN, "bank")
return await MahiroBank.repayment(user_id, amount)
@classmethod
async def settlement(cls):
"""结算每日利率"""
bank_user_list = await MahiroBank.filter(amount__gt=0).all()
log_list = await MahiroBankLog.filter(
is_completed=False, handle_type=BankHandleType.DEPOSIT
).all()
user_list = await UserConsole.filter(
user_id__in=[user.user_id for user in bank_user_list]
).all()
user_data = {user.user_id: user for user in user_list}
bank_data: dict[str, list[MahiroBankLog]] = {}
for log in log_list:
if log.user_id not in bank_data:
bank_data[log.user_id] = []
bank_data[log.user_id].append(log)
log_create_list = []
log_update_list = []
# 计算每日默认金币
for bank_user in bank_user_list:
if user := user_data.get(bank_user.user_id):
amount = bank_user.amount
if logs := bank_data.get(bank_user.user_id):
amount -= sum(log.amount for log in logs)
if not amount:
continue
# 计算每日默认金币
gold = int(amount * bank_user.rate)
user.gold += gold
log_create_list.append(
MahiroBankLog(
user_id=bank_user.user_id,
amount=gold,
rate=bank_user.rate,
handle_type=BankHandleType.INTEREST,
is_completed=True,
)
)
# 计算每日存款金币
for user_id, logs in bank_data.items():
if user := user_data.get(user_id):
for log in logs:
gold = int(log.amount * log.rate * log.effective_hour) or 1
user.gold += gold
log.is_completed = True
log_update_list.append(log)
log_create_list.append(
MahiroBankLog(
user_id=user_id,
amount=gold,
rate=log.rate,
handle_type=BankHandleType.INTEREST,
is_completed=True,
)
)
if log_create_list:
await MahiroBankLog.bulk_create(log_create_list, 10)
if log_update_list:
await MahiroBankLog.bulk_update(log_update_list, ["is_completed"], 10)
await UserConsole.bulk_update(user_list, ["gold"], 10)

View File

@ -141,7 +141,7 @@ async def _(
group_id = str(event.group_id)
if event.sub_type == "kick_me":
"""踢出Bot"""
await GroupManager.kick_bot(bot, user_id, group_id)
await GroupManager.kick_bot(bot, group_id, str(event.operator_id))
elif event.sub_type in ["leave", "kick"]:
result = await GroupManager.run_user(
bot, user_id, group_id, str(event.operator_id), event.sub_type

View File

@ -110,7 +110,7 @@ async def enable_plugin(
)
await BotConsole.enable_plugin(None, plugin.module)
await MessageUtils.build_message(
f"禁用全部 bot 的插件: {plugin_name.result}"
f"开启全部 bot 的插件: {plugin_name.result}"
).finish()
elif bot_id.available:
logger.info(

View File

@ -92,7 +92,7 @@ async def enable_task(
)
await BotConsole.enable_task(None, task.module)
await MessageUtils.build_message(
f"禁用全部 bot 的被动: {task_name.available}"
f"开启全部 bot 的被动: {task_name.available}"
).finish()
elif bot_id.available:
logger.info(

View File

@ -1,32 +1,77 @@
from typing import Annotated
from nonebot import on_command
from nonebot.adapters import Bot
from nonebot.params import Command
from arclet.alconna import AllParam
from nepattern import UnionPattern
from nonebot.adapters import Bot, Event
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.rule import to_me
from nonebot_plugin_alconna import Text as alcText
from nonebot_plugin_alconna import UniMsg
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import (
Alconna,
Args,
on_alconna,
)
from nonebot_plugin_alconna.uniseg.segment import (
At,
AtAll,
Audio,
Button,
Emoji,
File,
Hyper,
Image,
Keyboard,
Reference,
Reply,
Text,
Video,
Voice,
)
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData, RegisterConfig, Task
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
from zhenxun.utils.message import MessageUtils
from ._data_source import BroadcastManage
from .broadcast_manager import BroadcastManager
from .message_processor import (
_extract_broadcast_content,
get_broadcast_target_groups,
send_broadcast_and_notify,
)
BROADCAST_SEND_DELAY_RANGE = (1, 3)
__plugin_meta__ = PluginMetadata(
name="广播",
description="昭告天下!",
usage="""
广播 [消息] [图片]
示例广播 你们好
广播 [消息内容]
- 直接发送消息到除当前群组外的所有群组
- 支持文本图片@表情视频等多种消息类型
- 示例广播 你们好
- 示例广播 [图片] 新活动开始啦
广播 + 引用消息
- 将引用的消息作为广播内容发送
- 支持引用普通消息或合并转发消息
- 示例(引用一条消息) 广播
广播撤回
- 撤回最近一次由您触发的广播消息
- 仅能撤回短时间内的消息
- 示例广播撤回
特性
- 在群组中使用广播时不会将消息发送到当前群组
- 在私聊中使用广播时会发送到所有群组
别名
- bc (广播的简写)
- recall (广播撤回的别名)
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
version="1.2",
plugin_type=PluginType.SUPERUSER,
configs=[
RegisterConfig(
@ -42,26 +87,106 @@ __plugin_meta__ = PluginMetadata(
).to_dict(),
)
_matcher = on_command(
"广播", priority=1, permission=SUPERUSER, block=True, rule=to_me()
AnySeg = (
UnionPattern(
[
Text,
Image,
At,
AtAll,
Audio,
Video,
File,
Emoji,
Reply,
Reference,
Hyper,
Button,
Keyboard,
Voice,
]
)
@ "AnySeg"
)
_matcher = on_alconna(
Alconna(
"广播",
Args["content?", AllParam],
),
aliases={"bc"},
priority=1,
permission=SUPERUSER,
block=True,
rule=to_me(),
use_origin=False,
)
_recall_matcher = on_alconna(
Alconna("广播撤回"),
aliases={"recall"},
priority=1,
permission=SUPERUSER,
block=True,
rule=to_me(),
)
@_matcher.handle()
async def _(
async def handle_broadcast(
bot: Bot,
event: Event,
session: EventSession,
message: UniMsg,
command: Annotated[tuple[str, ...], Command()],
arp: alc.Arparma,
):
for msg in message:
if isinstance(msg, alcText) and msg.text.strip().startswith(command[0]):
msg.text = msg.text.replace(command[0], "", 1).strip()
break
await MessageUtils.build_message("正在发送..请等一下哦!").send()
count, error_count = await BroadcastManage.send(bot, message, session)
result = f"成功广播 {count} 个群组"
broadcast_content_msg = await _extract_broadcast_content(bot, event, arp, session)
if not broadcast_content_msg:
return
target_groups, enabled_groups = await get_broadcast_target_groups(bot, session)
if not target_groups or not enabled_groups:
return
try:
await send_broadcast_and_notify(
bot, event, broadcast_content_msg, enabled_groups, target_groups, session
)
except Exception as e:
error_msg = "发送广播失败"
BroadcastManager.log_error(error_msg, e, session)
await MessageUtils.build_message(f"{error_msg}").send(reply_to=True)
@_recall_matcher.handle()
async def handle_broadcast_recall(
bot: Bot,
event: Event,
session: EventSession,
):
"""处理广播撤回命令"""
await MessageUtils.build_message("正在尝试撤回最近一次广播...").send()
try:
success_count, error_count = await BroadcastManager.recall_last_broadcast(
bot, session
)
user_id = str(event.get_user_id())
if success_count == 0 and error_count == 0:
await bot.send_private_msg(
user_id=user_id,
message="没有找到最近的广播消息记录,可能已经撤回或超过可撤回时间。",
)
else:
result = f"广播撤回完成!\n成功撤回 {success_count} 条消息"
if error_count:
result += f"\n广播失败 {error_count} 个群组"
await MessageUtils.build_message(f"发送广播完成!\n{result}").send(reply_to=True)
logger.info(f"发送广播信息: {message}", "广播", session=session)
result += f"\n撤回失败 {error_count} 条消息 (可能已过期或无权限)"
await bot.send_private_msg(user_id=user_id, message=result)
BroadcastManager.log_info(
f"广播撤回完成: 成功 {success_count}, 失败 {error_count}", session
)
except Exception as e:
error_msg = "撤回广播消息失败"
BroadcastManager.log_error(error_msg, e, session)
user_id = str(event.get_user_id())
await bot.send_private_msg(user_id=user_id, message=f"{error_msg}")

View File

@ -1,72 +0,0 @@
import asyncio
import random
from nonebot.adapters import Bot
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import Image, UniMsg
from nonebot_plugin_session import EventSession
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.message import MessageUtils
from zhenxun.utils.platform import PlatformUtils
class BroadcastManage:
@classmethod
async def send(
cls, bot: Bot, message: UniMsg, session: EventSession
) -> tuple[int, int]:
"""发送广播消息
参数:
bot: Bot
message: 消息内容
session: Session
返回:
tuple[int, int]: 发送成功的群组数量, 发送失败的群组数量
"""
message_list = []
for msg in message:
if isinstance(msg, alc.Image) and msg.url:
message_list.append(Image(url=msg.url))
elif isinstance(msg, alc.Text):
message_list.append(msg.text)
group_list, _ = await PlatformUtils.get_group_list(bot)
if group_list:
error_count = 0
for group in group_list:
try:
if not await CommonUtils.task_is_block(
bot,
"broadcast", # group.channel_id
group.group_id,
):
target = PlatformUtils.get_target(
group_id=group.group_id, channel_id=group.channel_id
)
if target:
await MessageUtils.build_message(message_list).send(
target, bot
)
logger.debug(
"发送成功",
"广播",
session=session,
target=f"{group.group_id}:{group.channel_id}",
)
await asyncio.sleep(random.randint(1, 3))
else:
logger.warning("target为空", "广播", session=session)
except Exception as e:
error_count += 1
logger.error(
"发送失败",
"广播",
session=session,
target=f"{group.group_id}:{group.channel_id}",
e=e,
)
return len(group_list) - error_count, error_count
return 0, 0

View File

@ -0,0 +1,490 @@
import asyncio
import random
import traceback
from typing import ClassVar
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import Bot as V11Bot
from nonebot.exception import ActionFailed
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_alconna.uniseg import Receipt, Reference
from nonebot_plugin_session import EventSession
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.platform import PlatformUtils
from .models import BroadcastDetailResult, BroadcastResult
from .utils import custom_nodes_to_v11_nodes, uni_message_to_v11_list_of_dicts
class BroadcastManager:
"""广播管理器"""
_last_broadcast_msg_ids: ClassVar[dict[str, int]] = {}
@staticmethod
def _get_session_info(session: EventSession | None) -> str:
"""获取会话信息字符串"""
if not session:
return ""
try:
platform = getattr(session, "platform", "unknown")
session_id = str(session)
return f"[{platform}:{session_id}]"
except Exception:
return "[session-info-error]"
@staticmethod
def log_error(
message: str, error: Exception, session: EventSession | None = None, **kwargs
):
"""记录错误日志"""
session_info = BroadcastManager._get_session_info(session)
error_type = type(error).__name__
stack_trace = traceback.format_exc()
error_details = f"\n类型: {error_type}\n信息: {error!s}\n堆栈: {stack_trace}"
logger.error(
f"{session_info} {message}{error_details}", "广播", e=error, **kwargs
)
@staticmethod
def log_warning(message: str, session: EventSession | None = None, **kwargs):
"""记录警告级别日志"""
session_info = BroadcastManager._get_session_info(session)
logger.warning(f"{session_info} {message}", "广播", **kwargs)
@staticmethod
def log_info(message: str, session: EventSession | None = None, **kwargs):
"""记录信息级别日志"""
session_info = BroadcastManager._get_session_info(session)
logger.info(f"{session_info} {message}", "广播", **kwargs)
@classmethod
def get_last_broadcast_msg_ids(cls) -> dict[str, int]:
"""获取最近广播消息ID"""
return cls._last_broadcast_msg_ids.copy()
@classmethod
def clear_last_broadcast_msg_ids(cls) -> None:
"""清空消息ID记录"""
cls._last_broadcast_msg_ids.clear()
@classmethod
async def get_all_groups(cls, bot: Bot) -> tuple[list[GroupConsole], str]:
"""获取群组列表"""
return await PlatformUtils.get_group_list(bot)
@classmethod
async def send(
cls, bot: Bot, message: UniMessage, session: EventSession
) -> BroadcastResult:
"""发送广播到所有群组"""
logger.debug(
f"开始广播(send - 广播到所有群组)Bot ID: {bot.self_id}",
"广播",
session=session,
)
logger.debug("清空上一次的广播消息ID记录", "广播", session=session)
cls.clear_last_broadcast_msg_ids()
all_groups, _ = await cls.get_all_groups(bot)
return await cls.send_to_specific_groups(bot, message, all_groups, session)
@classmethod
async def send_to_specific_groups(
cls,
bot: Bot,
message: UniMessage,
target_groups: list[GroupConsole],
session_info: EventSession | str | None = None,
) -> BroadcastResult:
"""发送广播到指定群组"""
log_session = session_info or bot.self_id
logger.debug(
f"开始广播,目标 {len(target_groups)} 个群组Bot ID: {bot.self_id}",
"广播",
session=log_session,
)
if not target_groups:
logger.debug("目标群组列表为空,广播结束", "广播", session=log_session)
return 0, 0
platform = PlatformUtils.get_platform(bot)
is_forward_broadcast = any(
isinstance(seg, Reference) and getattr(seg, "nodes", None)
for seg in message
)
if platform == "qq" and isinstance(bot, V11Bot) and is_forward_broadcast:
if (
len(message) == 1
and isinstance(message[0], Reference)
and getattr(message[0], "nodes", None)
):
nodes_list = getattr(message[0], "nodes", [])
v11_nodes = custom_nodes_to_v11_nodes(nodes_list)
node_count = len(v11_nodes)
logger.debug(
f"从 UniMessage<Reference> 构造转发节点数: {node_count}",
"广播",
session=log_session,
)
else:
logger.warning(
"广播消息包含合并转发段和其他段,将尝试打平成一个节点发送",
"广播",
session=log_session,
)
v11_content_list = uni_message_to_v11_list_of_dicts(message)
v11_nodes = (
[
{
"type": "node",
"data": {
"user_id": bot.self_id,
"nickname": "广播",
"content": v11_content_list,
},
}
]
if v11_content_list
else []
)
if not v11_nodes:
logger.warning(
"构造出的 V11 合并转发节点为空,无法发送",
"广播",
session=log_session,
)
return 0, len(target_groups)
success_count, error_count, skip_count = await cls._broadcast_forward(
bot, log_session, target_groups, v11_nodes
)
else:
if is_forward_broadcast:
logger.warning(
f"合并转发消息在适配器 ({platform}) 不支持,将作为普通消息发送",
"广播",
session=log_session,
)
success_count, error_count, skip_count = await cls._broadcast_normal(
bot, log_session, target_groups, message
)
total = len(target_groups)
stats = f"成功: {success_count}, 失败: {error_count}"
stats += f", 跳过: {skip_count}, 总计: {total}"
logger.debug(
f"广播统计 - {stats}",
"广播",
session=log_session,
)
msg_ids = cls.get_last_broadcast_msg_ids()
if msg_ids:
id_list_str = ", ".join([f"{k}:{v}" for k, v in msg_ids.items()])
logger.debug(
f"广播结束,记录了 {len(msg_ids)} 条消息ID: {id_list_str}",
"广播",
session=log_session,
)
else:
logger.warning(
"广播结束但没有记录任何消息ID",
"广播",
session=log_session,
)
return success_count, error_count
@classmethod
async def _extract_message_id_from_result(
cls,
result: dict | Receipt,
group_key: str,
session_info: EventSession | str,
msg_type: str = "普通",
) -> None:
"""提取消息ID并记录"""
if isinstance(result, dict) and "message_id" in result:
msg_id = result["message_id"]
try:
msg_id_int = int(msg_id)
cls._last_broadcast_msg_ids[group_key] = msg_id_int
logger.debug(
f"记录群 {group_key}{msg_type}消息ID: {msg_id_int}",
"广播",
session=session_info,
)
except (ValueError, TypeError):
logger.warning(
f"{msg_type}结果中的 message_id 不是有效整数: {msg_id}",
"广播",
session=session_info,
)
elif isinstance(result, Receipt) and result.msg_ids:
try:
first_id_info = result.msg_ids[0]
msg_id = None
if isinstance(first_id_info, dict) and "message_id" in first_id_info:
msg_id = first_id_info["message_id"]
logger.debug(
f"从 Receipt.msg_ids[0] 提取到 ID: {msg_id}",
"广播",
session=session_info,
)
elif isinstance(first_id_info, int | str):
msg_id = first_id_info
logger.debug(
f"从 Receipt.msg_ids[0] 提取到原始ID: {msg_id}",
"广播",
session=session_info,
)
if msg_id is not None:
try:
msg_id_int = int(msg_id)
cls._last_broadcast_msg_ids[group_key] = msg_id_int
logger.debug(
f"记录群 {group_key} 的消息ID: {msg_id_int}",
"广播",
session=session_info,
)
except (ValueError, TypeError):
logger.warning(
f"提取的ID ({msg_id}) 不是有效整数",
"广播",
session=session_info,
)
else:
info_str = str(first_id_info)
logger.warning(
f"无法从 Receipt.msg_ids[0] 提取ID: {info_str}",
"广播",
session=session_info,
)
except IndexError:
logger.warning("Receipt.msg_ids 为空", "广播", session=session_info)
except Exception as e_extract:
logger.error(
f"从 Receipt 提取 msg_id 时出错: {e_extract}",
"广播",
session=session_info,
e=e_extract,
)
else:
logger.warning(
f"发送成功但无法从结果获取消息 ID. 结果: {result}",
"广播",
session=session_info,
)
@classmethod
async def _check_group_availability(cls, bot: Bot, group: GroupConsole) -> bool:
"""检查群组是否可用"""
if not group.group_id:
return False
if await CommonUtils.task_is_block(bot, "broadcast", group.group_id):
return False
return True
@classmethod
async def _broadcast_forward(
cls,
bot: V11Bot,
session_info: EventSession | str,
group_list: list[GroupConsole],
v11_nodes: list[dict],
) -> BroadcastDetailResult:
"""发送合并转发"""
success_count = 0
error_count = 0
skip_count = 0
for _, group in enumerate(group_list):
group_key = group.group_id or group.channel_id
if not await cls._check_group_availability(bot, group):
skip_count += 1
continue
try:
result = await bot.send_group_forward_msg(
group_id=int(group.group_id), messages=v11_nodes
)
logger.debug(
f"合并转发消息发送结果: {result}, 类型: {type(result)}",
"广播",
session=session_info,
)
await cls._extract_message_id_from_result(
result, group_key, session_info, "合并转发"
)
success_count += 1
await asyncio.sleep(random.randint(1, 3))
except ActionFailed as af_e:
error_count += 1
logger.error(
f"发送失败(合并转发) to {group_key}: {af_e}",
"广播",
session=session_info,
e=af_e,
)
except Exception as e:
error_count += 1
logger.error(
f"发送失败(合并转发) to {group_key}: {e}",
"广播",
session=session_info,
e=e,
)
return success_count, error_count, skip_count
@classmethod
async def _broadcast_normal(
cls,
bot: Bot,
session_info: EventSession | str,
group_list: list[GroupConsole],
message: UniMessage,
) -> BroadcastDetailResult:
"""发送普通消息"""
success_count = 0
error_count = 0
skip_count = 0
for _, group in enumerate(group_list):
group_key = (
f"{group.group_id}:{group.channel_id}"
if group.channel_id
else str(group.group_id)
)
if not await cls._check_group_availability(bot, group):
skip_count += 1
continue
try:
target = PlatformUtils.get_target(
group_id=group.group_id, channel_id=group.channel_id
)
if target:
receipt: Receipt = await message.send(target, bot=bot)
logger.debug(
f"广播消息发送结果: {receipt}, 类型: {type(receipt)}",
"广播",
session=session_info,
)
await cls._extract_message_id_from_result(
receipt, group_key, session_info
)
success_count += 1
await asyncio.sleep(random.randint(1, 3))
else:
logger.warning(
"target为空", "广播", session=session_info, target=group_key
)
skip_count += 1
except Exception as e:
error_count += 1
logger.error(
f"发送失败(普通) to {group_key}: {e}",
"广播",
session=session_info,
e=e,
)
return success_count, error_count, skip_count
@classmethod
async def recall_last_broadcast(
cls, bot: Bot, session_info: EventSession | str
) -> BroadcastResult:
"""撤回最近广播"""
msg_ids_to_recall = cls.get_last_broadcast_msg_ids()
if not msg_ids_to_recall:
logger.warning(
"没有找到最近的广播消息ID记录", "广播撤回", session=session_info
)
return 0, 0
id_list_str = ", ".join([f"{k}:{v}" for k, v in msg_ids_to_recall.items()])
logger.debug(
f"找到 {len(msg_ids_to_recall)} 条广播消息ID记录: {id_list_str}",
"广播撤回",
session=session_info,
)
success_count = 0
error_count = 0
logger.info(
f"准备撤回 {len(msg_ids_to_recall)} 条广播消息",
"广播撤回",
session=session_info,
)
for group_key, msg_id in msg_ids_to_recall.items():
try:
logger.debug(
f"尝试撤回消息 (ID: {msg_id}) in {group_key}",
"广播撤回",
session=session_info,
)
await bot.call_api("delete_msg", message_id=msg_id)
success_count += 1
except ActionFailed as af_e:
retcode = getattr(af_e, "retcode", None)
wording = getattr(af_e, "wording", "")
if retcode == 100 and "MESSAGE_NOT_FOUND" in wording.upper():
logger.warning(
f"消息 (ID: {msg_id}) 可能已被撤回或不存在于 {group_key}",
"广播撤回",
session=session_info,
)
elif retcode == 300 and "delete message" in wording.lower():
logger.warning(
f"消息 (ID: {msg_id}) 可能已被撤回或不存在于 {group_key}",
"广播撤回",
session=session_info,
)
else:
error_count += 1
logger.error(
f"撤回消息失败 (ID: {msg_id}) in {group_key}: {af_e}",
"广播撤回",
session=session_info,
e=af_e,
)
except Exception as e:
error_count += 1
logger.error(
f"撤回消息时发生未知错误 (ID: {msg_id}) in {group_key}: {e}",
"广播撤回",
session=session_info,
e=e,
)
await asyncio.sleep(0.2)
logger.debug("撤回操作完成清空消息ID记录", "广播撤回", session=session_info)
cls.clear_last_broadcast_msg_ids()
return success_count, error_count

View File

@ -0,0 +1,584 @@
import base64
import json
from typing import Any
from nonebot.adapters import Bot, Event
from nonebot.adapters.onebot.v11 import Message as V11Message
from nonebot.adapters.onebot.v11 import MessageSegment as V11MessageSegment
from nonebot.exception import ActionFailed
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_alconna.uniseg.segment import (
At,
AtAll,
CustomNode,
Image,
Reference,
Reply,
Text,
Video,
)
from nonebot_plugin_alconna.uniseg.tools import reply_fetch
from nonebot_plugin_session import EventSession
from zhenxun.services.log import logger
from zhenxun.utils.common_utils import CommonUtils
from zhenxun.utils.message import MessageUtils
from .broadcast_manager import BroadcastManager
MAX_FORWARD_DEPTH = 3
async def _process_forward_content(
forward_content: Any, forward_id: str | None, bot: Bot, depth: int
) -> list[CustomNode]:
"""处理转发消息内容"""
nodes_for_alc = []
content_parsed = False
if forward_content:
nodes_from_content = None
if isinstance(forward_content, list):
nodes_from_content = forward_content
elif isinstance(forward_content, str):
try:
parsed_content = json.loads(forward_content)
if isinstance(parsed_content, list):
nodes_from_content = parsed_content
except Exception as json_e:
logger.debug(
f"[Depth {depth}] JSON解析失败: {json_e}",
"广播",
)
if nodes_from_content is not None:
logger.debug(
f"[D{depth}] 节点数: {len(nodes_from_content)}",
"广播",
)
content_parsed = True
for node_data in nodes_from_content:
node = await _create_custom_node_from_data(node_data, bot, depth + 1)
if node:
nodes_for_alc.append(node)
if not content_parsed and forward_id:
logger.debug(
f"[D{depth}] 尝试API调用ID: {forward_id}",
"广播",
)
try:
forward_data = await bot.call_api("get_forward_msg", id=forward_id)
nodes_list = None
if isinstance(forward_data, dict) and "messages" in forward_data:
nodes_list = forward_data["messages"]
elif (
isinstance(forward_data, dict)
and "data" in forward_data
and isinstance(forward_data["data"], dict)
and "message" in forward_data["data"]
):
nodes_list = forward_data["data"]["message"]
elif isinstance(forward_data, list):
nodes_list = forward_data
if nodes_list:
node_count = len(nodes_list)
logger.debug(
f"[D{depth + 1}] 节点:{node_count}",
"广播",
)
for node_data in nodes_list:
node = await _create_custom_node_from_data(
node_data, bot, depth + 1
)
if node:
nodes_for_alc.append(node)
else:
logger.warning(
f"[D{depth + 1}] ID:{forward_id}无节点",
"广播",
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[嵌套转发消息获取失败]",
)
)
except ActionFailed as af_e:
logger.error(
f"[D{depth + 1}] API失败: {af_e}",
"广播",
e=af_e,
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[嵌套转发消息获取失败]",
)
)
except Exception as e:
logger.error(
f"[D{depth + 1}] 处理出错: {e}",
"广播",
e=e,
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[处理嵌套转发时出错]",
)
)
elif not content_parsed and not forward_id:
logger.warning(
f"[D{depth}] 转发段无内容也无ID",
"广播",
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="错误",
content="[嵌套转发消息无法解析]",
)
)
elif content_parsed and not nodes_for_alc:
logger.warning(
f"[D{depth}] 解析成功但无有效节点",
"广播",
)
nodes_for_alc.append(
CustomNode(
uid="0",
name="信息",
content="[嵌套转发内容为空]",
)
)
return nodes_for_alc
async def _create_custom_node_from_data(
node_data: dict, bot: Bot, depth: int
) -> CustomNode | None:
"""从节点数据创建CustomNode"""
node_content_raw = node_data.get("message") or node_data.get("content")
if not node_content_raw:
logger.warning(f"[D{depth}] 节点缺少消息内容", "广播")
return None
sender = node_data.get("sender", {})
uid = str(sender.get("user_id", "10000"))
name = sender.get("nickname", f"用户{uid[:4]}")
extracted_uni_msg = await _extract_content_from_message(
node_content_raw, bot, depth
)
if not extracted_uni_msg:
return None
return CustomNode(uid=uid, name=name, content=extracted_uni_msg)
async def _extract_broadcast_content(
bot: Bot,
event: Event,
arp: alc.Arparma,
session: EventSession,
) -> UniMessage | None:
"""从命令参数或引用消息中提取广播内容"""
broadcast_content_msg: UniMessage | None = None
command_content_list = arp.all_matched_args.get("content", [])
processed_command_list = []
has_command_content = False
if command_content_list:
for item in command_content_list:
if isinstance(item, alc.Segment):
processed_command_list.append(item)
if not (isinstance(item, Text) and not item.text.strip()):
has_command_content = True
elif isinstance(item, str):
if item.strip():
processed_command_list.append(Text(item.strip()))
has_command_content = True
else:
logger.warning(
f"Unexpected type in command content: {type(item)}", "广播"
)
if has_command_content:
logger.debug("检测到命令参数内容,优先使用参数内容", "广播", session=session)
broadcast_content_msg = UniMessage(processed_command_list)
if not broadcast_content_msg.filter(
lambda x: not (isinstance(x, Text) and not x.text.strip())
):
logger.warning(
"命令参数内容解析后为空或只包含空白", "广播", session=session
)
broadcast_content_msg = None
if not broadcast_content_msg:
reply_segment_obj: Reply | None = await reply_fetch(event, bot)
if (
reply_segment_obj
and hasattr(reply_segment_obj, "msg")
and reply_segment_obj.msg
):
logger.debug(
"未检测到有效命令参数,检测到引用消息", "广播", session=session
)
raw_quoted_content = reply_segment_obj.msg
is_forward = False
forward_id = None
if isinstance(raw_quoted_content, V11Message):
for seg in raw_quoted_content:
if isinstance(seg, V11MessageSegment):
if seg.type == "forward":
forward_id = seg.data.get("id")
is_forward = bool(forward_id)
break
elif seg.type == "json":
try:
json_data_str = seg.data.get("data", "{}")
if isinstance(json_data_str, str):
import json
json_data = json.loads(json_data_str)
if (
json_data.get("app") == "com.tencent.multimsg"
or json_data.get("view") == "Forward"
) and json_data.get("meta", {}).get(
"detail", {}
).get("resid"):
forward_id = json_data["meta"]["detail"][
"resid"
]
is_forward = True
break
except Exception:
pass
if is_forward and forward_id:
logger.info(
f"尝试获取并构造合并转发内容 (ID: {forward_id})",
"广播",
session=session,
)
nodes_to_forward: list[CustomNode] = []
try:
forward_data = await bot.call_api("get_forward_msg", id=forward_id)
nodes_list = None
if isinstance(forward_data, dict) and "messages" in forward_data:
nodes_list = forward_data["messages"]
elif (
isinstance(forward_data, dict)
and "data" in forward_data
and isinstance(forward_data["data"], dict)
and "message" in forward_data["data"]
):
nodes_list = forward_data["data"]["message"]
elif isinstance(forward_data, list):
nodes_list = forward_data
if nodes_list is not None:
for node_data in nodes_list:
node_sender = node_data.get("sender", {})
node_user_id = str(node_sender.get("user_id", "10000"))
node_nickname = node_sender.get(
"nickname", f"用户{node_user_id[:4]}"
)
node_content_raw = node_data.get(
"message"
) or node_data.get("content")
if node_content_raw:
extracted_node_uni_msg = (
await _extract_content_from_message(
node_content_raw, bot
)
)
if extracted_node_uni_msg:
nodes_to_forward.append(
CustomNode(
uid=node_user_id,
name=node_nickname,
content=extracted_node_uni_msg,
)
)
if nodes_to_forward:
broadcast_content_msg = UniMessage(
Reference(nodes=nodes_to_forward)
)
except ActionFailed:
await MessageUtils.build_message(
"获取合并转发消息失败,可能不支持此 API。"
).send(reply_to=True)
return None
except Exception as api_e:
logger.error(f"处理合并转发时出错: {api_e}", "广播", e=api_e)
await MessageUtils.build_message(
"处理合并转发消息时发生内部错误。"
).send(reply_to=True)
return None
else:
broadcast_content_msg = await _extract_content_from_message(
raw_quoted_content, bot
)
else:
logger.debug("未检测到命令参数和引用消息", "广播", session=session)
await MessageUtils.build_message("请提供广播内容或引用要广播的消息").send(
reply_to=True
)
return None
if not broadcast_content_msg:
logger.error(
"未能从命令参数或引用消息中获取有效的广播内容", "广播", session=session
)
await MessageUtils.build_message("错误:未能获取有效的广播内容。").send(
reply_to=True
)
return None
return broadcast_content_msg
async def _process_v11_segment(
seg_obj: V11MessageSegment | dict, depth: int, index: int, bot: Bot
) -> list[alc.Segment]:
"""处理V11消息段"""
result = []
seg_type = None
data_dict = None
if isinstance(seg_obj, V11MessageSegment):
seg_type = seg_obj.type
data_dict = seg_obj.data
elif isinstance(seg_obj, dict):
seg_type = seg_obj.get("type")
data_dict = seg_obj.get("data")
else:
return result
if not (seg_type and data_dict is not None):
logger.warning(f"[D{depth}] 跳过无效数据: {type(seg_obj)}", "广播")
return result
if seg_type == "text":
text_content = data_dict.get("text", "")
if isinstance(text_content, str) and text_content.strip():
result.append(Text(text_content))
elif seg_type == "image":
img_seg = None
if data_dict.get("url"):
img_seg = Image(url=data_dict["url"])
elif data_dict.get("file"):
file_val = data_dict["file"]
if isinstance(file_val, str) and file_val.startswith("base64://"):
b64_data = file_val[9:]
raw_bytes = base64.b64decode(b64_data)
img_seg = Image(raw=raw_bytes)
else:
img_seg = Image(path=file_val)
if img_seg:
result.append(img_seg)
else:
logger.warning(f"[Depth {depth}] V11 图片 {index} 缺少URL/文件", "广播")
elif seg_type == "at":
target_qq = data_dict.get("qq", "")
if target_qq.lower() == "all":
result.append(AtAll())
elif target_qq:
result.append(At(flag="user", target=target_qq))
elif seg_type == "video":
video_seg = None
if data_dict.get("url"):
video_seg = Video(url=data_dict["url"])
elif data_dict.get("file"):
file_val = data_dict["file"]
if isinstance(file_val, str) and file_val.startswith("base64://"):
b64_data = file_val[9:]
raw_bytes = base64.b64decode(b64_data)
video_seg = Video(raw=raw_bytes)
else:
video_seg = Video(path=file_val)
if video_seg:
result.append(video_seg)
logger.debug(f"[Depth {depth}] 处理视频消息成功", "广播")
else:
logger.warning(f"[Depth {depth}] V11 视频 {index} 缺少URL/文件", "广播")
elif seg_type == "forward":
nested_forward_id = data_dict.get("id") or data_dict.get("resid")
nested_forward_content = data_dict.get("content")
logger.debug(f"[D{depth}] 嵌套转发ID: {nested_forward_id}", "广播")
nested_nodes = await _process_forward_content(
nested_forward_content, nested_forward_id, bot, depth
)
if nested_nodes:
result.append(Reference(nodes=nested_nodes))
else:
logger.warning(f"[D{depth}] 跳过类型: {seg_type}", "广播")
return result
async def _extract_content_from_message(
message_content: Any, bot: Bot, depth: int = 0
) -> UniMessage:
"""提取消息内容到UniMessage"""
temp_msg = UniMessage()
input_type_str = str(type(message_content))
if depth >= MAX_FORWARD_DEPTH:
logger.warning(
f"[Depth {depth}] 达到最大递归深度 {MAX_FORWARD_DEPTH},停止解析嵌套转发。",
"广播",
)
temp_msg.append(Text("[嵌套转发层数过多,内容已省略]"))
return temp_msg
segments_to_process = []
if isinstance(message_content, UniMessage):
segments_to_process = list(message_content)
elif isinstance(message_content, V11Message):
segments_to_process = list(message_content)
elif isinstance(message_content, list):
segments_to_process = message_content
elif (
isinstance(message_content, dict)
and "type" in message_content
and "data" in message_content
):
segments_to_process = [message_content]
elif isinstance(message_content, str):
if message_content.strip():
temp_msg.append(Text(message_content))
return temp_msg
else:
logger.warning(f"[Depth {depth}] 无法处理的输入类型: {input_type_str}", "广播")
return temp_msg
if segments_to_process:
for index, seg_obj in enumerate(segments_to_process):
try:
if isinstance(seg_obj, Text):
text_content = getattr(seg_obj, "text", None)
if isinstance(text_content, str) and text_content.strip():
temp_msg.append(seg_obj)
elif isinstance(seg_obj, Image):
if (
getattr(seg_obj, "url", None)
or getattr(seg_obj, "path", None)
or getattr(seg_obj, "raw", None)
):
temp_msg.append(seg_obj)
elif isinstance(seg_obj, At):
temp_msg.append(seg_obj)
elif isinstance(seg_obj, AtAll):
temp_msg.append(seg_obj)
elif isinstance(seg_obj, Video):
if (
getattr(seg_obj, "url", None)
or getattr(seg_obj, "path", None)
or getattr(seg_obj, "raw", None)
):
temp_msg.append(seg_obj)
logger.debug(f"[D{depth}] 处理Video对象成功", "广播")
else:
processed_segments = await _process_v11_segment(
seg_obj, depth, index, bot
)
temp_msg.extend(processed_segments)
except Exception as e_conv_seg:
logger.warning(
f"[D{depth}] 处理段 {index} 出错: {e_conv_seg}",
"广播",
e=e_conv_seg,
)
if not temp_msg and message_content:
logger.warning(f"未能从类型 {input_type_str} 中提取内容", "广播")
return temp_msg
async def get_broadcast_target_groups(
bot: Bot, session: EventSession
) -> tuple[list, list]:
"""获取广播目标群组和启用了广播功能的群组"""
target_groups = []
all_groups, _ = await BroadcastManager.get_all_groups(bot)
current_group_id = None
if hasattr(session, "id2") and session.id2:
current_group_id = session.id2
if current_group_id:
target_groups = [
group for group in all_groups if group.group_id != current_group_id
]
logger.info(
f"向除当前群组({current_group_id})外的所有群组广播", "广播", session=session
)
else:
target_groups = all_groups
logger.info("向所有群组广播", "广播", session=session)
if not target_groups:
await MessageUtils.build_message("没有找到符合条件的广播目标群组。").send(
reply_to=True
)
return [], []
enabled_groups = []
for group in target_groups:
if not await CommonUtils.task_is_block(bot, "broadcast", group.group_id):
enabled_groups.append(group)
if not enabled_groups:
await MessageUtils.build_message(
"没有启用了广播功能的目标群组可供立即发送。"
).send(reply_to=True)
return target_groups, []
return target_groups, enabled_groups
async def send_broadcast_and_notify(
bot: Bot,
event: Event,
message: UniMessage,
enabled_groups: list,
target_groups: list,
session: EventSession,
) -> None:
"""发送广播并通知结果"""
BroadcastManager.clear_last_broadcast_msg_ids()
count, error_count = await BroadcastManager.send_to_specific_groups(
bot, message, enabled_groups, session
)
result = f"成功广播 {count} 个群组"
if error_count:
result += f"\n发送失败 {error_count} 个群组"
result += f"\n有效: {len(enabled_groups)} / 总计: {len(target_groups)}"
user_id = str(event.get_user_id())
await bot.send_private_msg(user_id=user_id, message=f"发送广播完成!\n{result}")
BroadcastManager.log_info(
f"广播完成,有效/总计: {len(enabled_groups)}/{len(target_groups)}",
session,
)

View File

@ -0,0 +1,64 @@
from datetime import datetime
from typing import Any
from nonebot_plugin_alconna import UniMessage
from zhenxun.models.group_console import GroupConsole
GroupKey = str
MessageID = int
BroadcastResult = tuple[int, int]
BroadcastDetailResult = tuple[int, int, int]
class BroadcastTarget:
"""广播目标"""
def __init__(self, group_id: str, channel_id: str | None = None):
self.group_id = group_id
self.channel_id = channel_id
def to_dict(self) -> dict[str, str | None]:
"""转换为字典格式"""
return {"group_id": self.group_id, "channel_id": self.channel_id}
@classmethod
def from_group_console(cls, group: GroupConsole) -> "BroadcastTarget":
"""从 GroupConsole 对象创建"""
return cls(group_id=group.group_id, channel_id=group.channel_id)
@property
def key(self) -> str:
"""获取群组的唯一标识"""
if self.channel_id:
return f"{self.group_id}:{self.channel_id}"
return str(self.group_id)
class BroadcastTask:
"""广播任务"""
def __init__(
self,
bot_id: str,
message: UniMessage,
targets: list[BroadcastTarget],
scheduled_time: datetime | None = None,
task_id: str | None = None,
):
self.bot_id = bot_id
self.message = message
self.targets = targets
self.scheduled_time = scheduled_time
self.task_id = task_id
def to_dict(self) -> dict[str, Any]:
"""转换为字典格式,用于序列化"""
return {
"bot_id": self.bot_id,
"targets": [t.to_dict() for t in self.targets],
"scheduled_time": self.scheduled_time.isoformat()
if self.scheduled_time
else None,
"task_id": self.task_id,
}

View File

@ -0,0 +1,175 @@
import base64
import nonebot_plugin_alconna as alc
from nonebot_plugin_alconna import UniMessage
from nonebot_plugin_alconna.uniseg import Reference
from nonebot_plugin_alconna.uniseg.segment import CustomNode, Video
from zhenxun.services.log import logger
def uni_segment_to_v11_segment_dict(
seg: alc.Segment, depth: int = 0
) -> dict | list[dict] | None:
"""UniSeg段转V11字典"""
if isinstance(seg, alc.Text):
return {"type": "text", "data": {"text": seg.text}}
elif isinstance(seg, alc.Image):
if getattr(seg, "url", None):
return {
"type": "image",
"data": {"file": seg.url},
}
elif getattr(seg, "raw", None):
raw_data = seg.raw
if isinstance(raw_data, str):
if len(raw_data) >= 9 and raw_data[:9] == "base64://":
return {"type": "image", "data": {"file": raw_data}}
elif isinstance(raw_data, bytes):
b64_str = base64.b64encode(raw_data).decode()
return {"type": "image", "data": {"file": f"base64://{b64_str}"}}
else:
logger.warning(f"无法处理 Image.raw 的类型: {type(raw_data)}", "广播")
elif getattr(seg, "path", None):
logger.warning(
f"在合并转发中使用了本地图片路径,可能无法显示: {seg.path}", "广播"
)
return {"type": "image", "data": {"file": f"file:///{seg.path}"}}
else:
logger.warning(f"alc.Image 缺少有效数据,无法转换为 V11 段: {seg}", "广播")
elif isinstance(seg, alc.At):
return {"type": "at", "data": {"qq": seg.target}}
elif isinstance(seg, alc.AtAll):
return {"type": "at", "data": {"qq": "all"}}
elif isinstance(seg, Video):
if getattr(seg, "url", None):
return {
"type": "video",
"data": {"file": seg.url},
}
elif getattr(seg, "raw", None):
raw_data = seg.raw
if isinstance(raw_data, str):
if len(raw_data) >= 9 and raw_data[:9] == "base64://":
return {"type": "video", "data": {"file": raw_data}}
elif isinstance(raw_data, bytes):
b64_str = base64.b64encode(raw_data).decode()
return {"type": "video", "data": {"file": f"base64://{b64_str}"}}
else:
logger.warning(f"无法处理 Video.raw 的类型: {type(raw_data)}", "广播")
elif getattr(seg, "path", None):
logger.warning(
f"在合并转发中使用了本地视频路径,可能无法显示: {seg.path}", "广播"
)
return {"type": "video", "data": {"file": f"file:///{seg.path}"}}
else:
logger.warning(f"Video 缺少有效数据,无法转换为 V11 段: {seg}", "广播")
elif isinstance(seg, Reference) and getattr(seg, "nodes", None):
if depth >= 3:
logger.warning(
f"嵌套转发深度超过限制 (depth={depth}),不再继续解析", "广播"
)
return {"type": "text", "data": {"text": "[嵌套转发层数过多,内容已省略]"}}
nested_v11_content_list = []
nodes_list = getattr(seg, "nodes", [])
for node in nodes_list:
if isinstance(node, CustomNode):
node_v11_content = []
if isinstance(node.content, UniMessage):
for nested_seg in node.content:
converted_dict = uni_segment_to_v11_segment_dict(
nested_seg, depth + 1
)
if isinstance(converted_dict, list):
node_v11_content.extend(converted_dict)
elif converted_dict:
node_v11_content.append(converted_dict)
elif isinstance(node.content, str):
node_v11_content.append(
{"type": "text", "data": {"text": node.content}}
)
if node_v11_content:
separator = {
"type": "text",
"data": {
"text": f"\n--- 来自 {node.name} ({node.uid}) 的消息 ---\n"
},
}
nested_v11_content_list.insert(0, separator)
nested_v11_content_list.extend(node_v11_content)
nested_v11_content_list.append(
{"type": "text", "data": {"text": "\n---\n"}}
)
return nested_v11_content_list
else:
logger.warning(f"广播时跳过不支持的 UniSeg 段类型: {type(seg)}", "广播")
return None
def uni_message_to_v11_list_of_dicts(uni_msg: UniMessage | str | list) -> list[dict]:
"""UniMessage转V11字典列表"""
try:
if isinstance(uni_msg, str):
return [{"type": "text", "data": {"text": uni_msg}}]
if isinstance(uni_msg, list):
if not uni_msg:
return []
if all(isinstance(item, str) for item in uni_msg):
return [{"type": "text", "data": {"text": item}} for item in uni_msg]
result = []
for item in uni_msg:
if hasattr(item, "__iter__") and not isinstance(item, str | bytes):
result.extend(uni_message_to_v11_list_of_dicts(item))
elif hasattr(item, "text") and not isinstance(item, str | bytes):
text_value = getattr(item, "text", "")
result.append({"type": "text", "data": {"text": str(text_value)}})
elif hasattr(item, "url") and not isinstance(item, str | bytes):
url_value = getattr(item, "url", "")
if isinstance(item, Video):
result.append(
{"type": "video", "data": {"file": str(url_value)}}
)
else:
result.append(
{"type": "image", "data": {"file": str(url_value)}}
)
else:
try:
result.append({"type": "text", "data": {"text": str(item)}})
except Exception as e:
logger.warning(f"无法转换列表元素: {item}, 错误: {e}", "广播")
return result
except Exception as e:
logger.warning(f"消息转换过程中出错: {e}", "广播")
return [{"type": "text", "data": {"text": str(uni_msg)}}]
def custom_nodes_to_v11_nodes(custom_nodes: list[CustomNode]) -> list[dict]:
"""CustomNode列表转V11节点"""
v11_nodes = []
for node in custom_nodes:
v11_content_list = uni_message_to_v11_list_of_dicts(node.content)
if v11_content_list:
v11_nodes.append(
{
"type": "node",
"data": {
"user_id": str(node.uid),
"nickname": node.name,
"content": v11_content_list,
},
}
)
else:
logger.warning(
f"CustomNode (uid={node.uid}) 内容转换后为空,跳过此节点", "广播"
)
return v11_nodes

View File

@ -29,8 +29,7 @@ from .public import init_public
__plugin_meta__ = PluginMetadata(
name="WebUi",
description="WebUi API",
usage="""
""".strip(),
usage='"""\n """.strip(),',
extra=PluginExtraData(
author="HibiKier",
version="0.1",
@ -83,7 +82,6 @@ BaseApiRouter.include_router(plugin_router)
BaseApiRouter.include_router(system_router)
BaseApiRouter.include_router(menu_router)
WsApiRouter = APIRouter(prefix="/zhenxun/socket")
WsApiRouter.include_router(ws_log_routes)
@ -94,6 +92,8 @@ WsApiRouter.include_router(chat_routes)
@driver.on_startup
async def _():
try:
# 存储任务引用的列表,防止任务被垃圾回收
_tasks = []
async def log_sink(message: str):
loop = None
@ -104,7 +104,8 @@ async def _():
logger.warning("Web Ui log_sink", e=e)
if not loop:
loop = asyncio.new_event_loop()
loop.create_task(LOG_STORAGE.add(message.rstrip("\n"))) # noqa: RUF006
# 存储任务引用到外部列表中
_tasks.append(loop.create_task(LOG_STORAGE.add(message.rstrip("\n"))))
logger_.add(
log_sink, colorize=True, filter=default_filter, format=default_format

View File

@ -46,7 +46,10 @@ class MenuManage:
icon="database",
),
MenuItem(
name="系统信息", module="system", router="/system", icon="system"
name="文件管理", module="system", router="/system", icon="system"
),
MenuItem(
name="关于我们", module="about", router="/about", icon="about"
),
]
self.save()

View File

@ -16,7 +16,7 @@ from zhenxun.utils.platform import PlatformUtils
from ....base_model import Result
from ....config import QueryDateType
from ....utils import authentication, get_system_status
from ....utils import authentication, clear_help_image, get_system_status
from .data_source import ApiDataSource
from .model import (
ActiveGroup,
@ -234,6 +234,7 @@ async def _(param: BotManageUpdateParam):
bot_data.block_plugins = CommonUtils.convert_module_format(param.block_plugins)
bot_data.block_tasks = CommonUtils.convert_module_format(param.block_tasks)
await bot_data.save(update_fields=["block_plugins", "block_tasks"])
clear_help_image()
return Result.ok()
except Exception as e:
logger.error(f"{router.prefix}/update_bot_manage 调用错误", "WebUi", e=e)

View File

@ -92,7 +92,7 @@ class ApiDataSource:
"""
version_file = Path() / "__version__"
if version_file.exists():
if text := version_file.open().read():
if text := version_file.open(encoding="utf-8").read():
return text.replace("__version__: ", "").strip()
return "unknown"

View File

@ -1,3 +1,5 @@
from datetime import datetime
from fastapi import APIRouter
import nonebot
from nonebot import on_message
@ -49,13 +51,14 @@ async def message_handle(
message: UniMsg,
group_id: str | None,
):
time = str(datetime.now().replace(microsecond=0))
messages = []
for m in message:
if isinstance(m, Text | str):
messages.append(MessageItem(type="text", msg=str(m)))
messages.append(MessageItem(type="text", msg=str(m), time=time))
elif isinstance(m, Image):
if m.url:
messages.append(MessageItem(type="img", msg=m.url))
messages.append(MessageItem(type="img", msg=m.url, time=time))
elif isinstance(m, At):
if group_id:
if m.target == "0":
@ -72,9 +75,9 @@ async def message_handle(
uname = group_user.user_name
if m.target not in ID2NAME[group_id]:
ID2NAME[group_id][m.target] = uname
messages.append(MessageItem(type="at", msg=f"@{uname}"))
messages.append(MessageItem(type="at", msg=f"@{uname}", time=time))
elif isinstance(m, Hyper):
messages.append(MessageItem(type="text", msg="[分享消息]"))
messages.append(MessageItem(type="text", msg="[分享消息]", time=time))
return messages

View File

@ -237,6 +237,8 @@ class MessageItem(BaseModel):
"""消息类型"""
msg: str
"""内容"""
time: str
"""发送日期"""
class Message(BaseModel):

View File

@ -6,13 +6,16 @@ from zhenxun.services.log import logger
from zhenxun.utils.enum import BlockType, PluginType
from ....base_model import Result
from ....utils import authentication
from ....utils import authentication, clear_help_image
from .data_source import ApiDataSource
from .model import (
BatchUpdatePlugins,
BatchUpdateResult,
PluginCount,
PluginDetail,
PluginInfo,
PluginSwitch,
RenameMenuTypePayload,
UpdatePlugin,
)
@ -30,9 +33,8 @@ async def _(
plugin_type: list[PluginType] = Query(None), menu_type: str | None = None
) -> Result[list[PluginInfo]]:
try:
return Result.ok(
await ApiDataSource.get_plugin_list(plugin_type, menu_type), "拿到信息啦!"
)
result = await ApiDataSource.get_plugin_list(plugin_type, menu_type)
return Result.ok(result, "拿到信息啦!")
except Exception as e:
logger.error(f"{router.prefix}/get_plugin_list 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
@ -78,6 +80,7 @@ async def _() -> Result[PluginCount]:
async def _(param: UpdatePlugin) -> Result:
try:
await ApiDataSource.update_plugin(param)
clear_help_image()
return Result.ok(info="已经帮你写好啦!")
except (ValueError, KeyError):
return Result.fail("插件数据不存在...")
@ -105,6 +108,7 @@ async def _(param: PluginSwitch) -> Result:
db_plugin.block_type = None
db_plugin.status = True
await db_plugin.save()
clear_help_image()
return Result.ok(info="成功改变了开关状态!")
except Exception as e:
logger.error(f"{router.prefix}/change_switch 调用错误", "WebUi", e=e)
@ -144,11 +148,68 @@ async def _() -> Result[list[str]]:
)
async def _(module: str) -> Result[PluginDetail]:
try:
return Result.ok(
await ApiDataSource.get_plugin_detail(module), "已经帮你写好啦!"
)
detail = await ApiDataSource.get_plugin_detail(module)
return Result.ok(detail, "已经帮你写好啦!")
except (ValueError, KeyError):
return Result.fail("插件数据不存在...")
except Exception as e:
logger.error(f"{router.prefix}/get_plugin 调用错误", "WebUi", e=e)
return Result.fail(f"{type(e)}: {e}")
@router.put(
"/plugins/batch_update",
dependencies=[authentication()],
response_model=Result[BatchUpdateResult],
response_class=JSONResponse,
summary="批量更新插件配置",
)
async def batch_update_plugin_config_api(
params: BatchUpdatePlugins,
) -> Result[BatchUpdateResult]:
"""批量更新插件配置,如开关、类型等"""
try:
result_dict = await ApiDataSource.batch_update_plugins(params=params)
result_model = BatchUpdateResult(
success=result_dict["success"],
updated_count=result_dict["updated_count"],
errors=result_dict["errors"],
)
clear_help_image()
return Result.ok(result_model, "插件配置更新完成")
except Exception as e:
logger.error(f"{router.prefix}/plugins/batch_update 调用错误", "WebUi", e=e)
return Result.fail(f"发生了一点错误捏 {type(e)}: {e}")
# 新增:重命名菜单类型路由
@router.put(
"/menu_type/rename",
dependencies=[authentication()],
response_model=Result,
summary="重命名菜单类型",
)
async def rename_menu_type_api(payload: RenameMenuTypePayload) -> Result:
try:
result = await ApiDataSource.rename_menu_type(
old_name=payload.old_name, new_name=payload.new_name
)
if result.get("success"):
clear_help_image()
return Result.ok(
info=result.get(
"info",
f"成功将 {result.get('updated_count', 0)} 个插件的菜单类型从 "
f"'{payload.old_name}' 修改为 '{payload.new_name}'",
)
)
else:
return Result.fail(info=result.get("info", "重命名失败"))
except ValueError as ve:
return Result.fail(info=str(ve))
except RuntimeError as re:
logger.error(f"{router.prefix}/menu_type/rename 调用错误", "WebUi", e=re)
return Result.fail(info=str(re))
except Exception as e:
logger.error(f"{router.prefix}/menu_type/rename 调用错误", "WebUi", e=e)
return Result.fail(info=f"发生未知错误: {type(e).__name__}")

View File

@ -2,13 +2,20 @@ import re
import cattrs
from fastapi import Query
from tortoise.exceptions import DoesNotExist
from zhenxun.configs.config import Config
from zhenxun.configs.utils import ConfigGroup
from zhenxun.models.plugin_info import PluginInfo as DbPluginInfo
from zhenxun.utils.enum import BlockType, PluginType
from .model import PluginConfig, PluginDetail, PluginInfo, UpdatePlugin
from .model import (
BatchUpdatePlugins,
PluginConfig,
PluginDetail,
PluginInfo,
UpdatePlugin,
)
class ApiDataSource:
@ -44,6 +51,11 @@ class ApiDataSource:
level=plugin.level,
status=plugin.status,
author=plugin.author,
block_type=plugin.block_type,
is_builtin="builtin_plugins" in plugin.module_path
or plugin.plugin_type == PluginType.HIDDEN,
allow_setting=plugin.plugin_type != PluginType.HIDDEN,
allow_switch=plugin.plugin_type != PluginType.HIDDEN,
)
plugin_list.append(plugin_info)
return plugin_list
@ -69,7 +81,6 @@ class ApiDataSource:
db_plugin.block_type = param.block_type
db_plugin.status = param.block_type != BlockType.ALL
await db_plugin.save()
# 配置项
if param.configs and (configs := Config.get(param.module)):
for key in param.configs:
if c := configs.configs.get(key):
@ -80,6 +91,87 @@ class ApiDataSource:
Config.save(save_simple_data=True)
return db_plugin
@classmethod
async def batch_update_plugins(cls, params: BatchUpdatePlugins) -> dict:
"""批量更新插件数据
参数:
params: BatchUpdatePlugins
返回:
dict: 更新结果, 例如 {'success': True, 'updated_count': 5, 'errors': []}
"""
plugins_to_update_other_fields = []
other_update_fields = set()
updated_count = 0
errors = []
for item in params.updates:
try:
db_plugin = await DbPluginInfo.get(module=item.module)
plugin_changed_other = False
plugin_changed_block = False
if db_plugin.block_type != item.block_type:
db_plugin.block_type = item.block_type
db_plugin.status = item.block_type != BlockType.ALL
plugin_changed_block = True
if item.menu_type is not None and db_plugin.menu_type != item.menu_type:
db_plugin.menu_type = item.menu_type
other_update_fields.add("menu_type")
plugin_changed_other = True
if (
item.default_status is not None
and db_plugin.default_status != item.default_status
):
db_plugin.default_status = item.default_status
other_update_fields.add("default_status")
plugin_changed_other = True
if plugin_changed_block:
try:
await db_plugin.save(update_fields=["block_type", "status"])
updated_count += 1
except Exception as e_save:
errors.append(
{
"module": item.module,
"error": f"Save block_type failed: {e_save!s}",
}
)
plugin_changed_other = False
if plugin_changed_other:
plugins_to_update_other_fields.append(db_plugin)
except DoesNotExist:
errors.append({"module": item.module, "error": "Plugin not found"})
except Exception as e:
errors.append({"module": item.module, "error": str(e)})
bulk_updated_count = 0
if plugins_to_update_other_fields and other_update_fields:
try:
await DbPluginInfo.bulk_update(
plugins_to_update_other_fields, list(other_update_fields)
)
bulk_updated_count = len(plugins_to_update_other_fields)
except Exception as e_bulk:
errors.append(
{
"module": "batch_update_other",
"error": f"Bulk update failed: {e_bulk!s}",
}
)
return {
"success": len(errors) == 0,
"updated_count": updated_count + bulk_updated_count,
"errors": errors,
}
@classmethod
def __build_plugin_config(
cls, module: str, cfg: str, config: ConfigGroup
@ -115,6 +207,41 @@ class ApiDataSource:
type_inner=type_inner, # type: ignore
)
@classmethod
async def rename_menu_type(cls, old_name: str, new_name: str) -> dict:
"""重命名菜单类型,并更新所有相关插件
参数:
old_name: 旧菜单类型名称
new_name: 新菜单类型名称
返回:
dict: 更新结果, 例如 {'success': True, 'updated_count': 3}
"""
if not old_name or not new_name:
raise ValueError("旧名称和新名称都不能为空")
if old_name == new_name:
return {
"success": True,
"updated_count": 0,
"info": "新旧名称相同,无需更新",
}
# 检查新名称是否已存在(理论上前端会校验,后端再保险一次)
exists = await DbPluginInfo.filter(menu_type=new_name).exists()
if exists:
raise ValueError(f"新的菜单类型名称 '{new_name}' 已被其他插件使用")
try:
# 使用 filter().update() 进行批量更新
updated_count = await DbPluginInfo.filter(menu_type=old_name).update(
menu_type=new_name
)
return {"success": True, "updated_count": updated_count}
except Exception as e:
# 可以添加更详细的日志记录
raise RuntimeError(f"数据库更新菜单类型失败: {e!s}")
@classmethod
async def get_plugin_detail(cls, module: str) -> PluginDetail:
"""获取插件详情

View File

@ -1,6 +1,6 @@
from typing import Any
from pydantic import BaseModel
from pydantic import BaseModel, Field
from zhenxun.utils.enum import BlockType
@ -37,19 +37,19 @@ class UpdatePlugin(BaseModel):
module: str
"""模块"""
default_status: bool
"""默认开关"""
"""是否默认开启"""
limit_superuser: bool
"""限制超级用户"""
cost_gold: int
"""金币花费"""
menu_type: str
"""插件菜单类型"""
"""是否限制超级用户"""
level: int
"""插件所需群权限"""
"""等级"""
cost_gold: int
"""花费金币"""
menu_type: str
"""菜单类型"""
block_type: BlockType | None = None
"""禁用类型"""
configs: dict[str, Any] | None = None
"""配置项"""
"""置项"""
class PluginInfo(BaseModel):
@ -58,27 +58,33 @@ class PluginInfo(BaseModel):
"""
module: str
"""插件名称"""
"""模块"""
plugin_name: str
"""插件中文名称"""
"""插件名称"""
default_status: bool
"""默认开关"""
"""是否默认开启"""
limit_superuser: bool
"""限制超级用户"""
"""是否限制超级用户"""
level: int
"""等级"""
cost_gold: int
"""花费金币"""
menu_type: str
"""插件菜单类型"""
"""菜单类型"""
version: str
"""插件版本"""
level: int
"""群权限"""
"""版本"""
status: bool
"""当前状态"""
"""状态"""
author: str | None = None
"""作者"""
block_type: BlockType | None = None
"""禁用类型"""
block_type: BlockType | None = Field(None, description="插件禁用状态 (None: 启用)")
"""禁用状态"""
is_builtin: bool = False
"""是否为内置插件"""
allow_switch: bool = True
"""是否允许开关"""
allow_setting: bool = True
"""是否允许设置"""
class PluginConfig(BaseModel):
@ -86,20 +92,13 @@ class PluginConfig(BaseModel):
插件配置项
"""
module: str
"""模块"""
key: str
""""""
value: Any
""""""
help: str | None = None
"""帮助"""
default_value: Any
"""默认值"""
type: Any = None
"""值类型"""
type_inner: list[str] | None = None
"""List Tuple等内部类型检验"""
module: str = Field(..., description="模块名")
key: str = Field(..., description="")
value: Any = Field(None, description="")
help: str | None = Field(None, description="帮助信息")
default_value: Any = Field(None, description="默认值")
type: str | None = Field(None, description="类型")
type_inner: list[str] | None = Field(None, description="内部类型")
class PluginCount(BaseModel):
@ -117,6 +116,21 @@ class PluginCount(BaseModel):
"""其他插件"""
class BatchUpdatePluginItem(BaseModel):
module: str = Field(..., description="插件模块名")
default_status: bool | None = Field(None, description="默认状态(开关)")
menu_type: str | None = Field(None, description="菜单类型")
block_type: BlockType | None = Field(
None, description="插件禁用状态 (None: 启用, ALL: 禁用)"
)
class BatchUpdatePlugins(BaseModel):
updates: list[BatchUpdatePluginItem] = Field(
..., description="要批量更新的插件列表"
)
class PluginDetail(PluginInfo):
"""
插件详情
@ -125,6 +139,26 @@ class PluginDetail(PluginInfo):
config_list: list[PluginConfig]
class RenameMenuTypePayload(BaseModel):
old_name: str = Field(..., description="旧菜单类型名称")
new_name: str = Field(..., description="新菜单类型名称")
class PluginIr(BaseModel):
id: int
"""插件id"""
class BatchUpdateResult(BaseModel):
"""
批量更新插件结果
"""
success: bool = Field(..., description="是否全部成功")
"""是否全部成功"""
updated_count: int = Field(..., description="更新成功的数量")
"""更新成功的数量"""
errors: list[dict[str, str]] = Field(
default_factory=list, description="错误信息列表"
)
"""错误信息列表"""

View File

@ -36,6 +36,8 @@ async def _(path: str | None = None) -> Result[list[DirFile]]:
is_image=is_image,
name=file,
parent=path,
size=None if file_path.is_dir() else file_path.stat().st_size,
mtime=file_path.stat().st_mtime,
)
)
return Result.ok(data_list)
@ -215,3 +217,13 @@ async def _(full_path: str) -> Result[str]:
return Result.ok(BuildImage.open(path).pic2bs4())
except Exception as e:
return Result.warning_(f"获取图片失败: {e!s}")
@router.get(
"/ping",
response_model=Result[str],
response_class=JSONResponse,
description="检查服务器状态",
)
async def _() -> Result[str]:
return Result.ok("pong")

View File

@ -14,6 +14,10 @@ class DirFile(BaseModel):
"""文件夹或文件名称"""
parent: str | None = None
"""父级"""
size: int | None = None
"""文件大小"""
mtime: float | None = None
"""修改时间"""
class DeleteFile(BaseModel):

View File

@ -11,7 +11,7 @@ import psutil
import ujson as json
from zhenxun.configs.config import Config
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH
from .base_model import SystemFolderSize, SystemStatus, User
@ -28,6 +28,22 @@ if token_file.exists():
token_data = json.load(open(token_file, encoding="utf8"))
GROUP_HELP_PATH = DATA_PATH / "group_help"
SIMPLE_HELP_IMAGE = IMAGE_PATH / "SIMPLE_HELP.png"
SIMPLE_DETAIL_HELP_IMAGE = IMAGE_PATH / "SIMPLE_DETAIL_HELP.png"
def clear_help_image():
"""清理帮助图片"""
if SIMPLE_HELP_IMAGE.exists():
SIMPLE_HELP_IMAGE.unlink()
if SIMPLE_DETAIL_HELP_IMAGE.exists():
SIMPLE_DETAIL_HELP_IMAGE.unlink()
for file in GROUP_HELP_PATH.iterdir():
if file.is_file():
file.unlink()
def get_user(uname: str) -> User | None:
"""获取账号密码

View File

@ -41,9 +41,9 @@ def convert_module_format(data: str | list[str]) -> str | list[str]:
str | list[str]: 根据输入类型返回转换后的数据
"""
if isinstance(data, str):
return [item.strip(",") for item in data.split("<") if item]
return [item.strip(",") for item in data.split("<") if item.strip()]
else:
return "".join(format(item) for item in data)
return "".join(add_disable_marker(item) for item in data)
class GroupConsole(Model):

View File

@ -0,0 +1,123 @@
from datetime import datetime
from typing_extensions import Self
from tortoise import fields
from zhenxun.services.db_context import Model
from .mahiro_bank_log import BankHandleType, MahiroBankLog
class MahiroBank(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255, description="用户id")
"""用户id"""
amount = fields.BigIntField(default=0, description="存款")
"""用户存款"""
rate = fields.FloatField(default=0.0005, description="小时利率")
"""小时利率"""
loan_amount = fields.BigIntField(default=0, description="贷款")
"""用户贷款"""
loan_rate = fields.FloatField(default=0.0005, description="贷款利率")
"""贷款利率"""
update_time = fields.DatetimeField(auto_now=True)
"""修改时间"""
create_time = fields.DatetimeField(auto_now_add=True)
"""创建时间"""
class Meta: # pyright: ignore [reportIncompatibleVariableOverride]
table = "mahiro_bank"
table_description = "小真寻银行"
@classmethod
async def deposit(cls, user_id: str, amount: int, rate: float) -> Self:
"""存款
参数:
user_id: 用户id
amount: 金币数量
rate: 小时利率
返回:
Self: MahiroBank
"""
effective_hour = int(24 - datetime.now().hour)
user, _ = await cls.get_or_create(user_id=user_id)
user.amount += amount
await user.save(update_fields=["amount", "rate"])
await MahiroBankLog.create(
user_id=user_id,
amount=amount,
rate=rate,
effective_hour=effective_hour,
handle_type=BankHandleType.DEPOSIT,
)
return user
@classmethod
async def withdraw(cls, user_id: str, amount: int) -> Self:
"""取款
参数:
user_id: 用户id
amount: 金币数量
返回:
Self: MahiroBank
"""
if amount <= 0:
raise ValueError("取款金额必须大于0")
user, _ = await cls.get_or_create(user_id=user_id)
if user.amount < amount:
raise ValueError("取款金额不能大于存款金额")
user.amount -= amount
await user.save(update_fields=["amount"])
await MahiroBankLog.create(
user_id=user_id, amount=amount, handle_type=BankHandleType.WITHDRAW
)
return user
@classmethod
async def loan(cls, user_id: str, amount: int, rate: float) -> Self:
"""贷款
参数:
user_id: 用户id
amount: 贷款金额
rate: 贷款利率
返回:
Self: MahiroBank
"""
user, _ = await cls.get_or_create(user_id=user_id)
user.loan_amount += amount
user.loan_rate = rate
await user.save(update_fields=["loan_amount", "loan_rate"])
await MahiroBankLog.create(
user_id=user_id, amount=amount, rate=rate, handle_type=BankHandleType.LOAN
)
return user
@classmethod
async def repayment(cls, user_id: str, amount: int) -> Self:
"""还款
参数:
user_id: 用户id
amount: 还款金额
返回:
Self: MahiroBank
"""
if amount <= 0:
raise ValueError("还款金额必须大于0")
user, _ = await cls.get_or_create(user_id=user_id)
if user.loan_amount < amount:
raise ValueError("还款金额不能大于贷款金额")
user.loan_amount -= amount
await user.save(update_fields=["loan_amount"])
await MahiroBankLog.create(
user_id=user_id, amount=amount, handle_type=BankHandleType.REPAYMENT
)
return user

View File

@ -0,0 +1,31 @@
from tortoise import fields
from zhenxun.services.db_context import Model
from zhenxun.utils.enum import BankHandleType
class MahiroBankLog(Model):
id = fields.IntField(pk=True, generated=True, auto_increment=True)
"""自增id"""
user_id = fields.CharField(255, description="用户id")
"""用户id"""
amount = fields.BigIntField(default=0, description="存款")
"""金币数量"""
rate = fields.FloatField(default=0, description="小时利率")
"""小时利率"""
handle_type = fields.CharEnumField(
BankHandleType, null=True, description="处理类型"
)
"""处理类型"""
is_completed = fields.BooleanField(default=False, description="是否完成")
"""是否完成"""
effective_hour = fields.IntField(default=0, description="有效小时")
"""有效小时"""
update_time = fields.DatetimeField(auto_now=True)
"""修改时间"""
create_time = fields.DatetimeField(auto_now_add=True)
"""创建时间"""
class Meta: # pyright: ignore [reportIncompatibleVariableOverride]
table = "mahiro_bank_log"
table_description = "小真寻银行日志"

View File

@ -60,27 +60,41 @@ class PluginInfo(Model):
table_description = "插件基本信息"
@classmethod
async def get_plugin(cls, load_status: bool = True, **kwargs) -> Self | None:
async def get_plugin(
cls, load_status: bool = True, filter_parent: bool = True, **kwargs
) -> Self | None:
"""获取插件列表
参数:
load_status: 加载状态.
filter_parent: 过滤父组件
返回:
Self | None: 插件
"""
if filter_parent:
return await cls.get_or_none(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
)
return await cls.get_or_none(load_status=load_status, **kwargs)
@classmethod
async def get_plugins(cls, load_status: bool = True, **kwargs) -> list[Self]:
async def get_plugins(
cls, load_status: bool = True, filter_parent: bool = True, **kwargs
) -> list[Self]:
"""获取插件列表
参数:
load_status: 加载状态.
filter_parent: 过滤父组件
返回:
list[Self]: 插件列表
"""
if filter_parent:
return await cls.filter(
load_status=load_status, plugin_type__not=PluginType.PARENT, **kwargs
).all()
return await cls.filter(load_status=load_status, **kwargs).all()
@classmethod

View File

@ -5,6 +5,18 @@ class BotSentType(StrEnum):
GROUP = "GROUP"
PRIVATE = "PRIVATE"
class BankHandleType(StrEnum):
DEPOSIT = "DEPOSIT"
"""存款"""
WITHDRAW = "WITHDRAW"
"""取款"""
LOAN = "LOAN"
"""贷款"""
REPAYMENT = "REPAYMENT"
"""还款"""
INTEREST = "INTEREST"
"""利息"""
class GoldHandle(StrEnum):
"""