Compare commits

...

13 Commits
main ... Beta

30 changed files with 2573 additions and 1176 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
/config/sign_in.json /config/sign_in.json
/resource/*
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/

View File

@ -7,13 +7,15 @@ from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from .command import diuse_farm, diuse_register, reclamation from .command import diuse_farm, diuse_register, reclamation
from .database.database import g_pSqlManager from .core.database.database import g_pSqlManager
from .dbService import g_pDBService from .core.dbService import g_pDBService
from .core.farm import g_pFarmManager
from .core.help import g_pHelpManager
from .core.player.playerPool import g_pUserPool
from .core.shop import g_pShopManager
from .event.event import g_pEventManager from .event.event import g_pEventManager
from .farm.farm import g_pFarmManager from .utils.json import g_pJsonManager
from .farm.shop import g_pShopManager from .utils.request import g_pRequestManager
from .json import g_pJsonManager
from .request import g_pRequestManager
__plugin_meta__ = PluginMetadata( __plugin_meta__ = PluginMetadata(
name="真寻农场", name="真寻农场",
@ -85,25 +87,29 @@ driver = get_driver()
# 构造函数 # 构造函数
@driver.on_startup @driver.on_startup
async def start(): async def start():
# 初始化数据库 # 数据库加载
await g_pSqlManager.init() await g_pSqlManager.init()
# 初始化读取Json # 初始化读取Json
await g_pJsonManager.init() await g_pJsonManager.init()
# 初始化数据库变量 和 加载作物数据库
await g_pDBService.init() await g_pDBService.init()
# 检查作物文件是否缺失 or 更新 # 检查作物文件是否缺失 or 更新
await g_pRequestManager.initPlantDBFile() await g_pRequestManager.initPlantDBFile()
await g_pHelpManager.createHelpImage()
# 析构函数 # 析构函数
@driver.on_shutdown @driver.on_shutdown
async def shutdown(): async def shutdown():
await g_pSqlManager.cleanup() # 单独卸载 作物数据库
await g_pDBService.cleanup() await g_pDBService.cleanup()
await g_pSqlManager.cleanup()
@scheduler.scheduled_job(trigger="cron", hour=4, minute=30, id="signInFile") @scheduler.scheduled_job(trigger="cron", hour=4, minute=30, id="signInFile")
async def signInFile(): async def signInFile():

View File

@ -19,15 +19,17 @@ from nonebot_plugin_uninfo import Uninfo
from nonebot_plugin_waiter import waiter from nonebot_plugin_waiter import waiter
from zhenxun.configs.config import BotConfig from zhenxun.configs.config import BotConfig
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils._build_image import BuildImage
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from .config import g_bSignStatus, g_sTranslation from .core.dbService import g_pDBService
from .dbService import g_pDBService from .core.farm import g_pFarmManager
from .farm.farm import g_pFarmManager from .core.shop import g_pShopManager
from .farm.shop import g_pShopManager from .utils.config import g_bSignStatus, g_sTranslation
from .json import g_pJsonManager from .utils.json import g_pJsonManager
from .tool import g_pToolManager from .utils.tool import g_pToolManager
diuse_register = on_alconna( diuse_register = on_alconna(
Alconna("开通农场"), Alconna("开通农场"),
@ -41,10 +43,9 @@ diuse_register = on_alconna(
@diuse_register.handle() @diuse_register.handle()
async def handle_register(session: Uninfo): async def handle_register(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
user = await g_pDBService.user.getUserInfoByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if player is not None and await player.isRegistered():
if user: await MessageUtils.build_message(g_sTranslation["register"]["already"]).send(
await MessageUtils.build_message(g_sTranslation["register"]["repeat"]).send(
reply_to=True reply_to=True
) )
return return
@ -53,10 +54,9 @@ async def handle_register(session: Uninfo):
raw_name = str(session.user.name) raw_name = str(session.user.name)
safe_name = g_pToolManager.sanitize_username(raw_name) safe_name = g_pToolManager.sanitize_username(raw_name)
# 初始化用户信息 success = await g_pDBService.user.initUserInfo(uid, safe_name)
success = await g_pDBService.user.initUserInfoByUid(
uid=uid, name=safe_name, exp=0, point=500 logger.info(f"用户 {uid} 选择的农场名称为: {raw_name} | 过滤后为: {safe_name}")
)
msg = ( msg = (
g_sTranslation["register"]["success"].format(point=500) g_sTranslation["register"]["success"].format(point=500)
@ -96,6 +96,8 @@ diuse_farm = on_alconna(
Subcommand("admin-up", Args["num?", int], help_text="农场下阶段"), Subcommand("admin-up", Args["num?", int], help_text="农场下阶段"),
Subcommand("point-to-vipPoint", Args["num?", int], help_text="点券兑换"), Subcommand("point-to-vipPoint", Args["num?", int], help_text="点券兑换"),
Subcommand("my-vipPoint", help_text="我的点券"), Subcommand("my-vipPoint", help_text="我的点券"),
Subcommand("farm-help", help_text="农场帮助"),
Subcommand("vipSeed-shop", Args["res?", MultiVar(str)], help_text="种子商店"),
), ),
priority=5, priority=5,
block=True, block=True,
@ -106,8 +108,9 @@ diuse_farm = on_alconna(
@diuse_farm.assign("$main") @diuse_farm.assign("$main")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
image = await g_pFarmManager.drawFarmByUid(uid) image = await g_pFarmManager.drawFarmByUid(uid)
@ -125,8 +128,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("detail") @diuse_farm.assign("detail")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
info = await g_pFarmManager.drawDetailFarmByUid(uid) info = await g_pFarmManager.drawDetailFarmByUid(uid)
@ -147,7 +151,12 @@ diuse_farm.shortcut(
@diuse_farm.assign("my-point") @diuse_farm.assign("my-point")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
point = await g_pDBService.user.getUserPointByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return
point = player.user["point"]
if point < 0: if point < 0:
await MessageUtils.build_message(g_sTranslation["basic"]["notFarm"]).send() await MessageUtils.build_message(g_sTranslation["basic"]["notFarm"]).send()
@ -169,8 +178,10 @@ diuse_farm.shortcut(
@diuse_farm.assign("seed-shop") @diuse_farm.assign("seed-shop")
async def _(session: Uninfo, res: Match[tuple[str, ...]]): async def _(session: Uninfo, res: Match[tuple[str, ...]]):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
if res.result is inspect._empty: if res.result is inspect._empty:
@ -197,9 +208,9 @@ async def _(session: Uninfo, res: Match[tuple[str, ...]]):
page = int(raw[1]) page = int(raw[1])
if filterKey is None: if filterKey is None:
image = await g_pShopManager.getSeedShopImage(page) image = await g_pShopManager.getSeedShopImage(page, 0, 0)
else: else:
image = await g_pShopManager.getSeedShopImage(filterKey, page) image = await g_pShopManager.getSeedShopImage(filterKey, page, 0)
await MessageUtils.build_message(image).send() await MessageUtils.build_message(image).send()
@ -222,8 +233,9 @@ async def _(
) )
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pShopManager.buySeed(uid, name.result, num.result) result = await g_pShopManager.buySeed(uid, name.result, num.result)
@ -241,8 +253,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("my-seed") @diuse_farm.assign("my-seed")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.getUserSeedByUid(uid) result = await g_pFarmManager.getUserSeedByUid(uid)
@ -267,8 +280,9 @@ async def _(
) )
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.sowing(uid, name.result, num.result) result = await g_pFarmManager.sowing(uid, name.result, num.result)
@ -286,8 +300,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("harvest") @diuse_farm.assign("harvest")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.harvest(uid) result = await g_pFarmManager.harvest(uid)
@ -305,8 +320,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("eradicate") @diuse_farm.assign("eradicate")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.eradicate(uid) result = await g_pFarmManager.eradicate(uid)
@ -324,8 +340,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("my-plant") @diuse_farm.assign("my-plant")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.getUserPlantByUid(uid) result = await g_pFarmManager.getUserPlantByUid(uid)
@ -343,8 +360,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("lock-plant") @diuse_farm.assign("lock-plant")
async def _(session: Uninfo, name: Match[str]): async def _(session: Uninfo, name: Match[str]):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.lockUserPlantByUid(uid, name.result, 1) result = await g_pFarmManager.lockUserPlantByUid(uid, name.result, 1)
@ -362,8 +380,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("unlock-plant") @diuse_farm.assign("unlock-plant")
async def _(session: Uninfo, name: Match[str]): async def _(session: Uninfo, name: Match[str]):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.lockUserPlantByUid(uid, name.result, 0) result = await g_pFarmManager.lockUserPlantByUid(uid, name.result, 0)
@ -383,8 +402,9 @@ async def _(
session: Uninfo, name: Match[str], num: Query[int] = AlconnaQuery("num", -1) session: Uninfo, name: Match[str], num: Query[int] = AlconnaQuery("num", -1)
): ):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pShopManager.sellPlantByUid(uid, name.result, num.result) result = await g_pShopManager.sellPlantByUid(uid, name.result, num.result)
@ -402,8 +422,9 @@ reclamation = on_alconna(
@reclamation.handle() @reclamation.handle()
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
condition = await g_pFarmManager.reclamationCondition(uid) condition = await g_pFarmManager.reclamationCondition(uid)
@ -438,8 +459,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("stealing") @diuse_farm.assign("stealing")
async def _(session: Uninfo, target: Match[At]): async def _(session: Uninfo, target: Match[At]):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
if not target.available: if not target.available:
@ -448,7 +470,7 @@ async def _(session: Uninfo, target: Match[At]):
) )
tar = target.result tar = target.result
result = await g_pDBService.user.isUserExist(tar.target) result = await g_pDBService.user.isRegistered(tar.target)
if not result: if not result:
await MessageUtils.build_message( await MessageUtils.build_message(
@ -476,8 +498,9 @@ async def _(session: Uninfo, num: Query[int] = AlconnaQuery("num", 0)):
) )
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.buyPointByUid(uid, num.result) result = await g_pFarmManager.buyPointByUid(uid, num.result)
@ -500,28 +523,22 @@ async def _(session: Uninfo, name: Match[str]):
) )
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
safeName = g_pToolManager.sanitize_username(name.result) player = await g_pToolManager.getPlayerByUid(uid)
if player is None:
if safeName == "神秘农夫":
await MessageUtils.build_message(g_sTranslation["changeName"]["error"]).send(
reply_to=True
)
return
result = await g_pDBService.user.updateUserNameByUid(uid, safeName)
if result:
await MessageUtils.build_message(g_sTranslation["changeName"]["success"]).send(
reply_to=True
)
else:
await MessageUtils.build_message(g_sTranslation["changeName"]["error1"]).send( await MessageUtils.build_message(g_sTranslation["changeName"]["error1"]).send(
reply_to=True reply_to=True
) )
return
result = await player.updateName(name.result)
await MessageUtils.build_message(g_sTranslation["changeName"][result]).send(
reply_to=True
)
diuse_farm.shortcut( diuse_farm.shortcut(
@ -535,8 +552,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("sign-in") @diuse_farm.assign("sign-in")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
# 判断签到是否正常加载 # 判断签到是否正常加载
@ -590,8 +608,6 @@ async def _(session: Uninfo):
await MessageUtils.build_message(message).send() await MessageUtils.build_message(message).send()
# await MessageUtils.alc_forward_msg([info], session.self_id, BotConfig.self_nickname).send(reply_to=True)
soil_upgrade = on_alconna( soil_upgrade = on_alconna(
Alconna("土地升级", Args["index", int]), Alconna("土地升级", Args["index", int]),
@ -604,8 +620,9 @@ soil_upgrade = on_alconna(
@soil_upgrade.handle() @soil_upgrade.handle()
async def _(session: Uninfo, index: Query[int] = AlconnaQuery("index", 1)): async def _(session: Uninfo, index: Query[int] = AlconnaQuery("index", 1)):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
condition = await g_pFarmManager.soilUpgradeCondition(uid, index.result) condition = await g_pFarmManager.soilUpgradeCondition(uid, index.result)
@ -643,8 +660,9 @@ diuse_farm.shortcut(
@diuse_farm.assign("admin-up") @diuse_farm.assign("admin-up")
async def _(session: Uninfo, num: Query[int] = AlconnaQuery("num", 0)): async def _(session: Uninfo, num: Query[int] = AlconnaQuery("num", 0)):
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
await g_pDBService.userSoil.nextPhase(uid, num.result) await g_pDBService.userSoil.nextPhase(uid, num.result)
@ -666,8 +684,9 @@ async def _(session: Uninfo, num: Query[int] = AlconnaQuery("num", 0)):
) )
uid = str(session.user.id) uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if not await g_pToolManager.isRegisteredByUid(uid): if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return return
result = await g_pFarmManager.pointToVipPointByUid(uid, num.result) result = await g_pFarmManager.pointToVipPointByUid(uid, num.result)
@ -685,11 +704,75 @@ diuse_farm.shortcut(
@diuse_farm.assign("my-vipPoint") @diuse_farm.assign("my-vipPoint")
async def _(session: Uninfo): async def _(session: Uninfo):
uid = str(session.user.id) uid = str(session.user.id)
vipPoint = await g_pDBService.user.getUserVipPointByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if player is None or not await player.isRegistered():
if not await g_pToolManager.isRegisteredByUid(uid): await g_pToolManager.repeat()
return return
await MessageUtils.build_message( await MessageUtils.build_message(
g_sTranslation["basic"]["vipPoint"].format(vipPoint=vipPoint) g_sTranslation["basic"]["vipPoint"].format(vipPoint=player.user["vipPoint"])
).send(reply_to=True) ).send(reply_to=True)
diuse_farm.shortcut(
"农场帮助",
command="我的农场",
arguments=["farm-help"],
prefix=True,
)
@diuse_farm.assign("farm-help")
async def _(session: Uninfo):
savePath = DATA_PATH / "farm_res/html/help.png"
image = BuildImage(background=savePath)
await MessageUtils.build_message(image).send(reply_to=True)
diuse_farm.shortcut(
"点券商店(.*?)",
command="我的农场",
arguments=["vipSeed-shop"],
prefix=True,
)
@diuse_farm.assign("vipSeed-shop")
async def _(session: Uninfo, res: Match[tuple[str, ...]]):
uid = str(session.user.id)
player = await g_pToolManager.getPlayerByUid(uid)
if player is None or not await player.isRegistered():
await g_pToolManager.repeat()
return
if res.result is inspect._empty:
raw = []
else:
raw = res.result
filterKey: str | int | None = None
page: int = 1
if len(raw) >= 1 and raw[0] is not None:
first = raw[0]
if isinstance(first, str) and first.isdigit():
page = int(first)
else:
filterKey = first
if (
len(raw) >= 2
and raw[1] is not None
and isinstance(raw[1], str)
and raw[1].isdigit()
):
page = int(raw[1])
if filterKey is None:
image = await g_pShopManager.getSeedShopImage(page, 0, 1)
else:
image = await g_pShopManager.getSeedShopImage(filterKey, page, 1)
await MessageUtils.build_message(image).send()

108
core/activity/activity.py Normal file
View File

@ -0,0 +1,108 @@
class ActivityManager:
"""活动管理器"""
def __init__(self):
self.activities = {} # 活动ID -> 活动配置
self.active_activities = set() # 当前活跃的活动ID
self.effect_handlers = {} # 效果类型 -> 处理器
# 注册效果处理器
self._register_handlers()
def _register_handlers(self):
"""注册所有效果处理器"""
self.effect_handlers[EffectType.MULTIPLIER] = MultiplierHandler()
self.effect_handlers[EffectType.FIXED_BONUS] = FixedBonusHandler()
self.effect_handlers[EffectType.BUFF_APPLICATION] = BuffApplicationHandler()
self.effect_handlers[EffectType.QUEST_TRIGGER] = QuestTriggerHandler()
def load_activities_from_config(self, config_path: str):
"""从配置文件加载活动"""
with open(config_path, encoding="utf-8") as f:
config = json.load(f)
for activity_data in config["activities"]:
activity = Activity(activity_data)
self.activities[activity.id] = activity
print(f"加载活动: {activity.name}")
def update_activity_status(self):
"""更新活动状态(定时调用)"""
now = datetime.now()
for activity in self.activities.values():
is_active = activity.start_time <= now <= activity.end_time
if is_active and activity.id not in self.active_activities:
# 活动开始
self.active_activities.add(activity.id)
print(f"活动开始: {activity.name}")
elif not is_active and activity.id in self.active_activities:
# 活动结束
self.active_activities.remove(activity.id)
print(f"活动结束: {activity.name}")
def get_active_activities(self, activity_type: ActivityType = None) -> List:
"""获取当前活跃的活动"""
active_list = []
for activity_id in self.active_activities:
activity = self.activities[activity_id]
if activity_type is None or activity.activity_type == activity_type:
active_list.append(activity)
return active_list
def apply_activity_effects(
self,
activity_type: ActivityType,
player: Player,
base_value: int,
context: Dict = None,
) -> int:
"""应用活动效果"""
if context is None:
context = {}
context["base_value"] = base_value
result = base_value
# 获取同类型的所有活跃活动
active_activities = self.get_active_activities(activity_type)
for activity in active_activities:
print(f"为玩家 {player.player_id} 应用活动: {activity.name}")
for effect in activity.effects:
handler = self.effect_handlers.get(effect.type)
if handler:
try:
effect_result = handler.execute(effect.params, player, context)
if effect_result is not None:
result = effect_result
context["base_value"] = result # 更新基础值供后续效果使用
except Exception as e:
print(f"效果执行失败: {effect.type}, 错误: {e}")
return result
class Activity:
"""活动类"""
def __init__(self, data: Dict):
self.id = data["id"]
self.name = data["name"]
self.activity_type = ActivityType(data["activity_type"])
self.start_time = datetime.strptime(data["start_time"], "%Y-%m-%d %H:%M:%S")
self.end_time = datetime.strptime(data["end_time"], "%Y-%m-%d %H:%M:%S")
self.effects = [Effect(effect_data) for effect_data in data["effects"]]
class Effect:
"""效果类"""
def __init__(self, data: Dict):
self.type = EffectType(data["type"])
self.params = data.get("params", {})

95
core/activity/effect.py Normal file
View File

@ -0,0 +1,95 @@
from enum import Enum
from typing import Any
# 活动类型枚举
class ActivityType(Enum):
PLANTING = "planting" # 种植活动
HARVESTING = "harvesting" # 收获活动
FISHING = "fishing" # 钓鱼活动
COMBAT = "combat" # 战斗活动
# 效果类型枚举
class EffectType(Enum):
MULTIPLIER = "multiplier" # 倍数加成
FIXED_BONUS = "fixed_bonus" # 固定加成
BUFF_APPLICATION = "buff_application" # 施加BUFF
QUEST_TRIGGER = "quest_trigger" # 任务触发
class EffectHandler:
"""
效果处理器基类
"""
def execute(self, params: dict, uid: str, context: dict) -> Any:
raise NotImplementedError
class MultiplierHandler(EffectHandler):
"""
倍数效果处理器
"""
def execute(self, params: dict, uid: str, context: dict) -> float:
base_value = context.get("base_value", 0)
multiplier = params.get("value", 1.0)
# 检查条件
if self._check_conditions(params.get("conditions", {}), player):
result = base_value * multiplier
print(f"倍数效果: {base_value} × {multiplier} = {result}")
return result
return base_value
def _check_conditions(self, conditions: dict, player: Player) -> bool:
"""检查生效条件"""
# 等级要求
min_level = conditions.get("min_level", 0)
if player.level < min_level:
return False
# 需要特定物品
required_items = conditions.get("required_items", [])
for item in required_items:
if player.inventory.get(item, 0) <= 0:
return False
return True
class FixedBonusHandler(EffectHandler):
"""固定加成处理器"""
def execute(self, params: dict, player: Player, context: dict) -> int:
base_value = context.get("base_value", 0)
bonus = params.get("value", 0)
result = base_value + bonus
print(f"固定加成: {base_value} + {bonus} = {result}")
return result
class BuffApplicationHandler(EffectHandler):
"""BUFF应用处理器"""
def execute(self, params: dict, player: Player, context: dict) -> None:
buff_id = params["buff_id"]
duration = params.get("duration", 3600) # 默认1小时
properties = params.get("properties", {})
player.add_buff(buff_id, duration, properties)
# 立即应用BUFF效果如果有
if "immediate_effect" in params:
self._apply_immediate_effect(params["immediate_effect"], player)
class QuestTriggerHandler(EffectHandler):
"""任务触发处理器"""
def execute(self, params: dict, player: Player, context: dict) -> None:
quest_id = params["quest_id"]
print(f"为玩家 {player.player_id} 触发任务: {quest_id}")
# 这里会调用任务系统来分配任务

399
core/database/database.py Normal file
View File

@ -0,0 +1,399 @@
from contextlib import asynccontextmanager
import os
from pathlib import Path
import re
from typing import Any
import aiosqlite
from zhenxun.services.log import logger
from ...utils.config import g_sDBFilePath, g_sDBPath
class CSqlManager:
def __init__(self):
dbPath = Path(g_sDBPath)
if dbPath and not dbPath.exists():
os.makedirs(dbPath, exist_ok=True)
@classmethod
async def cleanup(cls):
if hasattr(cls, "m_pDB") and cls.m_pDB:
await cls.m_pDB.close()
@classmethod
async def init(cls) -> bool:
try:
cls.m_pDB = await aiosqlite.connect(g_sDBFilePath)
cls.m_pDB.row_factory = aiosqlite.Row
return True
except Exception as e:
logger.debug("真寻农场初始化总数据库失败", e=e)
return False
@classmethod
@asynccontextmanager
async def _transaction(cls):
await cls.m_pDB.execute("BEGIN;")
try:
yield
except:
await cls.m_pDB.execute("ROLLBACK;")
raise
else:
await cls.m_pDB.execute("COMMIT;")
@classmethod
async def getTableInfo(cls, tableName: str) -> list:
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", tableName):
raise ValueError(f"Illegal table name: {tableName}")
try:
cursor = await cls.m_pDB.execute(f'PRAGMA table_info("{tableName}")')
rows = await cursor.fetchall()
return [{"name": row[1], "type": row[2]} for row in rows]
except aiosqlite.Error:
return []
@classmethod
async def ensureTableSchema(cls, tableName: str, columns: dict) -> bool:
"""由AI生成
创建表或为已存在表添加缺失字段
返回 True 表示有变更创建或新增列False 则无操作
Args:
tableName (_type_): 表名
columns (_type_): 字典
Returns:
_type_: _description_
"""
info = await cls.getTableInfo(tableName)
existing = {col["name"]: col["type"].upper() for col in info}
desired = {k: v.upper() for k, v in columns.items() if k != "PRIMARY KEY"}
primaryKey = columns.get("PRIMARY KEY", "")
if not existing:
colsDef = ", ".join(f'"{k}" {v}' for k, v in desired.items())
if primaryKey:
colsDef += f", PRIMARY KEY {primaryKey}"
await cls.m_pDB.execute(f'CREATE TABLE "{tableName}" ({colsDef});')
return True
toAdd = [k for k in desired if k not in existing]
toRemove = [k for k in existing if k not in desired]
typeMismatch = [
k for k in desired if k in existing and existing[k] != desired[k]
]
if toAdd and not toRemove and not typeMismatch:
for col in toAdd:
await cls.m_pDB.execute(
f'ALTER TABLE "{tableName}" ADD COLUMN "{col}" {columns[col]}'
)
return True
async with cls._transaction():
tmpTable = f"{tableName}_new"
colsDef = ", ".join(f'"{k}" {v}' for k, v in desired.items())
if primaryKey:
colsDef += f", PRIMARY KEY {primaryKey}"
await cls.m_pDB.execute(f'CREATE TABLE "{tmpTable}" ({colsDef});')
commonCols = [k for k in desired if k in existing]
if commonCols:
colsStr = ", ".join(f'"{c}"' for c in commonCols)
sql = (
f'INSERT INTO "{tmpTable}" ({colsStr}) '
f"SELECT {colsStr} "
f'FROM "{tableName}";'
)
await cls.m_pDB.execute(sql)
await cls.m_pDB.execute(f'DROP TABLE "{tableName}";')
await cls.m_pDB.execute(
f'ALTER TABLE "{tmpTable}" RENAME TO "{tableName}";'
)
return True
@classmethod
async def executeDB(cls, command: str) -> bool:
"""执行自定义SQL
Args:
command (str): SQL语句
Returns:
bool: 是否执行成功
"""
if not command:
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(command)
return True
except Exception:
return False
@classmethod
async def insert(cls, tableName: str, data: dict) -> bool:
"""
插入数据
Args:
tableName: 表名
data: 要插入的数据字典键为字段名值为字段值
Returns:
bool: 是否执行成功
"""
if not data:
return False
try:
# 构建参数化查询
columns = ", ".join(f'"{k}"' for k in data.keys())
placeholders = ", ".join("?" for _ in data.keys())
values = list(data.values())
sql = f'INSERT INTO "{tableName}" ({columns}) VALUES ({placeholders})'
async with cls._transaction():
await cls.m_pDB.execute(sql, values)
return True
except Exception as e:
logger.debug("真寻农场插入数据失败!", e=e)
return False
@classmethod
async def batch_insert(cls, tableName: str, data_list: list) -> bool:
"""
批量插入数据
Args:
tableName: 表名
data_list: 要插入的数据字典列表
Returns:
bool: 是否执行成功
"""
if not data_list:
return False
try:
# 使用第一个字典的键作为所有记录的字段
columns = ", ".join(f'"{k}"' for k in data_list[0].keys())
placeholders = ", ".join("?" for _ in data_list[0].keys())
sql = f'INSERT INTO "{tableName}" ({columns}) VALUES ({placeholders})'
async with cls._transaction():
await cls.m_pDB.executemany(
sql, [list(data.values()) for data in data_list]
)
return True
except Exception as e:
logger.debug("真寻农场批量插入数据失败!", e=e)
return False
@classmethod
async def select(
cls,
tableName: str,
columns: list[Any] | None = None,
where: dict[str, Any] | None = None,
order_by: str | None = None,
limit: int | None = None,
) -> list[dict]:
"""
查询数据
Args:
tableName: 表名
columns: 要查询的字段列表None表示所有字段
where: 查询条件字典
order_by: 排序字段
limit: 限制返回记录数
Returns:
list: 查询结果列表每个元素是一个字典
"""
try:
# 构建SELECT部分
if columns:
select_clause = ", ".join(f'"{col}"' for col in columns)
else:
select_clause = "*"
sql = f'SELECT {select_clause} FROM "{tableName}"'
# 构建WHERE部分
params = []
if where:
where_conditions = []
for key, value in where.items():
if isinstance(value, (list, tuple)):
# 处理IN查询
placeholders = ", ".join("?" for _ in value)
where_conditions.append(f'"{key}" IN ({placeholders})')
params.extend(value)
else:
where_conditions.append(f'"{key}" = ?')
params.append(value)
if where_conditions:
sql += " WHERE " + " AND ".join(where_conditions)
# 构建ORDER BY部分
if order_by:
sql += f" ORDER BY {order_by}"
# 构建LIMIT部分
if limit:
sql += f" LIMIT {limit}"
cursor = await cls.m_pDB.execute(sql, params)
rows = await cursor.fetchall()
# 转换为字典列表
result = []
for row in rows:
result.append(dict(row))
return result
except Exception as e:
logger.debug("真寻农场查询数据失败!", e=e)
return []
@classmethod
async def update(cls, tableName: str, data: dict, where: dict) -> bool:
"""
更新数据
Args:
tableName: 表名
data: 要更新的数据字典
where: 更新条件字典
Returns:
bool: 是否执行成功
"""
if not data:
return False
if not where:
return False
try:
# 构建SET部分
set_conditions = []
params = []
for key, value in data.items():
set_conditions.append(f'"{key}" = ?')
params.append(value)
# 构建WHERE部分
where_conditions = []
for key, value in where.items():
where_conditions.append(f'"{key}" = ?')
params.append(value)
sql = f'UPDATE "{tableName}" SET {", ".join(set_conditions)} WHERE {" AND ".join(where_conditions)}'
async with cls._transaction():
cursor = await cls.m_pDB.execute(sql, params)
# 检查是否影响了行
return cursor.rowcount > 0
except Exception as e:
logger.debug("真寻农场更新数据失败!", e=e)
return False
@classmethod
async def delete(cls, tableName: str, where: dict) -> bool:
"""
删除数据
Args:
tableName: 表名
where: 删除条件字典
Returns:
bool: 是否执行成功
"""
if not where:
return False
try:
# 构建WHERE部分
where_conditions = []
params = []
for key, value in where.items():
where_conditions.append(f'"{key}" = ?')
params.append(value)
sql = f'DELETE FROM "{tableName}" WHERE {" AND ".join(where_conditions)}'
async with cls._transaction():
cursor = await cls.m_pDB.execute(sql, params)
# 检查是否影响了行
return cursor.rowcount > 0
except Exception as e:
logger.debug("真寻农场删除数据失败!", e=e)
return False
@classmethod
async def exists(cls, tableName: str, where: dict) -> bool:
"""
检查记录是否存在
Args:
tableName: 表名
where: 查询条件字典
Returns:
bool: 是否存在符合条件的记录
"""
try:
result = await cls.select(tableName, columns=["1"], where=where, limit=1)
return len(result) > 0
except Exception as e:
logger.debug("真寻农场检查数据失败!", e=e)
return False
@classmethod
async def count(cls, tableName: str, where: dict = {}) -> int:
"""
统计记录数量
Args:
tableName: 表名
where: 查询条件字典
Returns:
int: 记录数量
"""
try:
# 构建WHERE部分
sql = f'SELECT COUNT(*) as count FROM "{tableName}"'
params = []
if where:
where_conditions = []
for key, value in where.items():
where_conditions.append(f'"{key}" = ?')
params.append(value)
sql += " WHERE " + " AND ".join(where_conditions)
cursor = await cls.m_pDB.execute(sql, params)
row = await cursor.fetchone()
return row["count"] if row else 0
except Exception as e:
logger.debug("真寻农场统计数据失败!", e=e)
return 0
g_pSqlManager = CSqlManager()

View File

@ -6,8 +6,8 @@ import aiosqlite
from zhenxun.configs.config import Config from zhenxun.configs.config import Config
from zhenxun.services.log import logger from zhenxun.services.log import logger
from ..config import g_bIsDebug, g_sPlantPath, g_sResourcePath from ...utils.config import g_bIsDebug, g_sPlantPath, g_sResourcePath
from ..request import g_pRequestManager from ...utils.request import g_pRequestManager
class CPlantManager: class CPlantManager:
@ -17,43 +17,39 @@ class CPlantManager:
except FileExistsError: except FileExistsError:
pass pass
@classmethod async def cleanup(self):
async def cleanup(cls): if hasattr(self, "m_pDB") and self.m_pDB:
if hasattr(cls, "m_pDB") and cls.m_pDB: await self.m_pDB.close()
await cls.m_pDB.close()
@classmethod async def init(self) -> bool:
async def init(cls) -> bool:
try: try:
_ = os.path.exists(g_sPlantPath) _ = os.path.exists(g_sPlantPath)
if g_bIsDebug: if g_bIsDebug:
cls.m_pDB = await aiosqlite.connect( self.m_pDB = await aiosqlite.connect(
str(g_sPlantPath.parent / "plant-test.db") str(g_sPlantPath.parent / "plant-test.db")
) )
else: else:
cls.m_pDB = await aiosqlite.connect(str(g_sPlantPath)) self.m_pDB = await aiosqlite.connect(str(g_sPlantPath))
cls.m_pDB.row_factory = aiosqlite.Row self.m_pDB.row_factory = aiosqlite.Row
return True return True
except Exception as e: except Exception as e:
logger.warning("初始化植物数据库失败", e=e) logger.warning("初始化植物数据库失败", e=e)
return False return False
@classmethod
@asynccontextmanager @asynccontextmanager
async def _transaction(cls): async def _transaction(self):
await cls.m_pDB.execute("BEGIN;") await self.m_pDB.execute("BEGIN;")
try: try:
yield yield
except: except:
await cls.m_pDB.execute("ROLLBACK;") await self.m_pDB.execute("ROLLBACK;")
raise raise
else: else:
await cls.m_pDB.execute("COMMIT;") await self.m_pDB.execute("COMMIT;")
@classmethod async def executeDB(self, command: str) -> bool:
async def executeDB(cls, command: str) -> bool:
"""执行自定义SQL """执行自定义SQL
Args: Args:
@ -67,15 +63,14 @@ class CPlantManager:
return False return False
try: try:
async with cls._transaction(): async with self._transaction():
await cls.m_pDB.execute(command) await self.m_pDB.execute(command)
return True return True
except Exception as e: except Exception as e:
logger.warning(f"数据库语句执行出错: {command}", e=e) logger.warning(f"数据库语句执行出错: {command}", e=e)
return False return False
@classmethod async def getPlantByName(self, name: str) -> dict | None:
async def getPlantByName(cls, name: str) -> dict | None:
"""根据作物名称查询记录 """根据作物名称查询记录
Args: Args:
@ -85,7 +80,7 @@ class CPlantManager:
dict | None: 返回记录字典未找到返回None dict | None: 返回记录字典未找到返回None
""" """
try: try:
async with cls.m_pDB.execute( async with self.m_pDB.execute(
"SELECT * FROM plant WHERE name = ?", (name,) "SELECT * FROM plant WHERE name = ?", (name,)
) as cursor: ) as cursor:
row = await cursor.fetchone() row = await cursor.fetchone()
@ -94,8 +89,7 @@ class CPlantManager:
logger.warning(f"查询作物失败: {name}", e=e) logger.warning(f"查询作物失败: {name}", e=e)
return None return None
@classmethod async def getPlantPhaseByName(self, name: str) -> list[int]:
async def getPlantPhaseByName(cls, name: str) -> list[int]:
"""根据作物名称获取作物各个阶段 """根据作物名称获取作物各个阶段
Args: Args:
@ -105,7 +99,7 @@ class CPlantManager:
list: 阶段数组 list: 阶段数组
""" """
try: try:
async with cls.m_pDB.execute( async with self.m_pDB.execute(
"SELECT phase FROM plant WHERE name = ?", (name,) "SELECT phase FROM plant WHERE name = ?", (name,)
) as cursor: ) as cursor:
row = await cursor.fetchone() row = await cursor.fetchone()
@ -130,8 +124,7 @@ class CPlantManager:
logger.warning(f"查询作物阶段失败: {name}", e=e) logger.warning(f"查询作物阶段失败: {name}", e=e)
return [] return []
@classmethod async def getPlantPhaseNumberByName(self, name: str) -> int:
async def getPlantPhaseNumberByName(cls, name: str) -> int:
"""根据作物名称获取作物总阶段数 """根据作物名称获取作物总阶段数
Args: Args:
@ -141,7 +134,7 @@ class CPlantManager:
int: 总阶段数 int: 总阶段数
""" """
try: try:
async with cls.m_pDB.execute( async with self.m_pDB.execute(
"SELECT phase FROM plant WHERE name = ?", (name,) "SELECT phase FROM plant WHERE name = ?", (name,)
) as cursor: ) as cursor:
row = await cursor.fetchone() row = await cursor.fetchone()
@ -164,8 +157,7 @@ class CPlantManager:
logger.warning(f"查询作物阶段失败: {name}", e=e) logger.warning(f"查询作物阶段失败: {name}", e=e)
return -1 return -1
@classmethod async def getPlantAgainByName(self, name: str) -> int:
async def getPlantAgainByName(cls, name: str) -> int:
"""根据作物名称获取作物再次成熟时间 """根据作物名称获取作物再次成熟时间
Args: Args:
@ -176,7 +168,7 @@ class CPlantManager:
""" """
try: try:
async with cls.m_pDB.execute( async with self.m_pDB.execute(
"SELECT phase FROM plant WHERE name = ?", (name,) "SELECT phase FROM plant WHERE name = ?", (name,)
) as cursor: ) as cursor:
row = await cursor.fetchone() row = await cursor.fetchone()
@ -193,8 +185,7 @@ class CPlantManager:
logger.warning(f"查询作物阶段失败: {name}", e=e) logger.warning(f"查询作物阶段失败: {name}", e=e)
return -1 return -1
@classmethod async def existsPlant(self, name: str) -> bool:
async def existsPlant(cls, name: str) -> bool:
"""判断作物是否存在 """判断作物是否存在
Args: Args:
@ -204,7 +195,7 @@ class CPlantManager:
bool: 存在返回True否则False bool: 存在返回True否则False
""" """
try: try:
async with cls.m_pDB.execute( async with self.m_pDB.execute(
"SELECT 1 FROM plant WHERE name = ? LIMIT 1", (name,) "SELECT 1 FROM plant WHERE name = ? LIMIT 1", (name,)
) as cursor: ) as cursor:
row = await cursor.fetchone() row = await cursor.fetchone()
@ -213,8 +204,7 @@ class CPlantManager:
logger.warning(f"检查作物存在性失败: {name}", e=e) logger.warning(f"检查作物存在性失败: {name}", e=e)
return False return False
@classmethod async def countPlants(self, onlyBuy: bool = False) -> int:
async def countPlants(cls, onlyBuy: bool = False) -> int:
"""获取作物总数 """获取作物总数
Args: Args:
@ -230,18 +220,17 @@ class CPlantManager:
else: else:
sql = "SELECT COUNT(*) FROM plant" sql = "SELECT COUNT(*) FROM plant"
params: tuple = () params: tuple = ()
async with cls.m_pDB.execute(sql, params) as cursor: async with self.m_pDB.execute(sql, params) as cursor:
row = await cursor.fetchone() row = await cursor.fetchone()
return row[0] if row else 0 return row[0] if row else 0
except Exception as e: except Exception as e:
logger.warning(f"统计作物数量失败, onlyBuy={onlyBuy}", e=e) logger.warning(f"统计作物数量失败, onlyBuy={onlyBuy}", e=e)
return 0 return 0
@classmethod async def listPlants(self) -> list[dict]:
async def listPlants(cls) -> list[dict]:
"""查询所有作物记录""" """查询所有作物记录"""
try: try:
async with cls.m_pDB.execute( async with self.m_pDB.execute(
"SELECT * FROM plant ORDER BY level" "SELECT * FROM plant ORDER BY level"
) as cursor: ) as cursor:
rows = await cursor.fetchall() rows = await cursor.fetchall()
@ -250,8 +239,7 @@ class CPlantManager:
logger.warning("查询所有作物失败", e=e) logger.warning("查询所有作物失败", e=e)
return [] return []
@classmethod async def downloadPlant(self) -> bool:
async def downloadPlant(cls) -> bool:
"""遍历所有作物下载各阶段图片及icon文件到指定文件夹 """遍历所有作物下载各阶段图片及icon文件到指定文件夹
Returns: Returns:
@ -262,10 +250,10 @@ class CPlantManager:
baseUrl = baseUrl.rstrip("/") + ":8998/file" baseUrl = baseUrl.rstrip("/") + ":8998/file"
try: try:
plants = await cls.listPlants() plants = await self.listPlants()
for plant in plants: for plant in plants:
name = plant["name"] name = plant["name"]
phaseCount = await cls.getPlantPhaseNumberByName(name) phaseCount = await self.getPlantPhaseNumberByName(name)
saveDir = os.path.join(g_sResourcePath, "plant", name) saveDir = os.path.join(g_sResourcePath, "plant", name)
begin = 0 if plant["general"] == 0 else 1 begin = 0 if plant["general"] == 0 else 1

211
core/database/user.py Normal file
View File

@ -0,0 +1,211 @@
import math
from ...utils.tool import g_pToolManager
from .database import CSqlManager
class CUserDB(CSqlManager):
def __init__(self):
self.currencies: list[str] = ["point", "vipPoint"]
async def initDB(self):
userInfo = {
"uid": "TEXT PRIMARY KEY", # 用户Uid
"name": "TEXT NOT NULL", # 农场名称
"exp": "INTEGER DEFAULT 0", # 经验值
"point": "INTEGER DEFAULT 0", # 金币
"vipPoint": "INTEGER DEFAULT 0", # 点券
"soil": "INTEGER DEFAULT 3", # 解锁土地数量
"stealTime": "TEXT DEFAULT ''", # 偷菜时间字符串
"stealCount": "INTEGER DEFAULT 0", # 剩余偷菜次数
}
await self.ensureTableSchema("user", userInfo)
async def initUserInfo(self, uid: str, name: str) -> bool:
"""初始化用户信息
Args:
uid (str): 用户ID
name (str): 农场名称
Returns:
bool: 是否成功初始化用户信息
"""
nowStr = g_pToolManager.dateTime().date().today().strftime("%Y-%m-%d")
result = await self.insert(
"user",
data={
"uid": uid,
"name": name,
"exp": 0,
"point": 500,
"soil": 3,
"stealTime": nowStr,
"stealCount": 5,
},
)
return result
async def getUserInfoByUid(self, uid: str) -> dict:
"""根据用户ID获取用户信息
Args:
uid (str): 用户ID
Returns:
dict: 用户信息字典未找到返回空字典
"""
if uid == "":
return {}
records = await self.select("user", where={"uid": uid})
return records[0] if records else {}
async def isRegistered(self, uid: str) -> bool:
"""检查用户是否注册农场
Args:
uid (str): 用户ID
Returns:
bool: 是否注册农场
"""
if uid == "":
return False
return await self.exists("user", where={"uid": uid})
async def updatePoint(self, uid: str, type: str, index: int) -> bool:
"""更新货币
Args:
uid (str): 用户ID
type (str): 货币类型 point/vipPoint
index (int): 更新后的数量
Returns:
bool: 是否成功更新货币
"""
if type not in self.currencies:
return False
if index < 0:
index = 0
await self.update("user", {type: index}, {"uid": uid})
return True
async def updateExp(self, uid: str, exp: int) -> bool:
"""更新经验值
Args:
uid (str): 用户ID
exp (int): 更新后的经验值
Returns:
bool: 是否成功更新经验值
"""
if exp < 0:
exp = 0
return await self.update("user", {"exp": exp}, {"uid": uid})
async def updateName(self, uid: str, name: str) -> str:
"""更新农场名称
Args:
uid (str): 用户ID
name (str): 农场名称
Returns:
bool: 是否成功更新农场名称
"""
safeName = g_pToolManager.sanitize_username(name)
if safeName == "神秘农夫":
return "error"
if await self.update("user", {"name": safeName}, {"uid": uid}):
return "success"
return "error1"
async def getUserLevelByUid(self, uid: str) -> tuple[int, int, int]:
"""获取用户等级信息
Args:
uid (str): 用户Uid
Returns:
tuple[int, int, int]: 成功返回(当前等级, 升至下级还需经验, 当前等级已获经验)
失败返回(-1, -1, -1)
"""
if not uid:
return -1, -1, -1
records = await self.select("user", where={"uid": uid}, columns=["exp"])
if not records:
return -1, -1, -1
try:
exp = int(records[0].get("exp", 0))
except Exception:
exp = 0
levelStep = 200 # 每级经验增量
discriminant = 1 + 8 * exp / levelStep
level = int((-1 + math.sqrt(discriminant)) // 2)
if level < 0:
level = 0
def cumExp(k: int) -> int:
return levelStep * k * (k + 1) // 2
totalExpCurrentLevel = cumExp(level)
totalExpNextLevel = cumExp(level + 1)
currentExp = exp - totalExpCurrentLevel
return level, totalExpNextLevel, currentExp
async def updateStealCountByUid(
self, uid: str, stealTime: str, stealCount: int
) -> bool:
"""根据用户Uid更新剩余偷菜次数
Args:
uid (str): 用户Uid
stealTime (str): 偷菜日期
stealCount (int): 新剩余偷菜次数
Returns:
bool: 是否更新成功
"""
if not uid or stealCount < 0:
return False
return await self.update(
"user", {"stealTime": stealTime, "stealCount": stealCount}, {"uid": uid}
)
async def updateFieldByUid(self, uid: str, field: str, value) -> bool:
"""更新单字段信息
Args:
uid (str): 用户Uid
field (str): 字段名称
Returns:
bool: 是否成功更新字段
"""
if not uid or not field:
return False
return await self.update("user", {field: value}, {"uid": uid})

View File

@ -5,10 +5,10 @@ import random
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils._build_image import BuildImage from zhenxun.utils._build_image import BuildImage
from ..config import g_bIsDebug from ...utils.config import g_bIsDebug
from ...utils.json import g_pJsonManager
from ...utils.tool import g_pToolManager
from ..dbService import g_pDBService from ..dbService import g_pDBService
from ..json import g_pJsonManager
from ..tool import g_pToolManager
from .database import CSqlManager from .database import CSqlManager
@ -124,6 +124,10 @@ class CUserSignDB(CSqlManager):
bool: 0: 签到失败 1: 签到成功 2: 重复签到 bool: 0: 签到失败 1: 签到成功 2: 重复签到
""" """
try: try:
player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return 0
if not signDate: if not signDate:
signDate = g_pToolManager.dateTime().date().today().strftime("%Y-%m-%d") signDate = g_pToolManager.dateTime().date().today().strftime("%Y-%m-%d")
@ -233,17 +237,11 @@ class CUserSignDB(CSqlManager):
exp += 9999 exp += 9999
# 向数据库更新 # 向数据库更新
currentExp = await g_pDBService.user.getUserExpByUid(uid) await player.addExp(exp)
await g_pDBService.user.updateUserExpByUid(uid, currentExp + exp) await player.addPoint("point", point)
currentPoint = await g_pDBService.user.getUserPointByUid(uid)
await g_pDBService.user.updateUserPointByUid(uid, currentPoint + point)
if vipPoint > 0: if vipPoint > 0:
currentVipPoint = await g_pDBService.user.getUserVipPointByUid(uid) await player.addPoint("vipPoint", vipPoint)
await g_pDBService.user.updateUserVipPointByUid(
uid, currentVipPoint + vipPoint
)
return 1 return 1
except Exception as e: except Exception as e:

View File

@ -2,9 +2,9 @@ import math
from zhenxun.services.log import logger from zhenxun.services.log import logger
from ..config import g_bIsDebug from ...utils.config import g_bIsDebug
from ...utils.tool import g_pToolManager
from ..dbService import g_pDBService from ..dbService import g_pDBService
from ..tool import g_pToolManager
from .database import CSqlManager from .database import CSqlManager
@ -117,56 +117,6 @@ class CUserSoilDB(CSqlManager):
columns = [description[0] for description in cursor.description] columns = [description[0] for description in cursor.description]
return dict(zip(columns, row)) return dict(zip(columns, row))
@classmethod
async def migrateOldFarmData(cls) -> bool:
"""迁移旧土地数据到新表 userSoil 并删除旧表
Returns:
bool: 如果旧表不存在则返回 False否则迁移并删除后返回 True
"""
# 检查旧表是否存在
cursor = await cls.m_pDB.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='soil'"
)
if not await cursor.fetchone():
return False
async with cls._transaction():
users = await g_pDBService.user.getAllUsers()
for uid in users:
farmInfo = await cls.getUserFarmByUid(uid)
for i in range(1, 31):
key = f"soil{i}"
data = farmInfo.get(key)
if not data:
continue
if data == ",,,4,":
continue
parts = data.split(",")
if len(parts) < 3:
continue
name = parts[0]
pt = int(parts[1])
mt = int(parts[2])
await cls.m_pDB.execute(
"""
INSERT INTO userSoil
(uid,soilIndex,plantName,plantTime,matureTime,harvestCount)
VALUES (?,?,?,?,?,?)
""",
(uid, i, name, pt, mt, 0),
)
await cls.m_pDB.execute("DROP TABLE soil")
logger.info("数据库迁移完毕!")
return True
@classmethod @classmethod
async def insertUserSoil(cls, soilInfo: dict): async def insertUserSoil(cls, soilInfo: dict):
"""插入一条新的 userSoil 记录 """插入一条新的 userSoil 记录

View File

@ -44,7 +44,7 @@ class CUserStealDB(CSqlManager):
return False return False
@classmethod @classmethod
async def getStealRecordsByUid(cls, uid: str) -> list: async def getStealRecordsByUid(cls, uid: str) -> dict:
"""根据用户Uid获取所有偷菜记录 """根据用户Uid获取所有偷菜记录
Args: Args:
@ -59,20 +59,16 @@ class CUserStealDB(CSqlManager):
'SELECT soilIndex, stealerUid, stealCount, stealTime FROM "userSteal" WHERE uid=?;', 'SELECT soilIndex, stealerUid, stealCount, stealTime FROM "userSteal" WHERE uid=?;',
(uid,), (uid,),
) )
rows = await cursor.fetchall() row = await cursor.fetchone()
return [
{ if not row:
"uid": uid, return {}
"soilIndex": row[0],
"stealerUid": row[1], result = dict(row)
"stealCount": row[2], return result
"stealTime": row[3],
}
for row in rows
]
except Exception as e: except Exception as e:
logger.warning("获取偷菜记录失败", e=e) logger.warning("获取偷菜记录失败", e=e)
return [] return {}
@classmethod @classmethod
async def getStealRecord(cls, uid: str, soilIndex: int) -> list: async def getStealRecord(cls, uid: str, soilIndex: int) -> list:

40
core/dbService.py Normal file
View File

@ -0,0 +1,40 @@
class CDBService:
async def init(self):
from .database.plant import CPlantManager
from .database.user import CUserDB
from .database.userItem import CUserItemDB
from .database.userPlant import CUserPlantDB
from .database.userSeed import CUserSeedDB
from .database.userSign import CUserSignDB
from .database.userSoil import CUserSoilDB
from .database.userSteal import CUserStealDB
self.plant = CPlantManager()
await self.plant.init()
self.user = CUserDB()
await self.user.initDB()
self.userSoil = CUserSoilDB()
await self.userSoil.initDB()
self.userPlant = CUserPlantDB()
await self.userPlant.initDB()
self.userSeed = CUserSeedDB()
await self.userSeed.initDB()
self.userItem = CUserItemDB()
await self.userItem.initDB()
self.userSteal = CUserStealDB()
await self.userSteal.initDB()
self.userSign = CUserSignDB()
await self.userSign.initDB()
async def cleanup(self):
await self.plant.cleanup()
g_pDBService = CDBService()

View File

@ -9,11 +9,11 @@ from zhenxun.utils.enum import GoldHandle
from zhenxun.utils.image_utils import ImageTemplate from zhenxun.utils.image_utils import ImageTemplate
from zhenxun.utils.platform import PlatformUtils from zhenxun.utils.platform import PlatformUtils
from ..config import g_bIsDebug, g_iSoilLevelMax, g_sResourcePath, g_sTranslation from ..core.dbService import g_pDBService
from ..dbService import g_pDBService
from ..event.event import g_pEventManager from ..event.event import g_pEventManager
from ..json import g_pJsonManager from ..utils.config import g_bIsDebug, g_iSoilLevelMax, g_sResourcePath, g_sTranslation
from ..tool import g_pToolManager from ..utils.json import g_pJsonManager
from ..utils.tool import g_pToolManager
class CFarmManager: class CFarmManager:
@ -36,20 +36,26 @@ class CFarmManager:
return f"你的金币不足或不足承担手续费。当前手续费为{fee}" return f"你的金币不足或不足承担手续费。当前手续费为{fee}"
await UserConsole.reduce_gold( await UserConsole.reduce_gold(
uid, num, GoldHandle.PLUGIN, "zhenxun_plugin_farm" uid,
num,
GoldHandle.PLUGIN, # type: ignore
"zhenxun_plugin_farm",
) )
await UserConsole.reduce_gold( await UserConsole.reduce_gold(
uid, fee, GoldHandle.PLUGIN, "zhenxun_plugin_farm" uid,
) # type: ignore fee,
GoldHandle.PLUGIN, # type: ignore
"zhenxun_plugin_farm",
)
point = num * pro point = num * pro
player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return g_sTranslation["basic"]["error"]
p = await g_pDBService.user.getUserPointByUid(uid) await player.addPoint("point", int(point))
number = point + p
await g_pDBService.user.updateUserPointByUid(uid, int(number)) return f"充值{point}农场币成功,手续费{tax}金币,当前农场币:{player.user.get('point', 0)}"
return f"充值{point}农场币成功,手续费{tax}金币,当前农场币:{number}"
@classmethod @classmethod
async def drawFarmByUid(cls, uid: str) -> bytes: async def drawFarmByUid(cls, uid: str) -> bytes:
@ -69,9 +75,11 @@ class CFarmManager:
await grass.resize(0, soilSize[0], soilSize[1]) await grass.resize(0, soilSize[0], soilSize[1])
soilPos = g_pJsonManager.m_pSoil["soil"] soilPos = g_pJsonManager.m_pSoil["soil"]
player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return img.pic2bytes()
userInfo = await g_pDBService.user.getUserInfoByUid(uid) soilUnlock = int(player.user.get("soil", 3))
soilUnlock = int(userInfo["soil"])
x = 0 x = 0
y = 0 y = 0
@ -168,12 +176,12 @@ class CFarmManager:
# 用户名 # 用户名
nameImg = await BuildImage.build_text_image( nameImg = await BuildImage.build_text_image(
userInfo["name"], size=24, font_color=(77, 35, 4) player.user["name"], size=24, font_color=(77, 35, 4)
) )
await img.paste(nameImg, (300, 92)) await img.paste(nameImg, (300, 92))
# 经验值 # 经验值
level = await g_pDBService.user.getUserLevelByUid(uid) level = await player.getUserLevel()
beginX = 309 beginX = 309
endX = 627 endX = 627
@ -194,7 +202,7 @@ class CFarmManager:
# 金币 # 金币
pointImg = await BuildImage.build_text_image( pointImg = await BuildImage.build_text_image(
str(userInfo["point"]), size=24, font_color=(253, 253, 253) str(player.user["point"]), size=24, font_color=(253, 253, 253)
) )
await img.paste(pointImg, (330, 255)) await img.paste(pointImg, (330, 255))
@ -220,8 +228,10 @@ class CFarmManager:
@classmethod @classmethod
async def drawDetailFarmByUid(cls, uid: str) -> list: async def drawDetailFarmByUid(cls, uid: str) -> list:
info = [] info = []
farm = await cls.drawFarmByUid(uid) farm = await cls.drawFarmByUid(uid)
player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return info
info.append(BuildImage.open(farm)) info.append(BuildImage.open(farm))
@ -238,7 +248,7 @@ class CFarmManager:
] ]
icon = "" icon = ""
soilNumber = await g_pDBService.user.getUserSoilByUid(uid) soilNumber = player.user.get("soil", 3)
for i in range(1, soilNumber + 1): for i in range(1, soilNumber + 1):
soilInfo = await g_pDBService.userSoil.getUserSoil(uid, i) soilInfo = await g_pDBService.userSoil.getUserSoil(uid, i)
@ -490,7 +500,10 @@ class CFarmManager:
return g_sTranslation["sowing"]["noNum"].format(name=name, num=count) return g_sTranslation["sowing"]["noNum"].format(name=name, num=count)
# 获取用户土地数量 # 获取用户土地数量
soilNumber = await g_pDBService.user.getUserSoilByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return g_sTranslation["basic"]["error"]
soilNumber = player.user.get("soil", 3)
# 如果播种数量为 -1表示播种所有可播种的土地 # 如果播种数量为 -1表示播种所有可播种的土地
if num == -1: if num == -1:
@ -546,7 +559,10 @@ class CFarmManager:
try: try:
await g_pEventManager.m_beforeHarvest.emit(uid=uid) # type: ignore await g_pEventManager.m_beforeHarvest.emit(uid=uid) # type: ignore
soilNumber = await g_pDBService.user.getUserSoilByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return g_sTranslation["basic"]["error"]
soilNumber = player.user.get("soil", 3)
harvestRecords = [] # 收获日志记录 harvestRecords = [] # 收获日志记录
experience = 0 # 总经验值 experience = 0 # 总经验值
@ -644,8 +660,8 @@ class CFarmManager:
) )
if experience > 0: if experience > 0:
exp = await g_pDBService.user.getUserExpByUid(uid) exp = player.user.get("exp", 0)
await g_pDBService.user.updateUserExpByUid(uid, exp + experience) await player.addExp(exp + experience)
harvestRecords.append( harvestRecords.append(
g_sTranslation["harvest"]["exp"].format( g_sTranslation["harvest"]["exp"].format(
exp=experience, exp=experience,
@ -671,7 +687,10 @@ class CFarmManager:
Returns: Returns:
str: 返回 str: 返回
""" """
soilNumber = await g_pDBService.user.getUserSoilByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return g_sTranslation["basic"]["error"]
soilNumber = player.user.get("soil", 3)
await g_pEventManager.m_beforeEradicate.emit(uid=uid) # type: ignore await g_pEventManager.m_beforeEradicate.emit(uid=uid) # type: ignore
@ -715,8 +734,8 @@ class CFarmManager:
await g_pEventManager.m_afterEradicate.emit(uid=uid, soilIndex=i) # type: ignore await g_pEventManager.m_afterEradicate.emit(uid=uid, soilIndex=i) # type: ignore
if experience > 0: if experience > 0:
exp = await g_pDBService.user.getUserExpByUid(uid) exp = player.user.get("exp", 0)
await g_pDBService.user.updateUserExpByUid(uid, exp + experience) await player.addExp(exp + experience)
return g_sTranslation["eradicate"]["success"].format(exp=experience) return g_sTranslation["eradicate"]["success"].format(exp=experience)
else: else:
@ -828,10 +847,12 @@ class CFarmManager:
str: 返回 str: 返回
""" """
# 用户信息 # 用户信息
userInfo = await g_pDBService.user.getUserInfoByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return g_sTranslation["basic"]["error"]
stealTime = userInfo.get("stealTime", "") stealTime = player.user.get("stealTime", "")
stealCount = int(userInfo["stealCount"]) stealCount = int(player.user["stealCount"])
if stealTime == "" or not stealTime: if stealTime == "" or not stealTime:
stealTime = g_pToolManager.dateTime().date().today().strftime("%Y-%m-%d") stealTime = g_pToolManager.dateTime().date().today().strftime("%Y-%m-%d")
@ -847,7 +868,7 @@ class CFarmManager:
return g_sTranslation["stealing"]["max"] return g_sTranslation["stealing"]["max"]
# 获取用户解锁地块数量 # 获取用户解锁地块数量
soilNumber = await g_pDBService.user.getUserSoilByUid(target) soilNumber = player.user.get("soil", 3)
harvestRecords: list[str] = [] harvestRecords: list[str] = []
isStealingNumber = 0 isStealingNumber = 0
isStealingPlant = 0 isStealingPlant = 0
@ -952,7 +973,7 @@ class CFarmManager:
else: else:
stealCount -= 1 stealCount -= 1
await g_pDBService.user.updateStealCountByUid(uid, stealTime, stealCount) await player.updateStealCountByUid(uid, stealTime, stealCount)
return "\n".join(harvestRecords) return "\n".join(harvestRecords)
@ -966,14 +987,16 @@ class CFarmManager:
Returns: Returns:
str: 返回条件文本信息 str: 返回条件文本信息
""" """
userInfo = await g_pDBService.user.getUserInfoByUid(uid)
rec = g_pJsonManager.m_pLevel["reclamation"] rec = g_pJsonManager.m_pLevel["reclamation"]
player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return g_sTranslation["basic"]["error"]
try: try:
if userInfo["soil"] >= 30: if player.user["soil"] >= 30:
return g_sTranslation["reclamation"]["perfect"] return g_sTranslation["reclamation"]["perfect"]
rec = rec[f"{userInfo['soil'] + 1}"] rec = rec[f"{player.user['soil'] + 1}"]
level = rec["level"] level = rec["level"]
point = rec["point"] point = rec["point"]
@ -1003,16 +1026,18 @@ class CFarmManager:
Returns: Returns:
str: _description_ str: _description_
""" """
userInfo = await g_pDBService.user.getUserInfoByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
level = await g_pDBService.user.getUserLevelByUid(uid) if not player:
return g_sTranslation["basic"]["error"]
level = await player.getUserLevel()
rec = g_pJsonManager.m_pLevel["reclamation"] rec = g_pJsonManager.m_pLevel["reclamation"]
try: try:
if userInfo["soil"] >= 30: if player.user["soil"] >= 30:
return g_sTranslation["reclamation"]["perfect"] return g_sTranslation["reclamation"]["perfect"]
rec = rec[f"{userInfo['soil'] + 1}"] rec = rec[f"{player.user['soil'] + 1}"]
levelFileter = rec["level"] levelFileter = rec["level"]
point = rec["point"] point = rec["point"]
@ -1023,12 +1048,12 @@ class CFarmManager:
level=level[0], next=levelFileter level=level[0], next=levelFileter
) )
if userInfo["point"] < point: if player.user["point"] < point:
return g_sTranslation["reclamation"]["noNum"].format(num=point) return g_sTranslation["reclamation"]["noNum"].format(num=point)
# TODO 缺少判断消耗的item # TODO 缺少判断消耗的item
await g_pDBService.user.updateUserPointByUid(uid, userInfo["point"] - point) await player.subPoint("point", point)
await g_pDBService.user.updateUserSoilByUid(uid, userInfo["soil"] + 1) await player.updateField("soil", player.user["soil"] + 1)
return g_sTranslation["reclamation"]["success"] return g_sTranslation["reclamation"]["success"]
except Exception: except Exception:
@ -1094,9 +1119,11 @@ class CFarmManager:
Returns: Returns:
str: str:
""" """
userInfo = await g_pDBService.user.getUserInfoByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
soilInfo = await g_pDBService.userSoil.getUserSoil(uid, soilIndex) if not player:
return g_sTranslation["basic"]["error"]
soilInfo = await g_pDBService.userSoil.getUserSoil(uid, soilIndex)
if not soilInfo: if not soilInfo:
return g_sTranslation["soilInfo"]["error"] return g_sTranslation["soilInfo"]["error"]
@ -1110,9 +1137,9 @@ class CFarmManager:
fileter = g_pJsonManager.m_pSoil["upgrade"][soilLevelText][countSoil] fileter = g_pJsonManager.m_pSoil["upgrade"][soilLevelText][countSoil]
getters = { getters = {
"level": (await g_pDBService.user.getUserLevelByUid(uid))[0], "level": (await player.getUserLevel())[0],
"point": userInfo.get("point", 0), "point": player.user.get("point", 0),
"vipPoint": userInfo.get("vipPoint", 0), "vipPoint": player.user.get("vipPoint", 0),
} }
requirements = { requirements = {
@ -1137,11 +1164,8 @@ class CFarmManager:
await g_pDBService.userSoil.matureNow(uid, soilIndex) await g_pDBService.userSoil.matureNow(uid, soilIndex)
# 更新数据库字段 # 更新数据库字段
point = userInfo.get("point", 0) - fileter.get("point", 0) await player.subPoint("point", fileter.get("point", 0))
await g_pDBService.user.updateUserPointByUid(uid, point) await player.subPoint("vipPoint", fileter.get("vipPoint", 0))
vipPoint = userInfo.get("vipPoint", 0) - fileter.get("vipPoint", 0)
await g_pDBService.user.updateUserVipPointByUid(uid, vipPoint)
return g_sTranslation["soilInfo"]["success"].format( return g_sTranslation["soilInfo"]["success"].format(
name=await g_pDBService.userSoil.getSoilLevelText(soilLevel), name=await g_pDBService.userSoil.getSoilLevelText(soilLevel),
@ -1151,11 +1175,11 @@ class CFarmManager:
@classmethod @classmethod
async def pointToVipPointByUid(cls, uid: str, num: int) -> str: async def pointToVipPointByUid(cls, uid: str, num: int) -> str:
"""点券兑换 """点券兑换
num:用户传参,即将兑换的点券
pro:兑换倍数;兑换倍数乘以num即为需要消耗的农场币
Args: Args:
uid (str): 用户Uid uid (str): 用户Uid
num (int): 兑换点券数量 num (int): 兑换点券数量
Returns: Returns:
str: 返回结果 str: 返回结果
兑换比例在配置文件中配置 兑换比例在配置文件中配置
@ -1173,12 +1197,14 @@ class CFarmManager:
pro = int(Config.get_config("zhenxun_plugin_farm", "点券兑换倍数")) pro = int(Config.get_config("zhenxun_plugin_farm", "点券兑换倍数"))
pro *= num pro *= num
point = await g_pDBService.user.getUserPointByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
if not player:
return g_sTranslation["basic"]["error"]
point = player.user.get("point", 0)
if point < pro: if point < pro:
return f"你的农场币不足,当前农场币为{point},兑换还需要{pro - point}农场币" return f"你的农场币不足,当前农场币为{point},兑换还需要{pro - point}农场币"
p = await g_pDBService.user.getUserVipPointByUid(uid)
giftPoints: int giftPoints: int
if num < 2000: if num < 2000:
giftPoints = 0 giftPoints = 0
@ -1189,11 +1215,9 @@ class CFarmManager:
else: else:
giftPoints = 3000 giftPoints = 3000
number = num + p + giftPoints number = num + giftPoints
await g_pDBService.user.updateUserVipPointByUid(uid, int(number)) await player.addPoint("vipPoint", number)
await player.subPoint("point", pro)
point -= pro
await g_pDBService.user.updateUserPointByUid(uid, int(point))
return f"兑换{num}点券成功,当前点券:{number},赠送点券:{giftPoints},当前农场币:{point}" return f"兑换{num}点券成功,当前点券:{number},赠送点券:{giftPoints},当前农场币:{point}"

123
core/help.py Normal file
View File

@ -0,0 +1,123 @@
from pathlib import Path
from jinja2 import Template
from playwright.async_api import async_playwright
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger
from ..utils.config import g_sResourcePath
class CHelpManager:
@classmethod
def renderHtmlToFile(
cls, path: Path | str, context: dict, output: Path | str
) -> None:
"""
使用 Jinja2 渲染 HTML 模板并保存到指定文件会自动创建父目录
Args:
path (str): 模板 HTML 路径
context (dict): 用于渲染的上下文字典
output (str): 输出 HTML 文件路径
"""
templatePath = str(path)
outputPath = str(output)
templateStr = Path(templatePath).read_text(encoding="utf-8")
template = Template(templateStr)
rendered = template.render(**context)
# 自动创建目录
Path(outputPath).parent.mkdir(parents=True, exist_ok=True)
Path(outputPath).write_text(rendered, encoding="utf-8")
@classmethod
async def screenshotHtmlToBytes(cls, path: str) -> bytes:
"""
使用 Playwright 截图本地 HTML 文件并返回 PNG 图片字节数据
Args:
path (str): 本地 HTML 文件路径
Returns:
bytes: PNG 图片的原始字节内容
"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page(
viewport={"width": 1200, "height": 900}, device_scale_factor=1
)
file_url = Path(path).resolve().as_uri()
await page.goto(file_url, wait_until="networkidle")
await page.evaluate("""() => {
return new Promise(r => setTimeout(r, 200));
}""")
image_bytes = await page.screenshot(full_page=True)
await browser.close()
return image_bytes
@classmethod
async def screenshotSave(
cls, path: str, save: str, width: int, height: int
) -> None:
"""
使用 Playwright 渲染本地 HTML 并将截图保存到指定路径
Args:
path (str): HTML 文件路径
save (str): PNG 保存路径 output/image.png
width (int): 图片宽度
height (int): 图片高度
"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page(
viewport={"width": width, "height": height}, device_scale_factor=1
)
file_url = Path(path).resolve().as_uri()
await page.goto(file_url, wait_until="networkidle")
await page.evaluate("""() => {
return new Promise(r => setTimeout(r, 200));
}""")
# 确保保存目录存在
Path(save).parent.mkdir(parents=True, exist_ok=True)
# 截图并保存到本地文件
await page.screenshot(path=save, full_page=True)
await browser.close()
@classmethod
async def createHelpImage(cls) -> bool:
templatePath = g_sResourcePath / "html/help.html"
outputPath = g_sResourcePath / "temp_html/help.html"
savePath = DATA_PATH / "farm_res/html/help.png"
context = {
"main_title": "真寻农场帮助菜单",
"subtitle": "[]中为可选参数",
"page_title": "真寻农场帮助菜单",
"font_family": "MyFont",
"contents": [
{"title": "主要指令", "commands": ["指令A", "指令B"]},
{"title": "B", "commands": ["指令D", "指令E", "指令M", "指令i"]},
],
}
try:
cls.renderHtmlToFile(templatePath, context, outputPath)
bytes = await cls.screenshotSave(str(outputPath), str(savePath), 1500, 2300)
except Exception as e:
logger.warning("绘制农场帮助菜单失败", e=e)
return False
return True
g_pHelpManager = CHelpManager()

219
core/player/player.py Normal file
View File

@ -0,0 +1,219 @@
from ..dbService import g_pDBService
class CPlayer:
def __init__(self):
self.user = {
"uid": "", # 用户Uid
"name": "", # 农场名称
"exp": 0, # 经验值
"point": 0, # 金币
"vipPoint": 0, # 点券
"soil": 3, # 解锁土地数量
"stealTime": "", # 偷菜时间字符串
"stealCount": 0, # 剩余偷菜次数
}
async def init(self, uid: str) -> bool:
self.user["uid"] = uid
return await self.loadFormDB()
async def loadFormDB(self) -> bool:
uid = self.user.get("uid", "")
if uid == "":
return False
self.user = await g_pDBService.user.getUserInfoByUid(uid)
return True
async def isRegistered(self) -> bool:
"""检查用户是否注册农场
Returns:
bool: 是否注册农场
"""
uid = self.user.get("uid", "")
if uid == "":
return False
return await g_pDBService.user.isRegistered(uid)
async def addPoint(self, type: str, index: int) -> bool:
"""增加货币
Args:
type (str): 货币类型 point/vipPoint
index (int): 增加的数量
Returns:
bool: 是否成功增加货币
"""
uid = self.user.get("uid", "")
if uid == "" or type not in g_pDBService.user.currencies:
return False
if index == 0:
return True
nowIndex = self.user.get(type, 0) + index
if nowIndex < 0:
nowIndex = 0
if await g_pDBService.user.updatePoint(uid, type, nowIndex):
self.user[type] = nowIndex
return True
return False
async def subPoint(self, type: str, index: int) -> bool:
"""减少货币
Args:
type (str): 货币类型 point/vipPoint
index (int): 减少的数量
Returns:
bool: 是否成功减少货币
"""
uid = self.user.get("uid", "")
if uid == "" or type not in g_pDBService.user.currencies:
return False
if index == 0:
return True
nowIndex = self.user.get(type, 0) - index
if nowIndex < 0:
nowIndex = 0
if await g_pDBService.user.updatePoint(uid, type, nowIndex):
self.user[type] = nowIndex
return True
return False
async def addExp(self, exp: int) -> bool:
"""增加经验值
Args:
exp (int): 增加的经验值
Returns:
bool: 是否成功增加经验值
"""
uid = self.user.get("uid", "")
if uid == "":
return False
if exp == 0:
return True
nowExp = self.user.get("exp", 0) + exp
if nowExp < 0:
nowExp = 0
if await g_pDBService.user.updateExp(uid, nowExp):
self.user["exp"] = nowExp
return True
return False
async def subExp(self, exp: int) -> bool:
"""减少经验值
Args:
exp (int): 减少的经验值
Returns:
bool: 是否成功减少经验值
"""
uid = self.user.get("uid", "")
if uid == "":
return False
if exp == 0:
return True
nowExp = self.user.get("exp", 0) - exp
if nowExp < 0:
nowExp = 0
if await g_pDBService.user.updateExp(uid, nowExp):
self.user["exp"] = nowExp
return True
return False
async def updateName(self, name: str) -> str:
"""更新农场名称
Args:
name (str): 农场名称
Returns:
str: success/error/error1
"""
uid = self.user.get("uid", "")
if uid == "":
return "error"
return await g_pDBService.user.updateName(uid, name)
async def getUserLevel(self) -> tuple[int, int, int]:
"""获取用户等级信息
Returns:
tuple[int, int, int]: 成功返回(当前等级, 升至下级还需经验, 当前等级已获经验)
失败返回(-1, -1, -1)
"""
uid = self.user.get("uid", "")
if uid == "":
return -1, -1, -1
return await g_pDBService.user.getUserLevelByUid(uid)
async def updateStealCountByUid(
self, uid: str, stealTime: str, stealCount: int
) -> bool:
"""根据用户Uid更新剩余偷菜次数
Args:
uid (str): 用户Uid
stealTime (str): 偷菜日期
stealCount (int): 新剩余偷菜次数
Returns:
bool: 是否更新成功
"""
uid = self.user.get("uid", "")
if uid == "":
return False
return await g_pDBService.user.updateStealCountByUid(uid, stealTime, stealCount)
async def updateField(self, field: str, value) -> bool:
"""更新单字段信息
Returns:
bool: 是否成功更新单字段信息
"""
uid = self.user.get("uid", "")
if uid == "":
return False
return await g_pDBService.user.updateFieldByUid(uid, field, value)

216
core/player/playerPool.py Normal file
View File

@ -0,0 +1,216 @@
import threading
import time
from typing import Any
from zhenxun.services.log import logger
class CPlayerPool:
"""
用户池管理类
管理用户对象的生命周期支持自动清理超时用户
"""
def __init__(self, timeoutSeconds: int = 300, cleanupInterval: int = 3600):
"""
初始化用户池
Args:
timeoutSeconds: 用户超时时间默认5分钟
cleanupInterval: 清理间隔默认1小时
"""
self._players: dict[str, dict[str, Any]] = {}
self._lock = threading.RLock()
self.timeoutSeconds = timeoutSeconds
self.cleanupInterval = cleanupInterval
# 启动后台清理线程
self._cleanupThread = threading.Thread(target=self._cleanupWorker, daemon=True)
self._running = True
self._cleanupThread.start()
logger.debug(
f"用户池初始化完成,超时时间: {timeoutSeconds}秒, 清理间隔: {cleanupInterval}"
)
def createUser(self, uid: str, userObj: Any) -> bool:
"""
创建并管理用户对象
Args:
uid: 用户ID
userObj: 用户对象
Returns:
bool: 是否创建成功
"""
with self._lock:
if uid in self._players:
logger.debug(f"用户 {uid} 已存在,正在覆盖")
# 可以选择返回False或者覆盖这里选择覆盖
# return False
self._players[uid] = {
"object": userObj,
"lastActive": time.time(),
"activeCount": 0,
}
logger.debug(f"用户 {uid} 创建并开始管理")
return True
def getUser(self, uid: str) -> Any | None:
"""
获取用户对象并刷新活跃时间
Args:
uid: 用户ID
Returns:
Optional[Any]: 用户对象如果不存在或已超时则返回None
"""
with self._lock:
userData = self._players.get(uid)
if not userData:
logger.debug(f"用户 {uid} 不存在")
return None
# 检查是否已超时(防御性检查)
currentTime = time.time()
if currentTime - userData["lastActive"] > self.timeoutSeconds:
logger.debug(f"用户 {uid} 在获取操作期间已超时")
self._removeUser(uid)
return None
# 刷新活跃时间
userData["lastActive"] = currentTime
userData["activeCount"] += 1
logger.debug(f"用户 {uid} 获取成功,活跃次数: {userData['activeCount']}")
return userData["object"]
def updateUser(self, uid: str, userObj: Any) -> bool:
"""
更新用户对象
Args:
uid: 用户ID
userObj: 新的用户对象
Returns:
bool: 是否更新成功
"""
with self._lock:
if uid not in self._players:
logger.debug(f"用户 {uid} 不存在,无法更新")
return False
self._players[uid]["object"] = userObj
self._players[uid]["lastActive"] = time.time()
logger.debug(f"用户 {uid} 更新成功")
return True
def removeUser(self, uid: str) -> bool:
"""
主动移除用户
Args:
uid: 用户ID
Returns:
bool: 是否移除成功
"""
with self._lock:
return self._removeUser(uid)
def _removeUser(self, uid: str) -> bool:
"""内部移除用户方法"""
if uid in self._players:
userData = self._players.pop(uid)
# 如果需要清理资源,可以在这里处理
if hasattr(userData["object"], "close"):
try:
userData["object"].close()
except Exception as e:
logger.debug(f"关闭用户 {uid} 时出错: {e}")
logger.debug(f"用户 {uid} 已移除,总活跃次数: {userData['activeCount']}")
return True
return False
def _cleanupWorker(self):
"""后台清理线程的工作函数"""
while self._running:
try:
self._cleanupExpiredUsers()
except Exception as e:
logger.debug(f"清理工作线程出错: {e}")
# 休眠指定间隔
time.sleep(self.cleanupInterval)
def _cleanupExpiredUsers(self):
"""清理超时用户"""
currentTime = time.time()
expiredUsers = []
# 首先收集过期的用户ID避免在迭代中修改字典
with self._lock:
for uid, userData in self._players.items():
if currentTime - userData["lastActive"] > self.timeoutSeconds:
expiredUsers.append(uid)
# 移除过期用户
for uid in expiredUsers:
with self._lock:
# 再次检查,防止在收集和移除之间用户被更新
if (
uid in self._players
and currentTime - self._players[uid]["lastActive"]
> self.timeoutSeconds
):
self._removeUser(uid)
if expiredUsers:
logger.debug(f"已清理 {len(expiredUsers)} 个过期用户: {expiredUsers}")
def getActiveUsers(self) -> dict[str, dict[str, Any]]:
"""
获取当前活跃用户信息
Returns:
Dict: 用户信息字典
"""
with self._lock:
# 返回副本避免外部修改
return {
uid: {
"lastActive": data["lastActive"],
"activeCount": data["activeCount"],
"timeRemaining": self.timeoutSeconds
- (time.time() - data["lastActive"]),
}
for uid, data in self._players.items()
}
def userCount(self) -> int:
"""获取当前用户数量"""
with self._lock:
return len(self._players)
def shutdown(self):
"""关闭用户池,清理资源"""
self._running = False
if self._cleanupThread.is_alive():
self._cleanupThread.join(timeout=5)
# 清理所有用户
with self._lock:
uids = list(self._players.keys())
for uid in uids:
self._removeUser(uid)
logger.debug("用户池关闭完成")
g_pUserPool = CPlayerPool()

View File

@ -2,13 +2,16 @@ import math
from zhenxun.utils.image_utils import ImageTemplate from zhenxun.utils.image_utils import ImageTemplate
from ..config import g_sResourcePath, g_sTranslation from ..core.dbService import g_pDBService
from ..dbService import g_pDBService from ..utils.config import g_sResourcePath, g_sTranslation
from ..utils.tool import g_pToolManager
class CShopManager: class CShopManager:
@classmethod @classmethod
async def getSeedShopImage(cls, filterKey: str | int = 1, num: int = 1) -> bytes: async def getSeedShopImage(
cls, filterKey: str | int = 1, num: int = 1, isVip: int = 0
) -> bytes:
"""获取商店页面 """获取商店页面
Args: Args:
@ -33,7 +36,6 @@ class CShopManager:
"-", "-",
"种子名称", "种子名称",
"农场币", "农场币",
"点券",
"解锁等级", "解锁等级",
"果实单价", "果实单价",
"收获经验", "收获经验",
@ -46,7 +48,26 @@ class CShopManager:
# 查询所有可购买作物,并根据筛选关键字过滤 # 查询所有可购买作物,并根据筛选关键字过滤
plants = await g_pDBService.plant.listPlants() plants = await g_pDBService.plant.listPlants()
filteredPlants = [] filteredPlants = []
# 如果是点券商店
if isVip:
columnName[2] = "点券"
for plant in plants: for plant in plants:
# 只留下点券购买的种子
if plant["isVip"] == 0:
continue
# 跳过未解锁购买的种子
if plant["isBuy"] == 0:
continue
# 字符串筛选
if filterStr and filterStr not in plant["name"]:
continue
filteredPlants.append(plant)
else:
for plant in plants:
# 只留下农场币购买的种子
if plant["isVip"] == 1:
continue
# 跳过未解锁购买的种子 # 跳过未解锁购买的种子
if plant["isBuy"] == 0: if plant["isBuy"] == 0:
continue continue
@ -78,7 +99,6 @@ class CShopManager:
icon, icon,
plant["name"], # 种子名称 plant["name"], # 种子名称
plant["buy"], # 农场币种子单价 plant["buy"], # 农场币种子单价
plant["vipBuy"], # 点券种子单价
plant["level"], # 解锁等级 plant["level"], # 解锁等级
plant["price"], # 果实单价 plant["price"], # 果实单价
plant["experience"], # 收获经验 plant["experience"], # 收获经验
@ -88,6 +108,8 @@ class CShopManager:
sell, # 是否可上架交易行 sell, # 是否可上架交易行
] ]
) )
if isVip:
dataList[-1][2] = plant["vipBuy"] # 点券种子单价
# 页码标题 # 页码标题
title = f"种子商店 页数: {page}/{pageCount}" title = f"种子商店 页数: {page}/{pageCount}"
@ -113,47 +135,38 @@ class CShopManager:
Returns: Returns:
str: str:
""" """
if num <= 0: if num <= 0:
return g_sTranslation["buySeed"]["notNum"] return g_sTranslation["buySeed"]["notNum"]
player = await g_pToolManager.getPlayerByUid(uid)
plantInfo = await g_pDBService.plant.getPlantByName(name) plantInfo = await g_pDBService.plant.getPlantByName(name)
if not plantInfo: if not plantInfo or not player:
return g_sTranslation["buySeed"]["error"] return g_sTranslation["buySeed"]["error"]
level = await g_pDBService.user.getUserLevelByUid(uid) level = player.user.get("level", 0)
if level[0] < int(plantInfo["level"]): if level < int(plantInfo["level"]):
return g_sTranslation["buySeed"]["noLevel"] return g_sTranslation["buySeed"]["noLevel"]
""" vipSeed = plantInfo.get("isVip", 0) == 1
logger.debug( currencyType = "vipPoint" if vipSeed else "point"
f"用户:{uid}购买{name},数量为{num}。用户农场币为{point},购买需要{total}" price = int(plantInfo["vipBuy" if vipSeed else "buy"])
) totalCost = price * num
"""
if plantInfo["isVip"] == 1: currentCurrency = player.user.get(currencyType, 0)
vipPoint = await g_pDBService.user.getUserVipPointByUid(uid) if currentCurrency < totalCost:
total = int(plantInfo["vipBuy"]) * num return g_sTranslation["buySeed"][f"no{'Vip' if vipSeed else ''}Point"]
if vipPoint < total:
return g_sTranslation["buySeed"]["noVipPoint"] await player.addPoint(currencyType, currentCurrency - totalCost)
await g_pDBService.user.updateUserVipPointByUid(uid, vipPoint - total)
else:
point = await g_pDBService.user.getUserPointByUid(uid)
total = int(plantInfo["buy"]) * num
if point < total:
return g_sTranslation["buySeed"]["noPoint"]
await g_pDBService.user.updateUserPointByUid(uid, point - total)
if not await g_pDBService.userSeed.addUserSeedByUid(uid, name, num): if not await g_pDBService.userSeed.addUserSeedByUid(uid, name, num):
return g_sTranslation["buySeed"]["errorSql"] return g_sTranslation["buySeed"]["errorSql"]
if plantInfo["isVip"] == 1: success_key = "vipSuccess" if vipSeed else "success"
return g_sTranslation["buySeed"]["vipSuccess"].format( remaining_currency = currentCurrency - totalCost
name=name, total=total, point=vipPoint - total
) return g_sTranslation["buySeed"][success_key].format(
else: name=name, total=totalCost, point=remaining_currency
return g_sTranslation["buySeed"]["success"].format(
name=name, total=total, point=point - total
) )
@classmethod @classmethod
@ -215,17 +228,18 @@ class CShopManager:
totalPoint = totalSold * price totalPoint = totalSold * price
currentPoint = await g_pDBService.user.getUserPointByUid(uid) player = await g_pToolManager.getPlayerByUid(uid)
await g_pDBService.user.updateUserPointByUid(uid, currentPoint + totalPoint) if not player:
return g_sTranslation["basic"]["error"]
if name == "": currentPoint = player.user.get("point", 0)
return g_sTranslation["sellPlant"]["success"].format( await player.addPoint("point", currentPoint + totalPoint)
result = "success1" if name == "" else "success"
return g_sTranslation["sellPlant"][result].format(
point=totalPoint, num=currentPoint + totalPoint point=totalPoint, num=currentPoint + totalPoint
) )
else:
return g_sTranslation["sellPlant"]["success1"].format(
name=name, point=totalPoint, num=currentPoint + totalPoint
)
g_pShopManager = CShopManager() g_pShopManager = CShopManager()

View File

@ -1,143 +0,0 @@
from contextlib import asynccontextmanager
import os
from pathlib import Path
import re
import aiosqlite
from zhenxun.services.log import logger
from ..config import g_sDBFilePath, g_sDBPath
class CSqlManager:
def __init__(self):
dbPath = Path(g_sDBPath)
if dbPath and not dbPath.exists():
os.makedirs(dbPath, exist_ok=True)
@classmethod
async def cleanup(cls):
if hasattr(cls, "m_pDB") and cls.m_pDB:
await cls.m_pDB.close()
@classmethod
async def init(cls) -> bool:
try:
cls.m_pDB = await aiosqlite.connect(g_sDBFilePath)
cls.m_pDB.row_factory = aiosqlite.Row
return True
except Exception as e:
logger.warning("初始化总数据库失败", e=e)
return False
@classmethod
@asynccontextmanager
async def _transaction(cls):
await cls.m_pDB.execute("BEGIN;")
try:
yield
except:
await cls.m_pDB.execute("ROLLBACK;")
raise
else:
await cls.m_pDB.execute("COMMIT;")
@classmethod
async def getTableInfo(cls, tableName: str) -> list:
if not re.match(r"^[A-Za-z_][A-Za-z0-9_]*$", tableName):
raise ValueError(f"Illegal table name: {tableName}")
try:
cursor = await cls.m_pDB.execute(f'PRAGMA table_info("{tableName}")')
rows = await cursor.fetchall()
return [{"name": row[1], "type": row[2]} for row in rows]
except aiosqlite.Error:
return []
@classmethod
async def ensureTableSchema(cls, tableName: str, columns: dict) -> bool:
"""由AI生成
创建表或为已存在表添加缺失字段
返回 True 表示有变更创建或新增列False 则无操作
Args:
tableName (_type_): 表名
columns (_type_): 字典
Returns:
_type_: _description_
"""
info = await cls.getTableInfo(tableName)
existing = {col["name"]: col["type"].upper() for col in info}
desired = {k: v.upper() for k, v in columns.items() if k != "PRIMARY KEY"}
primaryKey = columns.get("PRIMARY KEY", "")
if not existing:
colsDef = ", ".join(f'"{k}" {v}' for k, v in desired.items())
if primaryKey:
colsDef += f", PRIMARY KEY {primaryKey}"
await cls.m_pDB.execute(f'CREATE TABLE "{tableName}" ({colsDef});')
return True
toAdd = [k for k in desired if k not in existing]
toRemove = [k for k in existing if k not in desired]
typeMismatch = [
k for k in desired if k in existing and existing[k] != desired[k]
]
if toAdd and not toRemove and not typeMismatch:
for col in toAdd:
await cls.m_pDB.execute(
f'ALTER TABLE "{tableName}" ADD COLUMN "{col}" {columns[col]}'
)
return True
async with cls._transaction():
tmpTable = f"{tableName}_new"
colsDef = ", ".join(f'"{k}" {v}' for k, v in desired.items())
if primaryKey:
colsDef += f", PRIMARY KEY {primaryKey}"
await cls.m_pDB.execute(f'CREATE TABLE "{tmpTable}" ({colsDef});')
commonCols = [k for k in desired if k in existing]
if commonCols:
colsStr = ", ".join(f'"{c}"' for c in commonCols)
sql = (
f'INSERT INTO "{tmpTable}" ({colsStr}) '
f"SELECT {colsStr} "
f'FROM "{tableName}";'
)
await cls.m_pDB.execute(sql)
await cls.m_pDB.execute(f'DROP TABLE "{tableName}";')
await cls.m_pDB.execute(
f'ALTER TABLE "{tmpTable}" RENAME TO "{tableName}";'
)
return True
@classmethod
async def executeDB(cls, command: str) -> bool:
"""执行自定义SQL
Args:
command (str): SQL语句
Returns:
bool: 是否执行成功
"""
if not command:
logger.warning("数据库语句长度为空!")
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(command)
return True
except Exception as e:
logger.warning(f"数据库语句执行出错: {command}", e=e)
return False
g_pSqlManager = CSqlManager()

View File

@ -1,479 +0,0 @@
import math
from zhenxun.services.log import logger
from ..tool import g_pToolManager
from .database import CSqlManager
class CUserDB(CSqlManager):
@classmethod
async def initDB(cls):
"""初始化用户表结构确保user表存在且字段完整"""
userInfo = {
"uid": "TEXT PRIMARY KEY", # 用户Uid
"name": "TEXT NOT NULL", # 农场名称
"exp": "INTEGER DEFAULT 0", # 经验值
"point": "INTEGER DEFAULT 0", # 金币
"vipPoint": "INTEGER DEFAULT 0", # 点券
"soil": "INTEGER DEFAULT 3", # 解锁土地数量
"stealTime": "TEXT DEFAULT ''", # 偷菜时间字符串
"stealCount": "INTEGER DEFAULT 0", # 剩余偷菜次数
}
await cls.ensureTableSchema("user", userInfo)
@classmethod
async def initUserInfoByUid(
cls, uid: str, name: str = "", exp: int = 0, point: int = 500
) -> bool | str:
"""初始化用户信息,包含初始偷菜时间字符串与次数
Args:
uid (str): 用户Uid
name (str): 农场名称
exp (int): 农场经验
point (int): 农场币
Returns:
bool | str: False 表示失败字符串表示成功信息
"""
nowStr = g_pToolManager.dateTime().date().today().strftime("%Y-%m-%d")
sql = (
f"INSERT INTO user (uid, name, exp, point, soil, stealTime, stealCount) "
f"VALUES ({uid}, '{name}', {exp}, {point}, 3, '{nowStr}', 5)"
)
try:
async with cls._transaction():
await cls.m_pDB.execute(sql)
return "开通农场成功"
except Exception as e:
logger.warning("initUserInfoByUid 事务执行失败!", e=e)
return False
@classmethod
async def getAllUsers(cls) -> list[str]:
"""获取所有用户UID列表
Returns:
list[str]: 用户UID列表
"""
cursor = await cls.m_pDB.execute("SELECT uid FROM user")
rows = await cursor.fetchall()
return [row[0] for row in rows]
@classmethod
async def isUserExist(cls, uid: str) -> bool:
"""判断用户是否存在
Args:
uid (str): 用户Uid
Returns:
bool: 如果用户存在返回True否则返回False
"""
if not uid:
return False
try:
async with cls.m_pDB.execute(
"SELECT 1 FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return row is not None
except Exception as e:
logger.warning("isUserExist 查询失败!", e=e)
return False
@classmethod
async def getUserInfoByUid(cls, uid: str) -> dict:
"""获取指定用户完整信息
Args:
uid (str): 用户Uid
Returns:
dict: 包含所有用户字段的字典
"""
if not uid:
return {}
try:
async with cls.m_pDB.execute(
"SELECT * FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
if not row:
return {}
result = dict(row)
return result
except Exception as e:
logger.warning("getUserInfoByUid 查询失败!", e=e)
return {}
@classmethod
async def getUserNameByUid(cls, uid: str) -> str:
"""根据用户Uid获取用户名
Args:
uid (str): 用户Uid
Returns:
str: 用户名失败返回空字符串
"""
if not uid:
return ""
try:
async with cls.m_pDB.execute(
"SELECT name FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return row["name"] if row else ""
except Exception as e:
logger.warning("getUserNameByUid 查询失败!", e=e)
return ""
@classmethod
async def updateUserNameByUid(cls, uid: str, name: str) -> bool:
"""根据用户Uid更新用户名
Args:
uid (str): 用户Uid
name (str): 新用户名
Returns:
bool: 是否更新成功
"""
if not uid or not name:
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(
"UPDATE user SET name = ? WHERE uid = ?", (name, uid)
)
return True
except Exception as e:
logger.warning("updateUserNameByUid 事务执行失败!", e=e)
return False
@classmethod
async def getUserPointByUid(cls, uid: str) -> int:
"""获取指定用户农场币
Args:
uid (str): 用户Uid
Returns:
int: 农场币数量失败返回 -1
"""
if not uid:
return -1
try:
async with cls.m_pDB.execute(
"SELECT point FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return int(row[0]) if row and row[0] is not None else -1
except Exception as e:
logger.warning("getUserPointByUid 查询失败!", e=e)
return -1
@classmethod
async def updateUserPointByUid(cls, uid: str, point: int) -> bool:
"""根据用户Uid更新农场币数量
Args:
uid (str): 用户Uid
point (int): 新农场币数量
Returns:
bool: 是否更新成功
"""
if not uid or point < 0:
logger.warning("updateUserPointByUid 参数校验失败!")
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(
"UPDATE user SET point = ? WHERE uid = ?", (point, uid)
)
return True
except Exception as e:
logger.error("updateUserPointByUid 事务执行失败!", e=e)
return False
@classmethod
async def getUserVipPointByUid(cls, uid: str) -> int:
"""获取指定用户点券
Args:
uid (str): 用户Uid
Returns:
int: 点券数量失败返回 -1
"""
if not uid:
return -1
try:
async with cls.m_pDB.execute(
"SELECT vipPoint FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return int(row[0]) if row and row[0] is not None else -1
except Exception as e:
logger.warning("getUservipPointByUid 查询失败!", e=e)
return -1
@classmethod
async def updateUserVipPointByUid(cls, uid: str, vipPoint: int) -> bool:
"""根据用户Uid更新点券数量
Args:
uid (str): 用户Uid
vipPoint (int): 新点券数量
Returns:
bool: 是否更新成功
"""
if not uid or vipPoint < 0:
logger.warning("updateUservipPointByUid 参数校验失败!")
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(
"UPDATE user SET vipPoint = ? WHERE uid = ?", (vipPoint, uid)
)
return True
except Exception as e:
logger.error("updateUservipPointByUid 事务执行失败!", e=e)
return False
@classmethod
async def getUserExpByUid(cls, uid: str) -> int:
"""获取指定用户经验值
Args:
uid (str): 用户Uid
Returns:
int: 经验值失败返回 -1
"""
if not uid:
return -1
try:
async with cls.m_pDB.execute(
"SELECT exp FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return int(row[0]) if row and row[0] is not None else -1
except Exception as e:
logger.warning("getUserExpByUid 查询失败!", e=e)
return -1
@classmethod
async def updateUserExpByUid(cls, uid: str, exp: int) -> bool:
"""根据用户Uid更新经验值
Args:
uid (str): 用户Uid
exp (int): 新经验值
Returns:
bool: 是否更新成功
"""
if not uid:
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(
"UPDATE user SET exp = ? WHERE uid = ?", (exp, uid)
)
return True
except Exception as e:
logger.warning("updateUserExpByUid 事务执行失败!", e=e)
return False
@classmethod
async def getUserLevelByUid(cls, uid: str) -> tuple[int, int, int]:
"""获取用户等级信息
Args:
uid (str): 用户Uid
Returns:
tuple[int, int, int]: 成功返回(当前等级, 升至下级还需经验, 当前等级已获经验)
失败返回(-1, -1, -1)
"""
if not uid:
return -1, -1, -1
try:
async with cls.m_pDB.execute(
"SELECT exp FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
if not row or row[0] is None:
return -1, -1, -1
expVal = int(row[0])
levelStep = 200 # 每级经验增量
discriminant = 1 + 8 * expVal / levelStep
level = int((-1 + math.sqrt(discriminant)) // 2)
if level < 0:
level = 0
def cumExp(k: int) -> int:
return levelStep * k * (k + 1) // 2
totalExpCurrentLevel = cumExp(level)
totalExpNextLevel = cumExp(level + 1)
currentExp = expVal - totalExpCurrentLevel
return level, totalExpNextLevel, currentExp
return -1, -1, -1
except Exception as e:
logger.warning("getUserLevelByUid 查询失败!", e=e)
return -1, -1, -1
@classmethod
async def getUserSoilByUid(cls, uid: str) -> int:
"""获取解锁土地数量
Args:
uid (str): 用户Uid
Returns:
int: 解锁土地块数失败返回0
"""
if not uid:
return 0
try:
async with cls.m_pDB.execute(
"SELECT soil FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return int(row[0]) if row and row[0] is not None else 0
except Exception as e:
logger.warning("getUserSoilByUid 查询失败!", e=e)
return 0
@classmethod
async def updateUserSoilByUid(cls, uid: str, soil: int) -> bool:
"""更新指定用户解锁土地数量
Args:
uid (str): 用户Uid
soil (int): 新土地数量
Returns:
bool: 更新成功返回True否则False
"""
if not uid or soil < 0:
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(
"UPDATE user SET soil = ? WHERE uid = ?", (soil, uid)
)
return True
except Exception as e:
logger.warning("updateUserSoilByUid 事务执行失败!", e=e)
return False
@classmethod
async def getStealTimeByUid(cls, uid: str) -> str:
"""根据用户Uid获取偷菜时间字符串
Args:
uid (str): 用户Uid
Returns:
str: 偷菜时间字符串失败返回空字符串
"""
if not uid:
return ""
try:
async with cls.m_pDB.execute(
"SELECT stealTime FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return row[0] if row and row[0] else ""
except Exception as e:
logger.warning("getStealTimeByUid 查询失败!", e=e)
return ""
@classmethod
async def updateStealTimeByUid(cls, uid: str, stealTime: str) -> bool:
"""根据用户Uid更新偷菜时间字符串
Args:
uid (str): 用户Uid
stealTime (str): 新偷菜时间字符串
Returns:
bool: 是否更新成功
"""
if not uid or not stealTime:
logger.warning("updateStealTimeByUid 参数校验失败!")
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(
"UPDATE user SET stealTime = ? WHERE uid = ?", (stealTime, uid)
)
return True
except Exception as e:
logger.warning("updateStealTimeByUid 事务执行失败!", e=e)
return False
@classmethod
async def getStealCountByUid(cls, uid: str) -> int:
"""根据用户Uid获取剩余偷菜次数
Args:
uid (str): 用户Uid
Returns:
int: 剩余偷菜次数失败返回 -1
"""
if not uid:
return -1
try:
async with cls.m_pDB.execute(
"SELECT stealCount FROM user WHERE uid = ?", (uid,)
) as cursor:
row = await cursor.fetchone()
return int(row[0]) if row and row[0] is not None else 0
except Exception as e:
logger.warning("getStealCountByUid 查询失败!", e=e)
return -1
@classmethod
async def updateStealCountByUid(
cls, uid: str, stealTime: str, stealCount: int
) -> bool:
"""根据用户Uid更新剩余偷菜次数
Args:
uid (str): 用户Uid
stealTime (str): 偷菜日期
stealCount (int): 新剩余偷菜次数
Returns:
bool: 是否更新成功
"""
if not uid or stealCount < 0:
logger.warning("updateStealCountByUid 参数校验失败!")
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(
"UPDATE user SET stealTime = ?, stealCount = ? WHERE uid = ?",
(stealTime, stealCount, uid),
)
return True
except Exception as e:
logger.warning("updateStealCountByUid 事务执行失败!", e=e)
return False

View File

@ -1,45 +0,0 @@
class CDBService:
@classmethod
async def init(cls):
from .database.plant import CPlantManager
from .database.user import CUserDB
from .database.userItem import CUserItemDB
from .database.userPlant import CUserPlantDB
from .database.userSeed import CUserSeedDB
from .database.userSign import CUserSignDB
from .database.userSoil import CUserSoilDB
from .database.userSteal import CUserStealDB
cls.plant = CPlantManager()
await cls.plant.init()
cls.user = CUserDB()
await cls.user.initDB()
cls.userSoil = CUserSoilDB()
await cls.userSoil.initDB()
cls.userPlant = CUserPlantDB()
await cls.userPlant.initDB()
cls.userSeed = CUserSeedDB()
await cls.userSeed.initDB()
cls.userItem = CUserItemDB()
await cls.userItem.initDB()
cls.userSteal = CUserStealDB()
await cls.userSteal.initDB()
cls.userSign = CUserSignDB()
await cls.userSign.initDB()
# 迁移旧数据库
await cls.userSoil.migrateOldFarmData()
@classmethod
async def cleanup(cls):
await cls.plant.cleanup()
g_pDBService = CDBService()

View File

@ -1,117 +0,0 @@
from pathlib import Path
from jinja2 import Template
from playwright.async_api import async_playwright
from zhenxun.configs.path_config import DATA_PATH
from zhenxun.services.log import logger
from ..config import g_sResourcePath
def rendeerHtmlToFile(path: Path | str, context: dict, output: Path | str) -> None:
"""
使用 Jinja2 渲染 HTML 模板并保存到指定文件会自动创建父目录
Args:
path (str): 模板 HTML 路径
context (dict): 用于渲染的上下文字典
output (str): 输出 HTML 文件路径
"""
templatePath = str(path)
outputPath = str(output)
templateStr = Path(templatePath).read_text(encoding="utf-8")
template = Template(templateStr)
rendered = template.render(**context)
# 自动创建目录
Path(outputPath).parent.mkdir(parents=True, exist_ok=True)
Path(outputPath).write_text(rendered, encoding="utf-8")
async def screenshotHtmlToBytes(path: str) -> bytes:
"""
使用 Playwright 截图本地 HTML 文件并返回 PNG 图片字节数据
Args:
path (str): 本地 HTML 文件路径
Returns:
bytes: PNG 图片的原始字节内容
"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
file_url = Path(path).resolve().as_uri()
await page.goto(file_url)
image_bytes = await page.screenshot(full_page=True)
await browser.close()
return image_bytes
async def screenshotSave(path: str, save: str) -> None:
"""
使用 Playwright 渲染本地 HTML 并将截图保存到指定路径
Args:
path (str): HTML 文件路径
save (str): PNG 保存路径 output/image.png
"""
async with async_playwright() as p:
browser = await p.chromium.launch()
page = await browser.new_page()
file_url = Path(path).resolve().as_uri()
await page.goto(file_url)
# 确保保存目录存在
Path(save).parent.mkdir(parents=True, exist_ok=True)
# 截图并保存到本地文件
await page.screenshot(path=save, full_page=True)
await browser.close()
async def createHelpImage() -> bool:
templatePath = g_sResourcePath / "html/help.html"
outputPath = DATA_PATH / "farm_res/html/help.html"
context = {
"title": "功能指令总览",
"data": [
{
"command": "开通农场",
"description": "首次进入游戏开通农场",
"tip": "",
},
{
"command": "购买种子",
"description": "从商店中购买可用种子",
"tip": "",
},
{"command": "播种", "description": "将种子种入土地中", "tip": "先开垦土地"},
{
"command": "收获",
"description": "收获成熟作物获得收益",
"tip": "",
},
{
"command": "偷菜",
"description": "从好友农场中偷取成熟作物",
"tip": "1",
},
],
}
try:
rendeerHtmlToFile(templatePath, context, outputPath)
image_bytes = await screenshot_html_to_bytes(html_output_path)
except Exception as e:
logger.warning("绘制农场帮助菜单失败", e=e)
return False
return True

Binary file not shown.

View File

@ -1,75 +1,777 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="zh"> <html lang="zh-CN">
<head> <head>
<meta charset="UTF-8"> <meta charset="UTF-8">
<title>{{ title }}</title> <meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>智能功能展示 - 参数可视化</title>
<style> <style>
body { * {
background-color: #ffe4e9;
font-family: "Microsoft YaHei", sans-serif;
margin: 0; margin: 0;
padding: 0; padding: 0;
box-sizing: border-box;
font-family: "{{ font_family }}", system-ui, -apple-system, "Helvetica Neue", Arial, sans-serif;
} }
@font-face {
font-family: "{{ font_family }}";
src: url("../font/Rounded.ttf") format("truetype");
font-weight: 400;
font-style: normal;
font-display: swap;
}
body {
background: linear-gradient(135deg, #fff9f9 0%, #f0f9ff 100%);
color: #5a5a5a;
padding: 20px;
min-height: 100vh;
}
.container { .container {
max-width: 1400px;
width: 100%;
margin: 0 auto;
}
header {
text-align: center;
margin-bottom: 40px;
padding: 30px 20px;
}
h1 {
color: #ff85a2;
font-size: 3rem;
margin-bottom: 15px;
text-shadow: 2px 2px 0px rgba(255, 133, 162, 0.2);
position: relative;
display: inline-block;
}
h1::after {
content: "";
position: absolute;
bottom: -10px;
left: 50%;
transform: translateX(-50%);
width: 100px;
height: 4px;
background: linear-gradient(90deg, #ff85a2, #a2d2ff);
border-radius: 2px;
}
.description {
font-size: 1.2rem;
color: #888;
max-width: 800px;
margin: 20px auto;
line-height: 1.6;
background-color: rgba(255, 255, 255, 0.7);
padding: 20px;
border-radius: 15px;
box-shadow: 0 4px 10px rgba(0, 0, 0, 0.05);
}
.legend {
display: flex; display: flex;
justify-content: center; justify-content: center;
padding: 40px 20px; gap: 20px;
margin: 20px 0;
flex-wrap: wrap;
} }
.content-box {
background-color: #fff0f5; .legend-item {
border-radius: 24px; display: flex;
padding: 30px; align-items: center;
box-shadow: 0 0 8px rgba(0, 0, 0, 0.1); background: white;
width: 900px; padding: 8px 15px;
border-radius: 20px;
box-shadow: 0 2px 8px rgba(0,0,0,0.1);
font-size: 0.9rem;
} }
.title {
text-align: center; .legend-color {
font-size: 32px; width: 16px;
font-weight: bold; height: 16px;
margin-bottom: 40px; border-radius: 4px;
margin-right: 8px;
} }
table {
.required-color {
background-color: #ff6b9c;
}
.optional-color {
background-color: #a2d2ff;
}
.conditional-color {
background-color: #b9fbc0;
}
.features-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(400px, 1fr));
gap: 30px;
margin-top: 40px;
}
.feature-box {
background: linear-gradient(135deg, #ffffff 0%, #f8f8f8 100%);
border-radius: 25px;
padding: 25px;
box-shadow: 0 8px 20px rgba(255, 133, 162, 0.15);
transition: all 0.3s ease;
border: 3px solid transparent;
position: relative;
overflow: hidden;
cursor: pointer;
}
.feature-box:hover {
transform: translateY(-5px);
box-shadow: 0 12px 25px rgba(255, 133, 162, 0.25);
border-color: #ffc2d1;
}
.feature-box::before {
content: "";
position: absolute;
top: 0;
left: 0;
width: 100%; width: 100%;
border-collapse: collapse; height: 8px;
background: linear-gradient(90deg, #ff85a2, #a2d2ff);
} }
th {
background-color: #ffb6c1; .feature-header {
display: flex;
align-items: center;
margin-bottom: 20px;
}
.feature-icon {
font-size: 2.5rem;
margin-right: 15px;
}
.feature-name {
font-size: 1.6rem;
font-weight: bold; font-weight: bold;
padding: 12px; color: #ff6b9c;
}
.parameters-section {
margin: 20px 0;
}
.section-title {
font-size: 1rem;
color: #888;
margin-bottom: 10px;
display: flex;
align-items: center;
}
.section-title::before {
content: "📋";
margin-right: 8px;
}
.parameters-list {
display: flex;
flex-direction: column;
gap: 8px;
}
.parameter-item {
display: flex;
align-items: center;
padding: 10px 15px;
border-radius: 12px;
font-size: 0.95rem;
transition: all 0.2s ease;
}
.parameter-item:hover {
transform: translateX(5px);
}
.required-param {
background-color: rgba(255, 107, 156, 0.1);
border-left: 4px solid #ff6b9c;
}
.optional-param {
background-color: rgba(162, 210, 255, 0.1);
border-left: 4px solid #a2d2ff;
}
.conditional-param {
background-color: rgba(185, 251, 192, 0.1);
border-left: 4px solid #b9fbc0;
}
.param-badge {
display: inline-block;
padding: 3px 10px;
border-radius: 8px;
font-size: 0.8rem;
font-weight: bold;
margin-right: 12px;
min-width: 60px;
text-align: center; text-align: center;
} }
td {
padding: 12px; .required-badge {
text-align: center; background-color: #ff6b9c;
color: white;
} }
tr:not(:last-child) {
border-bottom: 1px solid #ddd; .optional-badge {
background-color: #a2d2ff;
color: white;
}
.conditional-badge {
background-color: #b9fbc0;
color: #333;
}
.param-description {
flex: 1;
}
.logic-section {
margin-top: 20px;
padding: 15px;
background-color: rgba(255, 213, 165, 0.1);
border-radius: 12px;
border-left: 4px solid #ffd6a5;
}
.logic-title {
font-weight: bold;
color: #ff9e6d;
margin-bottom: 8px;
display: flex;
align-items: center;
}
.logic-title::before {
content: "🔍";
margin-right: 8px;
}
.logic-content {
font-size: 0.9rem;
color: #666;
line-height: 1.5;
}
.usage-example {
margin-top: 15px;
padding: 12px;
background-color: #f8f9fa;
border-radius: 8px;
font-family: 'Courier New', monospace;
font-size: 0.85rem;
color: #333;
border-left: 3px solid #888;
}
.decoration {
position: absolute;
width: 80px;
height: 80px;
opacity: 0.1;
z-index: 0;
}
.decoration-1 {
top: -20px;
left: -20px;
background-color: #ff85a2;
border-radius: 50%;
}
.decoration-2 {
bottom: -20px;
right: -20px;
background-color: #a2d2ff;
border-radius: 50%;
}
@media (max-width: 768px) {
.features-grid {
grid-template-columns: 1fr;
}
h1 {
font-size: 2.2rem;
}
}
footer {
margin-top: 50px;
text-align: center;
color: #aaa;
font-size: 0.9rem;
padding: 20px;
} }
</style> </style>
</head> </head>
<body> <body>
<div class="container"> <div class="container">
<div class="content-box"> <header>
<div class="title">{{ title }}</div> <h1>✨ 真寻农场帮助菜单 ✨</h1>
<table> <p class="description">简单介绍一下农场的各个功能和食用方法</p>
<thead>
<tr> <div class="legend">
<th>指令</th> <div class="legend-item">
<th>描述</th> <div class="legend-color required-color"></div>
<th>Tip</th> <span>必填参数</span>
</tr>
</thead>
<tbody>
{% for entry in data %}
<tr>
<td>{{ entry.command }}</td>
<td>{{ entry.description }}</td>
<td>{{ entry.tip }}</td>
</tr>
{% endfor %}
</tbody>
</table>
</div> </div>
<div class="legend-item">
<div class="legend-color optional-color"></div>
<span>可选参数</span>
</div>
<div class="legend-item">
<div class="legend-color conditional-color"></div>
<span>条件参数</span>
</div>
</div>
</header>
<main>
<div class="features-grid">
<!-- 必填 + 可选参数 -->
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">at 开通农场</div>
</div>
<div class="logic-section">
<div class="logic-title">条件逻辑</div>
<div class="logic-content">
• 需要at小真寻<br>
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">我的农场</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">农场详述</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">我的农场币</div>
</div>
</div>
<!-- 条件参数 -->
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">种子商店</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item conditional-param">
<span class="param-badge conditional-badge">条件</span>
<span class="param-description">参数 - 根据类型决定含义</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">条件逻辑</div>
<div class="logic-content">
• 如果参数是中文 → 进入筛选模式,可接页码参数<br>
• 如果参数是数字 → 直接作为页码使用<br>
</div>
</div>
<div class="usage-example">
使用示例1: 种子商店 胡萝<br>
使用示例2: 种子商店 2<br>
使用示例3: 种子商店 胡萝 3
</div>
</div>
<!-- 多个参数 -->
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">购买种子</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item required-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">作物/种子名称</span>
</div>
<div class="parameter-item optional-param">
<span class="param-badge optional-badge">可选</span>
<span class="param-description">数量</span>
</div>
</div>
</div>
<div class="usage-example">
使用示例: 购买种子 胡萝卜
使用示例: 购买种子 胡萝卜 5
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">我的种子</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">播种</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item conditional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">作物/种子名称</span>
</div>
<div class="parameter-item optional-param">
<span class="param-badge optional-badge">可选</span>
<span class="param-description">数量</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 数量不填默认将最大可能播种
</div>
</div>
<div class="usage-example">
使用示例1: 种子商店 胡萝<br>
使用示例2: 种子商店 2<br>
使用示例3: 种子商店 胡萝 3
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">收获</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">铲除</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">我的作物</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">出售作物</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item conditional-param">
<span class="param-badge optional-badge">可选</span>
<span class="param-description">作物/种子名称</span>
</div>
<div class="parameter-item optional-param">
<span class="param-badge optional-badge">可选</span>
<span class="param-description">数量</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 不填写作物名将售卖仓库种全部作物<br>
• 填作物名不填数量将指定作物全部出售
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">偷菜 at</div>
</div>
<div class="logic-section">
<div class="logic-title">条件逻辑</div>
<div class="logic-content">
• 每人每天只能偷5次
• 后续需要at目标且目标开通真寻农场
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">开垦</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">购买农场币</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item optional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">数量</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 数量为消耗金币的数量
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">更改农场名</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item optional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">新的农场名</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 仅支持部分特殊符号
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">农场签到</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">土地升级</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item optional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">地块ID</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 地块ID通过农场详述获取
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">作物加锁</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item optional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">地块ID</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 地块ID通过农场详述获取
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">作物解锁</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item optional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">地块ID</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 地块ID通过农场详述获取
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">我的点券</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item optional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">地块ID</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 地块ID通过农场详述获取
</div>
</div>
</div>
<div class="feature-box">
<div class="decoration decoration-1"></div>
<div class="decoration decoration-2"></div>
<div class="feature-header">
<div class="feature-name">点券兑换</div>
</div>
<div class="parameters-section">
<div class="section-title">参数列表</div>
<div class="parameters-list">
<div class="parameter-item optional-param">
<span class="param-badge required-badge">必填</span>
<span class="param-description">地块ID</span>
</div>
</div>
</div>
<div class="logic-section">
<div class="logic-title">操作提示</div>
<div class="logic-content">
• 地块ID通过农场详述获取
</div>
</div>
</div>
</div>
</main>
</div> </div>
</body> </body>
</html> </html>

View File

@ -15,13 +15,13 @@ g_sDBPath = DATA_PATH / "farm_db"
g_sDBFilePath = DATA_PATH / "farm_db/farm.db" g_sDBFilePath = DATA_PATH / "farm_db/farm.db"
# 农场资源文件目录 # 农场资源文件目录
g_sResourcePath = Path(__file__).resolve().parent / "resource" g_sResourcePath = Path(__file__).resolve().parent / "../resource"
# 农场作物数据库 # 农场作物数据库
g_sPlantPath = g_sResourcePath / "db/plant.db" g_sPlantPath = g_sResourcePath / "db/plant.db"
# 农场配置文件目录 # 农场配置文件目录
g_sConfigPath = Path(__file__).resolve().parent / "config" g_sConfigPath = Path(__file__).resolve().parent / "../config"
# 农场签到文件路径 # 农场签到文件路径
g_sSignInPath = g_sConfigPath / "sign_in.json" g_sSignInPath = g_sConfigPath / "sign_in.json"
@ -35,6 +35,7 @@ g_sTranslation = {
"notFarm": "尚未开通农场快at我发送 开通农场 开通吧 🌱🚜", "notFarm": "尚未开通农场快at我发送 开通农场 开通吧 🌱🚜",
"point": "你的当前农场币为: {point} 🌾💰", "point": "你的当前农场币为: {point} 🌾💰",
"vipPoint": "你的当前点券为: {vipPoint} 🌾💰", "vipPoint": "你的当前点券为: {vipPoint} 🌾💰",
"error": "❌ 农场功能异常,请稍后再试 💔",
}, },
"register": { "register": {
"success": "✅ 农场开通成功!\n💼 初始资金:{point}农场币 🥳🎉", "success": "✅ 农场开通成功!\n💼 初始资金:{point}农场币 🥳🎉",

View File

@ -10,11 +10,12 @@ from rich.progress import (
TimeRemainingColumn, TimeRemainingColumn,
TransferSpeedColumn, TransferSpeedColumn,
) )
from zhenxun.configs.config import Config from zhenxun.configs.config import Config
from zhenxun.services.log import logger from zhenxun.services.log import logger
from ..core.dbService import g_pDBService
from .config import g_sPlantPath, g_sSignInPath from .config import g_sPlantPath, g_sSignInPath
from .dbService import g_pDBService
from .tool import g_pToolManager from .tool import g_pToolManager

View File

@ -1,25 +1,31 @@
import os
from datetime import datetime from datetime import datetime
import os
from zoneinfo import ZoneInfo from zoneinfo import ZoneInfo
from zhenxun.services.log import logger from zhenxun.services.log import logger
from zhenxun.utils.message import MessageUtils from zhenxun.utils.message import MessageUtils
from .dbService import g_pDBService from ..core.player.player import CPlayer
from ..core.player.playerPool import g_pUserPool
class CToolManager: class CToolManager:
@classmethod @classmethod
async def isRegisteredByUid(cls, uid: str) -> bool: async def repeat(cls):
result = await g_pDBService.user.isUserExist(uid)
if not result:
await MessageUtils.build_message( await MessageUtils.build_message(
"尚未开通农场快at我发送 开通农场 开通吧" "尚未开通农场快at我发送 开通农场 开通吧"
).send() ).send()
return False
return True @classmethod
async def getPlayerByUid(cls, uid: str) -> CPlayer | None:
player = g_pUserPool.getUser(uid)
if player is None:
player = CPlayer()
if not await player.init(uid):
return None
g_pUserPool.createUser(uid, player)
return player
@classmethod @classmethod
def sanitize_username(cls, username: str, max_length: int = 15) -> str: def sanitize_username(cls, username: str, max_length: int = 15) -> str: