This commit is contained in:
2025-10-29 22:20:21 +08:00
commit 32b3b7b29a
111 changed files with 344425 additions and 0 deletions

35
app/core/__init__.py Normal file
View File

@@ -0,0 +1,35 @@
from .api import miaoStarsBasicApi
from .utils.morelang import lang
from .utils.config import qconfig, cfg, userConfig, policyConfig
from .utils.signal_bus import signalBus
from .utils.format import getFileIcon, formatSize, formatDate
from .services.login_thread import CaptchaThread, LoginThread, RegisterThread
from .services.user_thread import (
UserNickNameUpdateThread,
UserAvatarUpdateThread,
GetUserAvatarThread,
GetPackThread,
GetPoliciesThread,
ChangePolicyThread,
DeleteTagThread,
AddTagThread,
)
from .services.file_thread import (
ListFileThread,
CreateFolderThread,
DeleteFileThread,
ListSearchThread,
ListShareThread,
UploadThread,
DownloadShareThread,
DownloadThread,
GetShareFileInfoThread,
UpdateFileContentThread,
)
from .services.preview_thread import TextLoaderThread, ImageLoaderThread

9
app/core/api/__init__.py Normal file
View File

@@ -0,0 +1,9 @@
from .basicApi import MiaoStarsBasicApi
from ..utils.config import userConfig
miaoStarsBasicApi = MiaoStarsBasicApi()
# 从userConfig中恢复token如果有
token = userConfig.getToken()
if token:
miaoStarsBasicApi.setToken(token)

578
app/core/api/basicApi.py Normal file
View File

@@ -0,0 +1,578 @@
"""这里存放基本API包括:
登录、注册、图形验证码获取
用户配置获取、用户头像获取
用户存储策略获取、用户仓内文件获取
"""
import time
from typing import Literal, Optional
from urllib.parse import quote_plus
import requests
from loguru import logger
from PyQt6.QtCore import QBuffer, QByteArray, QIODevice
from PyQt6.QtGui import QPixmap
from ..utils import getCode
from ..utils.config import policyConfig, userConfig
class MiaoStarsBasicApi:
_publicHeader = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
}
def __init__(self, token: Optional[str] = None):
self.basicApi = "http://leonmmcoset.jjxmm.win:5212/api/v4"
self.session = requests.Session()
self.session.verify = True
self.session.headers.update(MiaoStarsBasicApi._publicHeader)
# Cloudreve V4 使用 JWT 认证
# 优先使用传入的token如果没有则尝试从userConfig获取
self.token = token or userConfig.getToken()
if self.token:
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
self.userId = None
def returnSession(self) -> requests.Session:
return self.session
def setToken(self, token: str):
"""设置 JWT token 并同步到全局配置"""
# 确保token是字符串类型
if isinstance(token, str):
self.token = token
self.session.headers.update({"Authorization": f"Bearer {token}"})
# 同步更新到全局userConfig确保认证信息持久化
userConfig.setToken(token)
elif isinstance(token, dict) and token.get("access_token"):
# 兼容处理如果传入的是token对象提取access_token
access_token = token["access_token"]
self.token = access_token
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
userConfig.setToken(access_token)
def request(
self, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"], url, **kwargs
) -> dict:
maxRetries = 3
timeout = 20
for attempt in range(maxRetries):
try:
# 发起请求
response = self.session.request(
method=method, url=self.basicApi + url, timeout=timeout, **kwargs
)
# 检查HTTP状态码
response.raise_for_status()
# 解析JSON响应
try:
r = response.json()
# 保留原始响应格式不要过早转换Cloudreve V4的响应
# Cloudreve V4的错误响应通常包含{code, msg},而不是{error}
# 让具体的API方法来处理响应格式转换
return r
except:
return response.content
except requests.exceptions.RequestException as e:
# 网络相关错误,进行重试
if attempt < maxRetries - 1:
time.sleep(1) # 简单固定延迟
continue
else:
logger.error(f"请求失败: {str(e)}")
return {"code": -1, "msg": str(e)}
except Exception as e:
logger.error(f"处理响应时出错: {str(e)}")
return {"code": -1, "msg": str(e)}
def getCaptcha(self):
"""获取图形验证码 (Cloudreve V4 API)"""
import logging
logger = logging.getLogger(__name__)
logger.debug(f"请求验证码API: {self.basicApi}/site/captcha")
r = self.request(method="GET", url="/site/captcha")
logger.debug(f"验证码API响应: {r}")
# 根据Cloudreve V4 API文档响应格式为
# {"code": 0, "data": {"image": "data:image/png;base64,...", "ticket": "..."}, "msg": ""}
# 转换响应格式以保持与前端的兼容性
if isinstance(r, dict):
# 检查是否是标准的Cloudreve V4成功响应
if r.get("code") == 0 and isinstance(r.get("data"), dict):
data = r["data"]
# 确保image字段存在且是有效的base64格式
if "image" in data:
# 保存ticket到实例变量供后续登录/注册使用
self.captcha_ticket = data.get("ticket", "")
logger.debug(f"成功获取验证码已保存ticket: {self.captcha_ticket}")
# 前端代码在CaptchaThread.run()中使用response["data"].split(",")[1]
# 所以需要确保返回的data格式符合前端期望
captcha_image = data["image"]
# 检查是否已经包含data:image/png;base64,前缀
if captcha_image.startswith("data:image/png;base64,"):
# 保持原样让前端去split
result = {"code": 0, "data": captcha_image}
else:
# 如果没有前缀加上前缀以确保前端能正确split
result = {"code": 0, "data": f"data:image/png;base64,{captcha_image}"}
logger.debug(f"返回前端验证码数据格式: {result}")
return result
# 处理可能的错误响应
elif "msg" in r:
logger.error(f"验证码API错误: {r['msg']}")
return {"code": -1, "msg": r["msg"]}
elif "error" in r:
logger.error(f"验证码API错误: {r['error']}")
return {"code": -1, "msg": r["error"]}
else:
logger.error(f"验证码API返回非字典格式: {type(r)}")
# 默认返回失败格式
logger.error("获取验证码失败,返回默认错误格式")
return {"code": -1, "msg": "获取验证码失败"}
def login(self, username, password, captcha):
"""登录 (Cloudreve V4 JWT认证)"""
url = "/session/token"
# Cloudreve V4 API参数规范使用email, password, captcha和ticket
payload = {
"email": username, # 更正参数名username -> email
"password": password,
"captcha": captcha # 更正参数名captcha_code -> captcha
}
# 如果有保存的ticket则添加到请求中使用正确的参数名ticket
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
payload['ticket'] = self.captcha_ticket # 更正参数名captcha_ticket -> ticket
r = self.request(
"POST",
url,
json=payload,
)
# 输出服务器返回的原始信息到控制台
print(f"登录API服务器返回原始信息: {r}")
# 处理登录响应
# Cloudreve V4的响应格式{code: 0, data: {user: {...}, token: {...}}, msg: ""}
if isinstance(r, dict):
# 检查是否是成功响应
if r.get("code") == 0 and r.get("data"):
data = r["data"]
# 设置 JWT token
if data.get("token") and isinstance(data["token"], dict):
# 提取access_token字段作为Bearer token值
access_token = data["token"].get("access_token")
if access_token:
# 设置到当前API实例
self.setToken(access_token)
# 同时保存到全局userConfig中确保认证信息持久化
userConfig.setToken(access_token)
# 存储用户信息
user_info = {
"code": 0,
"data": {
"id": data.get("user", {}).get("id"),
"nick": data.get("user", {}).get("nickname", data.get("user", {}).get("nick")),
"email": username
}
}
self.userId = data.get("user", {}).get("id")
return user_info
# 处理错误响应
# 优先检查msg字段因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
error_msg = r.get("msg", r.get("error", "登录失败"))
return {"code": -1, "msg": error_msg}
return {"code": -1, "msg": "登录失败:响应格式错误"}
def register(self, username, password, captcha):
"""注册"""
url = "/user"
# Cloudreve V4 API参数规范使用email, password, captcha和ticket
payload = {
"email": username,
"password": password,
"captcha": captcha # 更正参数名captcha_code -> captcha
}
# 如果有保存的ticket则添加到请求中使用正确的参数名ticket
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
payload['ticket'] = self.captcha_ticket # 更正参数名captcha_ticket -> ticket
r = self.request(
"POST",
url,
json=payload,
)
# 输出服务器返回的原始信息到控制台
print(f"注册API服务器返回原始信息: {r}")
# 处理注册响应
# Cloudreve V4的响应格式{code: 0, data: {...}, msg: ""} 或 {code: 错误码, msg: "错误信息"}
if isinstance(r, dict):
# 检查是否是成功响应
if r.get("code") == 0 or r.get("id"):
return {"code": 203, "msg": "注册成功"}
# 处理错误响应
# 优先检查msg字段因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
error_msg = r.get("msg", r.get("error", "注册失败"))
return {"code": -1, "msg": error_msg}
return {"code": -1, "msg": "注册失败:响应格式错误"}
def updateUserNickname(self, nickname):
"""更新用户昵称"""
url = "/user/profile"
r = self.request(
"PUT",
url,
json={"nick": nickname},
)
# 转换响应格式
if r.get("id"):
return {"code": 0, "msg": "更新成功"}
else:
return {"code": -1, "msg": r.get("error", "更新失败")}
def updateUserAvatar(self, qimage):
"""更新用户头像 - 专门处理 QImage 对象"""
url = "/user/avatar"
# 检查 QImage 是否有效
if qimage.isNull():
return {"code": -1, "msg": "QImage 对象为空"}
# 将 QImage 转换为 JPEG 格式字节数据
byte_array = QByteArray()
buffer = QBuffer(byte_array)
buffer.open(QIODevice.WriteOnly)
# 转换为 JPEG 格式,质量设为 90
success = qimage.save(buffer, "JPEG", 90)
buffer.close()
if not success:
return {"code": -1, "msg": "图像转换失败"}
# 获取字节数据
file_data = byte_array.data()
# 构建 multipart 请求
files = {
'avatar': ('avatar.jpg', file_data, 'image/jpeg')
}
# 移除 Content-Type 以便 requests 自动设置
headers = self.session.headers.copy()
if 'Content-Type' in headers:
del headers['Content-Type']
r = self.request("POST", url, files=files, headers=headers)
# 转换响应格式
if r.get("url"):
return {"code": 0, "msg": "头像更新成功"}
else:
return {"code": -1, "msg": r.get("error", "头像更新失败")}
def getUserAvatar(self, size: Literal["s", "m", "l"]):
"""获取用户头像"""
# Cloudreve V4 获取用户信息以获取头像URL
user_info = self.getUserInfo()
if user_info.get("code") != 0:
return QPixmap(":app/images/logo.png")
avatar_url = user_info.get("data", {}).get("avatar")
if not avatar_url:
return QPixmap(":app/images/logo.png")
try:
# 直接下载头像图片
response = requests.get(avatar_url, timeout=10)
response.raise_for_status()
pixmap = QPixmap()
if pixmap.loadFromData(response.content):
userConfig.setUserAvatarPixmap(pixmap)
return pixmap
else:
return QPixmap(":app/images/logo.png")
except Exception as e:
logger.error(f"获取头像失败: {e}")
return QPixmap(":app/images/logo.png")
def getUserInfo(self):
"""获取用户信息 (Cloudreve V4 API)"""
# 使用正确的API端点
if self.userId:
url = f"/user/info/{self.userId}"
else:
# 如果没有userId尝试获取当前用户信息
url = "/user/profile" # 这可能需要根据实际情况调整
r = self.request("GET", url)
# 转换响应格式
if isinstance(r, dict):
return {"code": 0, "data": r}
else:
return {"code": -1, "msg": "获取用户信息失败"}
def getUserPack(self):
"""获取用户存储详细 (Cloudreve V4 API)"""
# 使用正确的API端点
url = "/user/capacity"
r = self.request("GET", url)
# 转换响应格式以保持向后兼容
if isinstance(r, dict) and "used" in r:
return {
"code": 0,
"data": {
"base": r.get("total", 0) - r.get("extra", 0),
"pack": r.get("extra", 0),
"used": r.get("used", 0),
"total": r.get("total", 0),
"packs": []
}
}
else:
return {
"code": 0,
"data": {
"base": 0,
"pack": 0,
"used": 0,
"total": 0,
"packs": []
}
}
def list(self, path="/"):
"""列出用户仓内文件 (Cloudreve V4 API)"""
# 使用正确的API端点和必需参数
url = "/file"
# 将path转换为Cloudreve V4要求的URI格式
# 根目录映射到 cloudreve://my/
if path == "/" or path == "":
uri = "cloudreve://my/"
else:
# 使用quote_plus正确编码路径确保特殊字符被正确处理
# 首先规范化路径分隔符
normalized_path = path.strip("/").replace("\\", "/")
# 对路径部分进行URL编码
encoded_path = quote_plus(normalized_path)
# 由于quote_plus会将斜杠也编码我们需要恢复它们
encoded_path = encoded_path.replace("%2F", "/")
uri = f"cloudreve://my/{encoded_path}"
# 添加必需的分页参数
params = {
"uri": uri,
"page": 0, # 从第一页开始
"page_size": 100 # 使用合理的默认值
}
logger.debug(f"发送文件列表请求: URI={uri}")
r = self.request("GET", url, params=params)
# 转换响应格式以保持向后兼容
# 根据API规范文件列表在data.files中
if isinstance(r, dict) and "data" in r and "files" in r["data"]:
return {
"code": 0,
"data": r["data"]["files"]
}
else:
logger.warning(f"无效的响应格式: {r}")
return {"code": 0, "data": []}
def getPolicy(self):
"""获取用户存储策略"""
url = "/user/policies"
r = self.request("GET", url)
# 转换响应格式
if isinstance(r, list):
return {"code": 0, "data": r}
else:
return {"code": 0, "data": []}
def changePolicy(self, path, policy):
"""修改用户存储策略"""
url = "/file/move"
r = self.request("POST", url, json={"items": [path], "dst": path, "policy": policy})
# 转换响应格式
if r.get("count") == 1:
return {"code": 0, "msg": "存储策略修改成功"}
else:
return {"code": -1, "msg": r.get("error", "存储策略修改失败")}
def createFolder(self, name):
"""创建文件夹"""
url = "/file/create"
currentPath = policyConfig.returnCurrentPath()
# 根据Cloudreve V4 API规范构建uri参数确保不会有重复斜杠
if currentPath == "/":
uri = f"cloudreve://my/{name}"
else:
uri = f"cloudreve://my{currentPath}/{name}"
r = self.request("POST", url, json={"uri": uri, "type": "folder", "err_on_conflict": True})
# 转换响应格式
if r.get("data") and r.get("code") == 0:
return {"code": 0, "msg": "文件夹创建成功"}
else:
return {"code": -1, "msg": r.get("msg", "文件夹创建失败")}
def deleteFile(self, fileId, fileType: Literal["file", "dir"]):
"""删除文件"""
url = "/file/delete"
deleteData = {
"items": [fileId] if fileType == "file" else [],
"dirs": [fileId] if fileType == "dir" else []
}
r = self.request("POST", url, json=deleteData)
# 转换响应格式
if r.get("count") > 0:
return {"code": 0, "msg": "删除成功"}
else:
return {"code": -1, "msg": r.get("error", "删除失败")}
def wareSearch(
self,
searchContent,
searchType: Literal["keyword", "internalTag", "externalTag"],
):
"""搜索文件 (Cloudreve V4 API)"""
# 根据Cloudreve V4 API使用/file端点并添加搜索参数
url = "/file"
# 使用Cloudreve V4的URI格式
uri = "cloudreve://my/"
# 构建搜索参数
params = {
"uri": uri,
"keyword": searchContent,
"page": 0,
"page_size": 100
}
# 根据搜索类型调整参数
if searchType == "internalTag":
params["type"] = "internal"
elif searchType == "externalTag":
params["type"] = "tag"
logger.debug(f"发送文件搜索请求: 关键词={searchContent}, 类型={searchType}")
r = self.request("GET", url, params=params)
# 转换响应格式以保持向后兼容
if isinstance(r, dict):
if "data" in r:
if "files" in r["data"]:
# Cloudreve V4 API 格式
return {"code": 0, "data": {"objects": r["data"]["files"]}}
elif isinstance(r["data"], list):
# 如果data是列表直接使用
return {"code": 0, "data": {"objects": r["data"]}}
logger.warning(f"搜索响应格式不正确: {r}")
return {"code": 0, "data": {"objects": []}}
def shareSearch(self, keyword, orderBy, order, page):
"""搜索分享 (Cloudreve V4 API)"""
# 使用正确的API端点 - Cloudreve V4使用/share端点获取分享列表
url = "/share"
params = {
"page": page,
"page_size": 50, # 添加默认页面大小以避免"PageSize cannot be empty"错误
"order_by": orderBy,
"order": order,
"keyword": keyword, # 根据实际API支持情况调整
}
r = self.request("GET", url, params=params)
return r
def deleteTag(self, tagId):
"""删除标签"""
url = f"/tag/{tagId}"
r = self.request("DELETE", url)
# 转换响应格式
if r is None or (isinstance(r, dict) and not r.get("error")):
return {"code": 0, "msg": "标签删除成功"}
else:
return {"code": -1, "msg": r.get("error", "标签删除失败")}
def addTag(self, name, expression):
"""添加标签"""
url = "/tag/filter"
jsons = {
"expression": expression,
"name": name,
"color": "#ff9800",
"icon": "Circle",
}
r = self.request("POST", url, json=jsons)
# 转换响应格式
if r.get("id"):
return {"code": 0, "msg": "标签添加成功", "data": r}
else:
return {"code": -1, "msg": r.get("error", "标签添加失败")}
def getShareFileInfo(self, shareId):
"""获取分享文件信息"""
url = f"/share/{shareId}"
r = self.request("GET", url)
# 转换响应格式
if r.get("id"):
return {"code": 0, "data": r}
else:
return {"code": -1, "msg": r.get("error", "获取分享信息失败")}
def updateFileContent(self, fileId, content):
"""更新文件内容"""
url = f"/file/content/{fileId}"
headers = {
"Content-Type": "text/plain"
}
r = self.request("PUT", url, data=content.encode("utf-8"), headers=headers)
# 转换响应格式
if r.get("size") is not None:
return {"code": 0, "msg": "文件内容更新成功"}
else:
return {"code": -1, "msg": r.get("error", "文件内容更新失败")}
def updateUserNickname(self, nickName):
"""更新用户昵称 (Cloudreve V4 API)"""
url = "/user/profile"
data = {
"nick": nickName
}
r = self.request("PUT", url, json=data)
# 转换响应格式
if isinstance(r, dict) and not r.get("error"):
return {"code": 0, "msg": "昵称更新成功"}
else:
return {"code": -1, "msg": r.get("error", "昵称更新失败")}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,166 @@
import base64
from loguru import logger
from PyQt6.QtCore import Qt, QThread, pyqtSignal
from PyQt6.QtGui import (QColor, QPainter, QPainterPath, QPen, QPixmap)
from ..api import miaoStarsBasicApi
from ...core import cfg, qconfig, userConfig
class CaptchaThread(QThread):
captchaReady = pyqtSignal(QPixmap)
captchaFailed = pyqtSignal(str)
def __init__(self):
super().__init__()
@staticmethod
def _createRoundedPixmap(pixmap, radius=10):
"""创建圆角图片"""
try:
# 获取原始图片尺寸
if pixmap.isNull():
logger.error("原始图片为空,无法创建圆角图片")
return pixmap
size = pixmap.size()
# 创建透明背景的图片
rounded_pixmap = QPixmap(size)
rounded_pixmap.fill(Qt.GlobalColor.transparent)
# 创建对象
painter = QPainter(rounded_pixmap)
painter.setRenderHints(
QPainter.RenderHint.Antialiasing
| QPainter.RenderHint.SmoothPixmapTransform
)
# 创建圆角矩形路径
path = QPainterPath()
path.addRoundedRect(0, 0, size.width(), size.height(), radius, radius)
# 设置裁剪区域
painter.setClipPath(path)
# 绘制原始图片
painter.drawPixmap(0, 0, pixmap)
# 绘制边框
pen = QPen(QColor(200, 200, 200)) # 浅灰色边框
pen.setWidth(1)
painter.setPen(pen)
painter.drawRoundedRect(
0, 0, size.width() - 1, size.height() - 1, radius, radius
)
painter.end()
return rounded_pixmap
except Exception as e:
logger.error(f"创建圆角图片失败:{e}")
return pixmap # 如果出错,返回原始图片
def run(self):
try:
logger.debug("开始获取验证码")
response = miaoStarsBasicApi.getCaptcha()
logger.debug(f"验证码API返回响应: {response}")
if response["code"] == 0:
# 确保data字段存在且为字符串
if "data" in response and isinstance(response["data"], str):
# 分割base64前缀和实际数据
try:
captchaImageData = response["data"].split(",")[1]
logger.debug(f"成功提取base64数据长度: {len(captchaImageData)}")
# 解码base64数据
captchaImage = base64.b64decode(captchaImageData)
logger.debug(f"成功解码base64数据长度: {len(captchaImage)} bytes")
# 加载图片
pixmap = QPixmap()
load_success = pixmap.loadFromData(captchaImage)
if load_success:
logger.debug(f"成功加载图片,尺寸: {pixmap.width()}x{pixmap.height()}")
# 创建圆角图片
pixmap = self._createRoundedPixmap(pixmap, radius=10)
self.captchaReady.emit(pixmap)
else:
logger.error("图片加载失败")
self.captchaFailed.emit("验证码图片加载失败")
self.captchaReady.emit(QPixmap(":app/images/loadFailure.png"))
except (IndexError, ValueError, TypeError) as e:
logger.error(f"验证码数据格式错误: {e}")
self.captchaFailed.emit(f"验证码数据格式错误: {str(e)}")
self.captchaReady.emit(QPixmap(":app/images/loadFailure.png"))
else:
logger.error("验证码响应中缺少有效的data字段")
self.captchaFailed.emit("验证码数据无效")
self.captchaReady.emit(QPixmap(":app/images/loadFailure.png"))
else:
error_msg = response.get("msg", "获取验证码失败")
logger.error(f"获取验证码失败: {error_msg}")
self.captchaFailed.emit(error_msg)
self.captchaReady.emit(QPixmap(":app/images/loadFailure.png"))
except Exception as e:
logger.exception(f"获取验证码过程中发生异常: {e}")
self.captchaFailed.emit(str(e))
self.captchaReady.emit(QPixmap(":app/images/loadFailure.png"))
class LoginThread(QThread):
successLogin = pyqtSignal()
errorLogin = pyqtSignal(str)
def __init__(self, email: str, password: str, captchaCode: str):
super().__init__()
logger.debug(f"初始化许可证服务线程 - 邮箱: {email}")
self.email = email
self.password = password
self.captchaCode = captchaCode
def run(self):
logger.info(f"开始验证用户登录 - 邮箱: {self.email}")
try:
loginResponse = miaoStarsBasicApi.login(
self.email, self.password, self.captchaCode
)
if loginResponse["code"] == 0:
self.successLogin.emit()
qconfig.set(cfg.email, self.email)
qconfig.set(cfg.activationCode, self.password)
userConfig.userData = loginResponse
# 从登录响应中提取token并保存
# 在basicApi的login方法中已经处理了token的设置这里保存到userConfig中以便程序启动时恢复
token = miaoStarsBasicApi.token
if token:
userConfig.setToken(token)
else:
self.errorLogin.emit(loginResponse["msg"])
except Exception as e:
logger.error(f"登录验证过程中发生异常: {e}")
self.errorLogin.emit("系统错误,请稍后重试")
class RegisterThread(QThread):
successRegister = pyqtSignal()
errorRegister = pyqtSignal(str)
def __init__(self, email: str, password: str, captchaCode: str):
super().__init__()
logger.debug(f"初始化许可证服务线程 - 邮箱: {email}")
self.email = email
self.password = password
self.captchaCode = captchaCode
def run(self):
logger.info(f"开始验证用户注册 - 邮箱: {self.email}")
try:
registerRespond = miaoStarsBasicApi.register(
self.email, self.password, self.captchaCode
)
if registerRespond["code"] == 203:
self.successRegister.emit()
else:
logger.error(f"注册失败: {registerRespond['msg']}")
self.errorRegister.emit(registerRespond["msg"])
except Exception as e:
logger.error(f"登录验证过程中发生异常: {e}")
self.errorRegister.emit("系统错误,请稍后重试")

View File

@@ -0,0 +1,246 @@
import os
from urllib.parse import urlparse
import requests
from PyQt6.QtCore import QThread, pyqtSignal
from PyQt6.QtGui import QImage, QPixmap
from app.core import miaoStarsBasicApi
class TextLoaderThread(QThread):
"""文本文件加载线程"""
textLoaded = pyqtSignal(str)
errorOccurred = pyqtSignal(str)
progressUpdated = pyqtSignal(int) # 进度更新信号
def __init__(self, url):
super().__init__()
self.url = url
def run(self):
"""线程执行函数"""
try:
# 1. 设置网络请求参数 - 优化连接参数
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=20,
pool_maxsize=20,
max_retries=5, # 增加重试次数
pool_block=False,
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 2. 增加超时时间并添加重试机制
response = miaoStarsBasicApi.returnSession().get(
self.url,
stream=True,
timeout=(15, 30), # 增加超时时间连接15秒读取30秒
)
response.raise_for_status()
# 3. 获取文件大小(如果服务器支持)
total_size = int(response.headers.get("content-length", 0))
downloaded_size = 0
# 4. 分块读取并处理 - 使用二进制读取提高速度
content_chunks = []
for chunk in response.iter_content(chunk_size=16384): # 增大块大小
if chunk:
content_chunks.append(chunk)
downloaded_size += len(chunk)
# 更新进度(如果知道总大小)
if total_size > 0:
progress = int((downloaded_size / total_size) * 100)
self.progressUpdated.emit(progress)
# 5. 合并内容并解码
binary_content = b"".join(content_chunks)
if not binary_content:
self.errorOccurred.emit("下载内容为空")
return
# 6. 智能编码检测和解码
text_content = self._decode_content(binary_content)
# 7. 发射加载完成的信号
self.textLoaded.emit(text_content)
except requests.exceptions.Timeout:
self.errorOccurred.emit("请求超时,请检查网络连接或尝试重新加载")
except requests.exceptions.ConnectionError:
self.errorOccurred.emit("网络连接错误,请检查网络设置")
except requests.exceptions.RequestException as e:
self.errorOccurred.emit(f"网络请求错误: {str(e)}")
except Exception as e:
self.errorOccurred.emit(f"文本处理错误: {str(e)}")
def _decode_content(self, binary_content):
"""智能解码二进制内容"""
# 优先尝试UTF-8
encodings = ["utf-8", "gbk", "gb2312", "latin-1", "iso-8859-1", "cp1252"]
for encoding in encodings:
try:
return binary_content.decode(encoding)
except UnicodeDecodeError:
continue
# 如果所有编码都失败,使用替换错误处理
try:
return binary_content.decode("utf-8", errors="replace")
except:
# 最后尝试忽略错误
return binary_content.decode("utf-8", errors="ignore")
def cancel(self):
"""取消下载"""
if self.isRunning():
self.terminate()
self.wait(1000) # 等待线程结束
class ImageLoaderThread(QThread):
"""优化的图片加载线程"""
imageLoaded = pyqtSignal(QPixmap)
errorOccurred = pyqtSignal(str)
progressUpdated = pyqtSignal(int) # 进度更新信号
def __init__(
self, url, cache_dir="image_cache", max_cache_size=50 * 1024 * 1024
): # 50MB缓存
super().__init__()
self.url = url
self.cache_dir = cache_dir
self.max_cache_size = max_cache_size
self._setup_cache()
def _setup_cache(self):
"""设置图片缓存目录"""
if not os.path.exists(self.cache_dir):
os.makedirs(self.cache_dir)
def _get_cache_filename(self):
"""生成缓存文件名"""
parsed_url = urlparse(self.url)
filename = os.path.basename(parsed_url.path) or "image"
# 添加URL哈希避免重名
import hashlib
url_hash = hashlib.md5(self.url.encode()).hexdigest()[:8]
return f"{url_hash}_{filename}"
def _get_cached_image(self):
"""获取缓存图片"""
cache_file = os.path.join(self.cache_dir, self._get_cache_filename())
if os.path.exists(cache_file):
try:
pixmap = QPixmap(cache_file)
if not pixmap.isNull():
return pixmap
except Exception:
pass
return None
def _save_to_cache(self, pixmap):
"""保存图片到缓存"""
try:
cache_file = os.path.join(self.cache_dir, self._get_cache_filename())
pixmap.save(cache_file, "JPG", 80) # 压缩质量80%
self._cleanup_cache() # 清理过期缓存
except Exception:
pass
def _cleanup_cache(self):
"""清理过期的缓存文件"""
# noinspection PyBroadException
try:
files = []
for f in os.listdir(self.cache_dir):
filepath = os.path.join(self.cache_dir, f)
if os.path.isfile(filepath):
files.append((filepath, os.path.getmtime(filepath)))
# 按修改时间排序
files.sort(key=lambda x: x[1])
# 计算总大小
total_size = sum(os.path.getsize(f[0]) for f in files)
# 如果超过最大缓存大小,删除最旧的文件
while total_size > self.max_cache_size and files:
oldest_file = files.pop(0)
total_size -= os.path.getsize(oldest_file[0])
os.remove(oldest_file[0])
except Exception:
pass
def run(self):
"""线程执行函数"""
try:
# 1. 首先检查缓存
cached_pixmap = self._get_cached_image()
if cached_pixmap:
self.imageLoaded.emit(cached_pixmap)
return
# 2. 设置更短的超时时间
session = requests.Session()
adapter = requests.adapters.HTTPAdapter(
pool_connections=10, pool_maxsize=10, max_retries=5 # 重试2次
)
session.mount("http://", adapter)
session.mount("https://", adapter)
# 3. 流式下载,支持进度显示
response = miaoStarsBasicApi.returnSession().get(
self.url, stream=True, timeout=(20, 30) # 连接超时5秒读取超时10秒
)
response.raise_for_status()
# 获取文件大小(如果服务器支持)
total_size = int(response.headers.get("content-length", 0))
downloaded_size = 0
# 4. 分块读取并处理
image_data = b""
for chunk in response.iter_content(chunk_size=8192):
if chunk:
image_data += chunk
downloaded_size += len(chunk)
# 更新进度(如果知道总大小)
if total_size > 0:
progress = int((downloaded_size / total_size) * 100)
self.progressUpdated.emit(progress)
# 5. 从数据创建QImage比QPixmap更快
image = QImage()
image.loadFromData(image_data)
if image.isNull():
raise Exception("无法加载图片数据")
# 6. 转换为QPixmap
pixmap = QPixmap.fromImage(image)
# 7. 保存到缓存
self._save_to_cache(pixmap)
# 发射加载完成的信号
self.imageLoaded.emit(pixmap)
except requests.exceptions.Timeout:
self.errorOccurred.emit("请求超时,请检查网络连接")
except requests.exceptions.ConnectionError:
self.errorOccurred.emit("网络连接错误")
except requests.exceptions.RequestException as e:
self.errorOccurred.emit(f"网络请求错误: {str(e)}")
except Exception as e:
self.errorOccurred.emit(f"图片处理错误: {str(e)}")

View File

@@ -0,0 +1,287 @@
# coding: utf-8
import os
import subprocess
import tempfile
from loguru import logger
from PyQt6.QtCore import QEventLoop, QObject, QThread, pyqtSignal
from PyQt6.QtCore import QUrl
from PyQt6.QtMultimedia import QAudioOutput, QMediaPlayer
class LocalTextToSpeechThread(QThread):
"""本地文本转语音播放线程 - Windows优化版"""
# 信号定义
playback_started = pyqtSignal() # 播放开始
playback_finished = pyqtSignal() # 播放完成
playback_error = pyqtSignal(str) # 播放错误
progress_updated = pyqtSignal(int) # 播放进度更新
synthesis_completed = pyqtSignal(str) # 语音合成完成(返回文件路径)
def __init__(self, text, parent=None):
super().__init__(parent)
self.text = text
self.audio_file_path = None
self.media_player = None
self.audio_output = None
self._stop_requested = False
def run(self):
"""线程执行函数"""
try:
# 1. 将文本转换为语音文件
self.audio_file_path = self._text_to_speech(self.text)
if not self.audio_file_path or self._stop_requested:
return
# 发射合成完成信号
self.synthesis_completed.emit(self.audio_file_path)
# 2. 播放语音
self._play_audio(self.audio_file_path)
except Exception as e:
self.playback_error.emit(f"语音播放错误: {str(e)}")
def _text_to_speech(self, text):
"""使用本地TTS引擎将文本转换为语音文件"""
try:
# 检查文本长度
if not text or len(text.strip()) == 0:
self.playback_error.emit("文本内容为空")
return None
# 限制文本长度,避免合成时间过长
max_length = 1000
if len(text) > max_length:
text = text[:max_length] + "。文本过长,已截断。"
self.playback_error.emit(f"文本过长,已截断前{max_length}个字符")
# 优先使用pyttsx3效率最高
try:
import pyttsx3
return self._pyttsx3_tts(text)
except ImportError:
# 备用方案使用Windows内置TTS
return self._windows_tts(text)
except Exception as e:
self.playback_error.emit(f"语音合成失败: {str(e)}")
return None
def _pyttsx3_tts(self, text):
"""使用pyttsx3合成语音 - 优化版"""
try:
import pyttsx3
# 初始化TTS引擎
engine = pyttsx3.init()
# 设置语音属性 - 提高效率
engine.setProperty('rate', 200) # 提高语速
engine.setProperty('volume', 0.9) # 提高音量
# 创建临时文件保存音频
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_file:
temp_path = temp_file.name
# 保存语音到文件
engine.save_to_file(text, temp_path)
engine.runAndWait()
# 检查文件是否成功创建
if os.path.exists(temp_path) and os.path.getsize(temp_path) > 0:
return temp_path
else:
logger.error("语音文件生成失败")
except Exception as e:
# 如果pyttsx3失败尝试Windows TTS
return self._windows_tts(text)
def _windows_tts(self, text):
"""Windows系统TTS - 优化版"""
try:
# 方法1: 使用PowerShell命令 - 最可靠
return self._powershell_tts(text)
except Exception as e:
logger.error(f"Windows TTS失败: {str(e)}")
def _powershell_tts(self, text):
"""使用PowerShell合成语音 - 优化版"""
try:
# 创建临时文件
with tempfile.NamedTemporaryFile(delete=False, suffix='.wav') as temp_file:
temp_path = temp_file.name
# 转义文本中的特殊字符
escaped_text = text.replace('"', '`"').replace("'", "`'")
# 使用PowerShell的SpeechSynthesizer - 简化命令
ps_script = f"""
Add-Type -AssemblyName System.Speech
$speak = New-Object System.Speech.Synthesis.SpeechSynthesizer
$speak.SetOutputToWaveFile("{temp_path}")
$speak.Speak("{escaped_text}")
$speak.Dispose()
"""
# 使用更高效的方式执行PowerShell
process = subprocess.Popen(
["powershell", "-Command", ps_script],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=True
)
# 等待进程完成,设置超时
try:
stdout, stderr = process.communicate(timeout=30)
if process.returncode != 0:
logger.error(f"PowerShell执行失败: {stderr.decode('gbk', errors='ignore')}")
except subprocess.TimeoutExpired:
process.kill()
logger.error(f"PowerShell超时")
if os.path.exists(temp_path) and os.path.getsize(temp_path) > 0:
return temp_path
else:
logger.error("语音文件生成失败")
except Exception as e:
# raise Exception(f"PowerShell TTS失败: {str(e)}")
logger.error(f"PowerShell TTS失败{e}")
def _play_audio(self, file_path):
"""播放音频文件 - 优化版"""
if self._stop_requested:
return
try:
# 创建媒体播放器和音频输出
self.media_player = QMediaPlayer()
self.audio_output = QAudioOutput()
self.media_player.setAudioOutput(self.audio_output)
# 设置音量
self.audio_output.setVolume(1.0)
# 连接信号
self.media_player.playbackStateChanged.connect(self._on_playback_state_changed)
self.media_player.positionChanged.connect(self._on_position_changed)
self.media_player.durationChanged.connect(self._on_duration_changed)
self.media_player.errorOccurred.connect(self._on_player_error)
# 设置媒体源并开始播放
self.media_player.setSource(QUrl.fromLocalFile(file_path))
self.media_player.play()
# 使用事件循环等待播放完成
loop = QEventLoop()
self.media_player.playbackStateChanged.connect(
lambda state: loop.quit() if state == QMediaPlayer.PlaybackState.StoppedState else None
)
loop.exec()
except Exception as e:
raise Exception(f"音频播放失败: {str(e)}")
finally:
# 清理临时文件
if file_path and os.path.exists(file_path):
try:
os.unlink(file_path)
except:
pass
def _on_playback_state_changed(self, state):
"""处理播放状态变化"""
from PyQt6.QtMultimedia import QMediaPlayer
if state == QMediaPlayer.PlaybackState.StoppedState:
self.playback_finished.emit()
def _on_position_changed(self, position):
"""处理播放位置变化"""
if (self.media_player and
self.media_player.duration() > 0):
progress = int((position / self.media_player.duration()) * 100)
self.progress_updated.emit(progress)
def _on_duration_changed(self, duration):
"""处理时长变化"""
if duration > 0:
self.playback_started.emit()
def _on_player_error(self, error, error_string):
"""处理播放器错误"""
self.playback_error.emit(f"播放器错误: {error_string}")
def stop_playback(self):
"""停止播放"""
self._stop_requested = True
if self.media_player and self.media_player.playbackState() != QMediaPlayer.PlaybackState.StoppedState:
self.media_player.stop()
# 清理临时文件
if self.audio_file_path and os.path.exists(self.audio_file_path):
try:
os.unlink(self.audio_file_path)
except:
pass
class LocalSpeechController(QObject):
"""本地语音播放控制器"""
def __init__(self, parent=None):
super().__init__(parent)
self.speech_thread = None
def play_text(self, text):
"""播放文本语音"""
# 停止当前播放
self.stop_playback()
# 创建新的语音线程
self.speech_thread = LocalTextToSpeechThread(text)
# 连接信号
self.speech_thread.playback_started.connect(self._on_playback_started)
self.speech_thread.playback_finished.connect(self._on_playback_finished)
self.speech_thread.playback_error.connect(self._on_playback_error)
self.speech_thread.progress_updated.connect(self._on_progress_updated)
self.speech_thread.synthesis_completed.connect(self._on_synthesis_completed)
# 开始播放
self.speech_thread.start()
def stop_playback(self):
"""停止播放"""
if self.speech_thread and self.speech_thread.isRunning():
self.speech_thread.stop_playback()
self.speech_thread.wait(1000) # 等待线程结束最多1秒
def is_playing(self):
"""检查是否正在播放"""
return self.speech_thread and self.speech_thread.isRunning()
def _on_playback_started(self):
"""处理播放开始"""
logger.info("语音播放开始")
def _on_playback_finished(self):
"""处理播放完成"""
logger.success("语音播放完成")
def _on_playback_error(self, error_msg):
"""处理播放错误"""
logger.warning(f"语音播放错误: {error_msg}")
def _on_progress_updated(self, progress):
...
def _on_synthesis_completed(self, file_path):
"""处理语音合成完成"""
logger.info(f"语音合成完成,文件路径: {file_path}")

View File

@@ -0,0 +1,199 @@
from loguru import logger
from PyQt6.QtCore import QThread, pyqtSignal
from PyQt6.QtGui import QPixmap
from ..api import miaoStarsBasicApi
class UserNickNameUpdateThread(QThread):
successUpdate = pyqtSignal()
errorUpdate = pyqtSignal(str)
def __init__(self, nickName: str):
super().__init__()
logger.debug(f"初始化用户昵称服务线程 - 昵称: {nickName}")
self.nickName = nickName
def run(self):
logger.info(f"开始更新用户昵称 - 昵称: {self.nickName}")
try:
response = miaoStarsBasicApi.updateUserNickname(self.nickName)
print(response)
if response["code"] == 0:
self.successUpdate.emit()
else:
logger.error("更新失败:", response["msg"])
self.errorUpdate.emit(response["msg"])
except Exception as e:
logger.error(f"更新用户昵称过程中发生异常: {e}")
self.errorUpdate.emit("系统错误,请稍后重试")
class UserAvatarUpdateThread(QThread):
successUpdate = pyqtSignal()
errorUpdate = pyqtSignal(str)
def __init__(self, avatarPath: str):
super().__init__()
logger.debug(f"初始化用户头像服务线程 - 头像路径: {avatarPath}")
self.avatarPath = avatarPath
def run(self):
logger.info(f"开始更新用户头像 - 头像路径: {self.avatarPath}")
try:
response = miaoStarsBasicApi.updateUserAvatar(self.avatarPath)
if response["code"] == 0:
logger.info("头像更新成功")
self.successUpdate.emit()
else:
logger.error(f"更新失败,错误信息: {response['msg']}")
self.errorUpdate.emit(f"更新失败: {response['msg']}")
except Exception as e:
logger.error(f"更新用户头像过程中发生异常: {e}")
self.errorUpdate.emit("系统错误,请稍后重试")
class GetUserAvatarThread(QThread):
avatarPixmap = pyqtSignal(QPixmap)
def __init__(self, size: str):
super().__init__()
logger.debug(f"初始化获取用户头像服务线程 - 头像尺寸: {size}")
self.size = size
def run(self):
logger.info(f"开始获取用户头像 - 头像尺寸: {self.size}")
try:
response = miaoStarsBasicApi.getUserAvatar(self.size)
self.avatarPixmap.emit(response)
except Exception as e:
logger.error(f"获取用户头像过程中发生异常: {e}")
self.avatarPixmap.emit(QPixmap(":app/images/logo.png"))
class GetPackThread(QThread):
storageDictSignal = pyqtSignal(dict)
def __init__(self):
super().__init__()
def run(self):
logger.info("开始请求用户配额包")
try:
response = miaoStarsBasicApi.getUserPack()
self.storageDictSignal.emit(response)
except Exception as e:
logger.error(f"获取用户配额包过程中发生异常: {e}")
self.storageDictSignal.emit(
{
"code": 0,
"data": {
"base": 0,
"pack": 0,
"used": 0,
"total": 0,
"packs": [],
},
"msg": "",
}
)
class GetPoliciesThread(QThread):
"""获取策略列表线程"""
successGetSignal = pyqtSignal(list)
errorSignal = pyqtSignal(str)
def __init__(self):
super().__init__()
def run(self):
try:
response = miaoStarsBasicApi.getPolicy()
if response["code"] == 0:
self.successGetSignal.emit(response["data"])
else:
self.errorSignal.emit(f"API返回错误: {response.get('msg')}")
except Exception as e:
self.errorSignal.emit(f"获取策略列表失败: {str(e)}")
class ChangePolicyThread(QThread):
"""更改策略线程"""
successChangedSignal = pyqtSignal()
errorSignal = pyqtSignal(str)
def __init__(self, path, policy_id):
super().__init__()
self.path = path
self.policy_id = policy_id
def run(self):
try:
response = miaoStarsBasicApi.changePolicy(self.path, self.policy_id)
if response["code"] == 0:
self.successChangedSignal.emit()
else:
self.errorSignal.emit(
f"更改策略失败: {response.get('msg', '未知错误')}"
)
except Exception as e:
self.errorSignal.emit(f"更改策略请求失败: {str(e)}")
class DeleteTagThread(QThread):
"""删除标签线程"""
successDeleteSignal = pyqtSignal()
errorSignal = pyqtSignal(str)
def __init__(self, tagId):
super().__init__()
self.tagId = tagId
def run(self):
try:
response = miaoStarsBasicApi.deleteTag(self.tagId)
if response["code"] == 0:
self.successDeleteSignal.emit()
logger.info(f"删除标签成功: {self.tagId}")
else:
logger.error(f"删除标签失败: {response.get('msg')}")
self.errorSignal.emit(f"删除标签失败: {response.get('msg')}")
except Exception as e:
self.errorSignal.emit(f"{str(e)}")
logger.error(f"删除标签请求失败: {str(e)}")
class AddTagThread(QThread):
"""添加标签的线程类"""
successSignal = pyqtSignal(str, dict) # 标签名称, 响应数据
errorSignal = pyqtSignal(str, str) # 标签名称, 错误信息
def __init__(self, name, expression, parent=None):
super().__init__(parent)
self.name = name
self.expression = expression
def run(self):
"""线程执行的主方法"""
try:
response = miaoStarsBasicApi.addTag(self.name, self.expression)
if response["code"] == 0:
logger.info(f"添加标签成功: {self.name}")
self.successSignal.emit(self.name, response)
else:
logger.error(f"添加标签失败: {self.name} - {response.get('msg')}")
self.errorSignal.emit(self.name, response.get("msg"))
except Exception as e:
logger.error(f"添加标签异常: {self.name} - {str(e)}")
self.errorSignal.emit(self.name, str(e))

View File

@@ -0,0 +1 @@
from .exceptions import getCode

202
app/core/utils/config.py Normal file
View File

@@ -0,0 +1,202 @@
# coding:utf-8
import sys
from datetime import datetime
from PyQt6.QtGui import QPixmap
from qfluentwidgets import (
BoolValidator,
ConfigItem,
ConfigSerializer,
OptionsConfigItem,
OptionsValidator,
qconfig,
QConfig,
FolderValidator,
Theme,
setThemeColor,
)
from .encryption import encrypt
from .setting import CONFIG_FILE, DOWNLOAD_FOLDER
def isWin11():
return sys.platform == "win32" and sys.getwindowsversion().build >= 22000
class EncrpytionSerializer(ConfigSerializer):
"""QColor serializer"""
def serialize(self, value):
return encrypt.encrypt(value)
def deserialize(self, value):
return encrypt.decrypt(value)
class Config(QConfig):
"""Config of application"""
# TODO: ADD YOUR CONFIG GROUP HERE
# register
rememberMe = ConfigItem(
"UmVnaXN0ZXI=", "UmVtZW1iZXJNZQ==", True, serializer=EncrpytionSerializer()
)
email = ConfigItem(
"UmVnaXN0ZXI=", "RW1haWw=", "", serializer=EncrpytionSerializer()
)
activationCode = ConfigItem(
"UmVnaXN0ZXI=", "QWN0aXZhdGlvbkNvZGU=", "", serializer=EncrpytionSerializer()
)
# main window
micaEnabled = ConfigItem("MainWindow", "MicaEnabled", isWin11(), BoolValidator())
dpiScale = OptionsConfigItem(
"MainWindow",
"DpiScale",
"Auto",
OptionsValidator([1, 1.25, 1.5, 1.75, 2, "Auto"]),
restart=True,
)
# software update
checkUpdateAtStartUp = ConfigItem(
"Update", "CheckUpdateAtStartUp", True, BoolValidator()
)
# bg
customBackground = ConfigItem(
"Background",
"CustomBackground",
"app\\resource\\images\\bg0.png",
)
customOpactity = ConfigItem("Background", "Opactity", 0.2)
downloadSavePath = ConfigItem(
"Download", "SavePath", DOWNLOAD_FOLDER, validator=FolderValidator()
)
# language
language = OptionsConfigItem(
"General", "Language", "zh", OptionsValidator(["zh", "en"]), restart=False
)
class UserConfig:
def __init__(self, userData):
self.userData = userData
self.token = None
self.avaterPixmap = None
@property
def userId(self):
if self.userData:
return self.userData["data"].get("id", "")
else:
return None
@property
def userAvatarURL(self):
if self.userData and "avatar" in self.userData["data"]:
return self.userData["data"].get("avatar", "")
else:
return ""
@property
def userName(self):
if self.userData:
return self.userData["data"].get("nickname", "")
else:
return ""
@property
def userEmail(self):
if self.userData:
return self.userData["data"].get("user_name", "")
else:
return ""
@property
def userGroup(self):
if self.userData:
return self.userData.get("data", {}).get("group", {}).get("name", "")
else:
return ""
@property
def userScore(self):
if self.userData:
return str(self.userData["data"].get("score", 0))
@property
def userCreatedTime(self):
if self.userData:
return self.format_date(self.userData["data"].get("created_at", ""))
def setUserAvatarPixmap(self, avaterPixmap):
self.avaterPixmap: QPixmap = avaterPixmap
def returnAvatarPixmap(self):
return self.avaterPixmap
def setToken(self, token):
"""设置JWT token"""
self.token = token
def getToken(self):
"""获取JWT token"""
return self.token
def format_date(self, date_str):
"""格式化日期时间"""
try:
# 处理带小数秒的情况
if "." in date_str:
# 分割日期和小数秒部分
date_part, fractional_part = date_str.split(".", 1)
# 去除末尾的"Z"并截取前6位小数
fractional_sec = fractional_part.rstrip("Z")[:6]
# 重新组合日期字符串
normalized_date_str = f"{date_part}.{fractional_sec}Z"
date_time = datetime.strptime(
normalized_date_str, "%Y-%m-%dT%H:%M:%S.%fZ"
)
else:
# 处理没有小数秒的情况
date_time = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
# 如果所有格式都失败,返回原始字符串
return date_str
return date_time.strftime("%Y-%m-%d %H:%M:%S")
class PolicyConfig:
def __init__(self):
self.currentPolicy = {}
self.currentPath = "/"
def returnPolicy(self):
return self.currentPolicy
def setPolicy(self, policy):
self.currentPolicy = policy
def setCurrentPath(self, path):
self.currentPath = path
def returnCurrentPath(self):
return self.currentPath
cfg = Config()
cfg.themeMode.value = Theme.AUTO
# 设置默认主题色为蓝色 (使用RGB值)
setThemeColor('#2F80ED') # 这是一个标准的蓝色RGB值
qconfig.load(str(CONFIG_FILE.absolute()), cfg)
userConfig = UserConfig(None)
policyConfig = PolicyConfig()

View File

@@ -0,0 +1,77 @@
import json
import base64
from Crypto.Cipher import AES
from Crypto.Util.Padding import pad, unpad
from Crypto.Random import get_random_bytes
# from .setting import ENCRYPTKEY
ENCRYPTKEY = b"lunminalinguaai_"
class AESCipher:
def __init__(self, key):
"""
初始化AES加密器
:param key: 加密密钥16/24/32字节可以是字符串或字节
"""
if isinstance(key, str):
key = key.encode("utf-8")
if len(key) not in [16, 24, 32]:
raise ValueError("密钥长度必须为16、24或32字节")
self.key = key
def encrypt(self, data):
"""
加密数据(支持字符串/列表/字典等JSON可序列化类型
:param data: 要加密的数据
:return: 返回Base64编码的加密字符串
"""
# 生成随机初始化向量
iv = get_random_bytes(AES.block_size)
# 创建AES加密器
cipher = AES.new(self.key, AES.MODE_CBC, iv)
# 序列化数据为JSON字符串并编码为字节
json_data = json.dumps(data)
plain_bytes = json_data.encode("utf-8")
# 填充并加密数据
padded_bytes = pad(plain_bytes, AES.block_size)
cipher_bytes = cipher.encrypt(padded_bytes)
# 组合IV和密文并进行Base64编码
encrypted_data = iv + cipher_bytes
return base64.b64encode(encrypted_data).decode("utf-8")
def decrypt(self, enc_data):
"""
解密数据并恢复原始格式
:param enc_data: Base64编码的加密字符串
:return: 原始数据(保持原始格式)
"""
# Base64解码
encrypted_data = base64.b64decode(enc_data)
# 提取初始化向量
iv = encrypted_data[: AES.block_size]
cipher_bytes = encrypted_data[AES.block_size :]
# 创建AES解密器
cipher = AES.new(self.key, AES.MODE_CBC, iv)
# 解密并去除填充
decrypted_bytes = cipher.decrypt(cipher_bytes)
unpadded_bytes = unpad(decrypted_bytes, AES.block_size)
# 解码JSON并恢复原始数据结构
json_data = unpadded_bytes.decode("utf-8")
return json.loads(json_data)
encrypt = AESCipher(ENCRYPTKEY)
if __name__ == '__main__':
data = "sk-3e47a49bf60e49e8ab08bb1f1550aa86"
enc_data = encrypt.encrypt(data)
print(enc_data)

View File

@@ -0,0 +1,14 @@
# API请求code
stateCodeList = {
"0": "成功",
"40026": "验证码错误",
"40001": "读取用户头像数据失败",
"40020": "邮箱或密码不正确",
"40018": "该账号未激活",
"40033": "用户未激活,已重新发送激活邮件",
}
def getCode(code):
return stateCodeList.get(str(code), "未知错误,请联系技术支持")

67
app/core/utils/format.py Normal file
View File

@@ -0,0 +1,67 @@
from datetime import datetime
def formatSize(size):
"""格式化文件大小"""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size < 1024:
return f"{size:.2f} {unit}"
size /= 1024
return f"{size:.2f} PB"
def formatDate(date_str):
"""格式化日期时间"""
try:
# 处理带小数秒的情况
if "." in date_str:
# 分割日期和小数秒部分
date_part, fractional_part = date_str.split(".", 1)
# 去除末尾的'Z'并截取前6位小数
fractional_sec = fractional_part.rstrip("Z")[:6]
# 重新组合日期字符串
normalized_date_str = f"{date_part}.{fractional_sec}Z"
date_time = datetime.strptime(normalized_date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
else:
# 处理没有小数秒的情况
date_time = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
# 如果所有格式都失败,返回原始字符串
return date_str
return date_time.strftime("%Y-%m-%d %H:%M:%S")
def getFileIcon(fileType, fileName):
if fileType == "file":
suffix = fileName.split(".")[-1].lower()
icon_map = {
"txt": "Txt.svg",
"png": "Image.svg",
"jpg": "Image.svg",
"svg": "Image.svg",
"jpeg": "Image.svg",
"bmp": "Image.svg",
"gif": "Gif.svg",
"xls": "Excel.svg",
"xlsx": "Excel.svg",
"doc": "Word.svg",
"docx": "Word.svg",
"pdf": "Pdf.svg",
"ppt": "PPT.svg",
"mp4": "Video.svg",
"mkv": "Video.svg",
"mp3": "music.svg",
"wav": "music.svg",
"zip": "Zip.svg",
"rar": "Zip.svg",
"csv": "Excel.svg",
"db": "Database.svg",
"py": "Programme.svg",
"c": "Programme.svg",
"cpp": "Programme.svg",
"go": "Programme.svg",
}
return icon_map.get(suffix, "None.svg") # 默认图标
else:
return "Folder.svg"

View File

@@ -0,0 +1,90 @@
# coding: utf-8
import json
import os
from loguru import logger
from pathlib import Path
# 导入配置
from .config import cfg, qconfig
# 当前语言设置,默认为中文
_current_language = "zh"
# 翻译词典
_translations = {}
# 语言文件目录
_LANG_DIR = Path("app/resource/lang").absolute()
# print(Path("app/resource/lang").absolute())
def load_language(lang_code="zh"):
"""
加载指定语言的翻译文件
Args:
lang_code: 语言代码,如 "zh""en"
"""
global _current_language, _translations
try:
# 构建语言文件路径
lang_file = os.path.join(_LANG_DIR, f"{lang_code}.json")
# 检查文件是否存在
if not os.path.exists(lang_file):
logger.warning(f"语言文件不存在: {lang_file},使用默认语言")
lang_code = "zh"
lang_file = os.path.join(_LANG_DIR, "zh.json")
# 读取语言文件
with open(lang_file, "r", encoding="utf-8") as f:
_translations = json.load(f)
_current_language = lang_code
logger.info(f"已加载语言: {lang_code}")
except Exception as e:
logger.error(f"加载语言文件失败: {e}")
# 如果加载失败,使用空字典
_translations = {}
def lang(text):
"""
翻译文本
Args:
text: 要翻译的原文
Returns:
翻译后的文本,如果没有找到翻译,则返回原文
"""
# 如果翻译词典为空,尝试加载默认语言
if not _translations:
load_language(_current_language)
# 返回翻译后的文本,找不到则返回原文
return _translations.get(text, text)
def get_current_language():
"""
获取当前语言代码
Returns:
当前语言代码
"""
return _current_language
# 初始化时从配置加载语言
try:
if hasattr(cfg, "language"):
initial_lang = qconfig.get(cfg.language)
load_language(initial_lang)
else:
# 如果配置中没有语言设置,使用默认值
load_language()
except Exception as e:
logger.error(f"加载语言配置时出错: {e}")
# 出错时使用默认语言
load_language("zh")

16
app/core/utils/setting.py Normal file
View File

@@ -0,0 +1,16 @@
# coding: utf-8
from pathlib import Path
# change DEBUG to False if you want to compile the code to exe
DEBUG = "__compiled__" not in globals()
YEAR = 2025
AUTHOR = "Miao"
VERSION = "v0.0.1"
APP_NAME = "miaostarspan"
CONFIG_FOLDER = Path("config").absolute()
CONFIG_FILE = CONFIG_FOLDER / "config.json"
DOWNLOAD_FOLDER = Path("download").absolute()
# 23

View File

@@ -0,0 +1,34 @@
# coding: utf-8
from PyQt6.QtCore import QObject, pyqtSignal
from PyQt6.QtGui import QPixmap
class SignalBus(QObject):
"""Signal bus"""
checkUpdateSig = pyqtSignal()
micaEnableChanged = pyqtSignal(bool)
dirOpenSignal = pyqtSignal(str)
shareDirOpenSignal = pyqtSignal(str)
avatarUpdated = pyqtSignal(QPixmap) # 头像更新信号
imagePreviewSignal = pyqtSignal(str)
txtPreviewSignal = pyqtSignal(str)
opacityChanged = pyqtSignal() # 透明度变化信号
backgroundChanged = pyqtSignal() # 背景颜色变化信号
refreshFolderListSignal = pyqtSignal()
addUploadFileTask = pyqtSignal(str) # 添加upload任务信号
addDownloadFileTask = pyqtSignal(str, str, str) # 添加download任务信号
shareFolderViewSignal = pyqtSignal(str) # 分享文件夹
shareFileDownloadSignal = pyqtSignal() # 分享文件下载
languageChanged = pyqtSignal() # 语言变更信号
loginSuccessSignal = pyqtSignal() # 登录成功信号
signalBus = SignalBus()

View File

@@ -0,0 +1,6 @@
# coding: utf-8
"""
Version information for the application
"""
version = "0.0.2"