"""这里存放基本API包括: 登录、注册、图形验证码获取 用户配置获取、用户头像获取 用户存储策略获取、用户仓内文件获取 """ import time from typing import Literal, Optional from urllib.parse import quote_plus import requests from loguru import logger from PyQt6.QtCore import QBuffer, QByteArray, QIODevice from PyQt6.QtGui import QPixmap from ..utils import getCode from ..utils.config import policyConfig, userConfig class MiaoStarsBasicApi: _publicHeader = { "Content-Type": "application/json", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/140.0.0.0 Safari/537.36", "Cache-Control": "no-cache, no-store, must-revalidate", "Pragma": "no-cache", "Expires": "0", } def __init__(self, token: Optional[str] = None): self.basicApi = "http://leonmmcoset.jjxmm.win:5212/api/v4" self.session = requests.Session() self.session.verify = True self.session.headers.update(MiaoStarsBasicApi._publicHeader) # Cloudreve V4 使用 JWT 认证 # 优先使用传入的token,如果没有则尝试从userConfig获取 self.token = token or userConfig.getToken() if self.token: self.session.headers.update({"Authorization": f"Bearer {self.token}"}) self.userId = None def returnSession(self) -> requests.Session: return self.session def setToken(self, token: str): """设置 JWT token 并同步到全局配置""" # 确保token是字符串类型 if isinstance(token, str): self.token = token self.session.headers.update({"Authorization": f"Bearer {token}"}) # 同步更新到全局userConfig,确保认证信息持久化 userConfig.setToken(token) elif isinstance(token, dict) and token.get("access_token"): # 兼容处理,如果传入的是token对象,提取access_token access_token = token["access_token"] self.token = access_token self.session.headers.update({"Authorization": f"Bearer {access_token}"}) userConfig.setToken(access_token) def request( self, method: Literal["GET", "POST", "PUT", "DELETE", "PATCH"], url, **kwargs ) -> dict: maxRetries = 3 timeout = 20 for attempt in range(maxRetries): try: # 发起请求 response = self.session.request( method=method, url=self.basicApi + url, timeout=timeout, **kwargs ) # 检查HTTP状态码 response.raise_for_status() # 解析JSON响应 try: r = response.json() # 保留原始响应格式,不要过早转换Cloudreve V4的响应 # Cloudreve V4的错误响应通常包含{code, msg},而不是{error} # 让具体的API方法来处理响应格式转换 return r except: return response.content except requests.exceptions.RequestException as e: # 网络相关错误,进行重试 if attempt < maxRetries - 1: time.sleep(1) # 简单固定延迟 continue else: logger.error(f"请求失败: {str(e)}") return {"code": -1, "msg": str(e)} except Exception as e: logger.error(f"处理响应时出错: {str(e)}") return {"code": -1, "msg": str(e)} def getCaptcha(self): """获取图形验证码 (Cloudreve V4 API)""" import logging logger = logging.getLogger(__name__) logger.debug(f"请求验证码API: {self.basicApi}/site/captcha") r = self.request(method="GET", url="/site/captcha") logger.debug(f"验证码API响应: {r}") # 根据Cloudreve V4 API文档,响应格式为: # {"code": 0, "data": {"image": "data:image/png;base64,...", "ticket": "..."}, "msg": ""} # 转换响应格式以保持与前端的兼容性 if isinstance(r, dict): # 检查是否是标准的Cloudreve V4成功响应 if r.get("code") == 0 and isinstance(r.get("data"), dict): data = r["data"] # 确保image字段存在且是有效的base64格式 if "image" in data: # 保存ticket到实例变量,供后续登录/注册使用 self.captcha_ticket = data.get("ticket", "") logger.debug(f"成功获取验证码,已保存ticket: {self.captcha_ticket}") # 前端代码在CaptchaThread.run()中使用response["data"].split(",")[1] # 所以需要确保返回的data格式符合前端期望 captcha_image = data["image"] # 检查是否已经包含data:image/png;base64,前缀 if captcha_image.startswith("data:image/png;base64,"): # 保持原样,让前端去split result = {"code": 0, "data": captcha_image} else: # 如果没有前缀,加上前缀以确保前端能正确split result = {"code": 0, "data": f"data:image/png;base64,{captcha_image}"} logger.debug(f"返回前端验证码数据格式: {result}") return result # 处理可能的错误响应 elif "msg" in r: logger.error(f"验证码API错误: {r['msg']}") return {"code": -1, "msg": r["msg"]} elif "error" in r: logger.error(f"验证码API错误: {r['error']}") return {"code": -1, "msg": r["error"]} else: logger.error(f"验证码API返回非字典格式: {type(r)}") # 默认返回失败格式 logger.error("获取验证码失败,返回默认错误格式") return {"code": -1, "msg": "获取验证码失败"} def login(self, username, password, captcha): """登录 (Cloudreve V4 JWT认证)""" url = "/session/token" # Cloudreve V4 API参数规范:使用email, password, captcha和ticket payload = { "email": username, # 更正参数名:username -> email "password": password, "captcha": captcha # 更正参数名:captcha_code -> captcha } # 如果有保存的ticket,则添加到请求中,使用正确的参数名ticket if hasattr(self, 'captcha_ticket') and self.captcha_ticket: payload['ticket'] = self.captcha_ticket # 更正参数名:captcha_ticket -> ticket r = self.request( "POST", url, json=payload, ) # 输出服务器返回的原始信息到控制台 print(f"登录API服务器返回原始信息: {r}") # 处理登录响应 # Cloudreve V4的响应格式:{code: 0, data: {user: {...}, token: {...}}, msg: ""} if isinstance(r, dict): # 检查是否是成功响应 if r.get("code") == 0 and r.get("data"): data = r["data"] # 设置 JWT token if data.get("token") and isinstance(data["token"], dict): # 提取access_token字段作为Bearer token值 access_token = data["token"].get("access_token") if access_token: # 设置到当前API实例 self.setToken(access_token) # 同时保存到全局userConfig中,确保认证信息持久化 userConfig.setToken(access_token) # 存储用户信息 user_info = { "code": 0, "data": { "id": data.get("user", {}).get("id"), "nick": data.get("user", {}).get("nickname", data.get("user", {}).get("nick")), "email": username } } self.userId = data.get("user", {}).get("id") return user_info # 处理错误响应 # 优先检查msg字段,因为Cloudreve V4返回的错误格式是{code, msg}而不是{error} error_msg = r.get("msg", r.get("error", "登录失败")) return {"code": -1, "msg": error_msg} return {"code": -1, "msg": "登录失败:响应格式错误"} def register(self, username, password, captcha): """注册""" url = "/user" # Cloudreve V4 API参数规范:使用email, password, captcha和ticket payload = { "email": username, "password": password, "captcha": captcha # 更正参数名:captcha_code -> captcha } # 如果有保存的ticket,则添加到请求中,使用正确的参数名ticket if hasattr(self, 'captcha_ticket') and self.captcha_ticket: payload['ticket'] = self.captcha_ticket # 更正参数名:captcha_ticket -> ticket r = self.request( "POST", url, json=payload, ) # 输出服务器返回的原始信息到控制台 print(f"注册API服务器返回原始信息: {r}") # 处理注册响应 # Cloudreve V4的响应格式:{code: 0, data: {...}, msg: ""} 或 {code: 错误码, msg: "错误信息"} if isinstance(r, dict): # 检查是否是成功响应 if r.get("code") == 0 or r.get("id"): return {"code": 203, "msg": "注册成功"} # 处理错误响应 # 优先检查msg字段,因为Cloudreve V4返回的错误格式是{code, msg}而不是{error} error_msg = r.get("msg", r.get("error", "注册失败")) return {"code": -1, "msg": error_msg} return {"code": -1, "msg": "注册失败:响应格式错误"} def updateUserNickname(self, nickname): """更新用户昵称""" url = "/user/profile" r = self.request( "PUT", url, json={"nick": nickname}, ) # 转换响应格式 if r.get("id"): return {"code": 0, "msg": "更新成功"} else: return {"code": -1, "msg": r.get("error", "更新失败")} def updateUserAvatar(self, qimage): """更新用户头像 - 专门处理 QImage 对象""" url = "/user/avatar" # 检查 QImage 是否有效 if qimage.isNull(): return {"code": -1, "msg": "QImage 对象为空"} # 将 QImage 转换为 JPEG 格式字节数据 byte_array = QByteArray() buffer = QBuffer(byte_array) buffer.open(QIODevice.WriteOnly) # 转换为 JPEG 格式,质量设为 90 success = qimage.save(buffer, "JPEG", 90) buffer.close() if not success: return {"code": -1, "msg": "图像转换失败"} # 获取字节数据 file_data = byte_array.data() # 构建 multipart 请求 files = { 'avatar': ('avatar.jpg', file_data, 'image/jpeg') } # 移除 Content-Type 以便 requests 自动设置 headers = self.session.headers.copy() if 'Content-Type' in headers: del headers['Content-Type'] r = self.request("POST", url, files=files, headers=headers) # 转换响应格式 if r.get("url"): return {"code": 0, "msg": "头像更新成功"} else: return {"code": -1, "msg": r.get("error", "头像更新失败")} def getUserAvatar(self, size: Literal["s", "m", "l"]): """获取用户头像""" # Cloudreve V4 获取用户信息以获取头像URL user_info = self.getUserInfo() logger.info(f"用户信息API返回:{user_info}") if user_info.get("code") != 0: return QPixmap(":app/images/logo.png") avatar_url = user_info.get("data", {}).get("avatar") if not avatar_url: return QPixmap(":app/images/logo.png") try: # 直接下载头像图片 response = requests.get(avatar_url, timeout=10) response.raise_for_status() pixmap = QPixmap() if pixmap.loadFromData(response.content): userConfig.setUserAvatarPixmap(pixmap) return pixmap else: return QPixmap(":app/images/logo.png") except Exception as e: logger.error(f"获取头像失败: {e}") return QPixmap(":app/images/logo.png") def getUserInfo(self): """获取用户信息 (Cloudreve V4 API)""" # 使用正确的API端点 if self.userId: url = f"/user/info/{self.userId}" else: # 如果没有userId,尝试获取当前用户信息 url = "/user/profile" # 这可能需要根据实际情况调整 r = self.request("GET", url) # 转换响应格式 if isinstance(r, dict): return r else: return {"code": -1, "msg": "获取用户信息失败"} def getUserPack(self): """获取用户存储详细 (Cloudreve V4 API)""" # 使用正确的API端点 url = "/user/capacity" r = self.request("GET", url) # 转换响应格式以保持向后兼容 if isinstance(r, dict) and "used" in r: return { "code": 0, "data": { "base": r.get("total", 0) - r.get("extra", 0), "pack": r.get("extra", 0), "used": r.get("used", 0), "total": r.get("total", 0), "packs": [] } } else: return { "code": 0, "data": { "base": 0, "pack": 0, "used": 0, "total": 0, "packs": [] } } def list(self, path="/"): """列出用户仓内文件 (Cloudreve V4 API)""" # 使用正确的API端点和必需参数 url = "/file" # 将path转换为Cloudreve V4要求的URI格式 # 根目录映射到 cloudreve://my/ if path == "/" or path == "": uri = "cloudreve://my/" else: # 使用quote_plus正确编码路径,确保特殊字符被正确处理 # 首先规范化路径分隔符 normalized_path = path.strip("/").replace("\\", "/") # 对路径部分进行URL编码 encoded_path = quote_plus(normalized_path) # 由于quote_plus会将斜杠也编码,我们需要恢复它们 encoded_path = encoded_path.replace("%2F", "/") uri = f"cloudreve://my/{encoded_path}" # 添加必需的分页参数 params = { "uri": uri, "page": 0, # 从第一页开始 "page_size": 100 # 使用合理的默认值 } logger.debug(f"发送文件列表请求: URI={uri}") r = self.request("GET", url, params=params) # 转换响应格式以保持向后兼容 # 根据API规范,文件列表在data.files中 if isinstance(r, dict) and "data" in r and "files" in r["data"]: return { "code": 0, "data": r["data"]["files"] } else: logger.warning(f"无效的响应格式: {r}") return {"code": 0, "data": []} def getPolicy(self): """获取用户存储策略""" url = "/user/policies" r = self.request("GET", url) # 转换响应格式 if isinstance(r, list): return {"code": 0, "data": r} else: return {"code": 0, "data": []} def changePolicy(self, path, policy): """修改用户存储策略""" url = "/file/move" r = self.request("POST", url, json={"items": [path], "dst": path, "policy": policy}) # 转换响应格式 if r.get("count") == 1: return {"code": 0, "msg": "存储策略修改成功"} else: return {"code": -1, "msg": r.get("error", "存储策略修改失败")} def createFolder(self, name): """创建文件夹""" url = "/file/create" currentPath = policyConfig.returnCurrentPath() # 根据Cloudreve V4 API规范构建uri参数,确保不会有重复斜杠 if currentPath == "/": uri = f"cloudreve://my/{name}" else: uri = f"cloudreve://my{currentPath}/{name}" r = self.request("POST", url, json={"uri": uri, "type": "folder", "err_on_conflict": True}) # 转换响应格式 if r.get("data") and r.get("code") == 0: return {"code": 0, "msg": "文件夹创建成功"} else: return {"code": -1, "msg": r.get("msg", "文件夹创建失败")} def deleteFile(self, fileId, fileType: Literal["file", "dir"]): """删除文件""" url = "/file/delete" deleteData = { "items": [fileId] if fileType == "file" else [], "dirs": [fileId] if fileType == "dir" else [] } r = self.request("POST", url, json=deleteData) # 转换响应格式 if r.get("count") > 0: return {"code": 0, "msg": "删除成功"} else: return {"code": -1, "msg": r.get("error", "删除失败")} def wareSearch( self, searchContent, searchType: Literal["keyword", "internalTag", "externalTag"], ): """搜索文件 (Cloudreve V4 API)""" # 根据Cloudreve V4 API,使用/file端点并添加搜索参数 url = "/file" # 使用Cloudreve V4的URI格式 uri = "cloudreve://my/" # 构建搜索参数 params = { "uri": uri, "keyword": searchContent, "page": 0, "page_size": 100 } # 根据搜索类型调整参数 if searchType == "internalTag": params["type"] = "internal" elif searchType == "externalTag": params["type"] = "tag" logger.debug(f"发送文件搜索请求: 关键词={searchContent}, 类型={searchType}") r = self.request("GET", url, params=params) # 转换响应格式以保持向后兼容 if isinstance(r, dict): if "data" in r: if "files" in r["data"]: # Cloudreve V4 API 格式 return {"code": 0, "data": {"objects": r["data"]["files"]}} elif isinstance(r["data"], list): # 如果data是列表,直接使用 return {"code": 0, "data": {"objects": r["data"]}} logger.warning(f"搜索响应格式不正确: {r}") return {"code": 0, "data": {"objects": []}} def shareSearch(self, keyword, orderBy, order, page): """搜索分享 (Cloudreve V4 API)""" # 使用正确的API端点 - Cloudreve V4使用/share端点获取分享列表 url = "/share" params = { "page": page, "page_size": 50, # 添加默认页面大小以避免"PageSize cannot be empty"错误 "order_by": orderBy, "order": order, "keyword": keyword, # 根据实际API支持情况调整 } r = self.request("GET", url, params=params) return r def deleteTag(self, tagId): """删除标签""" url = f"/tag/{tagId}" r = self.request("DELETE", url) # 转换响应格式 if r is None or (isinstance(r, dict) and not r.get("error")): return {"code": 0, "msg": "标签删除成功"} else: return {"code": -1, "msg": r.get("error", "标签删除失败")} def addTag(self, name, expression): """添加标签""" url = "/tag/filter" jsons = { "expression": expression, "name": name, "color": "#ff9800", "icon": "Circle", } r = self.request("POST", url, json=jsons) # 转换响应格式 if r.get("id"): return {"code": 0, "msg": "标签添加成功", "data": r} else: return {"code": -1, "msg": r.get("error", "标签添加失败")} def getShareFileInfo(self, shareId): """获取分享文件信息""" url = f"/share/{shareId}" r = self.request("GET", url) # 转换响应格式 if r.get("id"): return {"code": 0, "data": r} else: return {"code": -1, "msg": r.get("error", "获取分享信息失败")} def updateFileContent(self, fileId, content): """更新文件内容""" url = f"/file/content/{fileId}" headers = { "Content-Type": "text/plain" } r = self.request("PUT", url, data=content.encode("utf-8"), headers=headers) # 转换响应格式 if r.get("size") is not None: return {"code": 0, "msg": "文件内容更新成功"} else: return {"code": -1, "msg": r.get("error", "文件内容更新失败")} def updateUserNickname(self, nickName): """更新用户昵称 (Cloudreve V4 API)""" url = "/user/profile" data = { "nick": nickName } r = self.request("PUT", url, json=data) # 转换响应格式 if isinstance(r, dict) and not r.get("error"): return {"code": 0, "msg": "昵称更新成功"} else: return {"code": -1, "msg": r.get("error", "昵称更新失败")}