# coding: utf-8 # 导入loguru日志库 from datetime import datetime from loguru import logger from PyQt6.QtCore import Qt, QThread, QUrl, pyqtSignal from PyQt6.QtGui import QPixmap from PyQt6.QtNetwork import QNetworkAccessManager, QNetworkReply, QNetworkRequest from PyQt6.QtWidgets import ( QHBoxLayout, QVBoxLayout, QWidget, ) from qfluentwidgets import Action, BodyLabel, BreadcrumbBar, HorizontalSeparator, ImageLabel, InfoBar, InfoBarPosition, \ MenuAnimationType, MessageBoxBase, RoundMenu, ScrollArea, setFont, SubtitleLabel from qfluentwidgets import FluentIcon as FIF from app.core import lang from app.core import miaoStarsBasicApi from app.core import signalBus from app.view.components.file_card import SharedFolderFileCard # 使用miaoStarsBasicApi中已经配置好的V4 API UserSession = miaoStarsBasicApi.returnSession() class APIWorker(QThread): """API 工作线程""" finished = pyqtSignal(object, str) # 信号:传递响应数据和错误信息 def __init__(self, url_path, method="GET", data=None): super().__init__() self.url_path = url_path # 相对路径,如 "/share/list/{id}/path" self.method = method self.data = data logger.debug(f"初始化API工作线程,路径: {url_path}, 方法: {method}") def run(self): """执行API请求,使用miaoStarsBasicApi中已配置好的Cloudreve V4 API""" try: logger.info(f"开始API请求") # 使用miaoStarsBasicApi进行请求 if self.method == "GET": response = miaoStarsBasicApi.request(method="GET", url=self.url_path) else: response = miaoStarsBasicApi.request(method=self.method, url=self.url_path, json=self.data) # Cloudreve V4 API 返回的是处理后的结果,不需要再解析响应 logger.success(f"API请求成功") # 由于miaoStarsBasicApi.request已经处理了错误,直接返回结果 if isinstance(response, dict) and "code" in response and response["code"] == 0: self.finished.emit(response, "") else: error_msg = response.get("msg", "请求失败") if isinstance(response, dict) else "请求失败" logger.error(f"API请求失败, 错误: {error_msg}") self.finished.emit(None, error_msg) except Exception as e: error_msg = f"请求异常: {str(e)}" logger.exception(f"API请求异常, 异常信息: {error_msg}") self.finished.emit(None, error_msg) class LinkageSwitching(ScrollArea): """文件卡片滚动区域组件""" # 信号:文件卡片相关操作时发出 fileActionRequested = pyqtSignal(str, str) # (actionName, fileId) def __init__(self, _id, paths, breadcrumbBar, parent=None): super().__init__(parent) self.paths = paths self._id = _id self.currentPath = paths self.breadcrumbBar = breadcrumbBar self.fileCardsDict = {} # 存储所有文件卡片 logger.debug(f"初始化文件卡片滚动区域,路径: {paths}") self.setupUi() def setupUi(self): """初始化UI""" self.widgets = QWidget() self.layouts = QVBoxLayout(self.widgets) self.layouts.setAlignment(Qt.AlignmentFlag.AlignTop) self.setWidget(self.widgets) self.setWidgetResizable(True) # 设置布局属性 self.layouts.setContentsMargins(5, 5, 5, 0) self.layouts.setSpacing(5) # 设置样式 self.widgets.setStyleSheet("background-color: transparent; border: none;") self.setStyleSheet("background-color: transparent; border: none;") # 设置滚动策略 self.setHorizontalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAlwaysOff) self.setVerticalScrollBarPolicy(Qt.ScrollBarPolicy.ScrollBarAsNeeded) self._onLoadDict("/") def addFileCard(self, fileId, key, obj): """添加文件卡片 Args: fileId: 文件的唯一标识符 obj: 文件数据对象 Returns: 创建的文件卡片对象 """ if fileId in self.fileCardsDict: logger.warning(f"文件卡片已存在: {fileId}") return self.fileCardsDict[fileId] # logger.debug(f"添加文件卡片: {fileId} - {obj.get('name', '未知')}") fileCard = SharedFolderFileCard( key, obj["id"], obj["name"], obj["type"], obj["path"], obj["date"], obj["size"], self, ) fileCard.setObjectName(f"fileCard_{fileId}") self.fileCardsDict[fileId] = fileCard self.layouts.addWidget(fileCard) return fileCard def removeFileCard(self, fileId): """移除文件卡片""" if fileId in self.fileCardsDict: # logger.debug(f"移除文件卡片: {fileId}") fileCard = self.fileCardsDict[fileId] self.layouts.removeWidget(fileCard) fileCard.deleteLater() del self.fileCardsDict[fileId] else: logger.warning(f"尝试移除不存在的文件卡片: {fileId}") def clearFileCards(self): """清除所有文件卡片""" logger.debug("清除所有文件卡片") fileIds = list(self.fileCardsDict.keys()) for fileId in fileIds: self.removeFileCard(fileId) def contextMenuEvent(self, e): """重写上下文菜单事件""" logger.debug("触发上下文菜单事件") menu = RoundMenu(parent=self) # 添加操作 menu.addAction( Action(FIF.SYNC, lang("刷新当前"), triggered=self._refreshFolderList) ) menu.addSeparator() # 显示菜单 menu.exec(e.globalPos(), aniType=MenuAnimationType.DROP_DOWN) def _refreshFolderList(self): logger.debug("刷新文件夹列表") InfoBar.success( "成功", "刷新成功", Qt.Orientation.Horizontal, True, 1000, InfoBarPosition.TOP_RIGHT, self.parent(), ) def _onLoadDict(self, paths): """加载目录数据""" logger.info(f"加载目录数据: {paths}") self.currentPath = paths # 使用Cloudreve V4 API的分享列表路径 # 注意:V4 API中分享列表可能使用不同的路径和参数格式 # 这里假设路径为 /share/list/{share_id}?path={path} url_path = f"/share/list/{self._id}?path={paths}" self.loadDataThread = APIWorker(url_path) self.loadDataThread.finished.connect(self._dealDatas) self.loadDataThread.start() self.breadcrumbBar.setEnabled(False) def _dealDatas(self, data, msg): """处理目录数据""" logger.info("设置当前页策略") if msg: logger.error(f"加载目录数据失败: {msg}") return if not data or "data" not in data or "objects" not in data["data"]: logger.error("目录数据格式错误") return logger.info(f"成功加载目录数据,对象数量: {len(data['data']['objects'])}") self.objects = data["data"]["objects"] self.clearFileCards() self.breadcrumbBar.setEnabled(True) for obj in self.objects: try: self.addFileCard(obj["id"], obj["key"], obj) except: self.addFileCard(obj["id"], obj["id"], obj) class BasicInfoThread(QThread): """API 工作线程""" finished = pyqtSignal(object, str) # 信号:传递响应数据和错误信息 def __init__(self, _id): super().__init__() self._id = _id logger.debug(f"初始化API工作线程") def run(self): """执行API请求,使用miaoStarsBasicApi中已配置好的Cloudreve V4 API""" try: # 使用Cloudreve V4 API的URL格式 url_path = f"/share/{self._id}" logger.info(f"开始API请求") # 使用miaoStarsBasicApi进行请求 response = miaoStarsBasicApi.request(method="GET", url=url_path) # 由于miaoStarsBasicApi.request已经处理了错误,直接返回结果 if isinstance(response, dict) and "code" in response and response["code"] == 0: logger.success(f"API请求成功") self.finished.emit(response, "") else: error_msg = response.get("msg", "请求失败") if isinstance(response, dict) else "请求失败" logger.error(f"API请求失败, 错误: {error_msg}") self.finished.emit(None, error_msg) except Exception as e: error_msg = f"请求异常: {str(e)}" logger.exception(f"API请求异常, 异常信息: {error_msg}") self.finished.emit(None, error_msg) class ShareFolderMessageBox(MessageBoxBase): """主文件管理界面""" # 信号:面包屑导航项点击时发出,传递路径 breadcrumbItemClicked = pyqtSignal(str) def __init__(self, _id, parent=None): super().__init__(parent=parent) self.setObjectName("ShareFolderMessageBox") self.widget.setMinimumWidth(900) self.widget.setMinimumHeight(600) self.currentPath = "/" self._id = _id logger.debug("初始化分析文件管理") self.folderTitleLabel = SubtitleLabel(self) self.infomationLabel = BodyLabel(self) self.authorAvatar = ImageLabel(":app/images/logo.png", self) self.authorAvatar.setBorderRadius(20, 20, 20, 20) self.authorAvatar.scaledToHeight(20) self.authorAvatar.scaledToWidth(20) self.authorNameLabel = BodyLabel(self) self.breadcrumbBar = BreadcrumbBar(self) self.basicInfoThread = BasicInfoThread(self._id) self.basicInfoThread.finished.connect(self.handleApiResponse) self.basicInfoThread.start() self.networkManager = QNetworkAccessManager(self) self.networkManager.finished.connect(self.onAvatarDownloaded) self.setupUi() self.connectSignals() def setupUi(self): """初始化UI""" logger.debug("设置分析文件管理界面UI") # 初始化面包屑导航 self.breadcrumbBar.addItem("/", "/") setFont(self.breadcrumbBar, 18) self.breadcrumbBar.setSpacing(15) self.breadcrumbBar.currentItemChanged.connect(self.clickChangeDir) # 初始化堆叠窗口 self.linkageSwitching = LinkageSwitching( self._id, "/", self.breadcrumbBar, self ) # 设置主布局 self.initLayout() def initLayout(self): """初始化布局""" self.viewLayout.setContentsMargins(10, 20, 10, 5) self.viewLayout.addWidget(self.folderTitleLabel, 0, Qt.AlignmentFlag.AlignTop) self.viewLayout.addWidget(self.infomationLabel, 0, Qt.AlignmentFlag.AlignTop) self.viewLayout.addWidget(HorizontalSeparator(self)) self.authorLayout = QHBoxLayout() self.authorLayout.setAlignment(Qt.AlignmentFlag.AlignLeft) self.authorLayout.addWidget(self.authorAvatar, 0, Qt.AlignmentFlag.AlignLeft) self.authorLayout.addWidget(self.authorNameLabel, 0, Qt.AlignmentFlag.AlignLeft) self.viewLayout.addLayout(self.authorLayout) self.viewLayout.addWidget(HorizontalSeparator(self)) # 添加所有组件到主布局 self.viewLayout.addWidget(self.breadcrumbBar, 0, Qt.AlignmentFlag.AlignTop) self.viewLayout.addWidget(self.linkageSwitching) def handleApiResponse(self, response_data): response_data = response_data["data"] self.folderTitleLabel.setText(response_data["source"]["name"]) infoLabel = f"创建时间: {self.format_date(response_data['create_date'])} | 浏览次数: {response_data['views']} | 下载次数: {response_data['downloads']}" self.infomationLabel.setText(infoLabel) self.authorAvatar.setText(response_data["creator"]["nick"]) self.authorNameLabel.setText(response_data["creator"]["nick"]) self.loadAvatarFromId(response_data["creator"]["key"]) def loadAvatarFromId(self, _id): """从网络URL加载头像""" # 使用V4 API获取头像 url = f"/user/avatar/{_id}/l" logger.info(f"开始从网络加载头像") request = QNetworkRequest(QUrl(url)) self.networkManager.get(request) def onAvatarDownloaded(self, reply): """处理头像下载完成""" if reply.error() == QNetworkReply.NetworkError.NoError: # 读取下载的数据 data = reply.readAll() # 创建QPixmap并加载数据 pixmap = QPixmap() if pixmap.loadFromData(data): # 更新头像 self.authorAvatar.setImage(pixmap) logger.info("网络头像加载成功") else: logger.error("头像数据格式不支持") else: logger.error(f"头像下载失败: {reply.errorString()}") pixmap = QPixmap(":app/images/logo.png") self.authorAvatar.setImage(pixmap) logger.info("使用默认头像") self.authorAvatar.scaledToHeight(20) self.authorAvatar.scaledToWidth(20) reply.deleteLater() def format_size(self, size): """格式化文件大小""" for unit in ["B", "KB", "MB", "GB", "TB"]: if size < 1024: return f"{size:.2f} {unit}" size /= 1024 return f"{size:.2f} PB" def format_date(self, date_str): """格式化日期时间""" try: # 处理带小数秒的情况 if "." in date_str: # 分割日期和小数秒部分 date_part, fractional_part = date_str.split(".", 1) # 去除末尾的'Z'并截取前6位小数 fractional_sec = fractional_part.rstrip("Z")[:6] # 重新组合日期字符串 normalized_date_str = f"{date_part}.{fractional_sec}Z" date_time = datetime.strptime( normalized_date_str, "%Y-%m-%dT%H:%M:%S.%fZ" ) else: # 处理没有小数秒的情况 date_time = datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%SZ") except ValueError: # 如果所有格式都失败,返回原始字符串 return date_str return date_time.strftime("%Y-%m-%d %H:%M:%S") def refreshCurrentDirectory(self): """刷新当前目录的文件卡片""" logger.info(f"刷新当前目录: {self.currentPath}") # 重新加载当前目录数据 self.linkageSwitching._onLoadDict(self.currentPath) def clickChangeDir(self, name): """处理面包屑导航项点击事件""" logger.info(f"面包屑导航项点击: {name}") # 获取点击的路径 if name == "": name = "/" pathList = [] for item in self.breadcrumbBar.items: if item.text == name: pathList.append(item.text) break else: pathList.append(item.text) # 清理空路径 for i in pathList: if i == "": pathList.remove(i) path = "/".join(pathList) if pathList else "/" path = path[1:] if path == "": path = "/" if path == self.currentPath: logger.debug("路径未变化,跳过导航") return self.onChangeDir(path) def connectSignals(self): """连接信号与槽""" logger.debug("连接主文件管理界面信号") # 连接搜索信号 signalBus.shareDirOpenSignal.connect(lambda x: self.onChangeDir(x)) # signalBus.refreshFolderListSignal.connect(self.refreshCurrentDirectory) signalBus.shareFileDownloadSignal.connect(self.accept) def onChangeDir(self, path): """处理目录变更""" logger.info(f"变更目录: {path}") self.linkageSwitching._onLoadDict(path) if path == "": self.currentPath = "/" else: self.currentPath = path # 更新面包屑导航 display_name = path.split("/")[-1] if path != "/" else "/" self.breadcrumbBar.addItem(display_name, display_name)