♻️ 使用Uninfo重构PlatformUtils基础方法

This commit is contained in:
HibiKier 2024-12-20 15:48:12 +08:00
parent 176b5c9afd
commit 5a92a4fe3d
4 changed files with 113 additions and 225 deletions

View File

@ -37,8 +37,7 @@ async def _():
update_list = []
if modules := await TaskInfo.annotate().values_list("module", flat=True):
for bot in nonebot.get_bots().values():
group_list, _ = await PlatformUtils.get_group_list(bot)
group_list = [g for g in group_list if g.channel_id is None]
group_list, _ = await PlatformUtils.get_group_list(bot, True)
for group in group_list:
try:
last_message = (

View File

@ -47,8 +47,7 @@ class BotManage:
bot_info = BotInfo(
self_id=bot.self_id, nickname=nickname, ava_url=ava_url, platform=platform
)
group_list, _ = await PlatformUtils.get_group_list(bot)
group_list = [g for g in group_list if g.channel_id is None]
group_list, _ = await PlatformUtils.get_group_list(bot, True)
friend_list, _ = await PlatformUtils.get_friend_list(bot)
bot_info.group_count = len(group_list)
bot_info.friend_count = len(friend_list)

View File

@ -61,12 +61,16 @@ async def _(bot_id: str | None = None) -> Result[list[BaseInfo]]:
if bots := nonebot.get_bots():
select_bot: BaseInfo
for _, bot in bots.items():
login_info = await bot.get_login_info()
login_info = None
try:
login_info = await bot.get_login_info()
except Exception as e:
logger.warning("调用接口get_login_info失败", "webui", e=e)
bot_list.append(
TemplateBaseInfo(
bot=bot, # type: ignore
self_id=bot.self_id,
nickname=login_info["nickname"],
nickname=login_info["nickname"] if login_info else bot.self_id,
ava_url=AVA_URL.format(bot.self_id),
)
)
@ -83,9 +87,11 @@ async def _(bot_id: str | None = None) -> Result[list[BaseInfo]]:
create_time__gte=now - timedelta(hours=now.hour),
).count()
# 群聊数量
select_bot.group_count = len(await select_bot.bot.get_group_list())
select_bot.group_count = len(await PlatformUtils.get_group_list(select_bot.bot))
# 好友数量
select_bot.friend_count = len(await select_bot.bot.get_friend_list())
select_bot.friend_count = len(
await PlatformUtils.get_friend_list(select_bot.bot)
)
for bot in bot_list:
bot.bot = None # type: ignore
# 插件加载数量

View File

@ -5,14 +5,11 @@ from typing import Literal
import httpx
import nonebot
from nonebot.adapters import Bot
from nonebot.adapters.dodo import Bot as DodoBot
from nonebot.adapters.kaiheila import Bot as KaiheilaBot
from nonebot.adapters.onebot.v11 import Bot as v11Bot
from nonebot.adapters.onebot.v12 import Bot as v12Bot
from nonebot.utils import is_coroutine_callable
from nonebot_plugin_alconna import SupportScope
from nonebot_plugin_alconna.uniseg import Receipt, Target, UniMessage
from nonebot_plugin_uninfo import Uninfo, get_interface
from nonebot_plugin_uninfo import SceneType, Uninfo, get_interface
from nonebot_plugin_uninfo.model import Member
from pydantic import BaseModel
from zhenxun.configs.config import BotConfig
@ -35,6 +32,8 @@ class UserData(BaseModel):
"""用户id"""
group_id: str | None = None
"""群组id"""
channel_id: str | None = None
"""频道id"""
role: str | None = None
"""角色"""
avatar_url: str | None = None
@ -68,7 +67,7 @@ class PlatformUtils:
group_id: 群组id
duration: 禁言时长(分钟)
"""
if isinstance(bot, v11Bot):
if cls.get_platform(bot) == "qq":
await bot.set_group_ban(
group_id=int(group_id),
user_id=int(user_id),
@ -116,148 +115,80 @@ class PlatformUtils:
返回:
list[UserData]: 用户数据列表
"""
if isinstance(bot, v11Bot):
if member_list := await bot.get_group_member_list(group_id=int(group_id)):
return [
UserData(
name=user["nickname"],
card=user["card"],
user_id=user["user_id"],
group_id=user["group_id"],
role=user["role"],
join_time=user["join_time"],
)
for user in member_list
]
if isinstance(bot, v12Bot):
if member_list := await bot.get_group_member_list(group_id=group_id):
return [
UserData(
name=user["user_name"],
card=user["user_displayname"],
user_id=user["user_id"],
group_id=group_id,
)
for user in member_list
]
if isinstance(bot, DodoBot):
if result_data := await bot.get_member_list(
island_source_id=group_id, page_size=100, max_id=0
):
max_id = result_data.max_id
result_list = result_data.list
while max_id == 100:
result_data = await bot.get_member_list(
island_source_id=group_id, page_size=100, max_id=0
)
result_list += result_data.list
max_id = result_data.max_id
return [
UserData(
name=user.nick_name,
card=user.personal_nick_name,
avatar_url=user.avatar_url,
user_id=user.dodo_source_id,
group_id=user.island_source_id,
join_time=int(user.join_time.timestamp()),
)
for user in result_list
]
if isinstance(bot, KaiheilaBot):
if result_data := await bot.guild_userList(guild_id=group_id):
if result_data.users:
data_list = []
for user in result_data.users:
second = None
if user.joined_at:
second = int(user.joined_at / 1000)
data_list.append(
UserData(
name=user.nickname or "",
avatar_url=user.avatar,
user_id=user.id_, # type: ignore
group_id=group_id,
join_time=second,
)
)
return data_list
if interface := get_interface(bot):
members: list[Member] = await interface.get_members(
SceneType.GROUP, group_id
)
return [
UserData(
name=member.user.name or "",
card=member.nick,
user_id=member.user.id,
group_id=group_id,
role=member.role.id if member.role else "",
avatar_url=member.user.avatar,
join_time=int(member.joined_at.timestamp())
if member.joined_at
else None,
)
for member in members
]
return []
@classmethod
async def get_user(
cls, bot: Bot, user_id: str, group_id: str | None = None
cls,
bot: Bot,
user_id: str,
group_id: str | None = None,
channel_id: str | None = None,
) -> UserData | None:
"""获取用户信息
参数:
bot: Bot
user_id: 用户id
group_id: 群组/频道id.
group_id: 群组id.
channel_id: 频道id.
返回:
UserData | None: 用户数据
"""
if isinstance(bot, v11Bot):
if group_id:
if user := await bot.get_group_member_info(
group_id=int(group_id), user_id=int(user_id)
):
return UserData(
name=user["nickname"],
card=user["card"],
user_id=user["user_id"],
group_id=user["group_id"],
role=user["role"],
join_time=user["join_time"],
)
elif friend_list := await bot.get_friend_list():
for f in friend_list:
if f["user_id"] == int(user_id):
return UserData(
name=f["nickname"],
card=f["remark"],
user_id=f["user_id"],
)
if isinstance(bot, v12Bot):
if group_id:
if user := await bot.get_group_member_info(
group_id=group_id, user_id=user_id
):
return UserData(
name=user["user_name"],
card=user["user_displayname"],
user_id=user["user_id"],
group_id=group_id,
)
elif friend_list := await bot.get_friend_list():
for f in friend_list:
if f["user_id"] == int(user_id):
return UserData(
name=f["user_name"],
card=f["user_remark"],
user_id=f["user_id"],
)
if isinstance(bot, DodoBot) and group_id:
if user := await bot.get_member_info(
island_source_id=group_id, dodo_source_id=user_id
):
return UserData(
name=user.nick_name,
card=user.personal_nick_name,
avatar_url=user.avatar_url,
user_id=user.dodo_source_id,
group_id=user.island_source_id,
join_time=int(user.join_time.timestamp()),
if interface := get_interface(bot):
member = None
user = None
if channel_id:
member = await interface.get_member(
SceneType.CHANNEL_TEXT, channel_id, user_id
)
if isinstance(bot, KaiheilaBot) and group_id:
if user := await bot.user_view(guild_id=group_id, user_id=user_id):
second = int(user.joined_at / 1000) if user.joined_at else None
if member:
user = member.user
elif group_id:
member = await interface.get_member(SceneType.GROUP, group_id, user_id)
if member:
user = member.user
else:
user = await interface.get_user(user_id)
if not user:
return None
if member:
return UserData(
name=user.nickname or "",
avatar_url=user.avatar,
user_id=user_id,
name=user.name or "",
card=member.nick,
user_id=user.id,
group_id=group_id,
join_time=second,
channel_id=channel_id,
role=member.role.id if member.role else None,
join_time=int(member.joined_at.timestamp())
if member.joined_at
else None,
)
else:
return UserData(
name=user.name or "",
user_id=user.id,
group_id=group_id,
channel_id=channel_id,
)
return None
@ -361,7 +292,9 @@ class PlatformUtils:
group_list, platform = await cls.get_group_list(bot)
if group_list:
db_group = await GroupConsole.all()
db_group_id = [(group.group_id, group.channel_id) for group in db_group]
db_group_id: list[tuple[str, str]] = [
(group.group_id, group.channel_id) for group in db_group
]
for group in group_list:
group.platform = platform
if (group.group_id, group.channel_id) not in db_group_id:
@ -411,69 +344,43 @@ class PlatformUtils:
return "unknown"
@classmethod
async def get_group_list(cls, bot: Bot) -> tuple[list[GroupConsole], str]:
async def get_group_list(
cls, bot: Bot, only_group: bool = False
) -> tuple[list[GroupConsole], str]:
"""获取群组列表
参数:
bot: Bot
only_group: 是否只获取群组不获取channel
返回:
tuple[list[GroupConsole], str]: 群组列表, 平台
"""
if isinstance(bot, v11Bot):
group_list = await bot.get_group_list()
return [
GroupConsole(
group_id=str(g["group_id"]),
group_name=g["group_name"],
max_member_count=g["max_member_count"],
member_count=g["member_count"],
)
for g in group_list
], "qq"
if isinstance(bot, v12Bot):
group_list = await bot.get_group_list()
return [
GroupConsole(
group_id=g.group_id, # type: ignore
user_name=g.group_name, # type: ignore
)
for g in group_list
], "qq"
if isinstance(bot, DodoBot):
island_list = await bot.get_island_list()
source_id_list = [
(g.island_source_id, g.island_name)
for g in island_list
if g.island_source_id
]
group_list = []
for id, name in source_id_list:
channel_list = await bot.get_channel_list(island_source_id=id)
group_list.append(GroupConsole(group_id=id, group_name=name))
group_list += [
if interface := get_interface(bot):
platform = cls.get_platform(bot)
result_list = []
scenes = await interface.get_scenes(SceneType.GROUP)
for scene in scenes:
group_id = scene.id
result_list.append(
GroupConsole(
group_id=id, group_name=c.channel_name, channel_id=c.channel_id
group_id=scene.id,
group_name=scene.name,
)
for c in channel_list
]
return group_list, "dodo"
if isinstance(bot, KaiheilaBot):
group_list = []
guilds = await bot.guild_list()
if guilds.guilds:
for guild_id, name in [(g.id_, g.name) for g in guilds.guilds if g.id_]:
view = await bot.guild_view(guild_id=guild_id)
group_list.append(GroupConsole(group_id=guild_id, group_name=name))
if view.channels:
group_list += [
GroupConsole(
group_id=guild_id, group_name=c.name, channel_id=c.id_
)
if not only_group and platform != "qq":
if channel_list := await interface.get_scenes(
parent_scene_id=group_id
):
for channel in channel_list:
result_list.append(
GroupConsole(
group_id=scene.id,
group_name=channel.name,
channel_id=channel.id,
)
)
for c in view.channels
if c.type != 0
]
return group_list, "kaiheila"
return result_list, platform
return [], ""
@classmethod
@ -508,30 +415,11 @@ class PlatformUtils:
返回:
list[FriendUser]: 好友列表
"""
if isinstance(bot, v11Bot):
friend_list = await bot.get_friend_list()
if interface := get_interface(bot):
user_list = await interface.get_users()
return [
FriendUser(user_id=str(f["user_id"]), user_name=f["nickname"])
for f in friend_list
], "qq"
if isinstance(bot, v12Bot):
friend_list = await bot.get_friend_list()
return [
FriendUser(
user_id=f.user_id, # type: ignore
user_name=f.user_displayname or f.user_remark or f.user_name, # type: ignore
)
for f in friend_list
], "qq"
# if isinstance(bot, DodoBot):
# # TODO: dodo好友列表
# pass
# if isinstance(bot, KaiheilaBot):
# # TODO: kaiheila好友列表
# pass
# if isinstance(bot, DiscordBot):
# # TODO: discord好友列表
# pass
FriendUser(user_id=u.id, user_name=u.name) for u in user_list
], cls.get_platform(bot)
return [], ""
@classmethod
@ -554,16 +442,12 @@ class PlatformUtils:
target: 对应平台Target
"""
target = None
if isinstance(bot, v11Bot | v12Bot):
if group_id:
target = Target(group_id)
elif user_id:
target = Target(user_id, private=True)
elif isinstance(bot, DodoBot | KaiheilaBot):
if group_id and channel_id:
target = Target(channel_id, parent_id=group_id, channel=True)
elif user_id:
target = Target(user_id, private=True)
if group_id and channel_id:
target = Target(channel_id, parent_id=group_id, channel=True)
elif group_id:
target = Target(group_id)
elif user_id:
target = Target(user_id, private=True)
return target