feat: 我有一个朋友

This commit is contained in:
HibiKier 2024-05-16 19:54:30 +08:00
parent e2bb4d2e56
commit f03604d3bc
3 changed files with 275 additions and 1 deletions

View File

@ -0,0 +1,79 @@
import random
from io import BytesIO
from nonebot.adapters import Bot
from nonebot.plugin import PluginMetadata
from nonebot_plugin_alconna import Alconna, Args
from nonebot_plugin_alconna import At as alcAt
from nonebot_plugin_alconna import Match, on_alconna
from nonebot_plugin_saa import Image, Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.utils import PluginExtraData
from zhenxun.services.log import logger
from zhenxun.utils.http_utils import AsyncHttpx
from zhenxun.utils.image_utils import BuildImage
from zhenxun.utils.platform import PlatformUtils
__plugin_meta__ = PluginMetadata(
name="我有一个朋友",
description="我有一个朋友想问问...",
usage="""
指令
我有一个朋友想问问 [文本] ?[at]: 当at时你的朋友就是艾特对象
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
).dict(),
)
_matcher = on_alconna(
Alconna("one-friend", Args["text", str]["at?", alcAt]), priority=5, block=True
)
_matcher.shortcut(
"^我.{0,4}朋友.{0,2}(?:想问问|说|让我问问|想问|让我问|想知道|让我帮他问问|让我帮他问|让我帮忙问|让我帮忙问问|问)(?P<content>.{0,30})$",
command="one-friend",
arguments=["{content}"],
prefix=True,
)
@_matcher.handle()
async def _(bot: Bot, text: str, at: Match[alcAt], session: EventSession):
gid = session.id3 or session.id2
if not gid:
await Text("群组id为空...").finish()
if not session.id1:
await Text("用户id为空...").finish()
at_user = None
if at.available:
at_user = at.result.target
user = None
if at_user:
user = await PlatformUtils.get_user(bot, at_user, gid)
else:
if member_list := await PlatformUtils.get_group_member_list(bot, gid):
text = text.replace("", "").replace("", "").replace("", "")
user = random.choice(member_list)
if user:
ava_data = None
if PlatformUtils.get_platform(bot) == "qq":
ava_data = await PlatformUtils.get_user_avatar(user.user_id, "qq")
elif user.avatar_url:
ava_data = (await AsyncHttpx.get(user.avatar_url)).content
ava_img = BuildImage(200, 100, color=(0, 0, 0, 0))
if ava_data:
ava_img = BuildImage(200, 100, background=BytesIO(ava_data))
await ava_img.circle()
user_name = "朋友"
content = BuildImage(400, 30, font_size=30)
await content.text((0, 0), user_name)
A = BuildImage(700, 150, font_size=25, color="white")
await A.paste(ava_img, (30, 25))
await A.paste(content, (150, 38))
await A.text((150, 85), text, (125, 125, 125))
logger.info(f"发送有一个朋友: {text}", "我有一个朋友", session=session)
await Image(A.pic2bytes()).finish()
await Text("获取用户信息失败...").send()

View File

@ -72,7 +72,7 @@ class AsyncHttpx:
cls, cls,
url: str, url: str,
*, *,
data: Dict[str, str] | None = None, data: Dict[str, Any] | None = None,
content: Any = None, content: Any = None,
files: Any = None, files: Any = None,
verify: bool = True, verify: bool = True,

View File

@ -20,14 +20,209 @@ from nonebot_plugin_saa import (
TargetQQPrivate, TargetQQPrivate,
Text, Text,
) )
from pydantic import BaseModel
from zhenxun.models.friend_user import FriendUser from zhenxun.models.friend_user import FriendUser
from zhenxun.models.group_console import GroupConsole from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger from zhenxun.services.log import logger
class UserData(BaseModel):
name: str
"""昵称"""
card: str | None = None
"""名片/备注"""
user_id: str
"""用户id"""
group_id: str | None = None
"""群组id"""
role: str | None = None
"""角色"""
avatar_url: str | None = None
"""头像url"""
join_time: int | None = None
"""加入时间"""
class PlatformUtils: class PlatformUtils:
@classmethod
async def get_group_member_list(cls, bot: Bot, group_id: str) -> list[UserData]:
"""获取群组/频道成员列表
参数:
bot: Bot
group_id: 群组/频道id
返回:
list[UserData]: 用户数据列表
"""
if isinstance(bot, v11Bot):
if member_list := await bot.get_group_member_list(group_id=int(group_id)):
return [
UserData(
name=user["nickname"],
card=user["card"],
user_id=user["user_id"],
group_id=user["group_id"],
role=user["role"],
join_time=user["join_time"],
)
for user in member_list
]
if isinstance(bot, v12Bot):
if member_list := await bot.get_group_member_list(group_id=group_id):
return [
UserData(
name=user["user_name"],
card=user["user_displayname"],
user_id=user["user_id"],
group_id=group_id,
)
for user in member_list
]
if isinstance(bot, DodoBot):
if result_data := await bot.get_member_list(
island_source_id=group_id, page_size=100, max_id=0
):
max_id = result_data.max_id
result_list = result_data.list
data_list = []
while max_id == 100:
result_data = await bot.get_member_list(
island_source_id=group_id, page_size=100, max_id=0
)
result_list += result_data.list
max_id = result_data.max_id
for user in result_list:
data_list.append(
UserData(
name=user.nick_name,
card=user.personal_nick_name,
avatar_url=user.avatar_url,
user_id=user.dodo_source_id,
group_id=user.island_source_id,
join_time=int(user.join_time.timestamp()),
)
)
return data_list
if isinstance(bot, KaiheilaBot):
if result_data := await bot.guild_userList(guild_id=group_id):
if result_data.users:
data_list = []
for user in result_data.users:
second = None
if user.joined_at:
second = int(user.joined_at / 1000)
data_list.append(
UserData(
name=user.nickname or "",
avatar_url=user.avatar,
user_id=user.id_, # type: ignore
group_id=group_id,
join_time=second,
)
)
return data_list
if isinstance(bot, DiscordBot):
# TODO: discord获取用户
pass
return []
@classmethod
async def get_user(
cls, bot: Bot, user_id: str, group_id: str | None = None
) -> UserData | None:
"""获取用户信息
参数:
bot: Bot
user_id: 用户id
group_id: 群组/频道id.
返回:
UserData | None: 用户数据
"""
if isinstance(bot, v11Bot):
if group_id:
if user := await bot.get_group_member_info(
group_id=int(group_id), user_id=int(user_id)
):
return UserData(
name=user["nickname"],
card=user["card"],
user_id=user["user_id"],
group_id=user["group_id"],
role=user["role"],
join_time=user["join_time"],
)
else:
if friend_list := await bot.get_friend_list():
for f in friend_list:
if f["user_id"] == int(user_id):
return UserData(
name=f["nickname"],
card=f["remark"],
user_id=f["user_id"],
)
if isinstance(bot, v12Bot):
if group_id:
if user := await bot.get_group_member_info(
group_id=group_id, user_id=user_id
):
return UserData(
name=user["user_name"],
card=user["user_displayname"],
user_id=user["user_id"],
group_id=group_id,
)
else:
if friend_list := await bot.get_friend_list():
for f in friend_list:
if f["user_id"] == int(user_id):
return UserData(
name=f["user_name"],
card=f["user_remark"],
user_id=f["user_id"],
)
if isinstance(bot, DodoBot):
if group_id:
if user := await bot.get_member_info(
island_source_id=group_id, dodo_source_id=user_id
):
return UserData(
name=user.nick_name,
card=user.personal_nick_name,
avatar_url=user.avatar_url,
user_id=user.dodo_source_id,
group_id=user.island_source_id,
join_time=int(user.join_time.timestamp()),
)
else:
# TODO: DoDo个人数据
pass
if isinstance(bot, KaiheilaBot):
if group_id:
if user := await bot.user_view(guild_id=group_id, user_id=user_id):
second = None
if user.joined_at:
second = int(user.joined_at / 1000)
return UserData(
name=user.nickname or "",
avatar_url=user.avatar,
user_id=user_id,
group_id=group_id,
join_time=second,
)
else:
# TODO: kaiheila用户详情
pass
if isinstance(bot, DiscordBot):
# TODO: discord获取用户
pass
return None
@classmethod @classmethod
async def get_user_avatar(cls, user_id: str, platform: str) -> bytes | None: async def get_user_avatar(cls, user_id: str, platform: str) -> bytes | None:
"""快捷获取用户头像 """快捷获取用户头像