860 lines
35 KiB
Python
860 lines
35 KiB
Python
"""这里存放基本API包括:
|
||
登录、注册、图形验证码获取
|
||
用户配置获取、用户头像获取
|
||
用户存储策略获取、用户仓内文件获取
|
||
"""
|
||
|
||
import time
|
||
from typing import Literal, Optional
|
||
from urllib.parse import quote_plus
|
||
|
||
import requests
|
||
from loguru import logger
|
||
from PyQt6.QtCore import QBuffer, QByteArray, QIODevice
|
||
from PyQt6.QtGui import QPixmap
|
||
|
||
from ..utils import getCode
|
||
from ..utils.config import policyConfig, userConfig
|
||
|
||
|
||
class MiaoStarsBasicApi:
|
||
_publicHeader = {
|
||
"Content-Type": "application/json",
|
||
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
|
||
"Cache-Control": "no-cache, no-store, must-revalidate",
|
||
"Pragma": "no-cache",
|
||
"Expires": "0",
|
||
}
|
||
|
||
def __init__(self, token: Optional[str] = None):
|
||
self.basicApi = "http://leonmmcoset.jjxmm.win:5212/api/v4"
|
||
|
||
self.session = requests.Session()
|
||
self.session.verify = True
|
||
self.session.headers.update(MiaoStarsBasicApi._publicHeader)
|
||
|
||
# Cloudreve V4 使用 JWT 认证
|
||
# 优先使用传入的token,如果没有则尝试从userConfig获取
|
||
self.token = token or userConfig.getToken()
|
||
if self.token:
|
||
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
|
||
|
||
self.userId = None
|
||
|
||
def returnSession(self) -> requests.Session:
|
||
return self.session
|
||
|
||
def setToken(self, token: str):
|
||
"""设置 JWT token 并同步到全局配置"""
|
||
# 确保token是字符串类型
|
||
if isinstance(token, str):
|
||
self.token = token
|
||
self.session.headers.update({"Authorization": f"Bearer {token}"})
|
||
# 同步更新到全局userConfig,确保认证信息持久化
|
||
userConfig.setToken(token)
|
||
elif isinstance(token, dict) and token.get("access_token"):
|
||
# 兼容处理,如果传入的是token对象,提取access_token
|
||
access_token = token["access_token"]
|
||
self.token = access_token
|
||
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
|
||
userConfig.setToken(access_token)
|
||
|
||
def request(
|
||
self, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"], url, **kwargs
|
||
) -> dict:
|
||
|
||
maxRetries = 3
|
||
timeout = 20
|
||
|
||
for attempt in range(maxRetries):
|
||
try:
|
||
# 发起请求
|
||
response = self.session.request(
|
||
method=method, url=self.basicApi + url, timeout=timeout, **kwargs
|
||
)
|
||
# 检查HTTP状态码
|
||
response.raise_for_status()
|
||
# 解析JSON响应
|
||
try:
|
||
r = response.json()
|
||
# 保留原始响应格式,不要过早转换Cloudreve V4的响应
|
||
# Cloudreve V4的错误响应通常包含{code, msg},而不是{error}
|
||
# 让具体的API方法来处理响应格式转换
|
||
return r
|
||
except:
|
||
return response.content
|
||
except requests.exceptions.RequestException as e:
|
||
# 网络相关错误,进行重试
|
||
if attempt < maxRetries - 1:
|
||
time.sleep(1) # 简单固定延迟
|
||
continue
|
||
else:
|
||
logger.error(f"请求失败: {str(e)}")
|
||
return {"code": -1, "msg": str(e)}
|
||
except Exception as e:
|
||
logger.error(f"处理响应时出错: {str(e)}")
|
||
return {"code": -1, "msg": str(e)}
|
||
|
||
def getCaptcha(self):
|
||
"""获取图形验证码 (Cloudreve V4 API)"""
|
||
import logging
|
||
logger = logging.getLogger(__name__)
|
||
|
||
logger.debug(f"请求验证码API: {self.basicApi}/site/captcha")
|
||
r = self.request(method="GET", url="/site/captcha")
|
||
|
||
logger.debug(f"验证码API响应: {r}")
|
||
|
||
# 根据Cloudreve V4 API文档,响应格式为:
|
||
# {"code": 0, "data": {"image": "data:image/png;base64,...", "ticket": "..."}, "msg": ""}
|
||
# 转换响应格式以保持与前端的兼容性
|
||
if isinstance(r, dict):
|
||
# 检查是否是标准的Cloudreve V4成功响应
|
||
if r.get("code") == 0 and isinstance(r.get("data"), dict):
|
||
data = r["data"]
|
||
# 确保image字段存在且是有效的base64格式
|
||
if "image" in data:
|
||
# 保存ticket到实例变量,供后续登录/注册使用
|
||
self.captcha_ticket = data.get("ticket", "")
|
||
logger.debug(f"成功获取验证码,已保存ticket: {self.captcha_ticket}")
|
||
|
||
# 前端代码在CaptchaThread.run()中使用response["data"].split(",")[1]
|
||
# 所以需要确保返回的data格式符合前端期望
|
||
captcha_image = data["image"]
|
||
# 检查是否已经包含data:image/png;base64,前缀
|
||
if captcha_image.startswith("data:image/png;base64,"):
|
||
# 保持原样,让前端去split
|
||
result = {"code": 0, "data": captcha_image}
|
||
else:
|
||
# 如果没有前缀,加上前缀以确保前端能正确split
|
||
result = {"code": 0, "data": f"data:image/png;base64,{captcha_image}"}
|
||
|
||
logger.debug(f"返回前端验证码数据格式: {result}")
|
||
return result
|
||
# 处理可能的错误响应
|
||
elif "msg" in r:
|
||
logger.error(f"验证码API错误: {r['msg']}")
|
||
return {"code": -1, "msg": r["msg"]}
|
||
elif "error" in r:
|
||
logger.error(f"验证码API错误: {r['error']}")
|
||
return {"code": -1, "msg": r["error"]}
|
||
else:
|
||
logger.error(f"验证码API返回非字典格式: {type(r)}")
|
||
|
||
# 默认返回失败格式
|
||
logger.error("获取验证码失败,返回默认错误格式")
|
||
return {"code": -1, "msg": "获取验证码失败"}
|
||
|
||
def login(self, username, password, captcha):
|
||
"""登录 (Cloudreve V4 JWT认证)"""
|
||
url = "/session/token"
|
||
# Cloudreve V4 API参数规范:使用email, password, captcha和ticket
|
||
payload = {
|
||
"email": username, # 更正参数名:username -> email
|
||
"password": password,
|
||
"captcha": captcha # 更正参数名:captcha_code -> captcha
|
||
}
|
||
# 如果有保存的ticket,则添加到请求中,使用正确的参数名ticket
|
||
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
|
||
payload['ticket'] = self.captcha_ticket # 更正参数名:captcha_ticket -> ticket
|
||
r = self.request(
|
||
"POST",
|
||
url,
|
||
json=payload,
|
||
)
|
||
# 输出服务器返回的原始信息到控制台
|
||
print(f"登录API服务器返回原始信息: {r}")
|
||
|
||
# 处理登录响应
|
||
# Cloudreve V4的响应格式:{code: 0, data: {user: {...}, token: {...}}, msg: ""}
|
||
if isinstance(r, dict):
|
||
# 检查是否是成功响应
|
||
if r.get("code") == 0 and r.get("data"):
|
||
data = r["data"]
|
||
# 设置 JWT token
|
||
if data.get("token") and isinstance(data["token"], dict):
|
||
# 提取access_token字段作为Bearer token值
|
||
access_token = data["token"].get("access_token")
|
||
if access_token:
|
||
# 设置到当前API实例
|
||
self.setToken(access_token)
|
||
# 同时保存到全局userConfig中,确保认证信息持久化
|
||
userConfig.setToken(access_token)
|
||
# 存储用户信息
|
||
user_info = {
|
||
"code": 0,
|
||
"data": {
|
||
"id": data.get("user", {}).get("id"),
|
||
"nick": data.get("user", {}).get("nickname", data.get("user", {}).get("nick")),
|
||
"email": username
|
||
}
|
||
}
|
||
self.userId = data.get("user", {}).get("id")
|
||
return user_info
|
||
# 处理错误响应
|
||
# 优先检查msg字段,因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
|
||
error_msg = r.get("msg", r.get("error", "登录失败"))
|
||
return {"code": -1, "msg": error_msg}
|
||
|
||
return {"code": -1, "msg": "登录失败:响应格式错误"}
|
||
|
||
def register(self, username, password, captcha):
|
||
"""注册"""
|
||
url = "/user"
|
||
# Cloudreve V4 API参数规范:使用email, password, captcha和ticket
|
||
payload = {
|
||
"email": username,
|
||
"password": password,
|
||
"captcha": captcha # 更正参数名:captcha_code -> captcha
|
||
}
|
||
# 如果有保存的ticket,则添加到请求中,使用正确的参数名ticket
|
||
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
|
||
payload['ticket'] = self.captcha_ticket # 更正参数名:captcha_ticket -> ticket
|
||
r = self.request(
|
||
"POST",
|
||
url,
|
||
json=payload,
|
||
)
|
||
# 输出服务器返回的原始信息到控制台
|
||
print(f"注册API服务器返回原始信息: {r}")
|
||
|
||
# 处理注册响应
|
||
# Cloudreve V4的响应格式:{code: 0, data: {...}, msg: ""} 或 {code: 错误码, msg: "错误信息"}
|
||
if isinstance(r, dict):
|
||
# 检查是否是成功响应
|
||
if r.get("code") == 0 or r.get("id"):
|
||
return {"code": 203, "msg": "注册成功"}
|
||
# 处理错误响应
|
||
# 优先检查msg字段,因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
|
||
error_msg = r.get("msg", r.get("error", "注册失败"))
|
||
return {"code": -1, "msg": error_msg}
|
||
|
||
return {"code": -1, "msg": "注册失败:响应格式错误"}
|
||
|
||
def updateUserNickname(self, nickname):
|
||
"""更新用户昵称"""
|
||
url = "/user/profile"
|
||
r = self.request(
|
||
"PUT",
|
||
url,
|
||
json={"nick": nickname},
|
||
)
|
||
|
||
# 转换响应格式
|
||
if r.get("id"):
|
||
return {"code": 0, "msg": "更新成功"}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "更新失败")}
|
||
|
||
def updateUserAvatar(self, qimage):
|
||
"""更新用户头像 - 专门处理 QImage 对象"""
|
||
url = "/user/avatar"
|
||
|
||
# 检查 QImage 是否有效
|
||
if qimage.isNull():
|
||
return {"code": -1, "msg": "QImage 对象为空"}
|
||
|
||
# 将 QImage 转换为 JPEG 格式字节数据
|
||
byte_array = QByteArray()
|
||
buffer = QBuffer(byte_array)
|
||
buffer.open(QIODevice.WriteOnly)
|
||
|
||
# 转换为 JPEG 格式,质量设为 90
|
||
success = qimage.save(buffer, "JPEG", 90)
|
||
buffer.close()
|
||
|
||
if not success:
|
||
return {"code": -1, "msg": "图像转换失败"}
|
||
|
||
# 获取字节数据
|
||
file_data = byte_array.data()
|
||
|
||
# 构建 multipart 请求
|
||
files = {
|
||
'avatar': ('avatar.jpg', file_data, 'image/jpeg')
|
||
}
|
||
|
||
# 移除 Content-Type 以便 requests 自动设置
|
||
headers = self.session.headers.copy()
|
||
if 'Content-Type' in headers:
|
||
del headers['Content-Type']
|
||
|
||
r = self.request("POST", url, files=files, headers=headers)
|
||
|
||
# 转换响应格式
|
||
if r.get("url"):
|
||
return {"code": 0, "msg": "头像更新成功"}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "头像更新失败")}
|
||
|
||
def getUserAvatar(self, size: Literal["s", "m", "l"]):
|
||
"""获取用户头像"""
|
||
# Cloudreve V4 获取用户信息以获取头像URL
|
||
user_info = self.getUserInfo()
|
||
logger.info(f"用户信息API返回:{user_info}")
|
||
if user_info.get("code") != 0:
|
||
return QPixmap(":app/images/logo.png")
|
||
|
||
avatar_url = user_info.get("data", {}).get("avatar")
|
||
if not avatar_url:
|
||
return QPixmap(":app/images/logo.png")
|
||
|
||
try:
|
||
# 直接下载头像图片
|
||
response = requests.get(avatar_url, timeout=10)
|
||
response.raise_for_status()
|
||
|
||
pixmap = QPixmap()
|
||
if pixmap.loadFromData(response.content):
|
||
userConfig.setUserAvatarPixmap(pixmap)
|
||
return pixmap
|
||
else:
|
||
return QPixmap(":app/images/logo.png")
|
||
except Exception as e:
|
||
logger.error(f"获取头像失败: {e}")
|
||
return QPixmap(":app/images/logo.png")
|
||
|
||
def getUserInfo(self):
|
||
"""获取用户信息 (Cloudreve V4 API)"""
|
||
# 使用正确的API端点
|
||
if self.userId:
|
||
url = f"/user/info/{self.userId}"
|
||
else:
|
||
# 如果没有userId,尝试获取当前用户信息
|
||
url = "/user/profile" # 这可能需要根据实际情况调整
|
||
|
||
r = self.request("GET", url)
|
||
|
||
# 转换响应格式
|
||
if isinstance(r, dict):
|
||
return r
|
||
else:
|
||
return {"code": -1, "msg": "获取用户信息失败"}
|
||
|
||
def getUserPack(self):
|
||
"""获取用户存储详细 (Cloudreve V4 API)"""
|
||
# 使用正确的API端点
|
||
url = "/user/capacity"
|
||
r = self.request("GET", url)
|
||
|
||
# 转换响应格式以保持向后兼容
|
||
if isinstance(r, dict) and r.get("code") == 0 and isinstance(r.get("data"), dict):
|
||
data = r.get("data", {})
|
||
return {
|
||
"code": 0,
|
||
"data": {
|
||
"base": data.get("total", 0) - data.get("storage_pack_total", 0),
|
||
"pack": data.get("storage_pack_total", 0),
|
||
"used": data.get("used", 0),
|
||
"total": data.get("total", 0),
|
||
"packs": []
|
||
}
|
||
}
|
||
else:
|
||
return {
|
||
"code": 0,
|
||
"data": {
|
||
"base": 0,
|
||
"pack": 0,
|
||
"used": 0,
|
||
"total": 0,
|
||
"packs": []
|
||
}
|
||
}
|
||
|
||
def list(self, path="/"):
|
||
"""列出用户仓内文件 (Cloudreve V4 API)"""
|
||
# 使用正确的API端点和必需参数
|
||
url = "/file"
|
||
|
||
# 将path转换为Cloudreve V4要求的URI格式
|
||
# 根目录映射到 cloudreve://my/
|
||
if path == "/" or path == "":
|
||
uri = "cloudreve://my/"
|
||
else:
|
||
# 清理路径,确保不包含cloudreve://my前缀
|
||
normalized_path = path.strip("/").replace("\\", "/").replace("cloudreve://my/", "")
|
||
# 使用quote_plus正确编码路径,确保特殊字符被正确处理
|
||
encoded_path = quote_plus(normalized_path)
|
||
# 由于quote_plus会将斜杠也编码,我们需要恢复它们
|
||
encoded_path = encoded_path.replace("%2F", "/")
|
||
uri = f"cloudreve://my/{encoded_path}"
|
||
|
||
# 确保路径格式正确,移除重复的前缀
|
||
uri = uri.replace("cloudreve://my/cloudreve://my", "cloudreve://my")
|
||
|
||
# 添加必需的分页参数
|
||
params = {
|
||
"uri": uri,
|
||
"page": 0, # 从第一页开始
|
||
"page_size": 100 # 使用合理的默认值
|
||
}
|
||
|
||
logger.debug(f"发送文件列表请求: URI={uri}")
|
||
r = self.request("GET", url, params=params)
|
||
|
||
# 转换响应格式以保持向后兼容
|
||
# 根据API规范,文件列表在data.files中
|
||
if isinstance(r, dict) and "data" in r and "files" in r["data"]:
|
||
return {
|
||
"code": 0,
|
||
"data": r["data"]["files"]
|
||
}
|
||
else:
|
||
logger.warning(f"无效的响应格式: {r}")
|
||
return {"code": 0, "data": []}
|
||
|
||
def getPolicy(self):
|
||
"""获取用户存储策略"""
|
||
url = "/user/setting/policies"
|
||
r = self.request("GET", url)
|
||
|
||
# 转换响应格式
|
||
if isinstance(r, list):
|
||
return r
|
||
else:
|
||
return {"code": 0, "data": []}
|
||
|
||
def changePolicy(self, path, policy):
|
||
"""修改用户存储策略"""
|
||
url = "/file/move"
|
||
r = self.request("POST", url, json={"items": [path], "dst": path, "policy": policy})
|
||
|
||
# 转换响应格式
|
||
if r.get("count") == 1:
|
||
return {"code": 0, "msg": "存储策略修改成功"}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "存储策略修改失败")}
|
||
|
||
def createFolder(self, name):
|
||
"""创建文件夹"""
|
||
url = "/file/create"
|
||
currentPath = policyConfig.returnCurrentPath()
|
||
# 根据Cloudreve V4 API规范构建uri参数,确保不会有重复斜杠和前缀
|
||
# 清理currentPath,确保不包含cloudreve://my前缀
|
||
clean_current_path = currentPath.replace("cloudreve://my", "")
|
||
|
||
if clean_current_path == "/":
|
||
uri = f"cloudreve://my/{name}"
|
||
else:
|
||
uri = f"cloudreve://my{clean_current_path}/{name}"
|
||
|
||
# 确保路径格式正确,移除重复的前缀
|
||
uri = uri.replace("cloudreve://my/cloudreve://my", "cloudreve://my")
|
||
|
||
# 更健壮地处理重复文件名的情况
|
||
path_parts = uri.split('/')
|
||
if len(path_parts) > 1:
|
||
# 检查最后一个部分是否是名称
|
||
if path_parts[-1] == name:
|
||
# 检查倒数第二个部分是否也是名称
|
||
if len(path_parts) > 2 and path_parts[-2] == name:
|
||
# 移除重复的名称部分
|
||
path_parts.pop(-2)
|
||
uri = '/'.join(path_parts)
|
||
r = self.request("POST", url, json={"uri": uri, "type": "folder", "err_on_conflict": True})
|
||
|
||
# 转换响应格式
|
||
if r.get("data") and r.get("code") == 0:
|
||
return {"code": 0, "msg": "文件夹创建成功"}
|
||
else:
|
||
return {"code": -1, "msg": r.get("msg", "文件夹创建失败")}
|
||
|
||
def deleteFile(self, fileUri, fileType: Literal["file", "dir"]):
|
||
"""删除文件 (Cloudreve V4 API)"""
|
||
url = "/file"
|
||
|
||
# 现在调用方已经传入了正确格式的URI
|
||
logger.debug(f"删除文件,URI: {fileUri}")
|
||
|
||
# 根据Cloudreve V4 API规范,使用uris参数列表
|
||
deleteData = {
|
||
"uris": [fileUri]
|
||
}
|
||
r = self.request("DELETE", url, json=deleteData)
|
||
|
||
# 转换响应格式
|
||
# Cloudreve V4 响应格式:{code: 0} 表示成功
|
||
if isinstance(r, dict) and r.get("code") == 0:
|
||
return {"code": 0, "msg": "删除成功"}
|
||
else:
|
||
error_msg = r.get("msg", r.get("error", "删除失败"))
|
||
return {"code": -1, "msg": error_msg}
|
||
|
||
def wareSearch(
|
||
self,
|
||
searchContent,
|
||
searchType: Literal["keyword", "internalTag", "externalTag"],
|
||
):
|
||
"""搜索文件 (Cloudreve V4 API)"""
|
||
# 根据Cloudreve V4 API,使用/file端点并添加搜索参数
|
||
url = "/file"
|
||
|
||
# 使用Cloudreve V4的URI格式
|
||
uri = "cloudreve://my/"
|
||
|
||
# 构建搜索参数
|
||
params = {
|
||
"uri": uri,
|
||
"keyword": searchContent,
|
||
"page": 0,
|
||
"page_size": 100
|
||
}
|
||
|
||
# 根据搜索类型调整参数
|
||
if searchType == "internalTag":
|
||
params["type"] = "internal"
|
||
elif searchType == "externalTag":
|
||
params["type"] = "tag"
|
||
|
||
logger.debug(f"发送文件搜索请求: 关键词={searchContent}, 类型={searchType}")
|
||
r = self.request("GET", url, params=params)
|
||
|
||
# 转换响应格式以保持向后兼容
|
||
if isinstance(r, dict):
|
||
if "data" in r:
|
||
if "files" in r["data"]:
|
||
# Cloudreve V4 API 格式
|
||
return {"code": 0, "data": {"objects": r["data"]["files"]}}
|
||
elif isinstance(r["data"], list):
|
||
# 如果data是列表,直接使用
|
||
return {"code": 0, "data": {"objects": r["data"]}}
|
||
|
||
logger.warning(f"搜索响应格式不正确: {r}")
|
||
return {"code": 0, "data": {"objects": []}}
|
||
|
||
def shareSearch(self, keyword, orderBy, order, page):
|
||
"""搜索分享 (Cloudreve V4 API)"""
|
||
# 使用正确的API端点 - Cloudreve V4使用/share端点获取分享列表
|
||
url = "/share"
|
||
params = {
|
||
"page": page,
|
||
"page_size": 50, # 添加默认页面大小以避免"PageSize cannot be empty"错误
|
||
"order_by": orderBy,
|
||
"order": order,
|
||
"keyword": keyword, # 根据实际API支持情况调整
|
||
}
|
||
r = self.request("GET", url, params=params)
|
||
return r
|
||
|
||
def deleteTag(self, tagId):
|
||
"""删除标签"""
|
||
url = f"/tag/{tagId}"
|
||
r = self.request("DELETE", url)
|
||
|
||
# 转换响应格式
|
||
if r is None or (isinstance(r, dict) and not r.get("error")):
|
||
return {"code": 0, "msg": "标签删除成功"}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "标签删除失败")}
|
||
|
||
def addTag(self, name, expression):
|
||
"""添加标签"""
|
||
url = "/tag/filter"
|
||
jsons = {
|
||
"expression": expression,
|
||
"name": name,
|
||
"color": "#ff9800",
|
||
"icon": "Circle",
|
||
}
|
||
r = self.request("POST", url, json=jsons)
|
||
|
||
# 转换响应格式
|
||
if r.get("id"):
|
||
return {"code": 0, "msg": "标签添加成功", "data": r}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "标签添加失败")}
|
||
|
||
def getShareFileInfo(self, shareId):
|
||
"""获取分享文件信息"""
|
||
url = f"/share/{shareId}"
|
||
r = self.request("GET", url)
|
||
|
||
# 转换响应格式
|
||
if r.get("id"):
|
||
return {"code": 0, "data": r}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "获取分享信息失败")}
|
||
|
||
def updateFileContent(self, fileId, content):
|
||
"""更新文件内容"""
|
||
url = f"/file/content/{fileId}"
|
||
headers = {
|
||
"Content-Type": "text/plain"
|
||
}
|
||
r = self.request("PUT", url, data=content.encode("utf-8"), headers=headers)
|
||
|
||
# 转换响应格式
|
||
if r.get("size") is not None:
|
||
return {"code": 0, "msg": "文件内容更新成功"}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "文件内容更新失败")}
|
||
|
||
def renameFile(self, fileUri, newName, fileType="file"):
|
||
"""重命名文件 (Cloudreve V4 API)"""
|
||
url = "/file/rename"
|
||
|
||
# 确保传入的是正确格式的URI
|
||
if not fileUri.startswith("cloudreve://"):
|
||
logger.error(f"无效的URI格式: {fileUri},必须以cloudreve://开头")
|
||
return {"code": -1, "msg": "无效的URI格式"}
|
||
|
||
# 验证新名称不为空
|
||
if not newName or not newName.strip():
|
||
logger.error("新文件名不能为空")
|
||
return {"code": -1, "msg": "NewName cannot be empty"}
|
||
|
||
# 根据文件类型设置正确的type值
|
||
type_value = "file" if fileType == "file" else "dir"
|
||
|
||
# 构建重命名请求数据,符合Cloudreve V4 API规范
|
||
data = {
|
||
"type": type_value,
|
||
"uri": fileUri,
|
||
"new_name": newName.strip()
|
||
}
|
||
|
||
# 添加详细日志,记录完整的请求数据
|
||
logger.debug(f"重命名文件请求数据 - URI: {fileUri}, 新名称: {newName.strip()}, 类型: {type_value}, 完整数据: {data}")
|
||
|
||
r = self.request("POST", url, json=data)
|
||
|
||
# 记录响应
|
||
logger.debug(f"重命名文件响应: {r}")
|
||
|
||
# 转换响应格式
|
||
if isinstance(r, dict):
|
||
if r.get("code") == 0:
|
||
return {"code": 0, "msg": "重命名成功"}
|
||
else:
|
||
error_msg = r.get("msg", r.get("error", "重命名失败"))
|
||
return {"code": -1, "msg": error_msg}
|
||
|
||
logger.error(f"重命名响应格式不正确: {r}")
|
||
return {"code": -1, "msg": "重命名失败,响应格式不正确"}
|
||
|
||
def getFileUrl(self, fileUri, redirect=False):
|
||
"""获取文件临时下载URL (Cloudreve V4 API)"""
|
||
url = "/file/url"
|
||
|
||
# 确保传入的是正确格式的URI
|
||
if not fileUri.startswith("cloudreve://"):
|
||
logger.error(f"无效的URI格式: {fileUri},必须以cloudreve://开头")
|
||
return {"code": -1, "msg": "无效的URI格式"}
|
||
|
||
# 修复文件名重复问题 - 保留URI编码文件名,删除后面的原始文件名
|
||
if fileUri and '/' in fileUri:
|
||
import urllib.parse
|
||
parts = fileUri.split('/')
|
||
|
||
# 检查是否存在潜在的文件名重复情况
|
||
if len(parts) >= 2:
|
||
# 解码最后一个部分,检查是否与倒数第二个部分解码后相同
|
||
try:
|
||
decoded_last = urllib.parse.unquote(parts[-1])
|
||
decoded_prev = urllib.parse.unquote(parts[-2])
|
||
|
||
# 如果解码后相同,或者最后一个部分是原始文件名(没有编码)
|
||
if decoded_last == decoded_prev or parts[-1] == decoded_prev:
|
||
# 保留编码的文件名部分,移除后面的重复部分
|
||
fileUri = '/'.join(parts[:-1])
|
||
logger.debug(f"修复了重复文件名的URI: {fileUri}")
|
||
except Exception as e:
|
||
logger.debug(f"解码URI部分时出错: {str(e)}")
|
||
|
||
data = {
|
||
"uris": [fileUri],
|
||
"redirect": redirect,
|
||
"download": False # 设置为False以获取预览URL
|
||
}
|
||
logger.debug(f"获取文件URL,URI: {fileUri}")
|
||
r = self.request("POST", url, json=data)
|
||
|
||
# 转换响应格式
|
||
# 直接返回API响应,让调用者处理不同的返回格式
|
||
logger.debug(f"getFileUrl API响应: {r}")
|
||
|
||
# 如果是成功的响应格式,确保返回完整的响应对象
|
||
if isinstance(r, dict):
|
||
# 兼容不同的成功响应格式
|
||
if "urls" in r and isinstance(r["urls"], list):
|
||
# 清理URL中的反引号和多余空格
|
||
for url_item in r["urls"]:
|
||
if isinstance(url_item, dict) and "url" in url_item:
|
||
url_item["url"] = url_item["url"].strip('` ')
|
||
return r
|
||
elif r.get("code") == 0 and "data" in r:
|
||
# Cloudreve标准格式
|
||
return r
|
||
elif r.get("code") != 0:
|
||
# 错误响应
|
||
return r
|
||
|
||
# 如果响应不符合预期格式,返回错误
|
||
logger.error(f"getFileUrl响应格式不符合预期: {r}")
|
||
return {"code": -1, "msg": "获取文件URL失败,响应格式错误"}
|
||
|
||
def getFileContent(self, fileUri):
|
||
"""获取文件内容 (Cloudreve V4 API - PUT /file/content)"""
|
||
url = "/file/content"
|
||
|
||
# 确保传入的是正确格式的URI
|
||
if not fileUri.startswith("cloudreve://"):
|
||
logger.error(f"无效的URI格式: {fileUri},必须以cloudreve://开头")
|
||
return {"code": -1, "msg": "无效的URI格式"}
|
||
|
||
# 修复文件名重复问题 - 保留URI编码文件名,删除后面的原始文件名
|
||
if fileUri and '/' in fileUri:
|
||
import urllib.parse
|
||
parts = fileUri.split('/')
|
||
|
||
# 检查是否存在潜在的文件名重复情况
|
||
if len(parts) >= 2:
|
||
# 解码最后一个部分,检查是否与倒数第二个部分解码后相同
|
||
try:
|
||
decoded_last = urllib.parse.unquote(parts[-1])
|
||
decoded_prev = urllib.parse.unquote(parts[-2])
|
||
|
||
# 如果解码后相同,或者最后一个部分是原始文件名(没有编码)
|
||
if decoded_last == decoded_prev or parts[-1] == decoded_prev:
|
||
# 保留编码的文件名部分,移除后面的重复部分
|
||
fileUri = '/'.join(parts[:-1])
|
||
logger.debug(f"getFileContent - 修复了重复文件名的URI: {fileUri}")
|
||
except Exception as e:
|
||
logger.debug(f"解码URI部分时出错: {str(e)}")
|
||
|
||
# 构建查询参数
|
||
params = {"uri": fileUri}
|
||
|
||
logger.debug(f"获取文件内容,URI: {fileUri}")
|
||
|
||
try:
|
||
# 使用PUT请求获取文件内容
|
||
r = self.request("PUT", url, params=params)
|
||
|
||
# 转换响应格式
|
||
if isinstance(r, bytes):
|
||
# 直接返回二进制内容
|
||
return {"code": 0, "data": r}
|
||
elif isinstance(r, dict):
|
||
if r.get("code") == 0:
|
||
# 确保返回的数据是二进制格式
|
||
data = r.get("data")
|
||
if isinstance(data, bytes):
|
||
return {"code": 0, "data": data}
|
||
elif isinstance(data, dict) or isinstance(data, list):
|
||
# 如果data是字典或列表,说明可能是API响应错误,使用URL方式获取
|
||
logger.warning(f"获取到的数据是{type(data).__name__}类型,不是二进制数据,将尝试使用URL方式")
|
||
|
||
# 尝试使用getFileUrl获取临时URL,然后下载内容
|
||
url_response = self.getFileUrl(fileUri)
|
||
if url_response.get("code") == 0 and "data" in url_response and "urls" in url_response["data"]:
|
||
urls = url_response["data"]["urls"]
|
||
if urls and isinstance(urls[0], dict) and "url" in urls[0]:
|
||
temp_url = urls[0]["url"]
|
||
# 使用session获取文件内容
|
||
session_response = self.returnSession().get(temp_url, stream=True, timeout=(15, 30))
|
||
session_response.raise_for_status()
|
||
binary_content = session_response.content
|
||
logger.info(f"成功从临时URL获取文件内容,大小: {len(binary_content)} 字节")
|
||
return {"code": 0, "data": binary_content}
|
||
else:
|
||
# 尝试将其他类型转换为二进制
|
||
try:
|
||
if isinstance(data, str):
|
||
return {"code": 0, "data": data.encode('utf-8')}
|
||
else:
|
||
return {"code": 0, "data": str(data).encode('utf-8')}
|
||
except Exception:
|
||
pass
|
||
# 错误响应
|
||
return {"code": r.get("code", -1), "msg": r.get("msg", "获取文件内容失败")}
|
||
else:
|
||
# 未知响应类型,尝试转换为二进制
|
||
try:
|
||
return {"code": 0, "data": str(r).encode('utf-8')}
|
||
except Exception:
|
||
return {"code": -1, "msg": f"未知的响应类型: {type(r).__name__}"}
|
||
except Exception as e:
|
||
logger.error(f"获取文件内容时发生异常: {str(e)}")
|
||
return {"code": -1, "msg": f"获取文件内容失败: {str(e)}"}
|
||
|
||
def updateFileContent(self, fileUri, content, previous_version=None):
|
||
"""更新文件内容 (Cloudreve V4 API - PUT /file/content)"""
|
||
url = "/file/content"
|
||
|
||
# 确保传入的是正确格式的URI
|
||
if not fileUri.startswith("cloudreve://"):
|
||
logger.error(f"无效的URI格式: {fileUri},必须以cloudreve://开头")
|
||
return {"code": -1, "msg": "无效的URI格式"}
|
||
|
||
# 构建查询参数
|
||
params = {"uri": fileUri}
|
||
if previous_version:
|
||
params["previous"] = previous_version
|
||
|
||
# 设置请求头
|
||
headers = {
|
||
"Content-Length": str(len(content))
|
||
}
|
||
|
||
logger.debug(f"更新文件内容,URI: {fileUri}")
|
||
r = self.request("PUT", url, params=params, data=content, headers=headers)
|
||
|
||
# 转换响应格式
|
||
if isinstance(r, dict) and r.get("code") == 0:
|
||
return {"code": 0, "data": r.get("data", {})}
|
||
else:
|
||
error_msg = r.get("msg", r.get("error", "更新文件内容失败"))
|
||
return {"code": -1, "msg": error_msg}
|
||
|
||
def createViewerSession(self, fileUri, viewer_id, preferred_action="view", version=None, parent_uri=None):
|
||
"""创建查看器会话 (Cloudreve V4 API - PUT /file/viewerSession)"""
|
||
url = "/file/viewerSession"
|
||
|
||
# 确保传入的是正确格式的URI
|
||
if not fileUri.startswith("cloudreve://"):
|
||
logger.error(f"无效的URI格式: {fileUri},必须以cloudreve://开头")
|
||
return {"code": -1, "msg": "无效的URI格式"}
|
||
|
||
# 构建请求体
|
||
data = {
|
||
"uri": fileUri,
|
||
"viewer_id": viewer_id,
|
||
"preferred_action": preferred_action
|
||
}
|
||
|
||
# 添加可选参数
|
||
if version:
|
||
data["version"] = version
|
||
if parent_uri:
|
||
data["parent_uri"] = parent_uri
|
||
|
||
logger.debug(f"创建查看器会话,URI: {fileUri}, viewer_id: {viewer_id}")
|
||
r = self.request("PUT", url, json=data)
|
||
|
||
# 转换响应格式
|
||
if isinstance(r, dict) and r.get("code") == 0:
|
||
return {"code": 0, "data": r.get("data", {})}
|
||
else:
|
||
error_msg = r.get("msg", r.get("error", "创建查看器会话失败"))
|
||
return {"code": -1, "msg": error_msg}
|
||
|
||
def updateUserNickname(self, nickName):
|
||
"""更新用户昵称 (Cloudreve V4 API)"""
|
||
url = "/user/profile"
|
||
data = {
|
||
"nick": nickName
|
||
}
|
||
r = self.request("PUT", url, json=data)
|
||
|
||
# 转换响应格式
|
||
if isinstance(r, dict) and not r.get("error"):
|
||
return {"code": 0, "msg": "昵称更新成功"}
|
||
else:
|
||
return {"code": -1, "msg": r.get("error", "昵称更新失败")}
|
||
|
||
|
||
# 创建全局实例,方便其他模块直接使用
|
||
basicApi = MiaoStarsBasicApi()
|