2024-12-25 12:03:49 +08:00
|
|
|
|
from typing import overload
|
|
|
|
|
|
|
2024-10-18 18:57:55 +08:00
|
|
|
|
from nonebot.adapters import Bot
|
2024-12-10 19:49:11 +08:00
|
|
|
|
from nonebot_plugin_uninfo import Session, SupportScope, Uninfo, get_interface
|
2024-10-18 18:57:55 +08:00
|
|
|
|
|
2024-09-07 16:46:56 +08:00
|
|
|
|
from zhenxun.configs.config import BotConfig
|
2024-08-29 22:01:34 +08:00
|
|
|
|
from zhenxun.models.ban_console import BanConsole
|
2024-12-10 20:16:14 +08:00
|
|
|
|
from zhenxun.models.bot_console import BotConsole
|
2024-08-29 22:01:34 +08:00
|
|
|
|
from zhenxun.models.group_console import GroupConsole
|
2024-12-10 19:49:11 +08:00
|
|
|
|
from zhenxun.models.task_info import TaskInfo
|
|
|
|
|
|
from zhenxun.services.log import logger
|
2024-08-29 22:01:34 +08:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class CommonUtils:
|
|
|
|
|
|
@classmethod
|
2024-10-18 18:57:55 +08:00
|
|
|
|
async def task_is_block(
|
|
|
|
|
|
cls, session: Uninfo | Bot, module: str, group_id: str | None = None
|
|
|
|
|
|
) -> bool:
|
2024-08-29 22:01:34 +08:00
|
|
|
|
"""判断被动技能是否可以发送
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
module: 被动技能模块名
|
|
|
|
|
|
group_id: 群组id
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
bool: 是否可以发送
|
|
|
|
|
|
"""
|
2024-10-18 18:57:55 +08:00
|
|
|
|
if isinstance(session, Bot):
|
|
|
|
|
|
if interface := get_interface(session):
|
|
|
|
|
|
info = interface.basic_info()
|
|
|
|
|
|
if info["scope"] == SupportScope.qq_api:
|
|
|
|
|
|
logger.info("q官bot放弃所有被动技能发言...")
|
|
|
|
|
|
"""q官bot放弃所有被动技能发言"""
|
|
|
|
|
|
return False
|
|
|
|
|
|
if session.scene == SupportScope.qq_api:
|
|
|
|
|
|
"""q官bot放弃所有被动技能发言"""
|
|
|
|
|
|
logger.info("q官bot放弃所有被动技能发言...")
|
|
|
|
|
|
return False
|
|
|
|
|
|
if not group_id and isinstance(session, Session):
|
|
|
|
|
|
group_id = session.group.id if session.group else None
|
2024-08-29 22:01:34 +08:00
|
|
|
|
if task := await TaskInfo.get_or_none(module=module):
|
|
|
|
|
|
"""被动全局状态"""
|
|
|
|
|
|
if not task.status:
|
|
|
|
|
|
return True
|
2024-12-10 20:16:14 +08:00
|
|
|
|
if not await BotConsole.get_bot_status(session.self_id):
|
|
|
|
|
|
"""bot是否休眠"""
|
|
|
|
|
|
return True
|
|
|
|
|
|
block_tasks = await BotConsole.get_tasks(session.self_id, False)
|
|
|
|
|
|
if module in block_tasks:
|
|
|
|
|
|
"""bot是否禁用被动"""
|
|
|
|
|
|
return True
|
2024-08-29 22:01:34 +08:00
|
|
|
|
if group_id:
|
|
|
|
|
|
if await GroupConsole.is_block_task(group_id, module):
|
|
|
|
|
|
"""群组是否禁用被动"""
|
|
|
|
|
|
return True
|
|
|
|
|
|
if g := await GroupConsole.get_or_none(
|
|
|
|
|
|
group_id=group_id, channel_id__isnull=True
|
|
|
|
|
|
):
|
|
|
|
|
|
"""群组权限是否小于0"""
|
|
|
|
|
|
if g.level < 0:
|
|
|
|
|
|
return True
|
|
|
|
|
|
if await BanConsole.is_ban(None, group_id):
|
|
|
|
|
|
"""群组是否被ban"""
|
|
|
|
|
|
return True
|
|
|
|
|
|
return False
|
2024-09-07 16:46:56 +08:00
|
|
|
|
|
2024-12-25 12:03:49 +08:00
|
|
|
|
@staticmethod
|
|
|
|
|
|
def format(name: str) -> str:
|
|
|
|
|
|
return f"<{name},"
|
|
|
|
|
|
|
|
|
|
|
|
@overload
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
def convert_module_format(cls, data: str) -> list[str]: ...
|
|
|
|
|
|
|
|
|
|
|
|
@overload
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
def convert_module_format(cls, data: list[str]) -> str: ...
|
|
|
|
|
|
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
def convert_module_format(cls, data: str | list[str]) -> str | list[str]:
|
|
|
|
|
|
"""
|
|
|
|
|
|
在 `<aaa,<bbb,<ccc,` 和 `["aaa", "bbb", "ccc"]` 之间进行相互转换。
|
|
|
|
|
|
|
|
|
|
|
|
参数:
|
|
|
|
|
|
data (str | list[str]): 输入数据,可能是格式化字符串或字符串列表。
|
|
|
|
|
|
|
|
|
|
|
|
返回:
|
|
|
|
|
|
str | list[str]: 根据输入类型返回转换后的数据。
|
|
|
|
|
|
"""
|
|
|
|
|
|
if isinstance(data, str):
|
|
|
|
|
|
return [item.strip(",") for item in data.split("<") if item]
|
|
|
|
|
|
elif isinstance(data, list):
|
|
|
|
|
|
return "".join(cls.format(item) for item in data)
|
|
|
|
|
|
|
2024-09-07 16:46:56 +08:00
|
|
|
|
|
|
|
|
|
|
class SqlUtils:
|
|
|
|
|
|
@classmethod
|
|
|
|
|
|
def random(cls, query, limit: int = 1) -> str:
|
|
|
|
|
|
db_class_name = BotConfig.get_sql_type()
|
|
|
|
|
|
if "postgres" in db_class_name or "sqlite" in db_class_name:
|
|
|
|
|
|
query = f"{query.sql()} ORDER BY RANDOM() LIMIT {limit};"
|
|
|
|
|
|
elif "mysql" in db_class_name:
|
|
|
|
|
|
query = f"{query.sql()} ORDER BY RAND() LIMIT {limit};"
|
|
|
|
|
|
else:
|
|
|
|
|
|
logger.warning(
|
|
|
|
|
|
f"Unsupported database type: {db_class_name}", query.__module__
|
|
|
|
|
|
)
|
|
|
|
|
|
return query
|