diff --git a/poetry.lock b/poetry.lock index 0a13029c..e41ee7de 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1859,6 +1859,22 @@ type = "legacy" url = "https://mirrors.aliyun.com/pypi/simple" reference = "ali" +[[package]] +name = "pypinyin" +version = "0.51.0" +description = "汉字拼音转换模块/工具." +optional = false +python-versions = ">=2.6, !=3.0.*, !=3.1.*, !=3.2.*, <4" +files = [ + {file = "pypinyin-0.51.0-py2.py3-none-any.whl", hash = "sha256:ae8878f08fee15d0c5c11053a737e68a4158c22c63dc632b4de060af5c95bf84"}, + {file = "pypinyin-0.51.0.tar.gz", hash = "sha256:cede34fc35a79ef6c799f161e2c280e7b6755ee072fb741cae5ce2a60c4ae0c5"}, +] + +[package.source] +type = "legacy" +url = "https://mirrors.aliyun.com/pypi/simple" +reference = "ali" + [[package]] name = "python-dateutil" version = "2.8.2" @@ -2982,4 +2998,4 @@ reference = "ali" [metadata] lock-version = "2.0" python-versions = "^3.10" -content-hash = "858e616442c77d1a328e37af331056a7b870611b22247fcebfe5dbe41a3fd4f0" +content-hash = "535f64938d522045aff2fa03ec967470085477b9d5bf1b9b803bcfceac60c7b6" diff --git a/pyproject.toml b/pyproject.toml index 33e40319..f3363021 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -33,6 +33,7 @@ aiofiles = "^23.2.1" nonebot-plugin-htmlrender = "^0.3.0" nonebot-plugin-userinfo = "^0.1.3" nonebot-plugin-alconna = "^0.37.1" +pypinyin = "^0.51.0" [tool.poetry.dev-dependencies] diff --git a/zhenxun/builtin_plugins/admin/ban/__init__.py b/zhenxun/builtin_plugins/admin/ban/__init__.py index 11a0608b..330eaeb5 100644 --- a/zhenxun/builtin_plugins/admin/ban/__init__.py +++ b/zhenxun/builtin_plugins/admin/ban/__init__.py @@ -31,7 +31,7 @@ __plugin_meta__ = PluginMetadata( usage=""" 普通管理员 格式: - ban [At用户] [时长] + ban [At用户] [时长(分钟)] 示例: ban @用户 : 永久拉黑用户 diff --git a/zhenxun/builtin_plugins/hooks/__init__.py b/zhenxun/builtin_plugins/hooks/__init__.py index 41912be2..80aa7181 100644 --- a/zhenxun/builtin_plugins/hooks/__init__.py +++ b/zhenxun/builtin_plugins/hooks/__init__.py @@ -40,4 +40,4 @@ Config.add_plugin_config( type=int, ) -# nonebot.load_plugins(str(Path(__file__).parent.resolve())) +nonebot.load_plugins(str(Path(__file__).parent.resolve())) diff --git a/zhenxun/builtin_plugins/hooks/ban_hook.py b/zhenxun/builtin_plugins/hooks/ban_hook.py index 3f10e078..74d456f3 100644 --- a/zhenxun/builtin_plugins/hooks/ban_hook.py +++ b/zhenxun/builtin_plugins/hooks/ban_hook.py @@ -38,10 +38,8 @@ async def _( ban_result = Config.get_config("hook", "BAN_RESULT") if user_id in bot.config.superusers: return - if await BanConsole.is_ban(user_id) or await BanConsole.is_ban( - user_id, group_id - ): - time = await BanConsole.check_ban_time(user_id) + if await BanConsole.is_ban(user_id, group_id): + time = await BanConsole.check_ban_time(user_id, group_id) if time == -1: time_str = "∞" else: @@ -49,7 +47,13 @@ async def _( if time < 60: time_str = str(time) + " 秒" else: - time_str = str(int(time / 60)) + " 分钟" + minute = int(time / 60) + if minute > 60: + hours = int(minute / 60) + minute = minute % 60 + time_str = f"{hours} 小时 {minute}分钟" + else: + time_str = f"{minute} 分钟" if ban_result and _flmt.check(user_id): _flmt.start_cd(user_id) await MessageFactory( diff --git a/zhenxun/builtin_plugins/superuser/broadcast/_data_source.py b/zhenxun/builtin_plugins/superuser/broadcast/_data_source.py index 4781aff3..66a6033b 100644 --- a/zhenxun/builtin_plugins/superuser/broadcast/_data_source.py +++ b/zhenxun/builtin_plugins/superuser/broadcast/_data_source.py @@ -6,11 +6,7 @@ from nonebot.adapters.kaiheila import Bot as KaiheilaBot from nonebot.adapters.onebot.v11 import Bot as v11Bot from nonebot.adapters.onebot.v12 import Bot as v12Bot from nonebot_plugin_alconna import UniMsg -from nonebot_plugin_saa import ( - Image, - MessageFactory, - Text, -) +from nonebot_plugin_saa import Image, MessageFactory, Text from nonebot_plugin_session import EventSession from zhenxun.models.group_console import GroupConsole @@ -49,7 +45,7 @@ class BroadcastManage: group.group_id, "broadcast", group.channel_id ): target = PlatformManage.get_target( - bot, group.group_id, group.channel_id + bot, None, group.group_id, group.channel_id ) if target: await MessageFactory(message_list).send_to(target, bot) diff --git a/zhenxun/models/ban_console.py b/zhenxun/models/ban_console.py index 5996e55f..d15c5401 100644 --- a/zhenxun/models/ban_console.py +++ b/zhenxun/models/ban_console.py @@ -116,6 +116,8 @@ class BanConsole(Model): if await cls.check_ban_time(user_id, group_id): return True else: + if await cls.check_ban_time(user_id): + return True await cls.unban(user_id, group_id) return False @@ -126,7 +128,7 @@ class BanConsole(Model): group_id: str | None, ban_level: int, duration: int, - operator: str | None, + operator: str | None = None, ): """ban掉目标用户 @@ -134,7 +136,7 @@ class BanConsole(Model): user_id: 用户id group_id: 群组id ban_level: 使用命令者的权限等级 - duration: 时长,秒 + duration: 时长,分钟,-1时为永久 operator: 操作者id """ logger.debug( diff --git a/zhenxun/plugins/alapi/comments_163.py b/zhenxun/plugins/alapi/comments_163.py index 5b29ab5c..bac2587e 100644 --- a/zhenxun/plugins/alapi/comments_163.py +++ b/zhenxun/plugins/alapi/comments_163.py @@ -9,11 +9,6 @@ from zhenxun.services.log import logger from ._data_source import get_data -comments_163 = on_regex( - "^(网易云热评|网易云评论|到点了|12点了)$", priority=5, block=True -) - - comments_163_url = "https://v2.alapi.cn/api/comment" __plugin_meta__ = PluginMetadata( @@ -44,7 +39,7 @@ _matcher.shortcut( ) -@comments_163.handle() +@_matcher.handle() async def _(session: EventSession, arparma: Arparma): data, code = await get_data(comments_163_url) if code != 200 and isinstance(data, str): diff --git a/zhenxun/plugins/black_word/__init__.py b/zhenxun/plugins/black_word/__init__.py new file mode 100644 index 00000000..6af840a7 --- /dev/null +++ b/zhenxun/plugins/black_word/__init__.py @@ -0,0 +1,281 @@ +from datetime import datetime +from typing import Any, List + +from nonebot import on_message +from nonebot.adapters import Bot, Event +from nonebot.matcher import Matcher +from nonebot.message import run_preprocessor +from nonebot.permission import SUPERUSER +from nonebot.plugin import PluginMetadata +from nonebot_plugin_alconna import ( + Alconna, + Args, + Arparma, + Match, + Option, + UniMsg, + on_alconna, +) +from nonebot_plugin_saa import Image, Text +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import NICKNAME, Config +from zhenxun.configs.utils import PluginExtraData, RegisterConfig +from zhenxun.models.ban_console import BanConsole +from zhenxun.models.group_console import GroupConsole +from zhenxun.services.log import logger +from zhenxun.utils.enum import PluginType +from zhenxun.utils.image_utils import BuildImage + +from .data_source import set_user_punish, show_black_text_image +from .utils import black_word_manager + +__plugin_meta__ = PluginMetadata( + name="敏感词检测", + description="请注意你的发言!!", + usage=""" + 惩罚机制: 检测内容提示 + 设置惩罚 [uid] [id] [level]: 设置惩罚内容, 此id需要通过`记录名单 -u:uid`来获取 + 记录名单: 查看检测记录名单 + 记录名单: + -u [uid] 指定用户记录名单 + -g [gid] 指定群组记录名单 + -d [date] 指定日期 + -dt ['=', '>', '<'] 大于小于等于指定日期 + + 示例: + 设置惩罚 123123123 0 1 + 记录名单 -u 123123123 + 记录名单 -g 333333 + 记录名单 -d 2022-11-11 + 记录名单 -d 2022-11-11 -dt > + """.strip(), + extra=PluginExtraData( + author="HibiKier", + version="0.1", + plugin_type=PluginType.SUPERUSER, + menu_type="其他", + configs=[ + RegisterConfig( + key="CYCLE_DAYS", + value=30, + help="黑名单词汇记录周期", + default_value=30, + type=int, + ), + RegisterConfig( + key="TOLERATE_COUNT", + value=[5, 1, 1, 1, 1], + help="各个级别惩罚的容忍次数, 依次为: 1, 2, 3, 4, 5", + default_value=[5, 1, 1, 1, 1], + type=List[int], + ), + RegisterConfig( + key="AUTO_PUNISH", + value=True, + help="是否启动自动惩罚机制", + default_value=True, + type=bool, + ), + RegisterConfig( + key="BAN_4_DURATION", + value=360, + help="Ban时长(分钟),四级惩罚,可以为指定数字或指定列表区间(随机),例如 [30, 360]", + default_value=360, + type=int, + ), + RegisterConfig( + key="BAN_3_DURATION", + value=7, + help="Ban时长(天),三级惩罚,可以为指定数字或指定列表区间(随机),例如 [7, 30]", + default_value=7, + type=int, + ), + RegisterConfig( + key="WARNING_RESULT", + value=f"请注意对{NICKNAME}的发言内容", + help="口头警告内容", + default_value=None, + ), + RegisterConfig( + key="AUTO_ADD_PUNISH_LEVEL", + value=360, + help="自动提级机制,当周期内处罚次数大于某一特定值就提升惩罚等级", + default_value=360, + type=int, + ), + RegisterConfig( + key="ADD_PUNISH_LEVEL_TO_COUNT", + value=3, + help="在CYCLE_DAYS周期内触发指定惩罚次数后提升惩罚等级", + default_value=3, + type=int, + ), + RegisterConfig( + key="ALAPI_CHECK_FLAG", + value=False, + help="当未检测到已收录的敏感词时,开启ALAPI文本检测并将疑似文本发送给超级用户", + default_value=False, + type=bool, + ), + RegisterConfig( + key="CONTAIN_BLACK_STOP_PROPAGATION", + value=True, + help="当文本包含任意敏感词时,停止向下级插件传递,即不触发ai", + default_value=True, + type=bool, + ), + ], + ).dict(), +) + + +_message_matcher = on_message(priority=1, block=False) + +_punish_matcher = on_alconna( + Alconna("设置惩罚", Args["uid", str]["id", int]["punish_level", int]), + priority=1, + permission=SUPERUSER, + block=True, +) + + +_show_matcher = on_alconna( + Alconna( + "记录名单", + Option("-u|--uid", Args["uid", str]), + Option("-g|--group", Args["gid", str]), + Option("-d|--date", Args["date", str]), + Option("-dt|--type", Args["date_type", ["=", ">", "<"]], default="="), + ), + priority=1, + permission=SUPERUSER, + block=True, +) + +_show_punish_matcher = on_alconna( + Alconna("惩罚机制"), aliases={"敏感词检测"}, priority=1, block=True +) + + +# 黑名单词汇检测 +@run_preprocessor +async def _( + bot: Bot, message: UniMsg, matcher: Matcher, event: Event, session: EventSession +): + gid = session.id3 or session.id2 + if session.id1: + if ( + event.is_tome() + and matcher.plugin_name == "black_word" + and not await BanConsole.is_ban(session.id1, gid) + ): + msg = message.extract_plain_text() + if session.id1 in bot.config.superusers: + return logger.debug( + f"超级用户跳过黑名单词汇检查 Message: {msg}", target=session.id1 + ) + if gid: + """屏蔽群权限-1的群""" + group, _ = await GroupConsole.get_or_create( + group_id=gid, channel_id__isnull=True + ) + if group.level < 0: + return + if await black_word_manager.check(bot, session, msg) and Config.get_config( + "black_word", "CONTAIN_BLACK_STOP_PROPAGATION" + ): + matcher.stop_propagation() + + +@_show_matcher.handle() +async def _( + bot: Bot, uid: Match[str], gid: Match[str], date: Match[str], date_type: Match[str] +): + user_id = None + group_id = None + date_ = None + date_str = None + date_type_ = "=" + if uid.available: + user_id = uid.result + if gid.available: + group_id = gid.result + if date.available: + date_str = date.result + if date_type.available: + date_type_ = date_type.result + if date_str: + try: + date_ = datetime.strptime(date_str, "%Y-%m-%d") + except ValueError: + await Text("日期格式错误,需要:年-月-日").finish() + result = await show_black_text_image( + user_id, + group_id, + date_, + date_type_, + ) + await Image(result.pic2bytes()).send() + + +@_show_punish_matcher.handle() +async def _(): + text = f""" + ** 惩罚机制 ** + + 惩罚前包含容忍机制,在指定周期内会容忍偶尔少次数的敏感词只会进行警告提醒 + + 多次触发同级惩罚会使惩罚等级提高,即惩罚自动提级机制 + + 目前公开的惩罚等级: + + 1级:永久ban + + 2级:删除好友 + + 3级:ban指定/随机天数 + + 4级:ban指定/随机时长 + + 5级:警告 + + 备注: + + 该功能为测试阶段,如果你有被误封情况,请联系管理员,会从数据库中提取出你的数据进行审核后判断 + + 目前该功能暂不完善,部分情况会由管理员鉴定,请注意对真寻的发言 + + 关于敏感词: + + 记住不要骂{NICKNAME}就对了! + """.strip() + max_width = 0 + for m in text.split("\n"): + max_width = len(m) * 20 if len(m) * 20 > max_width else max_width + max_height = len(text.split("\n")) * 24 + A = BuildImage( + max_width, max_height, font="CJGaoDeGuo.otf", font_size=24, color="#E3DBD1" + ) + await A.text((10, 10), text) + await Image(A.pic2bytes()).send() + + +@_punish_matcher.handle() +async def _( + bot: Bot, + session: EventSession, + arparma: Arparma, + uid: str, + id: int, + punish_level: int, +): + result = await set_user_punish( + bot, uid, session.id2 or session.id3, id, punish_level + ) + await Text(result).send(reply=True) + logger.info( + f"设置惩罚 uid:{uid} id_:{id} punish_level:{punish_level} --> {result}", + arparma.header_result, + session=session, + ) diff --git a/zhenxun/plugins/black_word/data_source.py b/zhenxun/plugins/black_word/data_source.py new file mode 100644 index 00000000..e985facc --- /dev/null +++ b/zhenxun/plugins/black_word/data_source.py @@ -0,0 +1,103 @@ +from datetime import datetime + +from nonebot.adapters import Bot + +from zhenxun.models.friend_user import FriendUser +from zhenxun.models.group_member_info import GroupInfoUser +from zhenxun.utils.image_utils import BuildImage, ImageTemplate + +from .model import BlackWord +from .utils import Config, _get_punish + + +async def show_black_text_image( + user_id: str | None, + group_id: str | None, + date: datetime | None, + data_type: str = "=", +) -> BuildImage: + """展示记录名单 + + 参数: + bot: bot + user: 用户id + group_id: 群组id + date: 日期 + data_type: 日期搜索类型 + + 返回: + BuildImage: 数据图片 + """ + data_list = await BlackWord.get_black_data(user_id, group_id, date, data_type) + column_name = [ + "ID", + "昵称", + "UID", + "GID", + "文本", + "检测内容", + "检测等级", + "惩罚", + "平台", + "记录日期", + ] + column_list = [] + uid_list = [u for u in data_list] + uid2name = { + u.user_id: u.user_name for u in await FriendUser.filter(user_id__in=uid_list) + } + for i, data in enumerate(data_list): + uname = uid2name.get(data.user_id) + if not uname: + if u := await GroupInfoUser.get_or_none( + user_id=data.user_id, group_id=data.group_id + ): + uname = u.user_name + if len(data.plant_text) > 30: + data.plant_text = data.plant_text[:30] + "..." + column_list.append( + [ + i, + uname or data.user_id, + data.user_id, + data.group_id, + data.plant_text, + data.black_word, + data.punish_level, + data.punish, + data.platform, + data.create_time, + ] + ) + A = await ImageTemplate.table_page( + "记录名单", "一个都不放过!", column_name, column_list + ) + return A + + +async def set_user_punish( + bot: Bot, user_id: str, group_id: str | None, id_: int, punish_level: int +) -> str: + """设置惩罚 + + 参数: + user_id: 用户id + group_id: 群组id或频道id + id_: 记录下标 + punish_level: 惩罚等级 + + 返回: + str: 结果 + """ + result = await _get_punish(bot, punish_level, user_id, group_id) + punish = { + 1: "永久ban", + 2: "删除好友", + 3: f"ban {result} 天", + 4: f"ban {result} 分钟", + 5: "口头警告", + } + if await BlackWord.set_user_punish(user_id, punish[punish_level], id_=id_): + return f"已对 USER {user_id} 进行 {punish[punish_level]} 处罚。" + else: + return "操作失败,可能未找到用户,id或敏感词" diff --git a/zhenxun/plugins/black_word/model.py b/zhenxun/plugins/black_word/model.py new file mode 100644 index 00000000..ef81c0ba --- /dev/null +++ b/zhenxun/plugins/black_word/model.py @@ -0,0 +1,154 @@ +from datetime import datetime, timedelta +from email.policy import default + +import pytz +from tortoise import fields + +from zhenxun.services.db_context import Model + + +class BlackWord(Model): + + id = fields.IntField(pk=True, generated=True, auto_increment=True) + """自增id""" + user_id = fields.CharField(255) + """用户id""" + group_id = fields.CharField(255, null=True) + """群聊id""" + plant_text = fields.TextField() + """检测文本""" + black_word = fields.TextField() + """黑名单词语""" + punish = fields.TextField(default="") + """惩罚内容""" + punish_level = fields.IntField() + """惩罚等级""" + create_time = fields.DatetimeField(auto_now_add=True) + """创建时间""" + platform = fields.CharField(255, null=True) + """平台""" + + class Meta: + table = "black_word" + table_description = "惩罚机制数据表" + + @classmethod + async def set_user_punish( + cls, + user_id: str, + punish: str, + black_word: str | None = None, + id_: int | None = None, + ) -> bool: + """设置处罚 + + 参数: + user_id: 用户id + punish: 处罚 + black_word: 黑名单词汇 + id_: 记录下标 + """ + user = None + if (not black_word and id_ is None) or not punish: + return False + if black_word: + user = ( + await cls.filter(user_id=user_id, black_word=black_word, punish="") + .order_by("id") + .first() + ) + elif id_ is not None: + user_list = await cls.filter(user_id=user_id).order_by("id").all() + if len(user_list) == 0 or (id_ < 0 or id_ > len(user_list)): + return False + user = user_list[id_] + if not user: + return False + user.punish = f"{user.punish}{punish} " + await user.save(update_fields=["punish"]) + return True + + @classmethod + async def get_user_count( + cls, user_id: str, days: int = 7, punish_level: int | None = None + ) -> int: + """获取用户规定周期内的犯事次数 + + 参数: + user_id: 用户id + days: 周期天数 + punish_level: 惩罚等级 + """ + query = cls.filter( + user_id=user_id, + create_time__gte=datetime.now() - timedelta(days=days), + punish_level__not_in=[-1], + ) + if punish_level is not None: + query = query.filter(punish_level=punish_level) + return await query.count() + + @classmethod + async def get_user_punish_level(cls, user_id: str, days: int = 7) -> int | None: + """获取用户最近一次的惩罚记录等级 + + 参数: + user_id: 用户id + days: 周期天数 + """ + if ( + user := await cls.filter( + user_id=user_id, + create_time__gte=datetime.now() - timedelta(days=days), + ) + .order_by("id") + .first() + ): + return user.punish_level + return None + + @classmethod + async def get_black_data( + cls, + user_id: str | None, + group_id: str | None, + date: datetime | None, + date_type: str = "=", + ) -> list["BlackWord"]: + """通过指定条件查询数据 + + 参数: + user_id: 用户id + group_id: 群号 + date: 日期 + date_type: 日期查询类型 + """ + query = cls + if user_id: + query = query.filter(user_id=user_id) + if group_id: + query = query.filter(group_id=group_id) + if date: + if date_type == "=": + query = query.filter( + create_time__range=[date, date + timedelta(days=1)] + ) + elif date_type == ">": + query = query.filter(create_time__gte=date) + elif date_type == "<": + query = query.filter(create_time__lte=date) + data_list = await query.all().order_by("id") + for data in data_list: + data.create_time = data.create_time.astimezone( + pytz.timezone("Asia/Shanghai") + ) + return data_list # type: ignore + + @classmethod + async def _run_script(cls): + return [ + "ALTER TABLE black_word RENAME COLUMN user_qq TO user_id;", # 将user_qq改为user_id + "ALTER TABLE black_word ALTER COLUMN user_id TYPE character varying(255);", + "ALTER TABLE black_word ALTER COLUMN group_id TYPE character varying(255);", + "ALTER TABLE black_word ADD COLUMN platform character varying(255);", + ] diff --git a/zhenxun/plugins/black_word/utils.py b/zhenxun/plugins/black_word/utils.py new file mode 100644 index 00000000..29145dfb --- /dev/null +++ b/zhenxun/plugins/black_word/utils.py @@ -0,0 +1,374 @@ +import random +from pathlib import Path + +import ujson as json +from nonebot.adapters import Bot +from nonebot.adapters.onebot.v11 import ActionFailed +from nonebot_plugin_session import EventSession + +from zhenxun.configs.config import Config +from zhenxun.configs.path_config import DATA_PATH +from zhenxun.models.ban_console import BanConsole +from zhenxun.models.group_member_info import GroupInfoUser +from zhenxun.services.log import logger +from zhenxun.utils.http_utils import AsyncHttpx +from zhenxun.utils.platform import PlatformManage +from zhenxun.utils.utils import cn2py + +from .model import BlackWord + + +class BlackWordManager: + """ + 敏感词管理( 拒绝恶意 + """ + + def __init__(self, word_file: Path, py_file: Path): + self._word_list = { + "1": [], + "2": [], + "3": [], + "4": ["sb", "nmsl", "mdzz", "2b", "jb", "操", "废物", "憨憨", "cnm", "rnm"], + "5": [], + } + self._py_list = { + "1": [], + "2": [], + "3": [], + "4": [ + "shabi", + "wocaonima", + "sima", + "sabi", + "zhizhang", + "naocan", + "caonima", + "rinima", + "simadongxi", + "simawanyi", + "hanbi", + "hanpi", + "laji", + "fw", + ], + "5": [], + } + word_file.parent.mkdir(parents=True, exist_ok=True) + if word_file.exists(): + # 清空默认配置 + with open(word_file, "r", encoding="utf8") as f: + self._word_list = json.load(f) + else: + with open(word_file, "w", encoding="utf8") as f: + json.dump( + self._word_list, + f, + ensure_ascii=False, + indent=4, + ) + if py_file.exists(): + # 清空默认配置 + with open(py_file, "r", encoding="utf8") as f: + self._py_list = json.load(f) + else: + with open(py_file, "w", encoding="utf8") as f: + json.dump( + self._py_list, + f, + ensure_ascii=False, + indent=4, + ) + + async def check( + self, bot: Bot, session: EventSession, message: str + ) -> str | bool | None: + """检查是否包含黑名单词汇 + + 参数: + bot: Bot + session: EventSession + message: 消息 + """ + logger.debug( + f"检查文本是否含有黑名单词汇: {message}", "敏感词检测", session=session + ) + if session.id1: + if data := self._check(message): + if data[0]: + await _add_user_black_word( + bot, + session.id1, + session.id2 or session.id3, + data[0], + message, + int(data[1]), + ) + return True + if Config.get_config( + "black_word", "ALAPI_CHECK_FLAG" + ) and not await check_text(message): + await send_msg( + bot, + "", + None, + f"用户 {session.id1} 群组 {session.id3 or session.id2} ALAPI 疑似检测:{message}", + ) + return False + + def _check(self, message: str) -> tuple[str | None, int]: + """检测文本是否违规 + + 参数: + message: 检测消息 + """ + # 移除空格 + message = message.replace(" ", "") + py_msg = cn2py(message).lower() + # 完全匹配 + for x in [self._word_list, self._py_list]: + for level in x: + if message in x[level] or py_msg in x[level]: + return message if message in x[level] else py_msg, int(level) + # 模糊匹配 + for x in [self._word_list, self._py_list]: + for level in x: + for m in x[level]: + if m in message or m in py_msg: + return m, -1 + return None, 0 + + +async def _add_user_black_word( + bot: Bot, + user_id: str, + group_id: str | None, + black_word: str, + message: str, + punish_level: int, +): + """添加敏感词数据 + + 参数: + bot: Bot + user_id: 用户id + group_id: 群组id或频道id + black_word: 触发的黑名单词汇 + message: 原始文本 + punish_level: 惩罚等级 + """ + cycle_days = Config.get_config("black_word", "CYCLE_DAYS") or 7 + user_count = await BlackWord.get_user_count(user_id, cycle_days, punish_level) + add_punish_level_to_count = Config.get_config( + "black_word", "ADD_PUNISH_LEVEL_TO_COUNT" + ) + # 周期内超过次数直接提升惩罚 + if ( + Config.get_config("black_word", "AUTO_ADD_PUNISH_LEVEL") + and add_punish_level_to_count + ): + punish_level -= 1 + await BlackWord.create( + user_id=user_id, + group_id=group_id, + plant_text=message, + black_word=black_word, + punish_level=punish_level, + platform=PlatformManage.get_platform(bot), + ) + logger.info( + f"已将 USER {user_id} GROUP {group_id} 添加至黑名单词汇记录 Black_word:{black_word} Plant_text:{message}" + ) + # 自动惩罚 + if Config.get_config("black_word", "AUTO_PUNISH") and punish_level != -1: + await _punish_handle(bot, user_id, group_id, punish_level, black_word) + + +async def _punish_handle( + bot: Bot, + user_id: str, + group_id: str | None, + punish_level: int, + black_word: str, +): + """惩罚措施,级别越低惩罚越严 + + 参数: + bot: Bot + user_id: 用户id + group_id: 群组id或频道id + black_word: 触发的黑名单词汇 + channel_id: 频道id + """ + logger.info(f"BlackWord USER {user_id} 触发 {punish_level} 级惩罚...") + # 周期天数 + cycle_days = Config.get_config("black_word", "CYCLE_DAYS") or 7 + # 用户周期内触发punish_level级惩罚的次数 + user_count = await BlackWord.get_user_count(user_id, cycle_days, punish_level) + # 获取最近一次的惩罚等级,将在此基础上增加 + punish_level = ( + await BlackWord.get_user_punish_level(user_id, cycle_days) or punish_level + ) + # 容忍次数:List[int] + tolerate_count = Config.get_config("black_word", "TOLERATE_COUNT") + if not tolerate_count or len(tolerate_count) < 5: + tolerate_count = [5, 2, 2, 2, 2] + if punish_level == 1 and user_count > tolerate_count[punish_level - 1]: + # 永久ban + await _get_punish(bot, 1, user_id, group_id) + await BlackWord.set_user_punish(user_id, "永久ban 删除好友", black_word) + elif punish_level == 2 and user_count > tolerate_count[punish_level - 1]: + # 删除好友 + await _get_punish(bot, 2, user_id, group_id) + await BlackWord.set_user_punish(user_id, "删除好友", black_word) + elif punish_level == 3 and user_count > tolerate_count[punish_level - 1]: + # 永久ban + ban_day = await _get_punish(bot, 3, user_id, group_id) + await BlackWord.set_user_punish(user_id, f"ban {ban_day} 天", black_word) + elif punish_level == 4 and user_count > tolerate_count[punish_level - 1]: + # ban指定时长 + ban_time = await _get_punish(bot, 4, user_id, group_id) + await BlackWord.set_user_punish(user_id, f"ban {ban_time} 分钟", black_word) + elif punish_level == 5 and user_count > tolerate_count[punish_level - 1]: + # 口头警告 + warning_result = await _get_punish(bot, 5, user_id, group_id) + await BlackWord.set_user_punish( + user_id, f"口头警告:{warning_result}", black_word + ) + else: + await BlackWord.set_user_punish(user_id, f"提示!", black_word) + await send_msg( + bot, + user_id, + group_id, + f"BlackWordChecker:该条发言已被记录,目前你在{cycle_days}天内的发表{punish_level}级" + f"言论记录次数为:{user_count}次,请注意你的发言\n" + f"* 如果你不清楚惩罚机制,请发送“惩罚机制” *", + ) + + +async def _get_punish( + bot: Bot, + id_: int, + user_id: str, + group_id: str | None = None, +) -> int | str | None: + """通过id_获取惩罚 + + 参数: + bot: Bot + id_: id + user_id: 用户id + group_id: 群组id或频道id + """ + # 忽略的群聊 + # _ignore_group = Config.get_config("black_word", "IGNORE_GROUP") + # 处罚 id 4 ban 时间:int,List[int] + ban_3_duration = Config.get_config("black_word", "BAN_3_DURATION") or 7 + # 处罚 id 4 ban 时间:int,List[int] + ban_4_duration = Config.get_config("black_word", "BAN_4_DURATION") or 360 + # 口头警告内容 + warning_result = Config.get_config("black_word", "WARNING_RESULT") + if user := await GroupInfoUser.get_or_none(user_id=user_id, group_id=group_id): + uname = user.user_name + else: + uname = user_id + # 永久ban + if id_ == 1: + if str(user_id) not in bot.config.superusers: + await BanConsole.ban(user_id, group_id, 10, -1, None) + await send_msg( + bot, + user_id, + group_id, + f"BlackWordChecker 永久ban USER {uname}({user_id})", + ) + logger.info(f"BlackWord 永久封禁 USER {user_id}...") + # 删除好友(有的话 + elif id_ == 2: + if str(user_id) not in bot.config.superusers: + try: + await bot.delete_friend(user_id=user_id) + await send_msg( + bot, + user_id, + group_id, + f"BlackWordChecker 删除好友 USER {uname}({user_id})", + ) + logger.info(f"BlackWord 删除好友 {user_id}...") + except ActionFailed: + pass + # 封禁用户指定时间,默认7天 + elif id_ == 3: + if isinstance(ban_3_duration, list): + ban_3_duration = random.randint(ban_3_duration[0], ban_3_duration[1]) + await BanConsole.ban(user_id, group_id, 9, ban_4_duration * 60 * 24) + await send_msg( + bot, + user_id, + group_id, + f"BlackWordChecker 对用户 USER {uname}({user_id}) 进行封禁 {ban_3_duration} 天处罚。", + ) + logger.info(f"BlackWord 封禁 USER {uname}({user_id}) {ban_3_duration} 天...") + return ban_3_duration + # 封禁用户指定时间,默认360分钟 + elif id_ == 4: + if isinstance(ban_4_duration, list): + ban_4_duration = random.randint(ban_4_duration[0], ban_4_duration[1]) + await BanConsole.ban(user_id, group_id, 9, ban_4_duration * 60) + await send_msg( + bot, + user_id, + group_id, + f"BlackWordChecker 对用户 USER {uname}({user_id}) 进行封禁 {ban_4_duration} 分钟处罚。", + ) + logger.info(f"BlackWord 封禁 USER {uname}({user_id}) {ban_4_duration} 分钟...") + return ban_4_duration + # 口头警告 + elif id_ == 5: + await PlatformManage.send_message(bot, user_id, group_id, warning_result) + logger.info(f"BlackWord 口头警告 USER {user_id}") + return warning_result + return None + + +async def send_msg(bot: Bot, user_id: str, group_id: str | None, message: str): + """发送消息 + + 参数: + bot: Bot + user_id: user_id + group_id: group_id + message: message + """ + if not user_id: + platform = PlatformManage.get_platform(bot) + user_id = bot.config.platform_superusers[platform][0] + await PlatformManage.send_message(bot, user_id, group_id, message) + + +async def check_text(text: str) -> bool: + """ALAPI文本检测,检测输入违规 + + 参数: + text: 回复 + """ + if not Config.get_config("alapi", "ALAPI_TOKEN"): + return True + params = {"token": Config.get_config("alapi", "ALAPI_TOKEN"), "text": text} + try: + data = ( + await AsyncHttpx.get( + "https://v2.alapi.cn/api/censor/text", timeout=4, params=params + ) + ).json() + if data["code"] == 200: + return data["data"]["conclusion_type"] == 2 + except Exception as e: + logger.error(f"检测违规文本错误...", e=e) + return True + + +black_word_manager = BlackWordManager( + DATA_PATH / "black_word" / "black_word.json", + DATA_PATH / "black_word" / "black_py.json", +) diff --git a/zhenxun/utils/platform.py b/zhenxun/utils/platform.py index fb34e81f..dba7bfe3 100644 --- a/zhenxun/utils/platform.py +++ b/zhenxun/utils/platform.py @@ -9,10 +9,14 @@ from nonebot.adapters.onebot.v11 import Bot as v11Bot from nonebot.adapters.onebot.v12 import Bot as v12Bot from nonebot.utils import is_coroutine_callable from nonebot_plugin_saa import ( + Image, MessageFactory, TargetDoDoChannel, + TargetDoDoPrivate, TargetKaiheilaChannel, + TargetKaiheilaPrivate, TargetQQGroup, + TargetQQPrivate, Text, ) @@ -23,6 +27,31 @@ from zhenxun.services.log import logger class PlatformManage: + @classmethod + async def send_message( + cls, + bot: Bot, + user_id: str | None, + group_id: str | None, + message: str | Text | MessageFactory | Image, + ) -> bool: + """发送消息 + + 参数: + bot: Bot + user_id: 用户id + group_id: 群组id或频道id + message: 消息文本 + + 返回: + bool: 是否发送成功 + """ + if target := cls.get_target(bot, user_id, group_id): + send_message = Text(message) if isinstance(message, str) else message + await send_message.send_to(target, bot) + return True + return False + @classmethod async def update_group(cls, bot: Bot) -> int: """更新群组信息 @@ -198,13 +227,18 @@ class PlatformManage: return [], "" @classmethod - def get_target(cls, bot: Bot, group_id: str | None, channel_id: str | None): + def get_target( + cls, + bot: Bot, + user_id: str | None = None, + group_id: str | None = None, + ): """获取发生Target 参数: bot: Bot group_id: 群组id - channel_id: 频道id + channel_id: 频道id或群组id 返回: target: 对应平台Target @@ -213,11 +247,19 @@ class PlatformManage: if isinstance(bot, (v11Bot, v12Bot)): if group_id: target = TargetQQGroup(group_id=int(group_id)) - if channel_id: - if isinstance(bot, DodoBot): - target = TargetDoDoChannel(channel_id=channel_id) - elif isinstance(bot, KaiheilaBot): - target = TargetKaiheilaChannel(channel_id=channel_id) + elif user_id: + target = TargetQQPrivate(user_id=int(user_id)) + elif isinstance(bot, DodoBot): + if group_id: + target = TargetDoDoChannel(channel_id=group_id) + elif user_id: + # target = TargetDoDoPrivate(user_id=user_id) + pass + elif isinstance(bot, KaiheilaBot): + if group_id: + target = TargetKaiheilaChannel(channel_id=group_id) + elif user_id: + target = TargetKaiheilaPrivate(user_id=user_id) return target @@ -294,7 +336,7 @@ async def broadcast_group( if is_continue: continue target = PlatformManage.get_target( - _bot, group.group_id, group.channel_id + _bot, None, group.group_id, group.channel_id ) if target: _used_group.append(key) diff --git a/zhenxun/utils/utils.py b/zhenxun/utils/utils.py index 5fe35211..366a5283 100644 --- a/zhenxun/utils/utils.py +++ b/zhenxun/utils/utils.py @@ -6,6 +6,7 @@ from pathlib import Path from typing import Any import httpx +import pypinyin import pytz from zhenxun.services.log import logger @@ -151,6 +152,18 @@ class FreqLimiter: return self.next_time[key] - time.time() +def cn2py(word: str) -> str: + """将字符串转化为拼音 + + 参数: + word: 文本 + """ + temp = "" + for i in pypinyin.pinyin(word, style=pypinyin.NORMAL): + temp += "".join(i) + return temp + + async def get_user_avatar(uid: int | str) -> bytes | None: """快捷获取用户头像