Files
leonpan-pc/app/core/api/basicApi.py

929 lines
38 KiB
Python
Raw Normal View History

2025-10-29 22:20:21 +08:00
"""这里存放基本API包括:
登录注册图形验证码获取
用户配置获取用户头像获取
用户存储策略获取用户仓内文件获取
"""
import time
from typing import Literal, Optional
from urllib.parse import quote_plus
import requests
from loguru import logger
from PyQt6.QtCore import QBuffer, QByteArray, QIODevice
from PyQt6.QtGui import QPixmap
from ..utils import getCode
from ..utils.config import policyConfig, userConfig
class MiaoStarsBasicApi:
_publicHeader = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
}
def __init__(self, token: Optional[str] = None):
self.basicApi = "http://leonmmcoset.jjxmm.win:5212/api/v4"
self.session = requests.Session()
self.session.verify = True
self.session.headers.update(MiaoStarsBasicApi._publicHeader)
# Cloudreve V4 使用 JWT 认证
# 优先使用传入的token如果没有则尝试从userConfig获取
self.token = token or userConfig.getToken()
if self.token:
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
self.userId = None
def returnSession(self) -> requests.Session:
return self.session
def setToken(self, token: str):
"""设置 JWT token 并同步到全局配置"""
# 确保token是字符串类型
if isinstance(token, str):
self.token = token
self.session.headers.update({"Authorization": f"Bearer {token}"})
# 同步更新到全局userConfig确保认证信息持久化
userConfig.setToken(token)
elif isinstance(token, dict) and token.get("access_token"):
# 兼容处理如果传入的是token对象提取access_token
access_token = token["access_token"]
self.token = access_token
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
userConfig.setToken(access_token)
def request(
self, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"], url, **kwargs
) -> dict:
maxRetries = 3
timeout = 20
for attempt in range(maxRetries):
try:
# 发起请求
response = self.session.request(
method=method, url=self.basicApi + url, timeout=timeout, **kwargs
)
# 检查HTTP状态码
response.raise_for_status()
# 解析JSON响应
try:
r = response.json()
# 保留原始响应格式不要过早转换Cloudreve V4的响应
# Cloudreve V4的错误响应通常包含{code, msg},而不是{error}
# 让具体的API方法来处理响应格式转换
return r
except:
return response.content
except requests.exceptions.RequestException as e:
# 网络相关错误,进行重试
if attempt < maxRetries - 1:
time.sleep(1) # 简单固定延迟
continue
else:
logger.error(f"请求失败: {str(e)}")
return {"code": -1, "msg": str(e)}
except Exception as e:
logger.error(f"处理响应时出错: {str(e)}")
return {"code": -1, "msg": str(e)}
def getCaptcha(self):
"""获取图形验证码 (Cloudreve V4 API)"""
import logging
logger = logging.getLogger(__name__)
logger.debug(f"请求验证码API: {self.basicApi}/site/captcha")
r = self.request(method="GET", url="/site/captcha")
logger.debug(f"验证码API响应: {r}")
# 根据Cloudreve V4 API文档响应格式为
# {"code": 0, "data": {"image": "data:image/png;base64,...", "ticket": "..."}, "msg": ""}
# 转换响应格式以保持与前端的兼容性
if isinstance(r, dict):
# 检查是否是标准的Cloudreve V4成功响应
if r.get("code") == 0 and isinstance(r.get("data"), dict):
data = r["data"]
# 确保image字段存在且是有效的base64格式
if "image" in data:
# 保存ticket到实例变量供后续登录/注册使用
self.captcha_ticket = data.get("ticket", "")
logger.debug(f"成功获取验证码已保存ticket: {self.captcha_ticket}")
# 前端代码在CaptchaThread.run()中使用response["data"].split(",")[1]
# 所以需要确保返回的data格式符合前端期望
captcha_image = data["image"]
# 检查是否已经包含data:image/png;base64,前缀
if captcha_image.startswith("data:image/png;base64,"):
# 保持原样让前端去split
result = {"code": 0, "data": captcha_image}
else:
# 如果没有前缀加上前缀以确保前端能正确split
result = {"code": 0, "data": f"data:image/png;base64,{captcha_image}"}
logger.debug(f"返回前端验证码数据格式: {result}")
return result
# 处理可能的错误响应
elif "msg" in r:
logger.error(f"验证码API错误: {r['msg']}")
return {"code": -1, "msg": r["msg"]}
elif "error" in r:
logger.error(f"验证码API错误: {r['error']}")
return {"code": -1, "msg": r["error"]}
else:
logger.error(f"验证码API返回非字典格式: {type(r)}")
# 默认返回失败格式
logger.error("获取验证码失败,返回默认错误格式")
return {"code": -1, "msg": "获取验证码失败"}
def login(self, username, password, captcha):
"""登录 (Cloudreve V4 JWT认证)"""
url = "/session/token"
# Cloudreve V4 API参数规范使用email, password, captcha和ticket
payload = {
"email": username, # 更正参数名username -> email
"password": password,
"captcha": captcha # 更正参数名captcha_code -> captcha
}
# 如果有保存的ticket则添加到请求中使用正确的参数名ticket
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
payload['ticket'] = self.captcha_ticket # 更正参数名captcha_ticket -> ticket
r = self.request(
"POST",
url,
json=payload,
)
# 输出服务器返回的原始信息到控制台
print(f"登录API服务器返回原始信息: {r}")
# 处理登录响应
# Cloudreve V4的响应格式{code: 0, data: {user: {...}, token: {...}}, msg: ""}
if isinstance(r, dict):
# 检查是否是成功响应
if r.get("code") == 0 and r.get("data"):
data = r["data"]
# 设置 JWT token
if data.get("token") and isinstance(data["token"], dict):
# 提取access_token字段作为Bearer token值
access_token = data["token"].get("access_token")
if access_token:
# 设置到当前API实例
self.setToken(access_token)
# 同时保存到全局userConfig中确保认证信息持久化
userConfig.setToken(access_token)
# 存储用户信息
user_info = {
"code": 0,
"data": {
"id": data.get("user", {}).get("id"),
"nick": data.get("user", {}).get("nickname", data.get("user", {}).get("nick")),
"email": username
}
}
self.userId = data.get("user", {}).get("id")
return user_info
# 处理错误响应
# 优先检查msg字段因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
error_msg = r.get("msg", r.get("error", "登录失败"))
return {"code": -1, "msg": error_msg}
return {"code": -1, "msg": "登录失败:响应格式错误"}
def register(self, username, password, captcha):
"""注册"""
url = "/user"
# Cloudreve V4 API参数规范使用email, password, captcha和ticket
payload = {
"email": username,
"password": password,
"captcha": captcha # 更正参数名captcha_code -> captcha
}
# 如果有保存的ticket则添加到请求中使用正确的参数名ticket
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
payload['ticket'] = self.captcha_ticket # 更正参数名captcha_ticket -> ticket
r = self.request(
"POST",
url,
json=payload,
)
# 输出服务器返回的原始信息到控制台
print(f"注册API服务器返回原始信息: {r}")
# 处理注册响应
# Cloudreve V4的响应格式{code: 0, data: {...}, msg: ""} 或 {code: 错误码, msg: "错误信息"}
if isinstance(r, dict):
# 检查是否是成功响应
if r.get("code") == 0 or r.get("id"):
return {"code": 203, "msg": "注册成功"}
# 处理错误响应
# 优先检查msg字段因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
error_msg = r.get("msg", r.get("error", "注册失败"))
return {"code": -1, "msg": error_msg}
return {"code": -1, "msg": "注册失败:响应格式错误"}
def updateUserNickname(self, nickname):
"""更新用户昵称"""
url = "/user/profile"
r = self.request(
"PUT",
url,
json={"nick": nickname},
)
# 转换响应格式
if r.get("id"):
return {"code": 0, "msg": "更新成功"}
else:
return {"code": -1, "msg": r.get("error", "更新失败")}
def updateUserAvatar(self, qimage):
"""更新用户头像 - 专门处理 QImage 对象"""
url = "/user/avatar"
# 检查 QImage 是否有效
if qimage.isNull():
return {"code": -1, "msg": "QImage 对象为空"}
# 将 QImage 转换为 JPEG 格式字节数据
byte_array = QByteArray()
buffer = QBuffer(byte_array)
buffer.open(QIODevice.WriteOnly)
# 转换为 JPEG 格式,质量设为 90
success = qimage.save(buffer, "JPEG", 90)
buffer.close()
if not success:
return {"code": -1, "msg": "图像转换失败"}
# 获取字节数据
file_data = byte_array.data()
# 构建 multipart 请求
files = {
'avatar': ('avatar.jpg', file_data, 'image/jpeg')
}
# 移除 Content-Type 以便 requests 自动设置
headers = self.session.headers.copy()
if 'Content-Type' in headers:
del headers['Content-Type']
r = self.request("POST", url, files=files, headers=headers)
# 转换响应格式
if r.get("url"):
return {"code": 0, "msg": "头像更新成功"}
else:
return {"code": -1, "msg": r.get("error", "头像更新失败")}
def getUserAvatar(self, size: Literal["s", "m", "l"]):
"""获取用户头像"""
# Cloudreve V4 获取用户信息以获取头像URL
user_info = self.getUserInfo()
2025-10-30 15:50:40 +08:00
logger.info(f"用户信息API返回{user_info}")
2025-10-29 22:20:21 +08:00
if user_info.get("code") != 0:
return QPixmap(":app/images/logo.png")
avatar_url = user_info.get("data", {}).get("avatar")
if not avatar_url:
return QPixmap(":app/images/logo.png")
try:
# 直接下载头像图片
response = requests.get(avatar_url, timeout=10)
response.raise_for_status()
pixmap = QPixmap()
if pixmap.loadFromData(response.content):
userConfig.setUserAvatarPixmap(pixmap)
return pixmap
else:
return QPixmap(":app/images/logo.png")
except Exception as e:
logger.error(f"获取头像失败: {e}")
return QPixmap(":app/images/logo.png")
def getUserInfo(self):
"""获取用户信息 (Cloudreve V4 API)"""
# 使用正确的API端点
if self.userId:
url = f"/user/info/{self.userId}"
else:
# 如果没有userId尝试获取当前用户信息
url = "/user/profile" # 这可能需要根据实际情况调整
r = self.request("GET", url)
# 转换响应格式
if isinstance(r, dict):
2025-10-30 15:50:40 +08:00
return r
2025-10-29 22:20:21 +08:00
else:
return {"code": -1, "msg": "获取用户信息失败"}
def getUserPack(self):
"""获取用户存储详细 (Cloudreve V4 API)"""
# 使用正确的API端点
url = "/user/capacity"
r = self.request("GET", url)
# 转换响应格式以保持向后兼容
2025-11-02 19:17:20 +08:00
if isinstance(r, dict) and r.get("code") == 0 and isinstance(r.get("data"), dict):
data = r.get("data", {})
2025-10-29 22:20:21 +08:00
return {
"code": 0,
"data": {
2025-11-02 19:17:20 +08:00
"base": data.get("total", 0) - data.get("storage_pack_total", 0),
"pack": data.get("storage_pack_total", 0),
"used": data.get("used", 0),
"total": data.get("total", 0),
2025-10-29 22:20:21 +08:00
"packs": []
}
}
else:
return {
"code": 0,
"data": {
"base": 0,
"pack": 0,
"used": 0,
"total": 0,
"packs": []
}
}
def list(self, path="/"):
"""列出用户仓内文件 (Cloudreve V4 API)"""
# 使用正确的API端点和必需参数
url = "/file"
# 将path转换为Cloudreve V4要求的URI格式
# 根目录映射到 cloudreve://my/
if path == "/" or path == "":
uri = "cloudreve://my/"
else:
2025-11-01 20:14:35 +08:00
# 清理路径确保不包含cloudreve://my前缀
normalized_path = path.strip("/").replace("\\", "/").replace("cloudreve://my/", "")
2025-10-29 22:20:21 +08:00
# 使用quote_plus正确编码路径确保特殊字符被正确处理
encoded_path = quote_plus(normalized_path)
# 由于quote_plus会将斜杠也编码我们需要恢复它们
encoded_path = encoded_path.replace("%2F", "/")
uri = f"cloudreve://my/{encoded_path}"
2025-11-01 20:14:35 +08:00
# 确保路径格式正确,移除重复的前缀
uri = uri.replace("cloudreve://my/cloudreve://my", "cloudreve://my")
2025-10-29 22:20:21 +08:00
# 添加必需的分页参数
params = {
"uri": uri,
"page": 0, # 从第一页开始
"page_size": 100 # 使用合理的默认值
}
logger.debug(f"发送文件列表请求: URI={uri}")
r = self.request("GET", url, params=params)
# 转换响应格式以保持向后兼容
# 根据API规范文件列表在data.files中
if isinstance(r, dict) and "data" in r and "files" in r["data"]:
return {
"code": 0,
"data": r["data"]["files"]
}
else:
logger.warning(f"无效的响应格式: {r}")
return {"code": 0, "data": []}
def getPolicy(self):
"""获取用户存储策略"""
2025-11-01 20:14:35 +08:00
url = "/user/setting/policies"
2025-10-29 22:20:21 +08:00
r = self.request("GET", url)
# 转换响应格式
if isinstance(r, list):
2025-11-01 20:14:35 +08:00
return r
2025-10-29 22:20:21 +08:00
else:
return {"code": 0, "data": []}
def changePolicy(self, path, policy):
"""修改用户存储策略"""
url = "/file/move"
r = self.request("POST", url, json={"items": [path], "dst": path, "policy": policy})
# 转换响应格式
if r.get("count") == 1:
return {"code": 0, "msg": "存储策略修改成功"}
else:
return {"code": -1, "msg": r.get("error", "存储策略修改失败")}
def createFolder(self, name):
"""创建文件夹"""
url = "/file/create"
currentPath = policyConfig.returnCurrentPath()
2025-11-01 20:14:35 +08:00
# 根据Cloudreve V4 API规范构建uri参数确保不会有重复斜杠和前缀
# 清理currentPath确保不包含cloudreve://my前缀
clean_current_path = currentPath.replace("cloudreve://my", "")
if clean_current_path == "/":
2025-10-29 22:20:21 +08:00
uri = f"cloudreve://my/{name}"
else:
2025-11-01 20:14:35 +08:00
uri = f"cloudreve://my{clean_current_path}/{name}"
# 确保路径格式正确,移除重复的前缀
uri = uri.replace("cloudreve://my/cloudreve://my", "cloudreve://my")
# 更健壮地处理重复文件名的情况
path_parts = uri.split('/')
if len(path_parts) > 1:
# 检查最后一个部分是否是名称
if path_parts[-1] == name:
# 检查倒数第二个部分是否也是名称
if len(path_parts) > 2 and path_parts[-2] == name:
# 移除重复的名称部分
path_parts.pop(-2)
uri = '/'.join(path_parts)
2025-10-29 22:20:21 +08:00
r = self.request("POST", url, json={"uri": uri, "type": "folder", "err_on_conflict": True})
# 转换响应格式
if r.get("data") and r.get("code") == 0:
return {"code": 0, "msg": "文件夹创建成功"}
else:
return {"code": -1, "msg": r.get("msg", "文件夹创建失败")}
2025-11-01 20:14:35 +08:00
def deleteFile(self, fileUri, fileType: Literal["file", "dir"]):
"""删除文件 (Cloudreve V4 API)"""
url = "/file"
# 现在调用方已经传入了正确格式的URI
logger.debug(f"删除文件URI: {fileUri}")
# 根据Cloudreve V4 API规范使用uris参数列表
2025-10-29 22:20:21 +08:00
deleteData = {
2025-11-01 20:14:35 +08:00
"uris": [fileUri]
2025-10-29 22:20:21 +08:00
}
2025-11-01 20:14:35 +08:00
r = self.request("DELETE", url, json=deleteData)
2025-10-29 22:20:21 +08:00
# 转换响应格式
2025-11-01 20:14:35 +08:00
# Cloudreve V4 响应格式:{code: 0} 表示成功
if isinstance(r, dict) and r.get("code") == 0:
2025-10-29 22:20:21 +08:00
return {"code": 0, "msg": "删除成功"}
else:
2025-11-01 20:14:35 +08:00
error_msg = r.get("msg", r.get("error", "删除失败"))
return {"code": -1, "msg": error_msg}
2025-10-29 22:20:21 +08:00
def wareSearch(
self,
searchContent,
searchType: Literal["keyword", "internalTag", "externalTag"],
):
"""搜索文件 (Cloudreve V4 API)"""
2025-11-03 22:28:58 +08:00
# 根据Cloudreve V4 API使用/file端点并在URI中添加搜索条件
2025-10-29 22:20:21 +08:00
url = "/file"
2025-11-03 22:28:58 +08:00
# 构建搜索URI根据Cloudreve V4的搜索规范
# 基础URI指向用户根目录
base_uri = "cloudreve://my/"
# 根据搜索类型和内容构建查询参数
# 注意Cloudreve V4将搜索条件作为URI的查询部分
search_query = []
if searchType == "keyword" and searchContent:
# 文件名搜索
search_query.append(f"name={quote_plus(searchContent)}")
elif searchType == "internalTag" and searchContent:
# 内部标签搜索
search_query.append(f"tag:internal={quote_plus(searchContent)}")
elif searchType == "externalTag" and searchContent:
# 外部标签搜索
search_query.append(f"tag:external={quote_plus(searchContent)}")
# 构建完整URI
if search_query:
uri = f"{base_uri}?{ '&'.join(search_query) }"
else:
uri = base_uri
2025-10-29 22:20:21 +08:00
2025-11-03 22:28:58 +08:00
# 构建请求参数
2025-10-29 22:20:21 +08:00
params = {
"uri": uri,
2025-11-03 22:28:58 +08:00
"page": 0, # 从第一页开始
"page_size": 100, # 使用合理的默认值
"order_by": "updated_at", # 默认按更新时间排序
"order_direction": "desc" # 默认降序
2025-10-29 22:20:21 +08:00
}
2025-11-03 22:28:58 +08:00
logger.debug(f"发送文件搜索请求: URI={uri}, 搜索类型={searchType}, 搜索内容={searchContent}")
2025-10-29 22:20:21 +08:00
2025-11-03 22:28:58 +08:00
try:
r = self.request("GET", url, params=params)
2025-10-29 22:20:21 +08:00
2025-11-03 22:28:58 +08:00
# 转换响应格式以保持向后兼容
if isinstance(r, dict):
if r.get("code") == 0 and "data" in r:
if "files" in r["data"]:
# Cloudreve V4 API 标准格式
logger.success(f"搜索成功,找到 {len(r['data']['files'])} 个文件")
return {"code": 0, "data": {"objects": r["data"]["files"]}}
elif isinstance(r["data"], list):
# 兼容列表格式响应
return {"code": 0, "data": {"objects": r["data"]}}
else:
logger.warning(f"搜索响应缺少files字段: {r}")
return {"code": 0, "data": {"objects": []}}
else:
# API返回错误
error_msg = r.get("msg", r.get("error", "搜索失败"))
logger.error(f"搜索API返回错误: {error_msg}")
return {"code": -1, "msg": error_msg}
else:
logger.error(f"搜索响应格式无效: {type(r).__name__}")
return {"code": -1, "msg": "搜索响应格式无效"}
except Exception as e:
logger.exception(f"搜索请求异常: {str(e)}")
return {"code": -1, "msg": f"搜索请求异常: {str(e)}"}
2025-10-29 22:20:21 +08:00
2025-11-03 22:28:58 +08:00
# def shareSearch(self, keyword, orderBy, order, page):
# """搜索分享功能已移除"""
# return {"code": 0, "data": {"objects": []}}
2025-10-29 22:20:21 +08:00
def deleteTag(self, tagId):
2025-11-03 22:28:58 +08:00
"""删除标签 (Cloudreve V4 API)"""
2025-10-29 22:20:21 +08:00
url = f"/tag/{tagId}"
r = self.request("DELETE", url)
# 转换响应格式
2025-11-03 22:28:58 +08:00
# Cloudreve V4 响应格式:{code: 0} 表示成功
if isinstance(r, dict):
if r.get("code") == 0:
return {"code": 0, "msg": "标签删除成功"}
else:
error_msg = r.get("msg", "标签删除失败")
return {"code": -1, "msg": error_msg}
2025-10-29 22:20:21 +08:00
else:
2025-11-03 22:28:58 +08:00
return {"code": -1, "msg": "标签删除失败:响应格式错误"}
2025-10-29 22:20:21 +08:00
2025-11-03 22:28:58 +08:00
# 重写为不支持添加标签功能
2025-10-29 22:20:21 +08:00
def addTag(self, name, expression):
2025-11-03 22:28:58 +08:00
"""添加标签功能已禁用"""
logger.warning("添加标签功能已禁用")
return {"code": -1, "msg": "添加标签功能已禁用"}
def getMyShares(self, page=1, pageSize=10):
"""获取我的分享列表 (Cloudreve V4 API - GET /user/shares/{user-id})
Args:
page: 页码
pageSize: 每页数量
Returns:
分享列表数据
"""
# 使用"me"作为user-id表示当前用户
url = f"/user/shares/me"
params = {
"page": page,
"page_size": pageSize
2025-10-29 22:20:21 +08:00
}
2025-11-03 22:28:58 +08:00
r = self.request("GET", url, params=params)
2025-10-29 22:20:21 +08:00
2025-11-03 22:28:58 +08:00
# Cloudreve V4 响应格式处理
if isinstance(r, dict):
if r.get("code") == 0:
return {
"code": 0,
"msg": "获取分享列表成功",
"data": r.get("data", {})
}
else:
error_msg = r.get("msg", "获取分享列表失败")
return {"code": -1, "msg": error_msg}
2025-10-29 22:20:21 +08:00
else:
2025-11-03 22:28:58 +08:00
return {"code": -1, "msg": "获取分享列表失败:响应格式错误"}
2025-10-29 22:20:21 +08:00
def getShareFileInfo(self, shareId):
"""获取分享文件信息"""
url = f"/share/{shareId}"
r = self.request("GET", url)
# 转换响应格式
if r.get("id"):
return {"code": 0, "data": r}
else:
return {"code": -1, "msg": r.get("error", "获取分享信息失败")}
def updateFileContent(self, fileId, content):
"""更新文件内容"""
url = f"/file/content/{fileId}"
headers = {
"Content-Type": "text/plain"
}
r = self.request("PUT", url, data=content.encode("utf-8"), headers=headers)
# 转换响应格式
if r.get("size") is not None:
return {"code": 0, "msg": "文件内容更新成功"}
else:
return {"code": -1, "msg": r.get("error", "文件内容更新失败")}
2025-11-03 22:28:58 +08:00
def deleteShare(self, shareId):
"""删除分享链接 (Cloudreve V4 API)
Args:
shareId: 分享链接ID
Returns:
操作结果
"""
url = f"/share/{shareId}"
r = self.request("DELETE", url)
# Cloudreve V4 响应格式处理
if isinstance(r, dict):
if r.get("code") == 0:
return {"code": 0, "msg": "删除分享成功"}
else:
error_msg = r.get("msg", "删除分享失败")
return {"code": -1, "msg": error_msg}
else:
return {"code": -1, "msg": "删除分享失败:响应格式错误"}
2025-10-29 22:20:21 +08:00
2025-11-02 19:17:20 +08:00
def renameFile(self, fileUri, newName, fileType="file"):
"""重命名文件 (Cloudreve V4 API)"""
url = "/file/rename"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 验证新名称不为空
if not newName or not newName.strip():
logger.error("新文件名不能为空")
return {"code": -1, "msg": "NewName cannot be empty"}
# 根据文件类型设置正确的type值
type_value = "file" if fileType == "file" else "dir"
# 构建重命名请求数据符合Cloudreve V4 API规范
data = {
"type": type_value,
"uri": fileUri,
"new_name": newName.strip()
}
# 添加详细日志,记录完整的请求数据
logger.debug(f"重命名文件请求数据 - URI: {fileUri}, 新名称: {newName.strip()}, 类型: {type_value}, 完整数据: {data}")
r = self.request("POST", url, json=data)
# 记录响应
logger.debug(f"重命名文件响应: {r}")
# 转换响应格式
if isinstance(r, dict):
if r.get("code") == 0:
return {"code": 0, "msg": "重命名成功"}
else:
error_msg = r.get("msg", r.get("error", "重命名失败"))
return {"code": -1, "msg": error_msg}
logger.error(f"重命名响应格式不正确: {r}")
return {"code": -1, "msg": "重命名失败,响应格式不正确"}
2025-11-01 20:14:35 +08:00
def getFileUrl(self, fileUri, redirect=False):
"""获取文件临时下载URL (Cloudreve V4 API)"""
url = "/file/url"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 修复文件名重复问题 - 保留URI编码文件名删除后面的原始文件名
if fileUri and '/' in fileUri:
import urllib.parse
parts = fileUri.split('/')
# 检查是否存在潜在的文件名重复情况
if len(parts) >= 2:
# 解码最后一个部分,检查是否与倒数第二个部分解码后相同
try:
decoded_last = urllib.parse.unquote(parts[-1])
decoded_prev = urllib.parse.unquote(parts[-2])
# 如果解码后相同,或者最后一个部分是原始文件名(没有编码)
if decoded_last == decoded_prev or parts[-1] == decoded_prev:
# 保留编码的文件名部分,移除后面的重复部分
fileUri = '/'.join(parts[:-1])
logger.debug(f"修复了重复文件名的URI: {fileUri}")
except Exception as e:
logger.debug(f"解码URI部分时出错: {str(e)}")
data = {
"uris": [fileUri],
"redirect": redirect,
"download": False # 设置为False以获取预览URL
}
logger.debug(f"获取文件URLURI: {fileUri}")
r = self.request("POST", url, json=data)
# 转换响应格式
# 直接返回API响应让调用者处理不同的返回格式
logger.debug(f"getFileUrl API响应: {r}")
# 如果是成功的响应格式,确保返回完整的响应对象
if isinstance(r, dict):
# 兼容不同的成功响应格式
if "urls" in r and isinstance(r["urls"], list):
# 清理URL中的反引号和多余空格
for url_item in r["urls"]:
if isinstance(url_item, dict) and "url" in url_item:
url_item["url"] = url_item["url"].strip('` ')
return r
elif r.get("code") == 0 and "data" in r:
# Cloudreve标准格式
return r
elif r.get("code") != 0:
# 错误响应
return r
# 如果响应不符合预期格式,返回错误
logger.error(f"getFileUrl响应格式不符合预期: {r}")
return {"code": -1, "msg": "获取文件URL失败响应格式错误"}
def getFileContent(self, fileUri):
"""获取文件内容 (Cloudreve V4 API - PUT /file/content)"""
url = "/file/content"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 修复文件名重复问题 - 保留URI编码文件名删除后面的原始文件名
if fileUri and '/' in fileUri:
import urllib.parse
parts = fileUri.split('/')
# 检查是否存在潜在的文件名重复情况
if len(parts) >= 2:
# 解码最后一个部分,检查是否与倒数第二个部分解码后相同
try:
decoded_last = urllib.parse.unquote(parts[-1])
decoded_prev = urllib.parse.unquote(parts[-2])
# 如果解码后相同,或者最后一个部分是原始文件名(没有编码)
if decoded_last == decoded_prev or parts[-1] == decoded_prev:
# 保留编码的文件名部分,移除后面的重复部分
fileUri = '/'.join(parts[:-1])
logger.debug(f"getFileContent - 修复了重复文件名的URI: {fileUri}")
except Exception as e:
logger.debug(f"解码URI部分时出错: {str(e)}")
# 构建查询参数
params = {"uri": fileUri}
logger.debug(f"获取文件内容URI: {fileUri}")
try:
# 使用PUT请求获取文件内容
r = self.request("PUT", url, params=params)
# 转换响应格式
if isinstance(r, bytes):
# 直接返回二进制内容
return {"code": 0, "data": r}
elif isinstance(r, dict):
if r.get("code") == 0:
# 确保返回的数据是二进制格式
data = r.get("data")
if isinstance(data, bytes):
return {"code": 0, "data": data}
elif isinstance(data, dict) or isinstance(data, list):
# 如果data是字典或列表说明可能是API响应错误使用URL方式获取
logger.warning(f"获取到的数据是{type(data).__name__}类型不是二进制数据将尝试使用URL方式")
# 尝试使用getFileUrl获取临时URL然后下载内容
url_response = self.getFileUrl(fileUri)
if url_response.get("code") == 0 and "data" in url_response and "urls" in url_response["data"]:
urls = url_response["data"]["urls"]
if urls and isinstance(urls[0], dict) and "url" in urls[0]:
temp_url = urls[0]["url"]
# 使用session获取文件内容
session_response = self.returnSession().get(temp_url, stream=True, timeout=(15, 30))
session_response.raise_for_status()
binary_content = session_response.content
logger.info(f"成功从临时URL获取文件内容大小: {len(binary_content)} 字节")
return {"code": 0, "data": binary_content}
else:
# 尝试将其他类型转换为二进制
try:
if isinstance(data, str):
return {"code": 0, "data": data.encode('utf-8')}
else:
return {"code": 0, "data": str(data).encode('utf-8')}
except Exception:
pass
# 错误响应
return {"code": r.get("code", -1), "msg": r.get("msg", "获取文件内容失败")}
else:
# 未知响应类型,尝试转换为二进制
try:
return {"code": 0, "data": str(r).encode('utf-8')}
except Exception:
return {"code": -1, "msg": f"未知的响应类型: {type(r).__name__}"}
except Exception as e:
logger.error(f"获取文件内容时发生异常: {str(e)}")
return {"code": -1, "msg": f"获取文件内容失败: {str(e)}"}
def updateFileContent(self, fileUri, content, previous_version=None):
"""更新文件内容 (Cloudreve V4 API - PUT /file/content)"""
url = "/file/content"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 构建查询参数
params = {"uri": fileUri}
if previous_version:
params["previous"] = previous_version
# 设置请求头
headers = {
"Content-Length": str(len(content))
}
logger.debug(f"更新文件内容URI: {fileUri}")
r = self.request("PUT", url, params=params, data=content, headers=headers)
# 转换响应格式
if isinstance(r, dict) and r.get("code") == 0:
return {"code": 0, "data": r.get("data", {})}
else:
error_msg = r.get("msg", r.get("error", "更新文件内容失败"))
return {"code": -1, "msg": error_msg}
def createViewerSession(self, fileUri, viewer_id, preferred_action="view", version=None, parent_uri=None):
"""创建查看器会话 (Cloudreve V4 API - PUT /file/viewerSession)"""
url = "/file/viewerSession"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 构建请求体
data = {
"uri": fileUri,
"viewer_id": viewer_id,
"preferred_action": preferred_action
}
# 添加可选参数
if version:
data["version"] = version
if parent_uri:
data["parent_uri"] = parent_uri
logger.debug(f"创建查看器会话URI: {fileUri}, viewer_id: {viewer_id}")
r = self.request("PUT", url, json=data)
# 转换响应格式
if isinstance(r, dict) and r.get("code") == 0:
return {"code": 0, "data": r.get("data", {})}
else:
error_msg = r.get("msg", r.get("error", "创建查看器会话失败"))
return {"code": -1, "msg": error_msg}
2025-10-29 22:20:21 +08:00
def updateUserNickname(self, nickName):
"""更新用户昵称 (Cloudreve V4 API)"""
url = "/user/profile"
data = {
"nick": nickName
}
r = self.request("PUT", url, json=data)
# 转换响应格式
if isinstance(r, dict) and not r.get("error"):
return {"code": 0, "msg": "昵称更新成功"}
else:
return {"code": -1, "msg": r.get("error", "昵称更新失败")}
2025-11-01 20:14:35 +08:00
# 创建全局实例,方便其他模块直接使用
basicApi = MiaoStarsBasicApi()