feat: 添加hook,更新其他

This commit is contained in:
HibiKier 2024-02-26 03:04:32 +08:00
parent eb0572ea77
commit 7b3793728a
22 changed files with 1064 additions and 117 deletions

View File

@ -8,6 +8,7 @@
"arclet",
"Arparma",
"displayname",
"flmt",
"getbbox",
"httpx",
"kaiheila",

22
poetry.lock generated
View File

@ -96,13 +96,13 @@ reference = "ali"
[[package]]
name = "arclet-alconna"
version = "1.7.42"
version = "1.7.44"
description = "A High-performance, Generality, Humane Command Line Arguments Parser Library."
optional = false
python-versions = ">=3.8"
files = [
{file = "arclet_alconna-1.7.42-py3-none-any.whl", hash = "sha256:fa78944121d4afa4e2c0247a98967ddb1e76cf63b94c8c3f4f393c52f6d23e75"},
{file = "arclet_alconna-1.7.42.tar.gz", hash = "sha256:a5a1cca37d0c3d58607ee22485e636fa0b01d40eb43194e542b2c3d6a5d2e70b"},
{file = "arclet_alconna-1.7.44-py3-none-any.whl", hash = "sha256:e5751a2aa854b7b2c01cac87986ad11b397986a725c9536d5f9ff81a84e85614"},
{file = "arclet_alconna-1.7.44.tar.gz", hash = "sha256:9c8a70a3f75e8358fa9c71befd3687c8c9781a19b1d28cb53cbe08fbc36cf720"},
]
[package.dependencies]
@ -1356,20 +1356,20 @@ reference = "ali"
[[package]]
name = "nonebot-plugin-alconna"
version = "0.36.3"
version = "0.37.1"
description = "Alconna Adapter for Nonebot"
optional = false
python-versions = ">=3.8"
files = [
{file = "nonebot_plugin_alconna-0.36.3-py3-none-any.whl", hash = "sha256:8f26f96c711d3adadc538ebf40d51ba2249c18fe1689bf36baed0e4d1e05246a"},
{file = "nonebot_plugin_alconna-0.36.3.tar.gz", hash = "sha256:ed8e4f2fd845d0c3d8becdd68678c203ee76109b9104a3b1c18f63525e85c6d4"},
{file = "nonebot_plugin_alconna-0.37.1-py3-none-any.whl", hash = "sha256:fcc46f04ac89bf43730afebd97fa46e5910bc404a9e24cab7950da58be36246d"},
{file = "nonebot_plugin_alconna-0.37.1.tar.gz", hash = "sha256:5e9989ee7debd79d61c97aa41c88aac5fe452cc9c47f2d48b829d81d26dfe130"},
]
[package.dependencies]
arclet-alconna = ">=1.7.42,<2.0.0"
arclet-alconna-tools = ">=0.6.11,<0.7.0"
nepattern = ">=0.5.14,<0.6.0"
nonebot2 = ">=2.1.0"
arclet-alconna = ">=1.7.44"
arclet-alconna-tools = ">=0.6.11"
nepattern = ">=0.5.15"
nonebot2 = ">=2.2.0"
[package.source]
type = "legacy"
@ -2982,4 +2982,4 @@ reference = "ali"
[metadata]
lock-version = "2.0"
python-versions = "^3.10"
content-hash = "2e5c4963196533949601dff69762b6f5586056a8775419c2ee1aef0df91b016a"
content-hash = "858e616442c77d1a328e37af331056a7b870611b22247fcebfe5dbe41a3fd4f0"

View File

@ -12,7 +12,6 @@ url = "https://mirrors.aliyun.com/pypi/simple/"
[tool.poetry.dependencies]
python = "^3.10"
nonebot-plugin-alconna = "^0.36.0"
playwright = "^1.41.1"
nonebot-adapter-onebot = "^2.3.1"
nonebot-plugin-apscheduler = "^0.3.0"
@ -33,6 +32,7 @@ retrying = "^1.3.4"
aiofiles = "^23.2.1"
nonebot-plugin-htmlrender = "^0.3.0"
nonebot-plugin-userinfo = "^0.1.3"
nonebot-plugin-alconna = "^0.37.1"
[tool.poetry.dev-dependencies]

View File

@ -21,6 +21,7 @@ from zhenxun.utils.enum import BlockType, PluginType
from zhenxun.utils.rules import admin_check, ensure_group
from ._data_source import PluginManage, build_plugin, build_task
from .command import _group_status_matcher, _status_matcher
base_config = Config.get("admin_bot_manage")
@ -53,60 +54,70 @@ __plugin_meta__ = PluginMetadata(
)
_status_matcher = on_alconna(
Alconna(
"switch",
Option("-t|--task", action=store_true, help_text="被动技能"),
Subcommand(
"open",
Args["name", str],
Option(
"-g|--group",
Args["group_id", str],
),
),
Subcommand(
"close",
Args["name", str],
Option(
"-t|--type",
Args["block_type", ["all", "a", "private", "p", "group", "g"]],
),
Option(
"-g|--group",
Args["group_id", str],
),
),
),
rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL"),
priority=5,
block=True,
)
# _status_matcher = on_alconna(
# Alconna(
# "switch",
# Option("-t|--task", action=store_true, help_text="被动技能"),
# Subcommand(
# "open",
# Args["name", str],
# Option(
# "-g|--group",
# Args["group_id", str],
# ),
# ),
# Subcommand(
# "close",
# Args["name", str],
# Option(
# "-t|--type",
# Args["block_type", ["all", "a", "private", "p", "group", "g"]],
# ),
# Option(
# "-g|--group",
# Args["group_id", str],
# ),
# ),
# ),
# rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL"),
# priority=5,
# block=True,
# )
# TODO: shortcut
# # TODO: shortcut
_group_status_matcher = on_alconna(
Alconna("group-status", Args["status", ["sleep", "wake"]]),
rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL") & ensure_group,
priority=5,
block=True,
)
# _group_status_matcher = on_alconna(
# Alconna("group-status", Args["status", ["sleep", "wake"]]),
# rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL") & ensure_group,
# priority=5,
# block=True,
# )
@_status_matcher.assign("$main")
async def _(bot: Bot, session: EventSession, arparma: Arparma):
image = None
if arparma.find("task"):
image = await build_task(session.id3 or session.id2)
elif session.id1 in bot.config.superusers:
if session.id1 in bot.config.superusers:
image = await build_plugin()
if image:
await Image(image.pic2bs4()).send(reply=True)
logger.info(
f"查看{'被动' if arparma.find('task') else '功能'}列表",
arparma.header_result,
session=session,
)
logger.info(
f"查看功能列表",
arparma.header_result,
session=session,
)
@_status_matcher.assign("task")
async def _(bot: Bot, session: EventSession, arparma: Arparma):
image = None
if image := await build_task(session.id3 or session.id2):
await Image(image.pic2bs4()).send(reply=True)
logger.info(
f"查看被动列表",
arparma.header_result,
session=session,
)
@_status_matcher.assign("open")
@ -122,13 +133,14 @@ async def _(
await Text(result).send(reply=True)
logger.info(f"开启功能 {name}", arparma.header_result, session=session)
elif session.id1 in bot.config.superusers:
result = await PluginManage.superuser_block(name, None, group.result)
group_id = group.result if group.available else None
result = await PluginManage.superuser_block(name, None, group_id)
await Text(result).send(reply=True)
logger.info(
f"超级用户开启功能 {name}",
arparma.header_result,
session=session,
target=group.result,
target=group_id,
)
@ -146,17 +158,39 @@ async def _(
await Text(result).send(reply=True)
logger.info(f"关闭功能 {name}", arparma.header_result, session=session)
elif session.id1 in bot.config.superusers:
group_id = group.result if group.available else None
_type = BlockType.ALL
if block_type.available:
if block_type.result in ["p", "private"]:
_type = BlockType.FRIEND
_type = BlockType.PRIVATE
elif block_type.result in ["g", "group"]:
_type = BlockType.GROUP
result = await PluginManage.superuser_block(name, _type, group.result)
result = await PluginManage.superuser_block(name, _type, group_id)
await Text(result).send(reply=True)
logger.info(
f"超级用户关闭功能 {name}, 禁用类型: {_type}",
arparma.header_result,
session=session,
target=group.result,
target=group_id,
)
@_group_status_matcher.handle()
async def _(
bot: Bot,
session: EventSession,
arparma: Arparma,
status: str,
):
if gid := session.id3 or session.id2:
if status == "sleep":
await PluginManage.sleep(gid)
logger.info("进行休眠", arparma.header_result, session=session)
await Text("那我先睡觉了...").finish()
else:
if PluginManage.is_wake(gid):
await Text("我还醒着呢!").finish()
await PluginManage.wake(gid)
logger.info("醒来", arparma.header_result, session=session)
await Text("呜..醒来了...").finish()
return Text("群组id为空...").send()

View File

@ -119,7 +119,7 @@ async def build_task(group_id: str | None) -> BuildImage:
task.name,
"开启" if task.module not in group.block_task else "关闭",
"开启" if task.status else "关闭",
task.run_time,
task.run_time or "-",
]
)
else:
@ -129,7 +129,7 @@ async def build_task(group_id: str | None) -> BuildImage:
task.module,
task.name,
"开启" if task.status else "关闭",
task.run_time,
task.run_time or "-",
]
)
return await ImageTemplate.table_page(
@ -143,6 +143,20 @@ async def build_task(group_id: str | None) -> BuildImage:
class PluginManage:
@classmethod
async def is_wake(cls, group_id: str) -> bool:
if c := await GroupConsole.get_or_none(group_id=group_id):
return c.status
return False
@classmethod
async def sleep(cls, group_id: str):
await GroupConsole.filter(group_id=group_id).update(status=False)
@classmethod
async def wake(cls, group_id: str):
await GroupConsole.filter(group_id=group_id).update(status=True)
@classmethod
async def block(cls, module: str):
await PluginInfo.filter(module=module).update(status=False)
@ -191,8 +205,13 @@ class PluginManage:
返回:
str: 返回信息
"""
if plugin_name.isdigit():
plugin = await PluginInfo.get_or_none(id=int(plugin_name))
else:
plugin = await PluginInfo.get_or_none(name=plugin_name)
status_str = "开启" if status else "关闭"
if plugin := await PluginInfo.get_or_none(name=plugin_name):
if plugin:
group, _ = await GroupConsole.get_or_create(group_id=group_id)
if status:
if plugin.module in group.block_plugin:
@ -200,12 +219,12 @@ class PluginManage:
f"{plugin.module},", ""
)
await group.save(update_fields=["block_plugin"])
return f"已成功{status_str} {plugin_name} 功能!"
return f"已成功{status_str} {plugin.name} 功能!"
else:
if plugin.module not in group.block_plugin:
group.block_plugin += f"{plugin.module},"
await group.save(update_fields=["block_plugin"])
return f"已成功{status_str} {plugin_name} 功能!"
return f"已成功{status_str} {plugin.name} 功能!"
return f"该功能已经{status_str}了喔,不要重复{status_str}..."
return "没有找到这个功能喔..."
@ -223,7 +242,11 @@ class PluginManage:
返回:
str: 返回信息
"""
if plugin := await PluginInfo.get_or_none(name=plugin_name):
if plugin_name.isdigit():
plugin = await PluginInfo.get_or_none(id=int(plugin_name))
else:
plugin = await PluginInfo.get_or_none(name=plugin_name)
if plugin:
if group_id:
if group := await GroupConsole.get_or_none(group_id=group_id):
if f"super:{plugin_name}," not in group.block_plugin:
@ -238,7 +261,7 @@ class PluginManage:
plugin.status = not bool(block_type)
await plugin.save(update_fields=["status", "block_type"])
if not block_type:
return f"已成功将 {plugin_name} 全局启用!"
return f"已成功将 {plugin.name} 全局启用!"
else:
return f"已成功将 {plugin_name} 全局关闭!"
return f"已成功将 {plugin.name} 全局关闭!"
return "没有找到这个功能喔..."

View File

@ -0,0 +1,94 @@
from nonebot.rule import to_me
from nonebot_plugin_alconna import (
Alconna,
Args,
Option,
Subcommand,
on_alconna,
store_true,
)
from zhenxun.utils.rules import admin_check, ensure_group
_status_matcher = on_alconna(
Alconna(
"switch",
Option("-t|--task", action=store_true, help_text="被动技能"),
Subcommand(
"open",
Args["name", [str, int]],
Option(
"-g|--group",
Args["group_id", str],
),
),
Subcommand(
"close",
Args["name", [str, int]],
Option(
"-t|--type",
Args["block_type", ["all", "a", "private", "p", "group", "g"]],
),
Option(
"-g|--group",
Args["group_id", str],
),
),
),
rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL"),
priority=5,
block=True,
)
# TODO: shortcut
_group_status_matcher = on_alconna(
Alconna("group-status", Args["status", ["sleep", "wake"]]),
rule=admin_check("admin_bot_manage", "CHANGE_GROUP_SWITCH_LEVEL")
& ensure_group
& to_me(),
priority=5,
block=True,
)
_status_matcher.shortcut(
r"插件列表",
command="switch",
arguments=[],
prefix=True,
)
_status_matcher.shortcut(
r"群被动状态",
command="switch",
arguments=["--task"],
prefix=True,
)
_status_matcher.shortcut(
r"开启(?P<name>.+)",
command="switch",
arguments=["open", "{name}"],
prefix=True,
)
_status_matcher.shortcut(
r"关闭(?P<name>.+)",
command="switch",
arguments=["close", "{name}"],
prefix=True,
)
_group_status_matcher.shortcut(
r"醒来",
command="group-status",
arguments=["wake"],
prefix=True,
)
_group_status_matcher.shortcut(
r"休息吧",
command="group-status",
arguments=["sleep"],
prefix=True,
)

View File

@ -84,7 +84,7 @@ class HelpImageBuild:
sta = 2
if not group_id and plugin.block_type in [
BlockType.ALL,
BlockType.FRIEND,
BlockType.PRIVATE,
]:
sta = 2
if group_id and (

View File

@ -0,0 +1,455 @@
from typing import Dict
from unittest import result
from nonebot.adapters import Bot, Event
from nonebot.exception import IgnoredException
from nonebot.matcher import Matcher
from nonebot_plugin_alconna import UniMsg
from nonebot_plugin_saa import Mention, MessageFactory, Text
from nonebot_plugin_session import EventSession
from pydantic import BaseModel
from zhenxun.configs.config import Config
from zhenxun.models.group_console import GroupConsole
from zhenxun.models.level_user import LevelUser
from zhenxun.models.plugin_info import PluginInfo
from zhenxun.models.plugin_limit import PluginLimit
from zhenxun.models.user_console import UserConsole
from zhenxun.services.log import logger
from zhenxun.utils.enum import (
BlockType,
GoldHandle,
LimitWatchType,
PluginLimitType,
PluginType,
)
from zhenxun.utils.utils import CountLimiter, FreqLimiter, UserBlockLimiter
class Limit(BaseModel):
limit: PluginLimit
limiter: FreqLimiter | UserBlockLimiter | CountLimiter
class Config:
arbitrary_types_allowed = True
class LimitManage:
add_module = []
cd_limit: Dict[str, Limit] = {}
block_limit: Dict[str, Limit] = {}
count_limit: Dict[str, Limit] = {}
@classmethod
def add_limit(cls, limit: PluginLimit):
"""添加限制
参数:
limit: PluginLimit
"""
if limit.module not in cls.add_module:
cls.add_module.append(limit.module)
if limit.limit_type == PluginLimitType.BLOCK:
cls.block_limit[limit.module] = Limit(
limit=limit, limiter=UserBlockLimiter()
)
elif limit.limit_type == PluginLimitType.CD:
cls.cd_limit[limit.module] = Limit(
limit=limit, limiter=FreqLimiter(limit.cd)
)
elif limit.limit_type == PluginLimitType.COUNT:
cls.count_limit[limit.module] = Limit(
limit=limit, limiter=CountLimiter(limit.max_count)
)
@classmethod
def unblock(
cls, module: str, user_id: str, group_id: str | None, channel_id: str | None
):
"""解除插件block
参数:
module: 模块名
user_id: 用户id
group_id: 群组id
channel_id: 频道id
"""
if limit_model := cls.block_limit.get(module):
limit = limit_model.limit
limiter: UserBlockLimiter = limit_model.limiter # type: ignore
key_type = user_id
if group_id and limit.watch_type == LimitWatchType.GROUP:
key_type = channel_id or group_id
limiter.set_false(key_type)
@classmethod
async def check(
cls,
module: str,
user_id: str,
group_id: str | None,
channel_id: str | None,
session: EventSession,
):
"""检测限制
参数:
module: 模块名
user_id: 用户id
group_id: 群组id
channel_id: 频道id
session: Session
异常:
IgnoredException: IgnoredException
"""
if limit_model := cls.cd_limit.get(module):
await cls.__check(limit_model, user_id, group_id, channel_id, session)
if limit_model := cls.block_limit.get(module):
await cls.__check(limit_model, user_id, group_id, channel_id, session)
if limit_model := cls.count_limit.get(module):
await cls.__check(limit_model, user_id, group_id, channel_id, session)
@classmethod
async def __check(
cls,
limit_model: Limit,
user_id: str,
group_id: str | None,
channel_id: str | None,
session: EventSession,
):
"""检测限制
参数:
limit_model: Limit
user_id: 用户id
group_id: 群组id
channel_id: 频道id
session: Session
异常:
IgnoredException: IgnoredException
"""
if limit_model:
limit = limit_model.limit
limiter = limit_model.limiter
is_limit = (
LimitWatchType.ALL
or (group_id and limit.watch_type == LimitWatchType.GROUP)
or (not group_id and limit.watch_type == LimitWatchType.USER)
)
key_type = user_id
if group_id and limit.watch_type == LimitWatchType.GROUP:
key_type = channel_id or group_id
if is_limit and limiter.check(key_type):
if limit.result:
await Text(limit.result).send()
logger.debug(
f"{limit.module}({limit.limit_type}) 正在限制中...",
"HOOK",
session=session,
)
raise IgnoredException(f"{limit.module} 正在cd中...")
else:
if isinstance(limiter, FreqLimiter):
limiter.start_cd(key_type)
if isinstance(limiter, UserBlockLimiter):
limiter.set_true(key_type)
if isinstance(limiter, CountLimiter):
limiter.increase(key_type)
class IsSuperuserException(Exception):
pass
class AuthChecker:
"""
权限检查
"""
def __init__(self):
check_notice_info_cd = Config.get_config("hook", "CHECK_NOTICE_INFO_CD")
if check_notice_info_cd is None or check_notice_info_cd < 0:
raise ValueError("模块: [hook], 配置项: [CHECK_NOTICE_INFO_CD] 为空或小于0")
self._flmt = FreqLimiter(check_notice_info_cd)
self._flmt_g = FreqLimiter(check_notice_info_cd)
self._flmt_s = FreqLimiter(check_notice_info_cd)
self._flmt_c = FreqLimiter(check_notice_info_cd)
async def auth(
self,
matcher: Matcher,
bot: Bot,
session: EventSession,
message: UniMsg,
):
"""权限检查
参数:
matcher: matcher
bot: bot
session: EventSession
message: UniMsg
"""
is_ignore = False
cost_gold = 0
user_id = session.id1
group_id = session.id3
channel_id = session.id2
if not group_id:
group_id = channel_id
channel_id = None
if user_id and matcher.plugin and (module := matcher.plugin.name):
user = await UserConsole.get_user(user_id, session.platform)
if plugin := await PluginInfo.get_or_none(module=module):
try:
cost_gold = await self.auth_cost(user, plugin, session)
if session.id1 in bot.config.superusers:
if plugin.plugin_type == PluginType.SUPERUSER:
raise IsSuperuserException()
if not plugin.limit_superuser:
cost_gold = 0
raise IsSuperuserException()
await self.auth_group(plugin, session, message)
await self.auth_admin(plugin, session)
await self.auth_plugin(plugin, session)
await self.auth_limit(plugin, session)
except IsSuperuserException:
logger.debug(
f"超级用户或被ban跳过权限检测...", "HOOK", session=session
)
except IgnoredException:
is_ignore = True
LimitManage.unblock(
matcher.plugin.name, user_id, group_id, channel_id
)
if cost_gold and user_id:
"""花费金币"""
await UserConsole.reduce_gold(
user_id,
cost_gold,
GoldHandle.PLUGIN,
matcher.plugin.name if matcher.plugin else "",
session.platform,
)
logger.debug(f"调用功能花费金币: {cost_gold}", "HOOK", session=session)
if is_ignore:
raise IgnoredException("权限检测 ignore")
async def auth_limit(self, plugin: PluginInfo, session: EventSession):
"""插件限制
参数:
plugin: PluginInfo
session: EventSession
"""
user_id = session.id1
group_id = session.id3
channel_id = session.id2
if not group_id:
group_id = channel_id
channel_id = None
limit_list: list[PluginLimit] = await plugin.plugin_limit.all() # type: ignore
for limit in limit_list:
LimitManage.add_limit(limit)
if user_id:
await LimitManage.check(
plugin.module, user_id, group_id, channel_id, session
)
async def auth_plugin(self, plugin: PluginInfo, session: EventSession):
"""插件状态
参数:
plugin: PluginInfo
session: EventSession
"""
user_id = session.id1
group_id = session.id3
channel_id = session.id2
if not group_id:
group_id = channel_id
channel_id = None
if user_id:
if group_id:
if await GroupConsole.is_block_plugin(
group_id, plugin.module, channel_id
):
"""群组插件状态"""
if self._flmt_s.check(group_id or user_id):
self._flmt_s.start_cd(group_id or user_id)
await Text("该群未开启此功能...").send(reply=True)
logger.debug(
f"{plugin.name}({plugin.module}) 未开启此功能...",
"HOOK",
session=session,
)
raise IgnoredException("该群未开启此功能...")
if await GroupConsole.is_super_block_plugin(
group_id, plugin.module, channel_id
):
"""群组插件状态"""
if self._flmt_s.check(group_id or user_id):
self._flmt_s.start_cd(group_id or user_id)
await Text("超级管理员禁用了该群此功能...").send(reply=True)
logger.debug(
f"{plugin.name}({plugin.module}) 超级管理员禁用了该群此功能...",
"HOOK",
session=session,
)
raise IgnoredException("超级管理员禁用了该群此功能...")
# 群聊禁用
if not plugin.status and plugin.block_type == BlockType.GROUP:
try:
if self._flmt_c.check(group_id):
self._flmt_c.start_cd(group_id)
await Text("该功能在群聊中已被禁用...").send(reply=True)
except Exception:
pass
logger.debug(
f"{plugin.name}({plugin.module}) 该插件在群聊中已被禁用...",
"HOOK",
session=session,
)
raise IgnoredException("该插件在群聊中已被禁用...")
else:
# 私聊禁用
if not plugin.status and plugin.block_type == BlockType.PRIVATE:
try:
if self._flmt_c.check(user_id):
self._flmt_c.start_cd(user_id)
await Text("该功能在私聊中已被禁用...").send()
except Exception:
pass
logger.debug(
f"{plugin.name}({plugin.module}) 该插件在私聊中已被禁用...",
"HOOK",
session=session,
)
raise IgnoredException("该插件在私聊中已被禁用...")
if not plugin.status and plugin.block_type == BlockType.ALL:
"""全局状态"""
if group_id:
if await GroupConsole.is_super_group(group_id, channel_id):
raise IsSuperuserException()
if self._flmt_s.check(group_id or user_id):
self._flmt_s.start_cd(group_id or user_id)
await Text("全局未开启此功能...").send()
logger.debug(
f"{plugin.name}({plugin.module}) 全局未开启此功能...",
"HOOK",
session=session,
)
raise IgnoredException("全局未开启此功能...")
async def auth_admin(self, plugin: PluginInfo, session: EventSession):
"""管理员命令 个人权限
参数:
plugin: PluginInfo
session: EventSession
"""
user_id = session.id1
group_id = session.id3 or session.id2
if user_id and group_id and plugin.admin_level:
if group_id:
if await LevelUser.check_level(user_id, group_id, plugin.admin_level):
try:
if self._flmt.check(user_id):
self._flmt.start_cd(user_id)
await MessageFactory(
[
Mention(user_id),
Text(
f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}"
),
]
).finish(reply=True)
except Exception:
pass
logger.debug(
f"{plugin.name}({plugin.module}) 管理员权限不足...",
"HOOK",
session=session,
)
raise IgnoredException("管理员权限不足...")
else:
if not await LevelUser.check_level(user_id, "", plugin.admin_level):
try:
await Text(
f"你的权限不足喔,该功能需要的权限等级: {plugin.admin_level}"
).finish()
except Exception:
pass
logger.debug(
f"{plugin.name}({plugin.module}) 管理员权限不足...",
"HOOK",
session=session,
)
raise IgnoredException("权限不足")
async def auth_group(
self, plugin: PluginInfo, session: EventSession, message: UniMsg
):
"""群黑名单检测 群总开关检测
参数:
plugin: PluginInfo
session: EventSession
message: UniMsg
"""
if group_id := session.id3 or session.id2:
text = message.extract_plain_text()
group, _ = await GroupConsole.get_or_create(group_id=group_id)
if group.level < -1:
"""群权限小于0"""
logger.debug(
f"{plugin.name}({plugin.module}) 群黑名单, 群权限-1...",
"HOOK",
session=session,
)
raise IgnoredException("群黑名单")
if not group.status:
"""群休眠"""
if text.strip() != "醒来":
logger.debug(
f"{plugin.name}({plugin.module}) 功能总开关关闭状态...",
"HOOK",
session=session,
)
raise IgnoredException("功能总开关关闭状态")
async def auth_cost(
self, user: UserConsole, plugin: PluginInfo, session: EventSession
) -> int:
"""检测是否满足金币条件
参数:
user: UserConsole
plugin: PluginInfo
session: EventSession
返回:
int: 需要消耗的金币
"""
if user.gold < plugin.cost_gold:
"""插件消耗金币不足"""
try:
await Text(f"金币不足..该功能需要{plugin.cost_gold}金币..").send()
except Exception:
pass
logger.debug(
f"{plugin.name}({plugin.module}) 金币限制..该功能需要{plugin.cost_gold}金币..",
"HOOK",
session=session,
)
raise IgnoredException(f"{plugin.name}({plugin.module}) 金币限制...")
return plugin.cost_gold
checker = AuthChecker()

View File

@ -0,0 +1,35 @@
from typing import Optional
from nonebot.adapters.onebot.v11 import Bot, Event, MessageEvent
from nonebot.matcher import Matcher
from nonebot.message import run_postprocessor, run_preprocessor
from nonebot_plugin_alconna import UniMsg
from nonebot_plugin_session import EventSession
from ._auth_checker import LimitManage, checker
# # 权限检测
@run_preprocessor
async def _(matcher: Matcher, bot: Bot, session: EventSession, message: UniMsg):
await checker.auth(matcher, bot, session, message)
# 解除命令block阻塞
@run_postprocessor
async def _(
matcher: Matcher,
exception: Optional[Exception],
bot: Bot,
event: Event,
session: EventSession,
):
user_id = session.id1
group_id = session.id3
channel_id = session.id2
if not group_id:
group_id = channel_id
channel_id = None
if user_id and matcher.plugin:
module = matcher.plugin.name
LimitManage.unblock(module, user_id, group_id, channel_id)

View File

@ -136,7 +136,7 @@ async def _():
create_list = []
update_list = []
for task in task_list:
if task.module not in module_list:
if task.module not in module_dict:
create_list.append(task)
else:
task.id = module_dict[task.module]

View File

@ -36,9 +36,7 @@ class ShopManage:
goods = filter_goods[0]
else:
return "道具名称不存在..."
user, _ = await UserConsole.get_or_create(
user_id=user_id, defaults={"platform": platform}
)
user = await UserConsole.get_user(user_id, platform)
price = goods.goods_price * num * goods.goods_discount
if user.gold < price:
return "糟糕! 您的金币好像不太够哦..."
@ -77,9 +75,7 @@ class ShopManage:
返回:
BuildImage | None: 道具背包图片
"""
user, _ = await UserConsole.get_or_create(
user_id=user_id, defaults={"platform": platform}
)
user = await UserConsole.get_user(user_id, platform)
if not user.props:
return None
result = await GoodsInfo.filter(uuid__in=user.props.keys()).all()
@ -113,9 +109,7 @@ class ShopManage:
返回:
int: 金币数量
"""
user, _ = await UserConsole.get_or_create(
user_id=user_id, defaults={"platform": platform}
)
user = await UserConsole.get_user(user_id, platform)
return user.gold
@classmethod

View File

@ -94,13 +94,7 @@ class SignManage:
if not session.id1:
return None
now = datetime.now(pytz.timezone("Asia/Shanghai"))
user_console, _ = await UserConsole.get_or_create(
user_id=session.id1,
defaults={
"uid": await UserConsole.get_new_uid(),
"platform": session.platform,
},
)
user_console = await UserConsole.get_user(session.id1, session.platform)
user, _ = await SignUser.get_or_create(
user_id=session.id1,
defaults={"user_console": user_console, "platform": session.platform},
@ -112,7 +106,6 @@ class SignManage:
or (new_log and now > new_log.create_time)
or file_name in os.listdir(SIGN_TODAY_CARD_PATH)
):
user_console, _ = await UserConsole.get_or_create(user_id=session.id1)
path = await get_card(user, nickname, -1, user_console.gold, "")
else:
path = await cls._handle_sign_in(user, nickname, session, is_view_card)

View File

@ -35,19 +35,14 @@ async def _():
**{"好感度双倍加持卡_prob": 0.1, "好感度双倍加持卡Ⅱ_prob": 0.2, "好感度双倍加持卡Ⅲ_prob": 0.3}, # type: ignore
)
async def _(session: EventSession, user_id: int, group_id: int, prob: float):
user_console, _ = await UserConsole.get_or_create(
user_id=session.id1,
defaults={
"uid": await UserConsole.get_new_uid(),
"platform": session.platform,
},
)
user, _ = await SignUser.get_or_create(
user_id=user_id,
defaults={"platform": session.platform, "user_console": user_console},
)
user.add_probability = Decimal(prob)
await user.save(update_fields=["add_probability"])
if session.id1:
user_console = await UserConsole.get_user(session.id1, session.platform)
user, _ = await SignUser.get_or_create(
user_id=user_id,
defaults={"platform": session.platform, "user_console": user_console},
)
user.add_probability = Decimal(prob)
await user.save(update_fields=["add_probability"])
@shop_register(
name="测试道具A",

View File

@ -0,0 +1,154 @@
from nonebot.adapters import Bot
from nonebot.adapters.onebot.v11 import Bot as v11Bot
from nonebot.params import Depends
from nonebot.permission import SUPERUSER
from nonebot.plugin import PluginMetadata
from nonebot.typing import T_State
from nonebot_plugin_alconna import (
Alconna,
Args,
Arparma,
Match,
Option,
Subcommand,
on_alconna,
store_true,
)
from nonebot_plugin_saa import Text
from nonebot_plugin_session import EventSession
from zhenxun.configs.config import NICKNAME
from zhenxun.configs.utils import PluginExtraData
from zhenxun.models.group_console import GroupConsole
from zhenxun.services.log import logger
from zhenxun.utils.enum import PluginType
__plugin_meta__ = PluginMetadata(
name="管理群操作",
description="管理群操作",
usage="""
群权限 | 群白名单 | 退出群 操作
退群添加/删除群白名单添加/删除群认证当在群组中这五个命令且没有指定群号时默认指定当前群组
指令:
退群 ?[group_id]
修改群权限 [group_id] [等级]
修改群权限 [等级]: 该命令仅在群组时生效默认修改当前群组
添加群白名单 ?*[group_id]
删除群白名单 ?*[group_id]
添加群认证 ?*[group_id]
删除群认证 ?*[group_id]
""".strip(),
extra=PluginExtraData(
author="HibiKier",
version="0.1",
plugin_type=PluginType.SUPERUSER,
).dict(),
)
_matcher = on_alconna(
Alconna(
"group-manage",
Subcommand(
"modify-level", Args["level", int]["group_id?", int], help_text="修改群权限"
),
Subcommand(
"super-handle",
Option("--del", action=store_true, help_text="删除"),
Args["group_id", int],
help_text="添加/删除群白名单",
),
Subcommand(
"auth-handle",
Option("--del", action=store_true, help_text="删除"),
Args["group_id", int],
help_text="添加群白名单",
),
Subcommand("del-group", Args["group_id", int], help_text="退出群组"),
),
permission=SUPERUSER,
priority=1,
block=True,
)
def CheckGroupId():
"""
检测群组id
"""
async def dependency(
session: EventSession,
group_id: Match[int],
state: T_State,
):
gid = session.id3 or session.id2
if group_id.available:
gid = group_id.result
if not gid:
await Text("群组id不能为空...").finish()
state["group_id"] = gid
return Depends(dependency)
@_matcher.assign("modify-level", parameterless=[])
async def _(session: EventSession, arparma: Arparma, state: T_State, level: int):
gid = state["group_id"]
group, _ = await GroupConsole.get_or_create(group_id=gid)
old_level = group.level
group.level = level
await group.save(update_fields=["level"])
await Text("群权限修改成功!").send(reply=True)
logger.info(
f"修改群权限: {old_level} -> {level}",
arparma.header_result,
session=session,
target=gid,
)
@_matcher.assign("super-handle")
async def _(session: EventSession, arparma: Arparma, state: T_State):
gid = state["group_id"]
group = await GroupConsole.get_or_none(group_id=gid)
if not group:
await Text("群组信息不存在, 请更新群组信息...").finish()
s = "删除" if arparma.find("del") else "添加"
group.is_super = not arparma.find("del")
await group.save(update_fields=["is_super"])
await Text(f"{s}群白名单成功!").send(reply=True)
logger.info(f"{s}群白名单", arparma.header_result, session=session, target=gid)
@_matcher.assign("auth-handle")
async def _(session: EventSession, arparma: Arparma, state: T_State):
gid = state["group_id"]
await GroupConsole.update_or_create(
group_id=gid, defaults={"group_flag": 0 if arparma.find("del") else 1}
)
s = "删除" if arparma.find("del") else "添加"
await Text(f"{s}群认证成功!").send(reply=True)
logger.info(f"{s}群白名单", arparma.header_result, session=session, target=gid)
@_matcher.assign("del-group")
async def _(bot: Bot, session: EventSession, arparma: Arparma, group_id: int):
if isinstance(bot, v11Bot):
group_list = [g["group_id"] for g in await bot.get_group_list()]
if group_id not in group_list:
logger.debug("群组不存在", "退群", session=session, target=group_id)
await Text(f"{NICKNAME}未在该群组中...").finish()
try:
await bot.set_group_leave(group_id=group_id)
logger.info(
f"{NICKNAME}退出群组成功", "退群", session=session, target=group_id
)
await Text(f"退出群组 {group_id} 成功!").send()
await GroupConsole.filter(group_id=group_id).delete()
except Exception as e:
logger.error(f"退出群组失败", "退群", session=session, target=group_id, e=e)
await Text(f"退出群组 {group_id} 失败...").send()
else:
# TODO: 其他平台的退群操作
await Text(f"暂未支持退群操作...").send()

View File

@ -17,6 +17,14 @@ class GroupConsole(Model):
"""最大人数"""
member_count = fields.IntField(default=0, description="当前人数")
"""当前人数"""
status = fields.BooleanField(default=True, description="群状态")
"""群状态"""
level = fields.IntField(default=5, description="群权限")
"""群权限"""
is_super = fields.BooleanField(
default=False, description="超级用户指定,可以使用全局关闭的功能"
)
"""超级用户指定群,可以使用全局关闭的功能"""
group_flag = fields.IntField(default=0, description="群认证标记")
"""群认证标记"""
block_plugin = fields.TextField(default="", description="禁用插件")
@ -31,6 +39,61 @@ class GroupConsole(Model):
table_description = "群组信息表"
unique_together = ("group_id", "channel_id")
@classmethod
async def is_super_group(cls, group_id: str, channel_id: str | None = None) -> bool:
"""是否超级用户指定群
参数:
group_id: 群组id
channel_id: 频道id.
返回:
bool: 是否超级用户指定群
"""
if group := await cls.get_or_none(group_id=group_id):
return group.is_super
return False
@classmethod
async def is_super_block_plugin(
cls, group_id: str, module: str, channel_id: str | None = None
) -> bool:
"""查看群组是否超级用户禁用功能
参数:
group_id: 群组id
module: 模块名称
channel_id: 频道id
返回:
bool: 是否禁用被动
"""
return await cls.exists(
group_id=group_id,
channel_id=channel_id,
block_plugin__contains=f"super:{module},",
)
@classmethod
async def is_block_plugin(
cls, group_id: str, module: str, channel_id: str | None = None
) -> bool:
"""查看群组是否禁用功能
参数:
group_id: 群组id
module: 模块名称
channel_id: 频道id
返回:
bool: 是否禁用被动
"""
return await cls.exists(
group_id=group_id,
channel_id=channel_id,
block_plugin__contains=f"{module},",
)
@classmethod
async def is_block_task(
cls, group_id: str, task: str, channel_id: str | None = None

View File

@ -26,7 +26,7 @@ class PluginLimit(Model):
limit_type = fields.CharEnumField(PluginLimitType, description="限制类型")
"""限制类型"""
watch_type = fields.CharEnumField(LimitWatchType, description="监听类型")
"""限制类型"""
"""监听类型"""
status = fields.BooleanField(default=True, description="限制的开关状态")
"""限制的开关状态"""
check_type = fields.CharEnumField(

View File

@ -4,6 +4,7 @@ from tortoise import fields
from zhenxun.services.db_context import Model
from zhenxun.utils.enum import GoldHandle
from zhenxun.utils.exception import InsufficientGold
from .user_gold_log import UserGoldLog
@ -32,7 +33,29 @@ class UserConsole(Model):
table_description = "用户数据表"
@classmethod
async def get_new_uid(cls):
async def get_user(cls, user_id: str, platform: str | None = None) -> "UserConsole":
"""获取用户
参数:
user_id: 用户id
platform: 平台.
返回:
UserConsole: UserConsole
"""
user, _ = await UserConsole.get_or_create(
user_id=user_id,
defaults={"platform": platform, "uid": await cls.get_new_uid()},
)
return user
@classmethod
async def get_new_uid(cls) -> int:
"""获取最新uid
返回:
int: 最新uid
"""
if user := await cls.annotate().order_by("uid").first():
return user.uid + 1
return 1
@ -58,6 +81,38 @@ class UserConsole(Model):
user_id=user_id, gold=gold, handle=GoldHandle.GET, source=source
)
@classmethod
async def reduce_gold(
cls,
user_id: str,
gold: int,
handle: GoldHandle,
plugin_module: str,
platform: str | None = None,
):
"""消耗金币
参数:
user_id: 用户id
gold: 金币
handle: 金币处理
plugin_name: 插件模块
platform: 平台.
异常:
InsufficientGold: 金币不足
"""
user, _ = await cls.get_or_create(
user_id=user_id, defaults={"platform": platform, "uid": cls.get_new_uid()}
)
if user.gold < gold:
raise InsufficientGold()
user.gold -= gold
await user.save(update_fields=["gold"])
await UserGoldLog.create(
user_id=user_id, gold=gold, handle=handle, source=plugin_module
)
@classmethod
async def add_props(
cls, user_id: str, goods_uuid: str, num: int = 1, platform: str | None = None

View File

@ -453,14 +453,24 @@ class BuildImage:
return self
def pic2bs4(self) -> str:
"""
BuildImage base64
"""BuildImage 转 base64
返回:
str: base64
"""
buf = BytesIO()
self.markImg.save(buf, format="PNG")
base64_str = base64.b64encode(buf.getvalue()).decode()
return "base64://" + base64_str
def pic2io(self) -> BytesIO:
"""图片转 BytesIO
返回:
BytesIO: BytesIO
"""
return BytesIO(self.tobytes())
def convert(self, type_: ModeType) -> Self:
"""
修改图片类型

View File

@ -10,6 +10,8 @@ class GoldHandle(StrEnum):
"""购买"""
GET = "GET"
"""获取"""
PLUGIN = "PLUGIN"
"""插件花费"""
class PropHandle(StrEnum):
@ -40,7 +42,7 @@ class BlockType(StrEnum):
禁用状态
"""
FRIEND = "PRIVATE"
PRIVATE = "PRIVATE"
GROUP = "GROUP"
ALL = "ALL"
@ -72,6 +74,7 @@ class LimitWatchType(StrEnum):
USER = "USER"
GROUP = "GROUP"
ALL = "ALL"
class RequestType(StrEnum):

View File

@ -1,14 +1,38 @@
class NotFoundError(Exception):
"""
未发现
"""
pass
class GroupInfoNotFound(Exception):
"""
群组未找到
"""
pass
class EmptyError(Exception):
"""
空错误
"""
pass
class UserAndGroupIsNone(Exception):
"""
用户和群组为空
"""
pass
class InsufficientGold(Exception):
"""
金币不足
"""
pass

View File

@ -27,7 +27,9 @@ def admin_check(a: int | str, key: str | None = None) -> Rule:
if type(a) == str and key:
level = Config.get_config(a, key)
if level is not None:
return bool(LevelUser.check_level(session.id1, session.id2, int(level)))
return bool(
await LevelUser.check_level(session.id1, session.id2, int(level))
)
return False
return Rule(_rule)

View File

@ -1,10 +1,12 @@
import os
import time
from collections import defaultdict
from datetime import datetime
from pathlib import Path
from typing import Any
import httpx
import pytz
from zhenxun.services.log import logger
@ -78,21 +80,31 @@ class ResourceDirManager:
class CountLimiter:
"""
次数检测工具检测调用次数是否超过设定值
每日调用命令次数限制
"""
def __init__(self, max_count: int):
tz = pytz.timezone("Asia/Shanghai")
def __init__(self, max_num):
self.today = -1
self.count = defaultdict(int)
self.max_count = max_count
self.max = max_num
def add(self, key: Any):
self.count[key] += 1
def check(self, key) -> bool:
day = datetime.now(self.tz).day
if day != self.today:
self.today = day
self.count.clear()
return bool(self.count[key] < self.max)
def check(self, key: Any) -> bool:
if self.count[key] >= self.max_count:
self.count[key] = 0
return True
return False
def get_num(self, key):
return self.count[key]
def increase(self, key, num=1):
self.count[key] += num
def reset(self, key):
self.count[key] = 0
class UserBlockLimiter: