zhenxun_plugin_farm/database/plant.py

294 lines
8.5 KiB
Python
Raw Normal View History

from contextlib import asynccontextmanager
2025-06-29 01:45:24 +08:00
import os
import aiosqlite
2025-06-29 01:45:24 +08:00
from zhenxun.configs.config import Config
from zhenxun.services.log import logger
2025-06-29 01:45:24 +08:00
from ..config import g_bIsDebug, g_sPlantPath, g_sResourcePath
from ..request import g_pRequestManager
class CPlantManager:
def __init__(self):
try:
os.mkdir(g_sPlantPath)
except FileExistsError:
pass
@classmethod
async def cleanup(cls):
if hasattr(cls, "m_pDB") and cls.m_pDB:
await cls.m_pDB.close()
@classmethod
async def init(cls) -> bool:
try:
_ = os.path.exists(g_sPlantPath)
if g_bIsDebug:
cls.m_pDB = await aiosqlite.connect(
str(g_sPlantPath.parent / "plant-test.db")
)
else:
cls.m_pDB = await aiosqlite.connect(str(g_sPlantPath))
cls.m_pDB.row_factory = aiosqlite.Row
return True
except Exception as e:
logger.warning("初始化植物数据库失败", e=e)
return False
@classmethod
@asynccontextmanager
async def _transaction(cls):
await cls.m_pDB.execute("BEGIN;")
try:
yield
except:
await cls.m_pDB.execute("ROLLBACK;")
raise
else:
await cls.m_pDB.execute("COMMIT;")
@classmethod
async def executeDB(cls, command: str) -> bool:
"""执行自定义SQL
Args:
command (str): SQL语句
Returns:
bool: 是否执行成功
"""
if not command:
logger.warning("数据库语句长度为空!")
return False
try:
async with cls._transaction():
await cls.m_pDB.execute(command)
return True
except Exception as e:
logger.warning(f"数据库语句执行出错: {command}", e=e)
return False
@classmethod
async def getPlantByName(cls, name: str) -> dict | None:
"""根据作物名称查询记录
Args:
name (str): 作物名称
Returns:
dict | None: 返回记录字典未找到返回None
"""
try:
async with cls.m_pDB.execute(
"SELECT * FROM plant WHERE name = ?", (name,)
) as cursor:
row = await cursor.fetchone()
return dict(row) if row else None
except Exception as e:
logger.warning(f"查询作物失败: {name}", e=e)
return None
2025-05-28 19:48:12 +08:00
@classmethod
async def getPlantPhaseByName(cls, name: str) -> list[int]:
2025-05-28 19:48:12 +08:00
"""根据作物名称获取作物各个阶段
Args:
name (str): 作物名称
Returns:
list: 阶段数组
"""
try:
async with cls.m_pDB.execute(
"SELECT phase FROM plant WHERE name = ?", (name,)
) as cursor:
row = await cursor.fetchone()
if not row:
return []
phase = row[0].split(",")
2025-05-28 19:48:12 +08:00
seen = set()
result = []
2025-05-28 19:48:12 +08:00
for x in phase:
num = int(x)
if num not in seen:
seen.add(num)
result.append(num)
2025-05-28 19:48:12 +08:00
return result
except Exception as e:
logger.warning(f"查询作物阶段失败: {name}", e=e)
return []
@classmethod
async def getPlantPhaseNumberByName(cls, name: str) -> int:
"""根据作物名称获取作物总阶段数
Args:
name (str): 作物名称
Returns:
int: 总阶段数
"""
try:
async with cls.m_pDB.execute(
"SELECT phase FROM plant WHERE name = ?", (name,)
) as cursor:
row = await cursor.fetchone()
if not row:
return -1
phase = row[0].split(",")
2025-05-28 19:48:12 +08:00
# 去重
2025-05-28 19:48:12 +08:00
seen = set()
result = []
for x in phase:
if x not in seen:
seen.add(x)
result.append(x)
return len(result)
except Exception as e:
logger.warning(f"查询作物阶段失败: {name}", e=e)
return -1
@classmethod
async def getPlantAgainByName(cls, name: str) -> int:
"""根据作物名称获取作物再次成熟时间
Args:
name (str): 作物名称
Returns:
int: 再次成熟时间 单位:h
"""
try:
async with cls.m_pDB.execute(
"SELECT phase FROM plant WHERE name = ?", (name,)
) as cursor:
row = await cursor.fetchone()
if not row:
return -1
phase = row[0].split(",")
2025-05-28 19:48:12 +08:00
again = phase[-1] - phase[3] / 60 / 60
return again
except Exception as e:
logger.warning(f"查询作物阶段失败: {name}", e=e)
return -1
@classmethod
async def existsPlant(cls, name: str) -> bool:
"""判断作物是否存在
Args:
name (str): 作物名称
Returns:
bool: 存在返回True否则False
"""
try:
async with cls.m_pDB.execute(
"SELECT 1 FROM plant WHERE name = ? LIMIT 1", (name,)
) as cursor:
row = await cursor.fetchone()
return True if row else False
except Exception as e:
logger.warning(f"检查作物存在性失败: {name}", e=e)
return False
@classmethod
async def countPlants(cls, onlyBuy: bool = False) -> int:
"""获取作物总数
Args:
onlyBuy (bool): 若为True仅统计isBuy=1的记录默认False
Returns:
int: 符合条件的记录数
"""
try:
if onlyBuy:
sql = "SELECT COUNT(*) FROM plant WHERE isBuy = 1"
params: tuple = ()
else:
sql = "SELECT COUNT(*) FROM plant"
params: tuple = ()
async with cls.m_pDB.execute(sql, params) as cursor:
row = await cursor.fetchone()
return row[0] if row else 0
except Exception as e:
logger.warning(f"统计作物数量失败, onlyBuy={onlyBuy}", e=e)
return 0
@classmethod
async def listPlants(cls) -> list[dict]:
"""查询所有作物记录"""
try:
async with cls.m_pDB.execute(
"SELECT * FROM plant ORDER BY level"
) as cursor:
rows = await cursor.fetchall()
return [dict(r) for r in rows]
except Exception as e:
logger.warning("查询所有作物失败", e=e)
return []
2025-06-29 01:45:24 +08:00
@classmethod
async def downloadPlant(cls) -> bool:
"""遍历所有作物下载各阶段图片及icon文件到指定文件夹
Returns:
bool: 全部下载完成返回True如有失败返回False
"""
success = True
baseUrl = Config.get_config("zhenxun_plugin_farm", "服务地址")
baseUrl = baseUrl.rstrip("/") + ":8998/file"
try:
plants = await cls.listPlants()
for plant in plants:
name = plant["name"]
phaseCount = await cls.getPlantPhaseNumberByName(name)
saveDir = os.path.join(g_sResourcePath, "plant", name)
begin = 0 if plant["general"] == 0 else 1
for idx in range(begin, phaseCount + 1):
fileName = f"{idx}.png"
fullPath = os.path.join(saveDir, fileName)
if os.path.exists(fullPath):
continue
url = f"{baseUrl}/{name}/{idx}.png"
if not await g_pRequestManager.download(url, saveDir, f"{idx}.png"):
success = False
iconName = "icon.png"
iconPath = os.path.join(saveDir, iconName)
if not os.path.exists(iconPath):
iconUrl = f"{baseUrl}/{name}/{iconName}"
if not await g_pRequestManager.download(iconUrl, saveDir, iconName):
success = False
return success
except Exception as e:
logger.warning(f"下载作物资源异常: {e}")
return False