zhenxun_bot/zhenxun/utils/platform.py
2024-09-29 17:00:31 +08:00

632 lines
22 KiB
Python

import random
from typing import Literal
from collections.abc import Callable, Awaitable
import httpx
import nonebot
from pydantic import BaseModel
from nonebot.adapters import Bot
from nonebot.utils import is_coroutine_callable
from nonebot.adapters.dodo import Bot as DodoBot
from nonebot.adapters.onebot.v11 import Bot as v11Bot
from nonebot.adapters.onebot.v12 import Bot as v12Bot
from nonebot.adapters.discord import Bot as DiscordBot
from nonebot.adapters.kaiheila import Bot as KaiheilaBot
from nonebot_plugin_alconna.uniseg import Target, Receipt, UniMessage
from zhenxun.services.log import logger
from zhenxun.configs.config import BotConfig
from zhenxun.utils.message import MessageUtils
from zhenxun.models.friend_user import FriendUser
from zhenxun.utils.exception import NotFindSuperuser
from zhenxun.models.group_console import GroupConsole
class UserData(BaseModel):
name: str
"""昵称"""
card: str | None = None
"""名片/备注"""
user_id: str
"""用户id"""
group_id: str | None = None
"""群组id"""
role: str | None = None
"""角色"""
avatar_url: str | None = None
"""头像url"""
join_time: int | None = None
"""加入时间"""
class PlatformUtils:
@classmethod
async def ban_user(cls, bot: Bot, user_id: str, group_id: str, duration: int):
"""禁言
参数:
bot: Bot
user_id: 用户id
group_id: 群组id
duration: 禁言时长(分钟)
"""
if isinstance(bot, v11Bot):
await bot.set_group_ban(
group_id=int(group_id),
user_id=int(user_id),
duration=duration * 60,
)
@classmethod
async def send_superuser(
cls,
bot: Bot,
message: UniMessage | str,
superuser_id: str | None = None,
) -> Receipt | None:
"""发送消息给超级用户
参数:
bot: Bot
message: 消息
superuser_id: 指定超级用户id.
异常:
NotFindSuperuser: 未找到超级用户id
返回:
Receipt | None: Receipt
"""
if not superuser_id:
if platform := cls.get_platform(bot):
if platform_superusers := BotConfig.get_superuser(platform):
superuser_id = random.choice(platform_superusers)
else:
raise NotFindSuperuser()
if isinstance(message, str):
message = MessageUtils.build_message(message)
return await cls.send_message(bot, superuser_id, None, message)
@classmethod
async def get_group_member_list(cls, bot: Bot, group_id: str) -> list[UserData]:
"""获取群组/频道成员列表
参数:
bot: Bot
group_id: 群组/频道id
返回:
list[UserData]: 用户数据列表
"""
if isinstance(bot, v11Bot):
if member_list := await bot.get_group_member_list(group_id=int(group_id)):
return [
UserData(
name=user["nickname"],
card=user["card"],
user_id=user["user_id"],
group_id=user["group_id"],
role=user["role"],
join_time=user["join_time"],
)
for user in member_list
]
if isinstance(bot, v12Bot):
if member_list := await bot.get_group_member_list(group_id=group_id):
return [
UserData(
name=user["user_name"],
card=user["user_displayname"],
user_id=user["user_id"],
group_id=group_id,
)
for user in member_list
]
if isinstance(bot, DodoBot):
if result_data := await bot.get_member_list(
island_source_id=group_id, page_size=100, max_id=0
):
max_id = result_data.max_id
result_list = result_data.list
while max_id == 100:
result_data = await bot.get_member_list(
island_source_id=group_id, page_size=100, max_id=0
)
result_list += result_data.list
max_id = result_data.max_id
return [
UserData(
name=user.nick_name,
card=user.personal_nick_name,
avatar_url=user.avatar_url,
user_id=user.dodo_source_id,
group_id=user.island_source_id,
join_time=int(user.join_time.timestamp()),
)
for user in result_list
]
if isinstance(bot, KaiheilaBot):
if result_data := await bot.guild_userList(guild_id=group_id):
if result_data.users:
data_list = []
for user in result_data.users:
second = None
if user.joined_at:
second = int(user.joined_at / 1000)
data_list.append(
UserData(
name=user.nickname or "",
avatar_url=user.avatar,
user_id=user.id_, # type: ignore
group_id=group_id,
join_time=second,
)
)
return data_list
return []
@classmethod
async def get_user(
cls, bot: Bot, user_id: str, group_id: str | None = None
) -> UserData | None:
"""获取用户信息
参数:
bot: Bot
user_id: 用户id
group_id: 群组/频道id.
返回:
UserData | None: 用户数据
"""
if isinstance(bot, v11Bot):
if group_id:
if user := await bot.get_group_member_info(
group_id=int(group_id), user_id=int(user_id)
):
return UserData(
name=user["nickname"],
card=user["card"],
user_id=user["user_id"],
group_id=user["group_id"],
role=user["role"],
join_time=user["join_time"],
)
elif friend_list := await bot.get_friend_list():
for f in friend_list:
if f["user_id"] == int(user_id):
return UserData(
name=f["nickname"],
card=f["remark"],
user_id=f["user_id"],
)
if isinstance(bot, v12Bot):
if group_id:
if user := await bot.get_group_member_info(
group_id=group_id, user_id=user_id
):
return UserData(
name=user["user_name"],
card=user["user_displayname"],
user_id=user["user_id"],
group_id=group_id,
)
elif friend_list := await bot.get_friend_list():
for f in friend_list:
if f["user_id"] == int(user_id):
return UserData(
name=f["user_name"],
card=f["user_remark"],
user_id=f["user_id"],
)
if isinstance(bot, DodoBot) and group_id:
if user := await bot.get_member_info(
island_source_id=group_id, dodo_source_id=user_id
):
return UserData(
name=user.nick_name,
card=user.personal_nick_name,
avatar_url=user.avatar_url,
user_id=user.dodo_source_id,
group_id=user.island_source_id,
join_time=int(user.join_time.timestamp()),
)
if isinstance(bot, KaiheilaBot) and group_id:
if user := await bot.user_view(guild_id=group_id, user_id=user_id):
second = int(user.joined_at / 1000) if user.joined_at else None
return UserData(
name=user.nickname or "",
avatar_url=user.avatar,
user_id=user_id,
group_id=group_id,
join_time=second,
)
return None
@classmethod
async def get_user_avatar(cls, user_id: str, platform: str) -> bytes | None:
"""快捷获取用户头像
参数:
user_id: 用户id
platform: 平台
"""
if platform == "qq":
url = f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=160"
async with httpx.AsyncClient() as client:
for _ in range(3):
try:
return (await client.get(url)).content
except Exception:
logger.error(
"获取用户头像错误",
"Util",
target=user_id,
platform=platform,
)
return None
@classmethod
def get_user_avatar_url(cls, user_id: str, platform: str) -> str | None:
"""快捷获取用户头像url
参数:
user_id: 用户id
platform: 平台
"""
if platform == "qq":
return f"http://q1.qlogo.cn/g?b=qq&nk={user_id}&s=160"
return None
@classmethod
async def get_group_avatar(cls, gid: str, platform: str) -> bytes | None:
"""快捷获取用群头像
参数:
gid: 群组id
platform: 平台
"""
if platform == "qq":
url = f"http://p.qlogo.cn/gh/{gid}/{gid}/640/"
async with httpx.AsyncClient() as client:
for _ in range(3):
try:
return (await client.get(url)).content
except Exception:
logger.error(
"获取群头像错误", "Util", target=gid, platform=platform
)
return None
@classmethod
async def send_message(
cls,
bot: Bot,
user_id: str | None,
group_id: str | None,
message: str | UniMessage,
) -> Receipt | None:
"""发送消息
参数:
bot: Bot
user_id: 用户id
group_id: 群组id或频道id
message: 消息文本
返回:
Receipt | None: 是否发送成功
"""
if target := cls.get_target(bot, user_id, group_id):
send_message = (
MessageUtils.build_message(message)
if isinstance(message, str)
else message
)
return await send_message.send(target=target, bot=bot)
return None
@classmethod
async def update_group(cls, bot: Bot) -> int:
"""更新群组信息
参数:
bot: Bot
返回:
int: 更新个数
"""
create_list = []
update_list = []
group_list, platform = await cls.get_group_list(bot)
if group_list:
db_group = await GroupConsole.all()
db_group_id = [(group.group_id, group.channel_id) for group in db_group]
for group in group_list:
group.platform = platform
if (group.group_id, group.channel_id) not in db_group_id:
create_list.append(group)
logger.debug(
"群聊信息更新成功",
"更新群信息",
target=f"{group.group_id}:{group.channel_id}",
)
else:
_group = next(
g
for g in db_group
if g.group_id == group.group_id
and g.channel_id == group.channel_id
)
_group.group_name = group.group_name
_group.max_member_count = group.max_member_count
_group.member_count = group.member_count
update_list.append(_group)
if create_list:
await GroupConsole.bulk_create(create_list, 10)
if group_list:
await GroupConsole.bulk_update(
update_list, ["group_name", "max_member_count", "member_count"], 10
)
return len(create_list)
@classmethod
def get_platform(cls, bot: Bot) -> str | None:
"""获取平台
参数:
bot: Bot
返回:
str | None: 平台
"""
if isinstance(bot, v11Bot | v12Bot):
return "qq"
if isinstance(bot, DodoBot):
return "dodo"
if isinstance(bot, KaiheilaBot):
return "kaiheila"
return "discord" if isinstance(bot, DiscordBot) else None
@classmethod
async def get_group_list(cls, bot: Bot) -> tuple[list[GroupConsole], str]:
"""获取群组列表
参数:
bot: Bot
返回:
tuple[list[GroupConsole], str]: 群组列表, 平台
"""
if isinstance(bot, v11Bot):
group_list = await bot.get_group_list()
return [
GroupConsole(
group_id=str(g["group_id"]),
group_name=g["group_name"],
max_member_count=g["max_member_count"],
member_count=g["member_count"],
)
for g in group_list
], "qq"
if isinstance(bot, v12Bot):
group_list = await bot.get_group_list()
return [
GroupConsole(
group_id=g.group_id, # type: ignore
user_name=g.group_name, # type: ignore
)
for g in group_list
], "qq"
if isinstance(bot, DodoBot):
island_list = await bot.get_island_list()
source_id_list = [
(g.island_source_id, g.island_name)
for g in island_list
if g.island_source_id
]
group_list = []
for id, name in source_id_list:
channel_list = await bot.get_channel_list(island_source_id=id)
group_list.append(GroupConsole(group_id=id, group_name=name))
group_list += [
GroupConsole(
group_id=id, group_name=c.channel_name, channel_id=c.channel_id
)
for c in channel_list
]
return group_list, "dodo"
if isinstance(bot, KaiheilaBot):
group_list = []
guilds = await bot.guild_list()
if guilds.guilds:
for guild_id, name in [(g.id_, g.name) for g in guilds.guilds if g.id_]:
view = await bot.guild_view(guild_id=guild_id)
group_list.append(GroupConsole(group_id=guild_id, group_name=name))
if view.channels:
group_list += [
GroupConsole(
group_id=guild_id, group_name=c.name, channel_id=c.id_
)
for c in view.channels
if c.type != 0
]
return group_list, "kaiheila"
return [], ""
@classmethod
async def update_friend(cls, bot: Bot) -> int:
"""更新好友信息
参数:
bot: Bot
返回:
int: 更新个数
"""
create_list = []
friend_list, platform = await cls.get_friend_list(bot)
if friend_list:
user_id_list = await FriendUser.all().values_list("user_id", flat=True)
for friend in friend_list:
friend.platform = platform
if friend.user_id not in user_id_list:
create_list.append(friend)
if create_list:
await FriendUser.bulk_create(create_list, 10)
return len(create_list)
@classmethod
async def get_friend_list(cls, bot: Bot) -> tuple[list[FriendUser], str]:
"""获取好友列表
参数:
bot: Bot
返回:
list[FriendUser]: 好友列表
"""
if isinstance(bot, v11Bot):
friend_list = await bot.get_friend_list()
return [
FriendUser(user_id=str(f["user_id"]), user_name=f["nickname"])
for f in friend_list
], "qq"
if isinstance(bot, v12Bot):
friend_list = await bot.get_friend_list()
return [
FriendUser(
user_id=f.user_id, # type: ignore
user_name=f.user_displayname or f.user_remark or f.user_name, # type: ignore
)
for f in friend_list
], "qq"
# if isinstance(bot, DodoBot):
# # TODO: dodo好友列表
# pass
# if isinstance(bot, KaiheilaBot):
# # TODO: kaiheila好友列表
# pass
# if isinstance(bot, DiscordBot):
# # TODO: discord好友列表
# pass
return [], ""
@classmethod
def get_target(
cls,
bot: Bot,
user_id: str | None = None,
group_id: str | None = None,
channel_id: str | None = None,
):
"""获取发生Target
参数:
bot: Bot
user_id: 用户id
group_id: 频道id或群组id
channel_id: 频道id
返回:
target: 对应平台Target
"""
target = None
if isinstance(bot, v11Bot | v12Bot):
if group_id:
target = Target(group_id)
elif user_id:
target = Target(user_id, private=True)
elif isinstance(bot, DodoBot | KaiheilaBot):
if group_id and channel_id:
target = Target(channel_id, parent_id=group_id, channel=True)
elif user_id:
target = Target(user_id, private=True)
return target
async def broadcast_group(
message: str | UniMessage,
bot: Bot | list[Bot] | None = None,
bot_id: str | set[str] | None = None,
ignore_group: set[int] | None = None,
check_func: Callable[[str], Awaitable] | None = None,
log_cmd: str | None = None,
platform: Literal["qq", "dodo", "kaiheila"] | None = None,
):
"""获取所有Bot或指定Bot对象广播群聊
参数:
message: 广播消息内容
bot: 指定bot对象.
bot_id: 指定bot id.
ignore_group: 忽略群聊列表.
check_func: 发送前对群聊检测方法,判断是否发送.
log_cmd: 日志标记.
platform: 指定平台
"""
if platform and platform not in ["qq", "dodo", "kaiheila"]:
raise ValueError("指定平台不支持")
if not message:
raise ValueError("群聊广播消息不能为空")
bot_dict = nonebot.get_bots()
bot_list: list[Bot] = []
if bot:
if isinstance(bot, list):
bot_list = bot
else:
bot_list.append(bot)
elif bot_id:
_bot_id_list = bot_id
if isinstance(bot_id, str):
_bot_id_list = [bot_id]
for id_ in _bot_id_list:
if bot_id in bot_dict:
bot_list.append(bot_dict[bot_id])
else:
logger.warning(f"Bot:{id_} 对象未连接或不存在")
else:
bot_list = list(bot_dict.values())
_used_group = []
for _bot in bot_list:
try:
if platform and platform != PlatformUtils.get_platform(_bot):
continue
group_list, _ = await PlatformUtils.get_group_list(_bot)
if group_list:
for group in group_list:
key = f"{group.group_id}:{group.channel_id}"
try:
if (
ignore_group
and (
group.group_id in ignore_group
or group.channel_id in ignore_group
)
) or key in _used_group:
continue
is_run = False
if check_func:
if is_coroutine_callable(check_func):
is_run = await check_func(group.group_id)
else:
is_run = check_func(group.group_id)
if not is_run:
continue
target = PlatformUtils.get_target(
_bot, None, group.group_id, group.channel_id
)
if target:
_used_group.append(key)
message_list = message
await MessageUtils.build_message(message_list).send(
target, _bot
)
logger.debug("发送成功", log_cmd, target=key)
else:
logger.warning("target为空", log_cmd, target=key)
except Exception as e:
logger.error("发送失败", log_cmd, target=key, e=e)
except Exception as e:
logger.error(f"Bot: {_bot.self_id} 获取群聊列表失败", command=log_cmd, e=e)