Files
leonpan-pc/app/core/api/basicApi.py
2025-11-01 20:14:35 +08:00

816 lines
33 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""这里存放基本API包括:
登录、注册、图形验证码获取
用户配置获取、用户头像获取
用户存储策略获取、用户仓内文件获取
"""
import time
from typing import Literal, Optional
from urllib.parse import quote_plus
import requests
from loguru import logger
from PyQt6.QtCore import QBuffer, QByteArray, QIODevice
from PyQt6.QtGui import QPixmap
from ..utils import getCode
from ..utils.config import policyConfig, userConfig
class MiaoStarsBasicApi:
_publicHeader = {
"Content-Type": "application/json",
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36",
"Cache-Control": "no-cache, no-store, must-revalidate",
"Pragma": "no-cache",
"Expires": "0",
}
def __init__(self, token: Optional[str] = None):
self.basicApi = "http://leonmmcoset.jjxmm.win:5212/api/v4"
self.session = requests.Session()
self.session.verify = True
self.session.headers.update(MiaoStarsBasicApi._publicHeader)
# Cloudreve V4 使用 JWT 认证
# 优先使用传入的token如果没有则尝试从userConfig获取
self.token = token or userConfig.getToken()
if self.token:
self.session.headers.update({"Authorization": f"Bearer {self.token}"})
self.userId = None
def returnSession(self) -> requests.Session:
return self.session
def setToken(self, token: str):
"""设置 JWT token 并同步到全局配置"""
# 确保token是字符串类型
if isinstance(token, str):
self.token = token
self.session.headers.update({"Authorization": f"Bearer {token}"})
# 同步更新到全局userConfig确保认证信息持久化
userConfig.setToken(token)
elif isinstance(token, dict) and token.get("access_token"):
# 兼容处理如果传入的是token对象提取access_token
access_token = token["access_token"]
self.token = access_token
self.session.headers.update({"Authorization": f"Bearer {access_token}"})
userConfig.setToken(access_token)
def request(
self, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"], url, **kwargs
) -> dict:
maxRetries = 3
timeout = 20
for attempt in range(maxRetries):
try:
# 发起请求
response = self.session.request(
method=method, url=self.basicApi + url, timeout=timeout, **kwargs
)
# 检查HTTP状态码
response.raise_for_status()
# 解析JSON响应
try:
r = response.json()
# 保留原始响应格式不要过早转换Cloudreve V4的响应
# Cloudreve V4的错误响应通常包含{code, msg},而不是{error}
# 让具体的API方法来处理响应格式转换
return r
except:
return response.content
except requests.exceptions.RequestException as e:
# 网络相关错误,进行重试
if attempt < maxRetries - 1:
time.sleep(1) # 简单固定延迟
continue
else:
logger.error(f"请求失败: {str(e)}")
return {"code": -1, "msg": str(e)}
except Exception as e:
logger.error(f"处理响应时出错: {str(e)}")
return {"code": -1, "msg": str(e)}
def getCaptcha(self):
"""获取图形验证码 (Cloudreve V4 API)"""
import logging
logger = logging.getLogger(__name__)
logger.debug(f"请求验证码API: {self.basicApi}/site/captcha")
r = self.request(method="GET", url="/site/captcha")
logger.debug(f"验证码API响应: {r}")
# 根据Cloudreve V4 API文档响应格式为
# {"code": 0, "data": {"image": "data:image/png;base64,...", "ticket": "..."}, "msg": ""}
# 转换响应格式以保持与前端的兼容性
if isinstance(r, dict):
# 检查是否是标准的Cloudreve V4成功响应
if r.get("code") == 0 and isinstance(r.get("data"), dict):
data = r["data"]
# 确保image字段存在且是有效的base64格式
if "image" in data:
# 保存ticket到实例变量供后续登录/注册使用
self.captcha_ticket = data.get("ticket", "")
logger.debug(f"成功获取验证码已保存ticket: {self.captcha_ticket}")
# 前端代码在CaptchaThread.run()中使用response["data"].split(",")[1]
# 所以需要确保返回的data格式符合前端期望
captcha_image = data["image"]
# 检查是否已经包含data:image/png;base64,前缀
if captcha_image.startswith("data:image/png;base64,"):
# 保持原样让前端去split
result = {"code": 0, "data": captcha_image}
else:
# 如果没有前缀加上前缀以确保前端能正确split
result = {"code": 0, "data": f"data:image/png;base64,{captcha_image}"}
logger.debug(f"返回前端验证码数据格式: {result}")
return result
# 处理可能的错误响应
elif "msg" in r:
logger.error(f"验证码API错误: {r['msg']}")
return {"code": -1, "msg": r["msg"]}
elif "error" in r:
logger.error(f"验证码API错误: {r['error']}")
return {"code": -1, "msg": r["error"]}
else:
logger.error(f"验证码API返回非字典格式: {type(r)}")
# 默认返回失败格式
logger.error("获取验证码失败,返回默认错误格式")
return {"code": -1, "msg": "获取验证码失败"}
def login(self, username, password, captcha):
"""登录 (Cloudreve V4 JWT认证)"""
url = "/session/token"
# Cloudreve V4 API参数规范使用email, password, captcha和ticket
payload = {
"email": username, # 更正参数名username -> email
"password": password,
"captcha": captcha # 更正参数名captcha_code -> captcha
}
# 如果有保存的ticket则添加到请求中使用正确的参数名ticket
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
payload['ticket'] = self.captcha_ticket # 更正参数名captcha_ticket -> ticket
r = self.request(
"POST",
url,
json=payload,
)
# 输出服务器返回的原始信息到控制台
print(f"登录API服务器返回原始信息: {r}")
# 处理登录响应
# Cloudreve V4的响应格式{code: 0, data: {user: {...}, token: {...}}, msg: ""}
if isinstance(r, dict):
# 检查是否是成功响应
if r.get("code") == 0 and r.get("data"):
data = r["data"]
# 设置 JWT token
if data.get("token") and isinstance(data["token"], dict):
# 提取access_token字段作为Bearer token值
access_token = data["token"].get("access_token")
if access_token:
# 设置到当前API实例
self.setToken(access_token)
# 同时保存到全局userConfig中确保认证信息持久化
userConfig.setToken(access_token)
# 存储用户信息
user_info = {
"code": 0,
"data": {
"id": data.get("user", {}).get("id"),
"nick": data.get("user", {}).get("nickname", data.get("user", {}).get("nick")),
"email": username
}
}
self.userId = data.get("user", {}).get("id")
return user_info
# 处理错误响应
# 优先检查msg字段因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
error_msg = r.get("msg", r.get("error", "登录失败"))
return {"code": -1, "msg": error_msg}
return {"code": -1, "msg": "登录失败:响应格式错误"}
def register(self, username, password, captcha):
"""注册"""
url = "/user"
# Cloudreve V4 API参数规范使用email, password, captcha和ticket
payload = {
"email": username,
"password": password,
"captcha": captcha # 更正参数名captcha_code -> captcha
}
# 如果有保存的ticket则添加到请求中使用正确的参数名ticket
if hasattr(self, 'captcha_ticket') and self.captcha_ticket:
payload['ticket'] = self.captcha_ticket # 更正参数名captcha_ticket -> ticket
r = self.request(
"POST",
url,
json=payload,
)
# 输出服务器返回的原始信息到控制台
print(f"注册API服务器返回原始信息: {r}")
# 处理注册响应
# Cloudreve V4的响应格式{code: 0, data: {...}, msg: ""} 或 {code: 错误码, msg: "错误信息"}
if isinstance(r, dict):
# 检查是否是成功响应
if r.get("code") == 0 or r.get("id"):
return {"code": 203, "msg": "注册成功"}
# 处理错误响应
# 优先检查msg字段因为Cloudreve V4返回的错误格式是{code, msg}而不是{error}
error_msg = r.get("msg", r.get("error", "注册失败"))
return {"code": -1, "msg": error_msg}
return {"code": -1, "msg": "注册失败:响应格式错误"}
def updateUserNickname(self, nickname):
"""更新用户昵称"""
url = "/user/profile"
r = self.request(
"PUT",
url,
json={"nick": nickname},
)
# 转换响应格式
if r.get("id"):
return {"code": 0, "msg": "更新成功"}
else:
return {"code": -1, "msg": r.get("error", "更新失败")}
def updateUserAvatar(self, qimage):
"""更新用户头像 - 专门处理 QImage 对象"""
url = "/user/avatar"
# 检查 QImage 是否有效
if qimage.isNull():
return {"code": -1, "msg": "QImage 对象为空"}
# 将 QImage 转换为 JPEG 格式字节数据
byte_array = QByteArray()
buffer = QBuffer(byte_array)
buffer.open(QIODevice.WriteOnly)
# 转换为 JPEG 格式,质量设为 90
success = qimage.save(buffer, "JPEG", 90)
buffer.close()
if not success:
return {"code": -1, "msg": "图像转换失败"}
# 获取字节数据
file_data = byte_array.data()
# 构建 multipart 请求
files = {
'avatar': ('avatar.jpg', file_data, 'image/jpeg')
}
# 移除 Content-Type 以便 requests 自动设置
headers = self.session.headers.copy()
if 'Content-Type' in headers:
del headers['Content-Type']
r = self.request("POST", url, files=files, headers=headers)
# 转换响应格式
if r.get("url"):
return {"code": 0, "msg": "头像更新成功"}
else:
return {"code": -1, "msg": r.get("error", "头像更新失败")}
def getUserAvatar(self, size: Literal["s", "m", "l"]):
"""获取用户头像"""
# Cloudreve V4 获取用户信息以获取头像URL
user_info = self.getUserInfo()
logger.info(f"用户信息API返回{user_info}")
if user_info.get("code") != 0:
return QPixmap(":app/images/logo.png")
avatar_url = user_info.get("data", {}).get("avatar")
if not avatar_url:
return QPixmap(":app/images/logo.png")
try:
# 直接下载头像图片
response = requests.get(avatar_url, timeout=10)
response.raise_for_status()
pixmap = QPixmap()
if pixmap.loadFromData(response.content):
userConfig.setUserAvatarPixmap(pixmap)
return pixmap
else:
return QPixmap(":app/images/logo.png")
except Exception as e:
logger.error(f"获取头像失败: {e}")
return QPixmap(":app/images/logo.png")
def getUserInfo(self):
"""获取用户信息 (Cloudreve V4 API)"""
# 使用正确的API端点
if self.userId:
url = f"/user/info/{self.userId}"
else:
# 如果没有userId尝试获取当前用户信息
url = "/user/profile" # 这可能需要根据实际情况调整
r = self.request("GET", url)
# 转换响应格式
if isinstance(r, dict):
return r
else:
return {"code": -1, "msg": "获取用户信息失败"}
def getUserPack(self):
"""获取用户存储详细 (Cloudreve V4 API)"""
# 使用正确的API端点
url = "/user/capacity"
r = self.request("GET", url)
# 转换响应格式以保持向后兼容
if isinstance(r, dict) and "used" in r:
return {
"code": 0,
"data": {
"base": r.get("total", 0) - r.get("extra", 0),
"pack": r.get("extra", 0),
"used": r.get("used", 0),
"total": r.get("total", 0),
"packs": []
}
}
else:
return {
"code": 0,
"data": {
"base": 0,
"pack": 0,
"used": 0,
"total": 0,
"packs": []
}
}
def list(self, path="/"):
"""列出用户仓内文件 (Cloudreve V4 API)"""
# 使用正确的API端点和必需参数
url = "/file"
# 将path转换为Cloudreve V4要求的URI格式
# 根目录映射到 cloudreve://my/
if path == "/" or path == "":
uri = "cloudreve://my/"
else:
# 清理路径确保不包含cloudreve://my前缀
normalized_path = path.strip("/").replace("\\", "/").replace("cloudreve://my/", "")
# 使用quote_plus正确编码路径确保特殊字符被正确处理
encoded_path = quote_plus(normalized_path)
# 由于quote_plus会将斜杠也编码我们需要恢复它们
encoded_path = encoded_path.replace("%2F", "/")
uri = f"cloudreve://my/{encoded_path}"
# 确保路径格式正确,移除重复的前缀
uri = uri.replace("cloudreve://my/cloudreve://my", "cloudreve://my")
# 添加必需的分页参数
params = {
"uri": uri,
"page": 0, # 从第一页开始
"page_size": 100 # 使用合理的默认值
}
logger.debug(f"发送文件列表请求: URI={uri}")
r = self.request("GET", url, params=params)
# 转换响应格式以保持向后兼容
# 根据API规范文件列表在data.files中
if isinstance(r, dict) and "data" in r and "files" in r["data"]:
return {
"code": 0,
"data": r["data"]["files"]
}
else:
logger.warning(f"无效的响应格式: {r}")
return {"code": 0, "data": []}
def getPolicy(self):
"""获取用户存储策略"""
url = "/user/setting/policies"
r = self.request("GET", url)
# 转换响应格式
if isinstance(r, list):
return r
else:
return {"code": 0, "data": []}
def changePolicy(self, path, policy):
"""修改用户存储策略"""
url = "/file/move"
r = self.request("POST", url, json={"items": [path], "dst": path, "policy": policy})
# 转换响应格式
if r.get("count") == 1:
return {"code": 0, "msg": "存储策略修改成功"}
else:
return {"code": -1, "msg": r.get("error", "存储策略修改失败")}
def createFolder(self, name):
"""创建文件夹"""
url = "/file/create"
currentPath = policyConfig.returnCurrentPath()
# 根据Cloudreve V4 API规范构建uri参数确保不会有重复斜杠和前缀
# 清理currentPath确保不包含cloudreve://my前缀
clean_current_path = currentPath.replace("cloudreve://my", "")
if clean_current_path == "/":
uri = f"cloudreve://my/{name}"
else:
uri = f"cloudreve://my{clean_current_path}/{name}"
# 确保路径格式正确,移除重复的前缀
uri = uri.replace("cloudreve://my/cloudreve://my", "cloudreve://my")
# 更健壮地处理重复文件名的情况
path_parts = uri.split('/')
if len(path_parts) > 1:
# 检查最后一个部分是否是名称
if path_parts[-1] == name:
# 检查倒数第二个部分是否也是名称
if len(path_parts) > 2 and path_parts[-2] == name:
# 移除重复的名称部分
path_parts.pop(-2)
uri = '/'.join(path_parts)
r = self.request("POST", url, json={"uri": uri, "type": "folder", "err_on_conflict": True})
# 转换响应格式
if r.get("data") and r.get("code") == 0:
return {"code": 0, "msg": "文件夹创建成功"}
else:
return {"code": -1, "msg": r.get("msg", "文件夹创建失败")}
def deleteFile(self, fileUri, fileType: Literal["file", "dir"]):
"""删除文件 (Cloudreve V4 API)"""
url = "/file"
# 现在调用方已经传入了正确格式的URI
logger.debug(f"删除文件URI: {fileUri}")
# 根据Cloudreve V4 API规范使用uris参数列表
deleteData = {
"uris": [fileUri]
}
r = self.request("DELETE", url, json=deleteData)
# 转换响应格式
# Cloudreve V4 响应格式:{code: 0} 表示成功
if isinstance(r, dict) and r.get("code") == 0:
return {"code": 0, "msg": "删除成功"}
else:
error_msg = r.get("msg", r.get("error", "删除失败"))
return {"code": -1, "msg": error_msg}
def wareSearch(
self,
searchContent,
searchType: Literal["keyword", "internalTag", "externalTag"],
):
"""搜索文件 (Cloudreve V4 API)"""
# 根据Cloudreve V4 API使用/file端点并添加搜索参数
url = "/file"
# 使用Cloudreve V4的URI格式
uri = "cloudreve://my/"
# 构建搜索参数
params = {
"uri": uri,
"keyword": searchContent,
"page": 0,
"page_size": 100
}
# 根据搜索类型调整参数
if searchType == "internalTag":
params["type"] = "internal"
elif searchType == "externalTag":
params["type"] = "tag"
logger.debug(f"发送文件搜索请求: 关键词={searchContent}, 类型={searchType}")
r = self.request("GET", url, params=params)
# 转换响应格式以保持向后兼容
if isinstance(r, dict):
if "data" in r:
if "files" in r["data"]:
# Cloudreve V4 API 格式
return {"code": 0, "data": {"objects": r["data"]["files"]}}
elif isinstance(r["data"], list):
# 如果data是列表直接使用
return {"code": 0, "data": {"objects": r["data"]}}
logger.warning(f"搜索响应格式不正确: {r}")
return {"code": 0, "data": {"objects": []}}
def shareSearch(self, keyword, orderBy, order, page):
"""搜索分享 (Cloudreve V4 API)"""
# 使用正确的API端点 - Cloudreve V4使用/share端点获取分享列表
url = "/share"
params = {
"page": page,
"page_size": 50, # 添加默认页面大小以避免"PageSize cannot be empty"错误
"order_by": orderBy,
"order": order,
"keyword": keyword, # 根据实际API支持情况调整
}
r = self.request("GET", url, params=params)
return r
def deleteTag(self, tagId):
"""删除标签"""
url = f"/tag/{tagId}"
r = self.request("DELETE", url)
# 转换响应格式
if r is None or (isinstance(r, dict) and not r.get("error")):
return {"code": 0, "msg": "标签删除成功"}
else:
return {"code": -1, "msg": r.get("error", "标签删除失败")}
def addTag(self, name, expression):
"""添加标签"""
url = "/tag/filter"
jsons = {
"expression": expression,
"name": name,
"color": "#ff9800",
"icon": "Circle",
}
r = self.request("POST", url, json=jsons)
# 转换响应格式
if r.get("id"):
return {"code": 0, "msg": "标签添加成功", "data": r}
else:
return {"code": -1, "msg": r.get("error", "标签添加失败")}
def getShareFileInfo(self, shareId):
"""获取分享文件信息"""
url = f"/share/{shareId}"
r = self.request("GET", url)
# 转换响应格式
if r.get("id"):
return {"code": 0, "data": r}
else:
return {"code": -1, "msg": r.get("error", "获取分享信息失败")}
def updateFileContent(self, fileId, content):
"""更新文件内容"""
url = f"/file/content/{fileId}"
headers = {
"Content-Type": "text/plain"
}
r = self.request("PUT", url, data=content.encode("utf-8"), headers=headers)
# 转换响应格式
if r.get("size") is not None:
return {"code": 0, "msg": "文件内容更新成功"}
else:
return {"code": -1, "msg": r.get("error", "文件内容更新失败")}
def getFileUrl(self, fileUri, redirect=False):
"""获取文件临时下载URL (Cloudreve V4 API)"""
url = "/file/url"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 修复文件名重复问题 - 保留URI编码文件名删除后面的原始文件名
if fileUri and '/' in fileUri:
import urllib.parse
parts = fileUri.split('/')
# 检查是否存在潜在的文件名重复情况
if len(parts) >= 2:
# 解码最后一个部分,检查是否与倒数第二个部分解码后相同
try:
decoded_last = urllib.parse.unquote(parts[-1])
decoded_prev = urllib.parse.unquote(parts[-2])
# 如果解码后相同,或者最后一个部分是原始文件名(没有编码)
if decoded_last == decoded_prev or parts[-1] == decoded_prev:
# 保留编码的文件名部分,移除后面的重复部分
fileUri = '/'.join(parts[:-1])
logger.debug(f"修复了重复文件名的URI: {fileUri}")
except Exception as e:
logger.debug(f"解码URI部分时出错: {str(e)}")
data = {
"uris": [fileUri],
"redirect": redirect,
"download": False # 设置为False以获取预览URL
}
logger.debug(f"获取文件URLURI: {fileUri}")
r = self.request("POST", url, json=data)
# 转换响应格式
# 直接返回API响应让调用者处理不同的返回格式
logger.debug(f"getFileUrl API响应: {r}")
# 如果是成功的响应格式,确保返回完整的响应对象
if isinstance(r, dict):
# 兼容不同的成功响应格式
if "urls" in r and isinstance(r["urls"], list):
# 清理URL中的反引号和多余空格
for url_item in r["urls"]:
if isinstance(url_item, dict) and "url" in url_item:
url_item["url"] = url_item["url"].strip('` ')
return r
elif r.get("code") == 0 and "data" in r:
# Cloudreve标准格式
return r
elif r.get("code") != 0:
# 错误响应
return r
# 如果响应不符合预期格式,返回错误
logger.error(f"getFileUrl响应格式不符合预期: {r}")
return {"code": -1, "msg": "获取文件URL失败响应格式错误"}
def getFileContent(self, fileUri):
"""获取文件内容 (Cloudreve V4 API - PUT /file/content)"""
url = "/file/content"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 修复文件名重复问题 - 保留URI编码文件名删除后面的原始文件名
if fileUri and '/' in fileUri:
import urllib.parse
parts = fileUri.split('/')
# 检查是否存在潜在的文件名重复情况
if len(parts) >= 2:
# 解码最后一个部分,检查是否与倒数第二个部分解码后相同
try:
decoded_last = urllib.parse.unquote(parts[-1])
decoded_prev = urllib.parse.unquote(parts[-2])
# 如果解码后相同,或者最后一个部分是原始文件名(没有编码)
if decoded_last == decoded_prev or parts[-1] == decoded_prev:
# 保留编码的文件名部分,移除后面的重复部分
fileUri = '/'.join(parts[:-1])
logger.debug(f"getFileContent - 修复了重复文件名的URI: {fileUri}")
except Exception as e:
logger.debug(f"解码URI部分时出错: {str(e)}")
# 构建查询参数
params = {"uri": fileUri}
logger.debug(f"获取文件内容URI: {fileUri}")
try:
# 使用PUT请求获取文件内容
r = self.request("PUT", url, params=params)
# 转换响应格式
if isinstance(r, bytes):
# 直接返回二进制内容
return {"code": 0, "data": r}
elif isinstance(r, dict):
if r.get("code") == 0:
# 确保返回的数据是二进制格式
data = r.get("data")
if isinstance(data, bytes):
return {"code": 0, "data": data}
elif isinstance(data, dict) or isinstance(data, list):
# 如果data是字典或列表说明可能是API响应错误使用URL方式获取
logger.warning(f"获取到的数据是{type(data).__name__}类型不是二进制数据将尝试使用URL方式")
# 尝试使用getFileUrl获取临时URL然后下载内容
url_response = self.getFileUrl(fileUri)
if url_response.get("code") == 0 and "data" in url_response and "urls" in url_response["data"]:
urls = url_response["data"]["urls"]
if urls and isinstance(urls[0], dict) and "url" in urls[0]:
temp_url = urls[0]["url"]
# 使用session获取文件内容
session_response = self.returnSession().get(temp_url, stream=True, timeout=(15, 30))
session_response.raise_for_status()
binary_content = session_response.content
logger.info(f"成功从临时URL获取文件内容大小: {len(binary_content)} 字节")
return {"code": 0, "data": binary_content}
else:
# 尝试将其他类型转换为二进制
try:
if isinstance(data, str):
return {"code": 0, "data": data.encode('utf-8')}
else:
return {"code": 0, "data": str(data).encode('utf-8')}
except Exception:
pass
# 错误响应
return {"code": r.get("code", -1), "msg": r.get("msg", "获取文件内容失败")}
else:
# 未知响应类型,尝试转换为二进制
try:
return {"code": 0, "data": str(r).encode('utf-8')}
except Exception:
return {"code": -1, "msg": f"未知的响应类型: {type(r).__name__}"}
except Exception as e:
logger.error(f"获取文件内容时发生异常: {str(e)}")
return {"code": -1, "msg": f"获取文件内容失败: {str(e)}"}
def updateFileContent(self, fileUri, content, previous_version=None):
"""更新文件内容 (Cloudreve V4 API - PUT /file/content)"""
url = "/file/content"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 构建查询参数
params = {"uri": fileUri}
if previous_version:
params["previous"] = previous_version
# 设置请求头
headers = {
"Content-Length": str(len(content))
}
logger.debug(f"更新文件内容URI: {fileUri}")
r = self.request("PUT", url, params=params, data=content, headers=headers)
# 转换响应格式
if isinstance(r, dict) and r.get("code") == 0:
return {"code": 0, "data": r.get("data", {})}
else:
error_msg = r.get("msg", r.get("error", "更新文件内容失败"))
return {"code": -1, "msg": error_msg}
def createViewerSession(self, fileUri, viewer_id, preferred_action="view", version=None, parent_uri=None):
"""创建查看器会话 (Cloudreve V4 API - PUT /file/viewerSession)"""
url = "/file/viewerSession"
# 确保传入的是正确格式的URI
if not fileUri.startswith("cloudreve://"):
logger.error(f"无效的URI格式: {fileUri}必须以cloudreve://开头")
return {"code": -1, "msg": "无效的URI格式"}
# 构建请求体
data = {
"uri": fileUri,
"viewer_id": viewer_id,
"preferred_action": preferred_action
}
# 添加可选参数
if version:
data["version"] = version
if parent_uri:
data["parent_uri"] = parent_uri
logger.debug(f"创建查看器会话URI: {fileUri}, viewer_id: {viewer_id}")
r = self.request("PUT", url, json=data)
# 转换响应格式
if isinstance(r, dict) and r.get("code") == 0:
return {"code": 0, "data": r.get("data", {})}
else:
error_msg = r.get("msg", r.get("error", "创建查看器会话失败"))
return {"code": -1, "msg": error_msg}
def updateUserNickname(self, nickName):
"""更新用户昵称 (Cloudreve V4 API)"""
url = "/user/profile"
data = {
"nick": nickName
}
r = self.request("PUT", url, json=data)
# 转换响应格式
if isinstance(r, dict) and not r.get("error"):
return {"code": 0, "msg": "昵称更新成功"}
else:
return {"code": -1, "msg": r.get("error", "昵称更新失败")}
# 创建全局实例,方便其他模块直接使用
basicApi = MiaoStarsBasicApi()