zhenxun_plugin_farm/command.py

426 lines
12 KiB
Python
Raw Normal View History

from nonebot.adapters import Event, MessageTemplate
2025-03-16 19:11:05 +08:00
from nonebot.rule import to_me
2025-04-27 17:00:58 +08:00
from nonebot_plugin_alconna import (Alconna, AlconnaMatch, AlconnaQuery, Args,
Arparma, At, Match, MultiVar, Option,
Query, Subcommand, on_alconna, store_true)
2025-03-16 19:11:05 +08:00
from nonebot_plugin_uninfo import Uninfo
from nonebot_plugin_waiter import waiter
2025-03-16 19:11:05 +08:00
from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils
2025-03-16 19:11:05 +08:00
2025-04-29 18:11:09 +08:00
from .dbService import g_pDBService
from .farm.farm import g_pFarmManager
from .farm.shop import g_pShopManager
2025-03-16 19:11:05 +08:00
async def isRegisteredByUid(uid: str) -> bool:
2025-04-29 18:11:09 +08:00
result = await g_pDBService.user.isUserExist(uid)
2025-04-29 18:11:09 +08:00
if not result:
await MessageUtils.build_message("尚未开通农场快at我发送 开通农场 开通吧").send()
return False
return True
2025-03-16 19:11:05 +08:00
diuse_register = on_alconna(
Alconna("开通农场"),
priority=5,
2025-03-16 19:11:05 +08:00
rule=to_me(),
block=True,
2025-03-16 19:11:05 +08:00
)
@diuse_register.handle()
2025-04-26 10:25:06 +08:00
async def handle_register(session: Uninfo):
2025-03-16 19:11:05 +08:00
uid = str(session.user.id)
2025-04-29 18:11:09 +08:00
user = await g_pDBService.user.getUserInfoByUid(uid)
2025-03-16 19:11:05 +08:00
if user:
2025-04-26 10:25:06 +08:00
await MessageUtils.build_message("🎉 您已经开通农场啦~").send(reply_to=True)
return
try:
# 获取原始用户名并安全处理
raw_name = str(session.user.name)
safe_name = sanitize_username(raw_name)
2025-04-27 17:00:58 +08:00
2025-04-26 10:25:06 +08:00
# 初始化用户信息
2025-04-29 18:11:09 +08:00
success = await g_pDBService.user.initUserInfoByUid(
2025-04-26 10:25:06 +08:00
uid=uid,
name=safe_name,
exp=0,
2025-04-27 17:00:58 +08:00
point=500
2025-04-26 10:25:06 +08:00
)
msg = (
2025-04-27 17:00:58 +08:00
"✅ 农场开通成功!\n💼 初始资金500农场币"
if success
2025-04-26 10:25:06 +08:00
else "⚠️ 开通失败,请稍后再试"
)
logger.info(f"用户注册 {'成功' if success else '失败'}{uid}")
2025-04-26 10:25:06 +08:00
except Exception as e:
msg = "⚠️ 系统繁忙,请稍后再试"
logger.error(f"注册异常 | UID:{uid} | 错误:{str(e)}")
2025-04-27 17:00:58 +08:00
2025-04-26 10:25:06 +08:00
await MessageUtils.build_message(msg).send(reply_to=True)
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
def sanitize_username(username: str, max_length: int = 15) -> str:
"""
安全处理用户名
功能
1. 移除首尾空白
2. 过滤危险字符
3. 转义单引号
4. 处理空值
5. 限制长度
"""
# 处理空值
if not username:
return "神秘农夫"
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
# 基础清洗
cleaned = username.strip()
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
# 允许的字符白名单(可自定义扩展)
safe_chars = {
'_', '-', '!', '@', '#', '$', '%', '^', '&', '*', '(', ')',
2025-04-27 17:00:58 +08:00
'+', '=', '.', ',', '~', '·', ' ',
2025-04-26 10:29:59 +08:00
'a','b','c','d','e','f','g','h','i','j','k','l','m',
'n','o','p','q','r','s','t','u','v','w','x','y','z',
'A','B','C','D','E','F','G','H','I','J','K','L','M',
'N','O','P','Q','R','S','T','U','V','W','X','Y','Z',
'0','1','2','3','4','5','6','7','8','9',
}
# 添加常用中文字符Unicode范围
safe_chars.update(chr(c) for c in range(0x4E00, 0x9FFF+1))
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
# 过滤危险字符
filtered = [
2025-04-27 17:00:58 +08:00
c if c in safe_chars or 0x4E00 <= ord(c) <= 0x9FFF
else ''
2025-04-26 10:29:59 +08:00
for c in cleaned
]
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
# 合并处理结果
safe_str = ''.join(filtered)
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
# 转义单引号(双重保障)
escaped = safe_str.replace("'", "''")
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
# 处理空结果
if not escaped:
return "神秘农夫"
2025-04-27 17:00:58 +08:00
2025-04-26 10:29:59 +08:00
# 长度限制
return escaped[:max_length]
diuse_farm = on_alconna(
Alconna(
"我的农场",
Option("--all", action=store_true),
Subcommand("my-point", help_text="我的农场币"),
Subcommand("seed-shop", Args["num?", int], help_text="种子商店"),
Subcommand("buy-seed", Args["name?", str]["num?", int], help_text="购买种子"),
Subcommand("my-seed", help_text="我的种子"),
Subcommand("sowing", Args["name?", str]["num?", int], help_text="播种"),
Subcommand("harvest", help_text="收获"),
Subcommand("eradicate", help_text="铲除"),
Subcommand("my-plant", help_text="我的作物"),
Subcommand("sell-plant", Args["name?", str]["num?", int], help_text="出售作物"),
Subcommand("stealing", Args["target?", At], help_text="偷菜"),
Subcommand("buy-point", Args["num?", int], help_text="购买农场币"),
#Subcommand("sell-point", Args["num?", int], help_text="转换金币")
2025-04-27 17:00:58 +08:00
Subcommand("change-name", Args["name?", str], help_text="更改农场名")
),
priority=5,
block=True,
)
@diuse_farm.assign("$main")
2025-04-27 17:00:58 +08:00
async def _(session: Uninfo):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
2025-04-27 17:00:58 +08:00
image = await g_pFarmManager.drawFarmByUid(uid)
await MessageUtils.build_message(image).send(reply_to=True)
diuse_farm.shortcut(
"我的农场币",
command="我的农场",
arguments=["my-point"],
prefix=True,
)
@diuse_farm.assign("my-point")
async def _(session: Uninfo):
uid = str(session.user.id)
2025-04-29 18:11:09 +08:00
point = await g_pDBService.user.getUserPointByUid(uid)
if point < 0:
await MessageUtils.build_message("尚未开通农场快at我发送 开通农场 开通吧").send()
return False
await MessageUtils.build_message(f"你的当前农场币为: {point}").send(reply_to=True)
diuse_farm.shortcut(
"种子商店(.*?)",
command="我的农场",
arguments=["seed-shop"],
prefix=True,
)
@diuse_farm.assign("seed-shop")
async def _(session: Uninfo, num: Query[int] = AlconnaQuery("num", 1)):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
image = await g_pShopManager.getSeedShopImage(num.result)
await MessageUtils.build_message(image).send()
diuse_farm.shortcut(
"购买种子(?P<name>.*?)",
command="我的农场",
arguments=["buy-seed", "{name}"],
prefix=True,
)
@diuse_farm.assign("buy-seed")
async def _(session: Uninfo, name: Match[str], num: Query[int] = AlconnaQuery("num", 1),):
if not name.available:
await MessageUtils.build_message(
"请在指令后跟需要购买的种子名称"
).finish(reply_to=True)
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pShopManager.buySeed(uid, name.result, num.result)
await MessageUtils.build_message(result).send(reply_to=True)
diuse_farm.shortcut(
"我的种子",
command="我的农场",
arguments=["my-seed"],
prefix=True,
)
@diuse_farm.assign("my-seed")
async def _(session: Uninfo):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pFarmManager.getUserSeedByUid(uid)
await MessageUtils.build_message(result).send(reply_to=True)
diuse_farm.shortcut(
"播种(?P<name>.*?)",
command="我的农场",
arguments=["sowing", "{name}"],
prefix=True,
)
@diuse_farm.assign("sowing")
async def _(session: Uninfo, name: Match[str], num: Query[int] = AlconnaQuery("num", -1),):
if not name.available:
await MessageUtils.build_message(
"请在指令后跟需要播种的种子名称"
).finish(reply_to=True)
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pFarmManager.sowing(uid, name.result, num.result)
await MessageUtils.build_message(result).send(reply_to=True)
diuse_farm.shortcut(
"收获",
command="我的农场",
arguments=["harvest"],
prefix=True,
)
@diuse_farm.assign("harvest")
async def _(session: Uninfo):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pFarmManager.harvest(uid)
await MessageUtils.build_message(result).send(reply_to=True)
diuse_farm.shortcut(
"铲除",
command="我的农场",
arguments=["eradicate"],
prefix=True,
)
@diuse_farm.assign("eradicate")
async def _(session: Uninfo):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pFarmManager.eradicate(uid)
await MessageUtils.build_message(result).send(reply_to=True)
diuse_farm.shortcut(
"我的作物",
command="我的农场",
arguments=["my-plant"],
prefix=True,
)
@diuse_farm.assign("my-plant")
async def _(session: Uninfo):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pFarmManager.getUserPlantByUid(uid)
await MessageUtils.build_message(result).send(reply_to=True)
reclamation = on_alconna(
Alconna("开垦"),
priority=5,
block=True,
)
@reclamation.handle()
async def _(session: Uninfo):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
condition = await g_pFarmManager.reclamationCondition(uid)
condition += "\n 回复是将执行开垦"
await MessageUtils.build_message(condition).send(reply_to=True)
@waiter(waits=["message"], keep_session=True)
async def check(event: Event):
return event.get_plaintext()
resp = await check.wait(timeout=60)
if resp is None:
await MessageUtils.build_message("等待超时").send(reply_to=True)
return
if not resp == "":
return
res = await g_pFarmManager.reclamation(uid)
await MessageUtils.build_message(res).send(reply_to=True)
diuse_farm.shortcut(
"出售作物(?P<name>.*?)",
command="我的农场",
arguments=["sell-plant", "{name}"],
prefix=True,
)
@diuse_farm.assign("sell-plant")
2025-04-27 17:00:58 +08:00
async def _(session: Uninfo, name: Match[str], num: Query[int] = AlconnaQuery("num", -1),):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pShopManager.sellPlantByUid(uid, name.result, num.result)
await MessageUtils.build_message(result).send(reply_to=True)
diuse_farm.shortcut(
"偷菜",
command="我的农场",
arguments=["stealing"],
prefix=True,
)
@diuse_farm.assign("stealing")
async def _(session: Uninfo, target: Match[At]):
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
if not target.available:
await MessageUtils.build_message("请在指令后跟需要at的人").finish(reply_to=True)
tar = target.result
2025-04-29 18:11:09 +08:00
result = await g_pDBService.user.isUserExist(tar.target)
2025-04-29 18:11:09 +08:00
if not result:
await MessageUtils.build_message("目标尚未开通农场快邀请ta开通吧").send()
return None
result = await g_pFarmManager.stealing(uid, tar.target)
await MessageUtils.build_message(result).send(reply_to=True)
diuse_farm.shortcut(
"购买农场币(.*?)",
command="我的农场",
arguments=["buy-point"],
prefix=True,
)
@diuse_farm.assign("buy-point")
async def _(session: Uninfo, num: Query[int] = AlconnaQuery("num", 0)):
if num.result <= 0:
await MessageUtils.build_message(
"请在指令后跟需要购买农场币的数量"
).finish(reply_to=True)
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
result = await g_pFarmManager.buyPointByUid(uid, num.result)
await MessageUtils.build_message(result).send(reply_to=True)
2025-04-27 17:00:58 +08:00
diuse_farm.shortcut(
"更改农场名(?P<name>)",
command="我的农场",
arguments=["change-name", "{name}"],
prefix=True,
)
@diuse_farm.assign("change-name")
async def _(session: Uninfo, name: Match[str]):
if not name.available:
await MessageUtils.build_message(
"请在指令后跟需要更改的用户名"
).finish(reply_to=True)
uid = str(session.user.id)
if await isRegisteredByUid(uid) == False:
return
safeName = sanitize_username(name.result)
2025-04-29 18:11:09 +08:00
result = await g_pDBService.user.updateUserNameByUid(uid, safeName)
2025-04-27 17:00:58 +08:00
if result == True:
await MessageUtils.build_message("更新用户名成功!").send(reply_to=True)
else:
await MessageUtils.build_message("更新用户名失败!").send(reply_to=True)