diff --git a/zhenxun/plugins/one_friend/__init__.py b/zhenxun/plugins/one_friend/__init__.py new file mode 100644 index 00000000..67c5082e --- /dev/null +++ b/zhenxun/plugins/one_friend/__init__.py @@ -0,0 +1,79 @@ +import random +from io import BytesIO + +from nonebot.adapters import Bot +from nonebot.plugin import PluginMetadata +from nonebot_plugin_alconna import Alconna, Args +from nonebot_plugin_alconna import At as alcAt +from nonebot_plugin_alconna import Match, on_alconna +from nonebot_plugin_saa import Image, Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.utils import PluginExtraData +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx +from zhenxun.utils.image_utils import BuildImage +from zhenxun.utils.platform import PlatformUtils + +__plugin_meta__ = PluginMetadata( + name="我有一个朋友", + description="我有一个朋友想问问...", + usage=""" + 指令: + 我有一个朋友想问问 [文本] ?[at]: 当at时你的朋友就是艾特对象 + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + ).dict(), +) + +_matcher = on_alconna( + Alconna("one-friend", Args["text", str]["at?", alcAt]), priority=5, block=True +) + +_matcher.shortcut( + "^我.{0,4}朋友.{0,2}(?:想问问|说|让我问问|想问|让我问|想知道|让我帮他问问|让我帮他问|让我帮忙问|让我帮忙问问|问)(?P.{0,30})$", + command="one-friend", + arguments=["{content}"], + prefix=True, +) + + +@_matcher.handle() +async def _(bot: Bot, text: str, at: Match[alcAt], session: EventSession): + gid = session.id3 or session.id2 + if not gid: + await Text("群组id为空...").finish() + if not session.id1: + await Text("用户id为空...").finish() + at_user = None + if at.available: + at_user = at.result.target + user = None + if at_user: + user = await PlatformUtils.get_user(bot, at_user, gid) + else: + if member_list := await PlatformUtils.get_group_member_list(bot, gid): + text = text.replace("他", "我").replace("她", "我").replace("它", "我") + user = random.choice(member_list) + if user: + ava_data = None + if PlatformUtils.get_platform(bot) == "qq": + ava_data = await PlatformUtils.get_user_avatar(user.user_id, "qq") + elif user.avatar_url: + ava_data = (await AsyncHttpx.get(user.avatar_url)).content + ava_img = BuildImage(200, 100, color=(0, 0, 0, 0)) + if ava_data: + ava_img = BuildImage(200, 100, background=BytesIO(ava_data)) + await ava_img.circle() + user_name = "朋友" + content = BuildImage(400, 30, font_size=30) + await content.text((0, 0), user_name) + A = BuildImage(700, 150, font_size=25, color="white") + await A.paste(ava_img, (30, 25)) + await A.paste(content, (150, 38)) + await A.text((150, 85), text, (125, 125, 125)) + logger.info(f"发送有一个朋友: {text}", "我有一个朋友", session=session) + await Image(A.pic2bytes()).finish() + await Text("获取用户信息失败...").send() diff --git a/zhenxun/utils/http_utils.py b/zhenxun/utils/http_utils.py index ea9516ef..142d2ceb 100644 --- a/zhenxun/utils/http_utils.py +++ b/zhenxun/utils/http_utils.py @@ -72,7 +72,7 @@ class AsyncHttpx: cls, url: str, *, - data: Dict[str, str] | None = None, + data: Dict[str, Any] | None = None, content: Any = None, files: Any = None, verify: bool = True, diff --git a/zhenxun/utils/platform.py b/zhenxun/utils/platform.py index 19163cc5..12502fd8 100644 --- a/zhenxun/utils/platform.py +++ b/zhenxun/utils/platform.py @@ -20,14 +20,209 @@ from nonebot_plugin_saa import ( TargetQQPrivate, Text, ) +from pydantic import BaseModel from zhenxun.models.friend_user import FriendUser from zhenxun.models.group_console import GroupConsole from zhenxun.services.log import logger +class UserData(BaseModel): + + name: str + """昵称""" + card: str | None = None + """名片/备注""" + user_id: str + """用户id""" + group_id: str | None = None + """群组id""" + role: str | None = None + """角色""" + avatar_url: str | None = None + """头像url""" + join_time: int | None = None + """加入时间""" + + class PlatformUtils: + @classmethod + async def get_group_member_list(cls, bot: Bot, group_id: str) -> list[UserData]: + """获取群组/频道成员列表 + + 参数: + bot: Bot + group_id: 群组/频道id + + 返回: + list[UserData]: 用户数据列表 + """ + if isinstance(bot, v11Bot): + if member_list := await bot.get_group_member_list(group_id=int(group_id)): + return [ + UserData( + name=user["nickname"], + card=user["card"], + user_id=user["user_id"], + group_id=user["group_id"], + role=user["role"], + join_time=user["join_time"], + ) + for user in member_list + ] + if isinstance(bot, v12Bot): + if member_list := await bot.get_group_member_list(group_id=group_id): + return [ + UserData( + name=user["user_name"], + card=user["user_displayname"], + user_id=user["user_id"], + group_id=group_id, + ) + for user in member_list + ] + if isinstance(bot, DodoBot): + if result_data := await bot.get_member_list( + island_source_id=group_id, page_size=100, max_id=0 + ): + max_id = result_data.max_id + result_list = result_data.list + data_list = [] + while max_id == 100: + result_data = await bot.get_member_list( + island_source_id=group_id, page_size=100, max_id=0 + ) + result_list += result_data.list + max_id = result_data.max_id + for user in result_list: + data_list.append( + UserData( + name=user.nick_name, + card=user.personal_nick_name, + avatar_url=user.avatar_url, + user_id=user.dodo_source_id, + group_id=user.island_source_id, + join_time=int(user.join_time.timestamp()), + ) + ) + return data_list + if isinstance(bot, KaiheilaBot): + if result_data := await bot.guild_userList(guild_id=group_id): + if result_data.users: + data_list = [] + for user in result_data.users: + second = None + if user.joined_at: + second = int(user.joined_at / 1000) + data_list.append( + UserData( + name=user.nickname or "", + avatar_url=user.avatar, + user_id=user.id_, # type: ignore + group_id=group_id, + join_time=second, + ) + ) + return data_list + if isinstance(bot, DiscordBot): + # TODO: discord获取用户 + pass + return [] + + @classmethod + async def get_user( + cls, bot: Bot, user_id: str, group_id: str | None = None + ) -> UserData | None: + """获取用户信息 + + 参数: + bot: Bot + user_id: 用户id + group_id: 群组/频道id. + + 返回: + UserData | None: 用户数据 + """ + if isinstance(bot, v11Bot): + if group_id: + if user := await bot.get_group_member_info( + group_id=int(group_id), user_id=int(user_id) + ): + return UserData( + name=user["nickname"], + card=user["card"], + user_id=user["user_id"], + group_id=user["group_id"], + role=user["role"], + join_time=user["join_time"], + ) + else: + if friend_list := await bot.get_friend_list(): + for f in friend_list: + if f["user_id"] == int(user_id): + return UserData( + name=f["nickname"], + card=f["remark"], + user_id=f["user_id"], + ) + if isinstance(bot, v12Bot): + if group_id: + if user := await bot.get_group_member_info( + group_id=group_id, user_id=user_id + ): + return UserData( + name=user["user_name"], + card=user["user_displayname"], + user_id=user["user_id"], + group_id=group_id, + ) + else: + if friend_list := await bot.get_friend_list(): + for f in friend_list: + if f["user_id"] == int(user_id): + return UserData( + name=f["user_name"], + card=f["user_remark"], + user_id=f["user_id"], + ) + if isinstance(bot, DodoBot): + if group_id: + if user := await bot.get_member_info( + island_source_id=group_id, dodo_source_id=user_id + ): + return UserData( + name=user.nick_name, + card=user.personal_nick_name, + avatar_url=user.avatar_url, + user_id=user.dodo_source_id, + group_id=user.island_source_id, + join_time=int(user.join_time.timestamp()), + ) + else: + # TODO: DoDo个人数据 + pass + if isinstance(bot, KaiheilaBot): + if group_id: + if user := await bot.user_view(guild_id=group_id, user_id=user_id): + second = None + if user.joined_at: + second = int(user.joined_at / 1000) + return UserData( + name=user.nickname or "", + avatar_url=user.avatar, + user_id=user_id, + group_id=group_id, + join_time=second, + ) + else: + # TODO: kaiheila用户详情 + pass + if isinstance(bot, DiscordBot): + # TODO: discord获取用户 + pass + return None + @classmethod async def get_user_avatar(cls, user_id: str, platform: str) -> bytes | None: """快捷获取用户头像