mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
✨ unban支持使用id (#1681)
This commit is contained in:
parent
a28a60d9c8
commit
3117dc264f
@ -30,7 +30,7 @@ __plugin_meta__ = PluginMetadata(
|
|||||||
usage="""
|
usage="""
|
||||||
普通管理员
|
普通管理员
|
||||||
格式:
|
格式:
|
||||||
ban [At用户] -t [时长(分钟)]
|
ban [At用户] ?[-t [时长(分钟)]]
|
||||||
|
|
||||||
示例:
|
示例:
|
||||||
ban @用户 : 永久拉黑用户
|
ban @用户 : 永久拉黑用户
|
||||||
@ -44,7 +44,8 @@ __plugin_meta__ = PluginMetadata(
|
|||||||
superuser_help="""
|
superuser_help="""
|
||||||
超级管理员额外命令
|
超级管理员额外命令
|
||||||
格式:
|
格式:
|
||||||
ban [At用户/用户Id] [时长]
|
ban [At用户/用户Id] ?[-t [时长]]
|
||||||
|
unban --id [idx] : 通过id来进行unban操作
|
||||||
ban列表: 获取所有Ban数据
|
ban列表: 获取所有Ban数据
|
||||||
|
|
||||||
群组ban列表: 获取群组Ban数据
|
群组ban列表: 获取群组Ban数据
|
||||||
@ -98,6 +99,7 @@ _unban_matcher = on_alconna(
|
|||||||
"unban",
|
"unban",
|
||||||
Args["user?", [str, At]],
|
Args["user?", [str, At]],
|
||||||
Option("-g|--group", Args["group_id", str]),
|
Option("-g|--group", Args["group_id", str]),
|
||||||
|
Option("--id", Args["idx", int]),
|
||||||
),
|
),
|
||||||
rule=admin_check("ban", "BAN_LEVEL"),
|
rule=admin_check("ban", "BAN_LEVEL"),
|
||||||
priority=5,
|
priority=5,
|
||||||
@ -238,8 +240,10 @@ async def _(
|
|||||||
arparma: Arparma,
|
arparma: Arparma,
|
||||||
user: Match[str | At],
|
user: Match[str | At],
|
||||||
group_id: Match[str],
|
group_id: Match[str],
|
||||||
|
idx: Match[int],
|
||||||
):
|
):
|
||||||
user_id = ""
|
user_id = ""
|
||||||
|
_idx = idx.result if idx.available else None
|
||||||
if user.available:
|
if user.available:
|
||||||
if isinstance(user.result, At):
|
if isinstance(user.result, At):
|
||||||
user_id = user.result.target
|
user_id = user.result.target
|
||||||
@ -248,48 +252,44 @@ async def _(
|
|||||||
await MessageUtils.build_message("权限不足捏...").finish(reply_to=True)
|
await MessageUtils.build_message("权限不足捏...").finish(reply_to=True)
|
||||||
user_id = user.result
|
user_id = user.result
|
||||||
if gid := session.id3 or session.id2:
|
if gid := session.id3 or session.id2:
|
||||||
u_d = user_id
|
|
||||||
if group_id.available:
|
if group_id.available:
|
||||||
u_d = gid
|
|
||||||
gid = group_id.result
|
gid = group_id.result
|
||||||
if await BanManage.unban(
|
is_unban, result = await BanManage.unban(
|
||||||
user_id, gid, session, session.id1 in bot.config.superusers
|
user_id, gid, session, _idx, session.id1 in bot.config.superusers
|
||||||
):
|
)
|
||||||
logger.info(
|
if not is_unban:
|
||||||
"管理员UnBan",
|
await MessageUtils.build_message(result).finish(reply_to=True)
|
||||||
arparma.header_result,
|
logger.info(
|
||||||
session=session,
|
"管理员UnBan",
|
||||||
target=f"{gid}:{user_id}",
|
arparma.header_result,
|
||||||
)
|
session=session,
|
||||||
await MessageUtils.build_message(
|
target=f"{gid}:{result}",
|
||||||
[
|
)
|
||||||
"将 ",
|
await MessageUtils.build_message(
|
||||||
(
|
[
|
||||||
At(flag="user", target=user_id)
|
"将 ",
|
||||||
if isinstance(user.result, At)
|
(
|
||||||
else u_d
|
At(flag="user", target=user_id)
|
||||||
), # type: ignore
|
if isinstance(user.result, At)
|
||||||
" 从黑屋中拉了出来并急救了一下!",
|
else result
|
||||||
]
|
), # type: ignore
|
||||||
).finish(reply_to=True)
|
" 从黑屋中拉了出来并急救了一下!",
|
||||||
else:
|
]
|
||||||
await MessageUtils.build_message("该用户不在黑名单中捏...").finish(
|
).finish(reply_to=True)
|
||||||
reply_to=True
|
|
||||||
)
|
|
||||||
elif session.id1 in bot.config.superusers:
|
elif session.id1 in bot.config.superusers:
|
||||||
_group_id = group_id.result if group_id.available else None
|
_group_id = group_id.result if group_id.available else None
|
||||||
if await BanManage.unban(user_id, _group_id, session, True):
|
is_unban, result = await BanManage.unban(
|
||||||
logger.info(
|
user_id, _group_id, session, _idx, True
|
||||||
"超级用户UnBan",
|
)
|
||||||
arparma.header_result,
|
if not is_unban:
|
||||||
session=session,
|
await MessageUtils.build_message(result).finish(reply_to=True)
|
||||||
target=f"{_group_id}:{user_id}",
|
logger.info(
|
||||||
)
|
"超级用户UnBan",
|
||||||
at_msg = user_id or f"群组:{_group_id}"
|
arparma.header_result,
|
||||||
await MessageUtils.build_message(
|
session=session,
|
||||||
f"对 {at_msg} 从黑屋中拉了出来并急救了一下!"
|
target=f"{_group_id}:{user_id}",
|
||||||
).finish(reply_to=True)
|
)
|
||||||
else:
|
at_msg = user_id or f"群组:{result}"
|
||||||
await MessageUtils.build_message("该用户不在黑名单中捏...").finish(
|
await MessageUtils.build_message(
|
||||||
reply_to=True
|
f"对 {at_msg} 从黑屋中拉了出来并急救了一下!"
|
||||||
)
|
).finish(reply_to=True)
|
||||||
|
|||||||
@ -32,11 +32,10 @@ class BanManage:
|
|||||||
query = query.filter(user_id=user_id)
|
query = query.filter(user_id=user_id)
|
||||||
elif group_id:
|
elif group_id:
|
||||||
query = query.filter(group_id=group_id)
|
query = query.filter(group_id=group_id)
|
||||||
else:
|
elif filter_type == "user":
|
||||||
if filter_type == "user":
|
query = query.filter(group_id__isnull=True)
|
||||||
query = query.filter(group_id__isnull=True)
|
elif filter_type == "group":
|
||||||
elif filter_type == "group":
|
query = query.filter(user_id__isnull=True)
|
||||||
query = query.filter(user_id__isnull=True)
|
|
||||||
data_list = await query.all()
|
data_list = await query.all()
|
||||||
if not data_list:
|
if not data_list:
|
||||||
return None
|
return None
|
||||||
@ -85,26 +84,36 @@ class BanManage:
|
|||||||
user_id: str | None,
|
user_id: str | None,
|
||||||
group_id: str | None,
|
group_id: str | None,
|
||||||
session: EventSession,
|
session: EventSession,
|
||||||
|
idx: int | None = None,
|
||||||
is_superuser: bool = False,
|
is_superuser: bool = False,
|
||||||
) -> bool:
|
) -> tuple[bool, str]:
|
||||||
"""unban目标用户
|
"""unban目标用户
|
||||||
|
|
||||||
参数:
|
参数:
|
||||||
user_id: 用户id
|
user_id: 用户id
|
||||||
group_id: 群组id
|
group_id: 群组id
|
||||||
session: Session
|
session: Session
|
||||||
|
idx: 指定id
|
||||||
is_superuser: 是否为超级用户操作
|
is_superuser: 是否为超级用户操作
|
||||||
|
|
||||||
返回:
|
返回:
|
||||||
bool: 是否unban成功
|
tuple[bool, str]: 是否unban成功, 群组/用户id或提示
|
||||||
"""
|
"""
|
||||||
user_level = 9999
|
user_level = 9999
|
||||||
if not is_superuser and user_id and session.id1:
|
if not is_superuser and user_id and session.id1:
|
||||||
user_level = await LevelUser.get_user_level(session.id1, group_id)
|
user_level = await LevelUser.get_user_level(session.id1, group_id)
|
||||||
if await BanConsole.check_ban_level(user_id, group_id, user_level):
|
if idx:
|
||||||
|
ban_data = await BanConsole.get_or_none(id=idx)
|
||||||
|
if not ban_data:
|
||||||
|
return False, "该用户/群组不在黑名单中不足捏..."
|
||||||
|
if ban_data.ban_level > user_level:
|
||||||
|
return False, "unBan权限等级不足捏..."
|
||||||
|
await ban_data.delete()
|
||||||
|
return True, str(ban_data.user_id or ban_data.group_id)
|
||||||
|
elif await BanConsole.check_ban_level(user_id, group_id, user_level):
|
||||||
await BanConsole.unban(user_id, group_id)
|
await BanConsole.unban(user_id, group_id)
|
||||||
return True
|
return True, str(group_id)
|
||||||
return False
|
return False, "该用户/群组不在黑名单中不足捏..."
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def ban(
|
async def ban(
|
||||||
|
|||||||
@ -1,15 +1,14 @@
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from tortoise import fields
|
|
||||||
from typing_extensions import Self
|
from typing_extensions import Self
|
||||||
|
|
||||||
from zhenxun.services.db_context import Model
|
from tortoise import fields
|
||||||
|
|
||||||
from zhenxun.services.log import logger
|
from zhenxun.services.log import logger
|
||||||
|
from zhenxun.services.db_context import Model
|
||||||
from zhenxun.utils.exception import UserAndGroupIsNone
|
from zhenxun.utils.exception import UserAndGroupIsNone
|
||||||
|
|
||||||
|
|
||||||
class BanConsole(Model):
|
class BanConsole(Model):
|
||||||
|
|
||||||
id = fields.IntField(pk=True, generated=True, auto_increment=True)
|
id = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||||
"""自增id"""
|
"""自增id"""
|
||||||
user_id = fields.CharField(255, null=True)
|
user_id = fields.CharField(255, null=True)
|
||||||
@ -25,9 +24,9 @@ class BanConsole(Model):
|
|||||||
operator = fields.CharField(255)
|
operator = fields.CharField(255)
|
||||||
"""使用Ban命令的用户"""
|
"""使用Ban命令的用户"""
|
||||||
|
|
||||||
class Meta:
|
class Meta: # type: ignore
|
||||||
table = "ban_console"
|
table = "ban_console"
|
||||||
table_description = ".ban/b了 封禁人员/群组数据表"
|
table_description = "封禁人员/群组数据表"
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def _get_data(cls, user_id: str | None, group_id: str | None) -> Self | None:
|
async def _get_data(cls, user_id: str | None, group_id: str | None) -> Self | None:
|
||||||
@ -45,16 +44,14 @@ class BanConsole(Model):
|
|||||||
"""
|
"""
|
||||||
if not user_id and not group_id:
|
if not user_id and not group_id:
|
||||||
raise UserAndGroupIsNone()
|
raise UserAndGroupIsNone()
|
||||||
user = None
|
|
||||||
if user_id:
|
if user_id:
|
||||||
if group_id:
|
return (
|
||||||
user = await cls.get_or_none(user_id=user_id, group_id=group_id)
|
await cls.get_or_none(user_id=user_id, group_id=group_id)
|
||||||
else:
|
if group_id
|
||||||
user = await cls.get_or_none(user_id=user_id, group_id__isnull=True)
|
else await cls.get_or_none(user_id=user_id, group_id__isnull=True)
|
||||||
|
)
|
||||||
else:
|
else:
|
||||||
if group_id:
|
return await cls.get_or_none(user_id=user_id, group_id=group_id)
|
||||||
user = await cls.get_or_none(user_id__isnull=True, group_id=group_id)
|
|
||||||
return user
|
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def check_ban_level(
|
async def check_ban_level(
|
||||||
@ -91,7 +88,7 @@ class BanConsole(Model):
|
|||||||
返回:
|
返回:
|
||||||
int: ban剩余时长,-1时为永久ban,0表示未被ban
|
int: ban剩余时长,-1时为永久ban,0表示未被ban
|
||||||
"""
|
"""
|
||||||
logger.debug(f"获取用户ban时长", target=f"{group_id}:{user_id}")
|
logger.debug("获取用户ban时长", target=f"{group_id}:{user_id}")
|
||||||
user = await cls._get_data(user_id, group_id)
|
user = await cls._get_data(user_id, group_id)
|
||||||
if not user and user_id:
|
if not user and user_id:
|
||||||
user = await cls._get_data(user_id, None)
|
user = await cls._get_data(user_id, None)
|
||||||
@ -99,9 +96,7 @@ class BanConsole(Model):
|
|||||||
if user.duration == -1:
|
if user.duration == -1:
|
||||||
return -1
|
return -1
|
||||||
_time = time.time() - (user.ban_time + user.duration)
|
_time = time.time() - (user.ban_time + user.duration)
|
||||||
if _time > 0:
|
return 0 if _time > 0 else int(time.time() - user.ban_time - user.duration)
|
||||||
return 0
|
|
||||||
return int(time.time() - user.ban_time - user.duration)
|
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
@ -114,7 +109,7 @@ class BanConsole(Model):
|
|||||||
返回:
|
返回:
|
||||||
bool: 是否被ban
|
bool: 是否被ban
|
||||||
"""
|
"""
|
||||||
logger.debug(f"检测是否被ban", target=f"{group_id}:{user_id}")
|
logger.debug("检测是否被ban", target=f"{group_id}:{user_id}")
|
||||||
if await cls.check_ban_time(user_id, group_id):
|
if await cls.check_ban_time(user_id, group_id):
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
@ -143,8 +138,8 @@ class BanConsole(Model):
|
|||||||
f"封禁用户/群组,等级:{ban_level},时长: {duration}",
|
f"封禁用户/群组,等级:{ban_level},时长: {duration}",
|
||||||
target=f"{group_id}:{user_id}",
|
target=f"{group_id}:{user_id}",
|
||||||
)
|
)
|
||||||
user = await cls._get_data(user_id, group_id)
|
target = await cls._get_data(user_id, group_id)
|
||||||
if user:
|
if target:
|
||||||
await cls.unban(user_id, group_id)
|
await cls.unban(user_id, group_id)
|
||||||
await cls.create(
|
await cls.create(
|
||||||
user_id=user_id,
|
user_id=user_id,
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user