mirror of
https://github.com/zhenxun-org/zhenxun_bot.git
synced 2025-12-15 06:12:53 +08:00
🐛 修复获取群组时会修改群组插件关闭状态 (#1869)
* 🐛 修复获取群组时会修改群组插件关闭状态 * ✨ 支持https图片地址转换 * ✨ 支持https图片地址转换 * 🎨 移除多余导入 * 🎨 优化GroupConsole设置插件默认状态代码结构 * 🎨 优化群组表代码结构 * ⚡ 移除build_message的https默认转图片
This commit is contained in:
parent
737a740968
commit
36bbaa3ae1
5
.vscode/settings.json
vendored
5
.vscode/settings.json
vendored
@ -16,6 +16,7 @@
|
||||
"jsdelivr",
|
||||
"kaiheila",
|
||||
"lolicon",
|
||||
"Mahiro",
|
||||
"nonebot",
|
||||
"onebot",
|
||||
"pixiv",
|
||||
@ -24,9 +25,9 @@
|
||||
"tobytes",
|
||||
"ujson",
|
||||
"unban",
|
||||
"Uninfo",
|
||||
"userinfo",
|
||||
"zhenxun",
|
||||
"jsdelivr"
|
||||
"zhenxun"
|
||||
],
|
||||
"python.analysis.autoImportCompletions": true,
|
||||
"python.testing.pytestArgs": ["tests"],
|
||||
|
||||
@ -15,7 +15,8 @@ async def get_task() -> dict[str, str] | None:
|
||||
return {
|
||||
"name": "被动技能",
|
||||
"description": "控制群组中的被动技能状态",
|
||||
"usage": "通过 开启/关闭群被动 来控制群被<br>----------<br>"
|
||||
"usage": "通过 开启/关闭群被动 来控制群被动 <br>"
|
||||
+ " 示例:开启/关闭群被动早晚安 <br> ---------- <br> "
|
||||
+ "<br>".join([task.name for task in task_list]),
|
||||
}
|
||||
return None
|
||||
|
||||
@ -1,3 +1,5 @@
|
||||
import os
|
||||
|
||||
from zhenxun.configs.path_config import DATA_PATH, IMAGE_PATH
|
||||
from zhenxun.models.group_console import GroupConsole
|
||||
from zhenxun.models.plugin_info import PluginInfo
|
||||
@ -14,9 +16,9 @@ GROUP_HELP_PATH = DATA_PATH / "group_help"
|
||||
def delete_help_image(gid: str | None = None):
|
||||
"""删除帮助图片"""
|
||||
if gid:
|
||||
file = GROUP_HELP_PATH / f"{gid}.png"
|
||||
if file.exists():
|
||||
file.unlink()
|
||||
for file in os.listdir(GROUP_HELP_PATH):
|
||||
if file.startswith(f"{gid}"):
|
||||
os.remove(GROUP_HELP_PATH / file)
|
||||
else:
|
||||
if HELP_FILE.exists():
|
||||
HELP_FILE.unlink()
|
||||
@ -196,7 +198,7 @@ class PluginManage:
|
||||
await PluginInfo.filter(plugin_type=PluginType.NORMAL).update(
|
||||
default_status=status
|
||||
)
|
||||
return f'成功将所有功能进群默认状态修改为: {"开启" if status else "关闭"}'
|
||||
return f"成功将所有功能进群默认状态修改为: {'开启' if status else '关闭'}"
|
||||
if group_id:
|
||||
if group := await GroupConsole.get_or_none(
|
||||
group_id=group_id, channel_id__isnull=True
|
||||
@ -213,12 +215,12 @@ class PluginManage:
|
||||
module_list = [f"<{module}" for module in module_list]
|
||||
group.block_plugin = ",".join(module_list) + "," # type: ignore
|
||||
await group.save(update_fields=["block_plugin"])
|
||||
return f'成功将此群组所有功能状态修改为: {"开启" if status else "关闭"}'
|
||||
return f"成功将此群组所有功能状态修改为: {'开启' if status else '关闭'}"
|
||||
return "获取群组失败..."
|
||||
await PluginInfo.filter(plugin_type=PluginType.NORMAL).update(
|
||||
status=status, block_type=None if status else BlockType.ALL
|
||||
)
|
||||
return f'成功将所有功能全局状态修改为: {"开启" if status else "关闭"}'
|
||||
return f"成功将所有功能全局状态修改为: {'开启' if status else '关闭'}"
|
||||
|
||||
@classmethod
|
||||
async def is_wake(cls, group_id: str) -> bool:
|
||||
|
||||
@ -15,7 +15,8 @@ async def get_task() -> dict[str, str] | None:
|
||||
return {
|
||||
"name": "被动技能",
|
||||
"description": "控制群组中的被动技能状态",
|
||||
"usage": "通过 开启/关闭群被动 来控制群被动 <br> ---------- <br> "
|
||||
"usage": "通过 开启/关闭群被动 来控制群被动 <br>"
|
||||
+ " 示例:开启/关闭群被动早晚安 <br> ---------- <br> "
|
||||
+ "<br>".join([task.name for task in task_list]),
|
||||
}
|
||||
return None
|
||||
|
||||
@ -1,4 +1,4 @@
|
||||
from typing import Any, overload
|
||||
from typing import Any, cast, overload
|
||||
from typing_extensions import Self
|
||||
|
||||
from tortoise import fields
|
||||
@ -10,6 +10,42 @@ from zhenxun.services.db_context import Model
|
||||
from zhenxun.utils.enum import PluginType
|
||||
|
||||
|
||||
def add_disable_marker(name: str) -> str:
|
||||
"""添加模块禁用标记符
|
||||
|
||||
Args:
|
||||
name: 模块名称
|
||||
|
||||
Returns:
|
||||
添加了禁用标记的模块名 (前缀'<'和后缀',')
|
||||
"""
|
||||
return f"<{name},"
|
||||
|
||||
|
||||
@overload
|
||||
def convert_module_format(data: str) -> list[str]: ...
|
||||
|
||||
|
||||
@overload
|
||||
def convert_module_format(data: list[str]) -> str: ...
|
||||
|
||||
|
||||
def convert_module_format(data: str | list[str]) -> str | list[str]:
|
||||
"""
|
||||
在 `<aaa,<bbb,<ccc,` 和 `["aaa", "bbb", "ccc"]` (即禁用启用)之间进行相互转换。
|
||||
|
||||
参数:
|
||||
data: 要转换的数据
|
||||
|
||||
返回:
|
||||
str | list[str]: 根据输入类型返回转换后的数据。
|
||||
"""
|
||||
if isinstance(data, str):
|
||||
return [item.strip(",") for item in data.split("<") if item]
|
||||
else:
|
||||
return "".join(format(item) for item in data)
|
||||
|
||||
|
||||
class GroupConsole(Model):
|
||||
id = fields.IntField(pk=True, generated=True, auto_increment=True)
|
||||
"""自增id"""
|
||||
@ -51,33 +87,34 @@ class GroupConsole(Model):
|
||||
table_description = "群组信息表"
|
||||
unique_together = ("group_id", "channel_id")
|
||||
|
||||
@staticmethod
|
||||
def format(name: str) -> str:
|
||||
return f"<{name},"
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
def convert_module_format(cls, data: str) -> list[str]: ...
|
||||
|
||||
@overload
|
||||
@classmethod
|
||||
def convert_module_format(cls, data: list[str]) -> str: ...
|
||||
|
||||
@classmethod
|
||||
def convert_module_format(cls, data: str | list[str]) -> str | list[str]:
|
||||
"""
|
||||
在 `<aaa,<bbb,<ccc,` 和 `["aaa", "bbb", "ccc"]` 之间进行相互转换。
|
||||
|
||||
参数:
|
||||
data (str | list[str]): 输入数据,可能是格式化字符串或字符串列表。
|
||||
async def _get_task_modules(cls, *, default_status: bool) -> list[str]:
|
||||
"""获取默认禁用的任务模块
|
||||
|
||||
返回:
|
||||
str | list[str]: 根据输入类型返回转换后的数据。
|
||||
list[str]: 任务模块列表
|
||||
"""
|
||||
if isinstance(data, str):
|
||||
return [item.strip(",") for item in data.split("<") if item]
|
||||
elif isinstance(data, list):
|
||||
return "".join(cls.format(item) for item in data)
|
||||
return cast(
|
||||
list[str],
|
||||
await TaskInfo.filter(default_status=default_status).values_list(
|
||||
"module", flat=True
|
||||
),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def _get_plugin_modules(cls, *, default_status: bool) -> list[str]:
|
||||
"""获取默认禁用的插件模块
|
||||
|
||||
返回:
|
||||
list[str]: 插件模块列表
|
||||
"""
|
||||
return cast(
|
||||
list[str],
|
||||
await PluginInfo.filter(
|
||||
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
|
||||
default_status=default_status,
|
||||
).values_list("module", flat=True),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
async def create(
|
||||
@ -85,20 +122,44 @@ class GroupConsole(Model):
|
||||
) -> Self:
|
||||
"""覆盖create方法"""
|
||||
group = await super().create(using_db=using_db, **kwargs)
|
||||
if modules := await TaskInfo.filter(default_status=False).values_list(
|
||||
"module", flat=True
|
||||
):
|
||||
group.block_task = cls.convert_module_format(modules) # type: ignore
|
||||
if modules := await PluginInfo.filter(
|
||||
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
|
||||
default_status=False,
|
||||
).values_list("module", flat=True):
|
||||
group.block_plugin = cls.convert_module_format(modules) # type: ignore
|
||||
await group.save(
|
||||
using_db=using_db, update_fields=["block_plugin", "block_task"]
|
||||
)
|
||||
|
||||
task_modules = await cls._get_task_modules(default_status=False)
|
||||
plugin_modules = await cls._get_plugin_modules(default_status=False)
|
||||
|
||||
if task_modules or plugin_modules:
|
||||
await cls._update_modules(group, task_modules, plugin_modules, using_db)
|
||||
|
||||
return group
|
||||
|
||||
@classmethod
|
||||
async def _update_modules(
|
||||
cls,
|
||||
group: Self,
|
||||
task_modules: list[str],
|
||||
plugin_modules: list[str],
|
||||
using_db: BaseDBAsyncClient | None = None,
|
||||
) -> None:
|
||||
"""更新模块设置
|
||||
|
||||
参数:
|
||||
group: 群组实例
|
||||
task_modules: 任务模块列表
|
||||
plugin_modules: 插件模块列表
|
||||
using_db: 数据库连接
|
||||
"""
|
||||
update_fields = []
|
||||
|
||||
if task_modules:
|
||||
group.block_task = convert_module_format(task_modules)
|
||||
update_fields.append("block_task")
|
||||
|
||||
if plugin_modules:
|
||||
group.block_plugin = convert_module_format(plugin_modules)
|
||||
update_fields.append("block_plugin")
|
||||
|
||||
if update_fields:
|
||||
await group.save(using_db=using_db, update_fields=update_fields)
|
||||
|
||||
@classmethod
|
||||
async def get_or_create(
|
||||
cls,
|
||||
@ -110,20 +171,15 @@ class GroupConsole(Model):
|
||||
group, is_create = await super().get_or_create(
|
||||
defaults=defaults, using_db=using_db, **kwargs
|
||||
)
|
||||
if is_create and (
|
||||
modules := await TaskInfo.filter(default_status=False).values_list(
|
||||
"module", flat=True
|
||||
)
|
||||
):
|
||||
group.block_task = cls.convert_module_format(modules) # type: ignore
|
||||
if modules := await PluginInfo.filter(
|
||||
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
|
||||
default_status=False,
|
||||
).values_list("module", flat=True):
|
||||
group.block_plugin = cls.convert_module_format(modules) # type: ignore
|
||||
await group.save(
|
||||
using_db=using_db, update_fields=["block_plugin", "block_task"]
|
||||
)
|
||||
if not is_create:
|
||||
return group, is_create
|
||||
|
||||
task_modules = await cls._get_task_modules(default_status=False)
|
||||
plugin_modules = await cls._get_plugin_modules(default_status=False)
|
||||
|
||||
if task_modules or plugin_modules:
|
||||
await cls._update_modules(group, task_modules, plugin_modules, using_db)
|
||||
|
||||
return group, is_create
|
||||
|
||||
@classmethod
|
||||
@ -137,20 +193,15 @@ class GroupConsole(Model):
|
||||
group, is_create = await super().update_or_create(
|
||||
defaults=defaults, using_db=using_db, **kwargs
|
||||
)
|
||||
if is_create and (
|
||||
modules := await TaskInfo.filter(default_status=False).values_list(
|
||||
"module", flat=True
|
||||
)
|
||||
):
|
||||
group.block_task = cls.convert_module_format(modules) # type: ignore
|
||||
if modules := await PluginInfo.filter(
|
||||
plugin_type__in=[PluginType.NORMAL, PluginType.DEPENDANT],
|
||||
default_status=False,
|
||||
).values_list("module", flat=True):
|
||||
group.block_plugin = cls.convert_module_format(modules) # type: ignore
|
||||
await group.save(
|
||||
using_db=using_db, update_fields=["block_plugin", "block_task"]
|
||||
)
|
||||
if not is_create:
|
||||
return group, is_create
|
||||
|
||||
task_modules = await cls._get_task_modules(default_status=False)
|
||||
plugin_modules = await cls._get_plugin_modules(default_status=False)
|
||||
|
||||
if task_modules or plugin_modules:
|
||||
await cls._update_modules(group, task_modules, plugin_modules, using_db)
|
||||
|
||||
return group, is_create
|
||||
|
||||
@classmethod
|
||||
@ -195,7 +246,7 @@ class GroupConsole(Model):
|
||||
"""
|
||||
return await cls.exists(
|
||||
group_id=group_id,
|
||||
superuser_block_plugin__contains=f"<{module},",
|
||||
superuser_block_plugin__contains=add_disable_marker(module),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@ -209,10 +260,11 @@ class GroupConsole(Model):
|
||||
返回:
|
||||
bool: 是否禁用插件
|
||||
"""
|
||||
module = add_disable_marker(module)
|
||||
return await cls.exists(
|
||||
group_id=group_id, block_plugin__contains=f"<{module},"
|
||||
group_id=group_id, block_plugin__contains=module
|
||||
) or await cls.exists(
|
||||
group_id=group_id, superuser_block_plugin__contains=f"<{module},"
|
||||
group_id=group_id, superuser_block_plugin__contains=module
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@ -234,12 +286,22 @@ class GroupConsole(Model):
|
||||
group, _ = await cls.get_or_create(
|
||||
group_id=group_id, defaults={"platform": platform}
|
||||
)
|
||||
update_fields = []
|
||||
if is_superuser:
|
||||
if f"<{module}," not in group.superuser_block_plugin:
|
||||
group.superuser_block_plugin += f"<{module},"
|
||||
elif f"<{module}," not in group.block_plugin:
|
||||
group.block_plugin += f"<{module},"
|
||||
await group.save(update_fields=["block_plugin", "superuser_block_plugin"])
|
||||
superuser_block_plugin = convert_module_format(group.superuser_block_plugin)
|
||||
if module not in superuser_block_plugin:
|
||||
superuser_block_plugin.append(module)
|
||||
group.superuser_block_plugin = convert_module_format(
|
||||
superuser_block_plugin
|
||||
)
|
||||
update_fields.append("superuser_block_plugin")
|
||||
elif add_disable_marker(module) not in group.block_plugin:
|
||||
block_plugin = convert_module_format(group.block_plugin)
|
||||
block_plugin.append(module)
|
||||
group.block_plugin = convert_module_format(block_plugin)
|
||||
update_fields.append("block_plugin")
|
||||
if update_fields:
|
||||
await group.save(update_fields=update_fields)
|
||||
|
||||
@classmethod
|
||||
async def set_unblock_plugin(
|
||||
@ -260,14 +322,22 @@ class GroupConsole(Model):
|
||||
group, _ = await cls.get_or_create(
|
||||
group_id=group_id, defaults={"platform": platform}
|
||||
)
|
||||
update_fields = []
|
||||
if is_superuser:
|
||||
if f"<{module}," in group.superuser_block_plugin:
|
||||
group.superuser_block_plugin = group.superuser_block_plugin.replace(
|
||||
f"<{module},", ""
|
||||
superuser_block_plugin = convert_module_format(group.superuser_block_plugin)
|
||||
if module in superuser_block_plugin:
|
||||
superuser_block_plugin.remove(module)
|
||||
group.superuser_block_plugin = convert_module_format(
|
||||
superuser_block_plugin
|
||||
)
|
||||
elif f"<{module}," in group.block_plugin:
|
||||
group.block_plugin = group.block_plugin.replace(f"<{module},", "")
|
||||
await group.save(update_fields=["block_plugin", "superuser_block_plugin"])
|
||||
update_fields.append("superuser_block_plugin")
|
||||
elif add_disable_marker(module) in group.block_plugin:
|
||||
block_plugin = convert_module_format(group.block_plugin)
|
||||
block_plugin.remove(module)
|
||||
group.block_plugin = convert_module_format(block_plugin)
|
||||
update_fields.append("block_plugin")
|
||||
if update_fields:
|
||||
await group.save(update_fields=update_fields)
|
||||
|
||||
@classmethod
|
||||
async def is_normal_block_plugin(
|
||||
@ -302,7 +372,7 @@ class GroupConsole(Model):
|
||||
"""
|
||||
return await cls.exists(
|
||||
group_id=group_id,
|
||||
superuser_block_task__contains=f"<{task},",
|
||||
superuser_block_task__contains=add_disable_marker(task),
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@ -319,22 +389,23 @@ class GroupConsole(Model):
|
||||
返回:
|
||||
bool: 是否禁用被动
|
||||
"""
|
||||
task = add_disable_marker(task)
|
||||
if not channel_id:
|
||||
return await cls.exists(
|
||||
group_id=group_id,
|
||||
channel_id__isnull=True,
|
||||
block_task__contains=f"<{task},",
|
||||
block_task__contains=task,
|
||||
) or await cls.exists(
|
||||
group_id=group_id,
|
||||
channel_id__isnull=True,
|
||||
superuser_block_task__contains=f"<{task},",
|
||||
superuser_block_task__contains=task,
|
||||
)
|
||||
return await cls.exists(
|
||||
group_id=group_id, channel_id=channel_id, block_task__contains=f"<{task},"
|
||||
group_id=group_id, channel_id=channel_id, block_task__contains=task
|
||||
) or await cls.exists(
|
||||
group_id=group_id,
|
||||
channel_id__isnull=True,
|
||||
superuser_block_task__contains=f"<{task},",
|
||||
superuser_block_task__contains=task,
|
||||
)
|
||||
|
||||
@classmethod
|
||||
@ -356,12 +427,20 @@ class GroupConsole(Model):
|
||||
group, _ = await cls.get_or_create(
|
||||
group_id=group_id, defaults={"platform": platform}
|
||||
)
|
||||
update_fields = []
|
||||
if is_superuser:
|
||||
if f"<{task}," not in group.superuser_block_task:
|
||||
group.superuser_block_task += f"<{task},"
|
||||
elif f"<{task}," not in group.block_task:
|
||||
group.block_task += f"<{task},"
|
||||
await group.save(update_fields=["block_task", "superuser_block_task"])
|
||||
superuser_block_task = convert_module_format(group.superuser_block_task)
|
||||
if task not in group.superuser_block_task:
|
||||
superuser_block_task.append(task)
|
||||
group.superuser_block_task = convert_module_format(superuser_block_task)
|
||||
update_fields.append("superuser_block_task")
|
||||
elif add_disable_marker(task) not in group.block_task:
|
||||
block_task = convert_module_format(group.block_task)
|
||||
block_task.append(task)
|
||||
group.block_task = convert_module_format(block_task)
|
||||
update_fields.append("block_task")
|
||||
if update_fields:
|
||||
await group.save(update_fields=update_fields)
|
||||
|
||||
@classmethod
|
||||
async def set_unblock_task(
|
||||
@ -382,14 +461,20 @@ class GroupConsole(Model):
|
||||
group, _ = await cls.get_or_create(
|
||||
group_id=group_id, defaults={"platform": platform}
|
||||
)
|
||||
update_fields = []
|
||||
if is_superuser:
|
||||
if f"<{task}," in group.superuser_block_task:
|
||||
group.superuser_block_task = group.superuser_block_task.replace(
|
||||
f"<{task},", ""
|
||||
)
|
||||
elif f"<{task}," in group.block_task:
|
||||
group.block_task = group.block_task.replace(f"<{task},", "")
|
||||
await group.save(update_fields=["block_task", "superuser_block_task"])
|
||||
superuser_block_task = convert_module_format(group.superuser_block_task)
|
||||
if task in superuser_block_task:
|
||||
superuser_block_task.remove(task)
|
||||
group.superuser_block_task = convert_module_format(superuser_block_task)
|
||||
update_fields.append("superuser_block_task")
|
||||
elif add_disable_marker(task) in group.block_task:
|
||||
block_task = convert_module_format(group.block_task)
|
||||
block_task.remove(task)
|
||||
group.block_task = convert_module_format(block_task)
|
||||
update_fields.append("block_task")
|
||||
if update_fields:
|
||||
await group.save(update_fields=update_fields)
|
||||
|
||||
@classmethod
|
||||
def _run_script(cls):
|
||||
|
||||
@ -64,8 +64,6 @@ class MessageUtils:
|
||||
if isinstance(msg, str):
|
||||
if msg.startswith("base64://"):
|
||||
message_list.append(Image(raw=BytesIO(base64.b64decode(msg[9:]))))
|
||||
elif msg.startswith("http://"):
|
||||
message_list.append(Image(url=msg))
|
||||
else:
|
||||
message_list.append(Text(msg))
|
||||
elif isinstance(msg, int | float):
|
||||
|
||||
Loading…
Reference in New Issue
Block a user