2025-04-20 17:31:25 +08:00
package workflows
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/samber/lo"
)
type (
RemoteDownloadTask struct {
* queue . DBTask
l logging . Logger
state * RemoteDownloadTaskState
node cluster . Node
d downloader . Downloader
progress queue . Progresses
}
RemoteDownloadTaskPhase string
RemoteDownloadTaskState struct {
SrcFileUri string ` json:"src_file_uri,omitempty" `
SrcUri string ` json:"src_uri,omitempty" `
Dst string ` json:"dst,omitempty" `
Handle * downloader . TaskHandle ` json:"handle,omitempty" `
Status * downloader . TaskStatus ` json:"status,omitempty" `
NodeState ` json:",inline" `
Phase RemoteDownloadTaskPhase ` json:"phase,omitempty" `
SlaveUploadTaskID int ` json:"slave__upload_task_id,omitempty" `
SlaveUploadState * SlaveUploadTaskState ` json:"slave_upload_state,omitempty" `
GetTaskStatusTried int ` json:"get_task_status_tried,omitempty" `
Transferred map [ int ] interface { } ` json:"transferred,omitempty" `
Failed int ` json:"failed,omitempty" `
}
)
const (
RemoteDownloadTaskPhaseNotStarted RemoteDownloadTaskPhase = ""
RemoteDownloadTaskPhaseMonitor = "monitor"
RemoteDownloadTaskPhaseTransfer = "transfer"
RemoteDownloadTaskPhaseAwaitSeeding = "seeding"
GetTaskStatusMaxTries = 5
SummaryKeyDownloadStatus = "download"
SummaryKeySrcStr = "src_str"
ProgressTypeRelocateTransferCount = "relocate"
ProgressTypeUploadSinglePrefix = "upload_single_"
SummaryKeySrcMultiple = "src_multiple"
SummaryKeySrcDstPolicyID = "dst_policy_id"
SummaryKeyFailed = "failed"
)
func init ( ) {
queue . RegisterResumableTaskFactory ( queue . RemoteDownloadTaskType , NewRemoteDownloadTaskFromModel )
}
// NewRemoteDownloadTask creates a new RemoteDownloadTask
func NewRemoteDownloadTask ( ctx context . Context , src string , srcFile , dst string ) ( queue . Task , error ) {
state := & RemoteDownloadTaskState {
SrcUri : src ,
SrcFileUri : srcFile ,
Dst : dst ,
NodeState : NodeState { } ,
}
stateBytes , err := json . Marshal ( state )
if err != nil {
return nil , fmt . Errorf ( "failed to marshal state: %w" , err )
}
t := & RemoteDownloadTask {
DBTask : & queue . DBTask {
Task : & ent . Task {
Type : queue . RemoteDownloadTaskType ,
CorrelationID : logging . CorrelationID ( ctx ) ,
PrivateState : string ( stateBytes ) ,
PublicState : & types . TaskPublicState { } ,
} ,
DirectOwner : inventory . UserFromContext ( ctx ) ,
} ,
}
return t , nil
}
func NewRemoteDownloadTaskFromModel ( task * ent . Task ) queue . Task {
return & RemoteDownloadTask {
DBTask : & queue . DBTask {
Task : task ,
} ,
}
}
func ( m * RemoteDownloadTask ) Do ( ctx context . Context ) ( task . Status , error ) {
dep := dependency . FromContext ( ctx )
m . l = dep . Logger ( )
// unmarshal state
state := & RemoteDownloadTaskState { }
if err := json . Unmarshal ( [ ] byte ( m . State ( ) ) , state ) ; err != nil {
return task . StatusError , fmt . Errorf ( "failed to unmarshal state: %w" , err )
}
m . state = state
// select node
node , err := allocateNode ( ctx , dep , & m . state . NodeState , types . NodeCapabilityRemoteDownload )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to allocate node: %w" , err )
}
m . node = node
// create downloader instance
if m . d == nil {
d , err := node . CreateDownloader ( ctx , dep . RequestClient ( ) , dep . SettingProvider ( ) )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to create downloader: %w" , err )
}
m . d = d
}
next := task . StatusCompleted
switch m . state . Phase {
case RemoteDownloadTaskPhaseNotStarted :
next , err = m . createDownloadTask ( ctx , dep )
case RemoteDownloadTaskPhaseMonitor , RemoteDownloadTaskPhaseAwaitSeeding :
next , err = m . monitor ( ctx , dep )
case RemoteDownloadTaskPhaseTransfer :
if m . node . IsMaster ( ) {
next , err = m . masterTransfer ( ctx , dep )
} else {
next , err = m . slaveTransfer ( ctx , dep )
}
}
newStateStr , marshalErr := json . Marshal ( m . state )
if marshalErr != nil {
return task . StatusError , fmt . Errorf ( "failed to marshal state: %w" , marshalErr )
}
m . Lock ( )
m . Task . PrivateState = string ( newStateStr )
m . Unlock ( )
return next , err
}
func ( m * RemoteDownloadTask ) createDownloadTask ( ctx context . Context , dep dependency . Dep ) ( task . Status , error ) {
if m . state . Handle != nil {
m . state . Phase = RemoteDownloadTaskPhaseMonitor
return task . StatusSuspending , nil
}
user := inventory . UserFromContext ( ctx )
torrentUrl := m . state . SrcUri
if m . state . SrcFileUri != "" {
// Target is a torrent file
uri , err := fs . NewUriFromString ( m . state . SrcFileUri )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to parse src file uri: %s (%w)" , err , queue . CriticalErr )
}
fm := manager . NewFileManager ( dep , user )
expire := time . Now ( ) . Add ( dep . SettingProvider ( ) . EntityUrlValidDuration ( ctx ) )
torrentUrls , _ , err := fm . GetEntityUrls ( ctx , [ ] manager . GetEntityUrlArgs {
{ URI : uri } ,
} , fs . WithUrlExpire ( & expire ) )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to get torrent entity urls: %w" , err )
}
if len ( torrentUrls ) == 0 {
return task . StatusError , fmt . Errorf ( "no torrent urls found" )
}
torrentUrl = torrentUrls [ 0 ]
}
// Create download task
handle , err := m . d . CreateTask ( ctx , torrentUrl , user . Edges . Group . Settings . RemoteDownloadOptions )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to create download task: %w" , err )
}
m . state . Handle = handle
m . state . Phase = RemoteDownloadTaskPhaseMonitor
return task . StatusSuspending , nil
}
func ( m * RemoteDownloadTask ) monitor ( ctx context . Context , dep dependency . Dep ) ( task . Status , error ) {
resumeAfter := time . Duration ( m . node . Settings ( ctx ) . Interval ) * time . Second
// Update task status
status , err := m . d . Info ( ctx , m . state . Handle )
if err != nil {
if errors . Is ( err , downloader . ErrTaskNotFount ) && m . state . Status != nil {
// If task is not found, but it previously existed, consider it as canceled
m . l . Warning ( "task not found, consider it as canceled" )
return task . StatusCanceled , nil
}
m . state . GetTaskStatusTried ++
if m . state . GetTaskStatusTried >= GetTaskStatusMaxTries {
return task . StatusError , fmt . Errorf ( "failed to get task status after %d retry: %w" , m . state . GetTaskStatusTried , err )
}
m . l . Warning ( "failed to get task info: %s, will retry." , err )
m . ResumeAfter ( resumeAfter )
return task . StatusSuspending , nil
}
// Follow to new handle if needed
if status . FollowedBy != nil {
m . l . Info ( "Task handle updated to %v" , status . FollowedBy )
m . state . Handle = status . FollowedBy
m . ResumeAfter ( 0 )
return task . StatusSuspending , nil
}
if m . state . Status == nil || m . state . Status . Total != status . Total {
m . l . Info ( "download size changed, re-validate files." )
// First time to get status / total size changed, check user capacity
if err := m . validateFiles ( ctx , dep , status ) ; err != nil {
m . state . Status = status
return task . StatusError , fmt . Errorf ( "failed to validate files: %s (%w)" , err , queue . CriticalErr )
}
}
m . state . Status = status
m . state . GetTaskStatusTried = 0
m . l . Debug ( "Monitor %q task state: %s" , status . Name , status . State )
switch status . State {
case downloader . StatusSeeding :
m . l . Info ( "Download task seeding" )
if m . state . Phase == RemoteDownloadTaskPhaseMonitor {
// Not transferred
m . state . Phase = RemoteDownloadTaskPhaseTransfer
return task . StatusSuspending , nil
} else if ! m . node . Settings ( ctx ) . WaitForSeeding {
// Skip seeding
m . l . Info ( "Download task seeding skipped." )
return task . StatusCompleted , nil
} else {
// Still seeding
m . ResumeAfter ( resumeAfter )
return task . StatusSuspending , nil
}
case downloader . StatusCompleted :
m . l . Info ( "Download task completed" )
if m . state . Phase == RemoteDownloadTaskPhaseMonitor {
// Not transferred
m . state . Phase = RemoteDownloadTaskPhaseTransfer
return task . StatusSuspending , nil
}
// Seeding complete
m . l . Info ( "Download task seeding completed" )
return task . StatusCompleted , nil
case downloader . StatusDownloading :
m . ResumeAfter ( resumeAfter )
return task . StatusSuspending , nil
case downloader . StatusUnknown , downloader . StatusError :
2025-04-27 10:36:06 +08:00
return task . StatusError , fmt . Errorf ( "download task failed with state %q (%w), errorMsg: %s" , status . State , queue . CriticalErr , status . ErrorMessage )
2025-04-20 17:31:25 +08:00
}
m . ResumeAfter ( resumeAfter )
return task . StatusSuspending , nil
}
func ( m * RemoteDownloadTask ) slaveTransfer ( ctx context . Context , dep dependency . Dep ) ( task . Status , error ) {
u := inventory . UserFromContext ( ctx )
if m . state . Transferred == nil {
m . state . Transferred = make ( map [ int ] interface { } )
}
if m . state . SlaveUploadTaskID == 0 {
dstUri , err := fs . NewUriFromString ( m . state . Dst )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to parse dst uri %q: %s (%w)" , m . state . Dst , err , queue . CriticalErr )
}
// Create slave upload task
payload := & SlaveUploadTaskState {
Files : [ ] SlaveUploadEntity { } ,
MaxParallel : dep . SettingProvider ( ) . MaxParallelTransfer ( ctx ) ,
UserID : u . ID ,
}
// Construct files to be transferred
for _ , f := range m . state . Status . Files {
if ! f . Selected {
continue
}
// Skip already transferred
if _ , ok := m . state . Transferred [ f . Index ] ; ok {
continue
}
dst := dstUri . JoinRaw ( f . Name )
src := filepath . FromSlash ( path . Join ( m . state . Status . SavePath , f . Name ) )
payload . Files = append ( payload . Files , SlaveUploadEntity {
Src : src ,
Uri : dst ,
Size : f . Size ,
Index : f . Index ,
} )
}
payloadStr , err := json . Marshal ( payload )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to marshal payload: %w" , err )
}
taskId , err := m . node . CreateTask ( ctx , queue . SlaveUploadTaskType , string ( payloadStr ) )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to create slave task: %w" , err )
}
m . state . NodeState . progress = nil
m . state . SlaveUploadTaskID = taskId
m . ResumeAfter ( 0 )
return task . StatusSuspending , nil
}
m . l . Info ( "Checking slave upload task %d..." , m . state . SlaveUploadTaskID )
t , err := m . node . GetTask ( ctx , m . state . SlaveUploadTaskID , true )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to get slave task: %w" , err )
}
m . Lock ( )
m . state . NodeState . progress = t . Progress
m . Unlock ( )
m . state . SlaveUploadState = & SlaveUploadTaskState { }
if err := json . Unmarshal ( [ ] byte ( t . PrivateState ) , m . state . SlaveUploadState ) ; err != nil {
return task . StatusError , fmt . Errorf ( "failed to unmarshal slave compress state: %s (%w)" , err , queue . CriticalErr )
}
if t . Status == task . StatusError || t . Status == task . StatusCompleted {
if len ( m . state . SlaveUploadState . Transferred ) < len ( m . state . SlaveUploadState . Files ) {
// Not all files transferred, retry
slaveTaskId := m . state . SlaveUploadTaskID
m . state . SlaveUploadTaskID = 0
for i , _ := range m . state . SlaveUploadState . Transferred {
m . state . Transferred [ m . state . SlaveUploadState . Files [ i ] . Index ] = struct { } { }
}
m . l . Warning ( "Slave task %d failed to transfer %d files, retrying..." , slaveTaskId , len ( m . state . SlaveUploadState . Files ) - len ( m . state . SlaveUploadState . Transferred ) )
return task . StatusError , fmt . Errorf (
"slave task failed to transfer %d files, first 5 errors: %s" ,
len ( m . state . SlaveUploadState . Files ) - len ( m . state . SlaveUploadState . Transferred ) ,
m . state . SlaveUploadState . First5TransferErrors ,
)
} else {
m . state . Phase = RemoteDownloadTaskPhaseAwaitSeeding
m . ResumeAfter ( 0 )
return task . StatusSuspending , nil
}
}
if t . Status == task . StatusCanceled {
return task . StatusError , fmt . Errorf ( "slave task canceled (%w)" , queue . CriticalErr )
}
m . l . Info ( "Slave task %d is still uploading, resume after 30s." , m . state . SlaveUploadTaskID )
m . ResumeAfter ( time . Second * 30 )
return task . StatusSuspending , nil
}
func ( m * RemoteDownloadTask ) masterTransfer ( ctx context . Context , dep dependency . Dep ) ( task . Status , error ) {
if m . state . Transferred == nil {
m . state . Transferred = make ( map [ int ] interface { } )
}
maxParallel := dep . SettingProvider ( ) . MaxParallelTransfer ( ctx )
wg := sync . WaitGroup { }
worker := make ( chan int , maxParallel )
for i := 0 ; i < maxParallel ; i ++ {
worker <- i
}
// Sum up total count and select files
totalCount := 0
totalSize := int64 ( 0 )
allFiles := make ( [ ] downloader . TaskFile , 0 , len ( m . state . Status . Files ) )
for _ , f := range m . state . Status . Files {
if f . Selected {
allFiles = append ( allFiles , f )
totalSize += f . Size
totalCount ++
}
}
m . Lock ( )
m . progress = make ( queue . Progresses )
m . progress [ ProgressTypeUploadCount ] = & queue . Progress { Total : int64 ( totalCount ) }
m . progress [ ProgressTypeUpload ] = & queue . Progress { Total : totalSize }
m . Unlock ( )
dstUri , err := fs . NewUriFromString ( m . state . Dst )
if err != nil {
return task . StatusError , fmt . Errorf ( "failed to parse dst uri: %s (%w)" , err , queue . CriticalErr )
}
user := inventory . UserFromContext ( ctx )
fm := manager . NewFileManager ( dep , user )
failed := int64 ( 0 )
ae := serializer . NewAggregateError ( )
transferFunc := func ( workerId int , file downloader . TaskFile ) {
defer func ( ) {
atomic . AddInt64 ( & m . progress [ ProgressTypeUploadCount ] . Current , 1 )
worker <- workerId
wg . Done ( )
} ( )
dst := dstUri . JoinRaw ( file . Name )
src := filepath . FromSlash ( path . Join ( m . state . Status . SavePath , file . Name ) )
m . l . Info ( "Uploading file %s to %s..." , src , file . Name , dst )
progressKey := fmt . Sprintf ( "%s%d" , ProgressTypeUploadSinglePrefix , workerId )
m . Lock ( )
m . progress [ progressKey ] = & queue . Progress { Identifier : dst . String ( ) , Total : file . Size }
m . Unlock ( )
fileStream , err := os . Open ( src )
if err != nil {
m . l . Warning ( "Failed to open file %s: %s" , src , err . Error ( ) )
atomic . AddInt64 ( & m . progress [ ProgressTypeUpload ] . Current , file . Size )
atomic . AddInt64 ( & failed , 1 )
ae . Add ( file . Name , fmt . Errorf ( "failed to open file: %w" , err ) )
return
}
defer fileStream . Close ( )
fileData := & fs . UploadRequest {
Props : & fs . UploadProps {
Uri : dst ,
Size : file . Size ,
} ,
ProgressFunc : func ( current , diff int64 , total int64 ) {
atomic . AddInt64 ( & m . progress [ progressKey ] . Current , diff )
atomic . AddInt64 ( & m . progress [ ProgressTypeUpload ] . Current , diff )
} ,
File : fileStream ,
}
_ , err = fm . Update ( ctx , fileData , fs . WithNoEntityType ( ) )
if err != nil {
m . l . Warning ( "Failed to upload file %s: %s" , src , err . Error ( ) )
atomic . AddInt64 ( & failed , 1 )
atomic . AddInt64 ( & m . progress [ ProgressTypeUpload ] . Current , file . Size )
ae . Add ( file . Name , fmt . Errorf ( "failed to upload file: %w" , err ) )
return
}
m . Lock ( )
m . state . Transferred [ file . Index ] = nil
m . Unlock ( )
}
// Start upload files
for _ , file := range allFiles {
// Check if file is already transferred
if _ , ok := m . state . Transferred [ file . Index ] ; ok {
m . l . Info ( "File %s already transferred, skipping..." , file . Name )
atomic . AddInt64 ( & m . progress [ ProgressTypeUpload ] . Current , file . Size )
atomic . AddInt64 ( & m . progress [ ProgressTypeUploadCount ] . Current , 1 )
continue
}
select {
case <- ctx . Done ( ) :
return task . StatusError , ctx . Err ( )
case workerId := <- worker :
wg . Add ( 1 )
go transferFunc ( workerId , file )
}
}
wg . Wait ( )
if failed > 0 {
m . state . Failed = int ( failed )
m . l . Error ( "Failed to transfer %d file(s)." , failed )
return task . StatusError , fmt . Errorf ( "failed to transfer %d file(s), first 5 errors: %s" , failed , ae . FormatFirstN ( 5 ) )
}
m . l . Info ( "All files transferred." )
m . state . Phase = RemoteDownloadTaskPhaseAwaitSeeding
return task . StatusSuspending , nil
}
func ( m * RemoteDownloadTask ) awaitSeeding ( ctx context . Context , dep dependency . Dep ) ( task . Status , error ) {
return task . StatusSuspending , nil
}
func ( m * RemoteDownloadTask ) validateFiles ( ctx context . Context , dep dependency . Dep , status * downloader . TaskStatus ) error {
// Validate files
user := inventory . UserFromContext ( ctx )
fm := manager . NewFileManager ( dep , user )
dstUri , err := fs . NewUriFromString ( m . state . Dst )
if err != nil {
return fmt . Errorf ( "failed to parse dst uri: %w" , err )
}
selectedFiles := lo . Filter ( status . Files , func ( f downloader . TaskFile , _ int ) bool {
return f . Selected
} )
if len ( selectedFiles ) == 0 {
return fmt . Errorf ( "no selected file found in download task" )
}
2025-04-24 15:26:58 +08:00
validateArgs := lo . Map ( selectedFiles , func ( f downloader . TaskFile , _ int ) fs . PreValidateFile {
return fs . PreValidateFile {
Name : f . Name ,
Size : f . Size ,
OmitName : f . Name == "" ,
2025-04-20 17:31:25 +08:00
}
} )
2025-04-24 15:26:58 +08:00
if err := fm . PreValidateUpload ( ctx , dstUri , validateArgs ... ) ; err != nil {
return fmt . Errorf ( "failed to pre-validate files: %w" , err )
2025-04-20 17:31:25 +08:00
}
return nil
}
func ( m * RemoteDownloadTask ) Cleanup ( ctx context . Context ) error {
if m . state . Handle != nil {
if err := m . d . Cancel ( ctx , m . state . Handle ) ; err != nil {
m . l . Warning ( "failed to cancel download task: %s" , err )
}
}
if m . state . Status != nil && m . node . IsMaster ( ) && m . state . Status . SavePath != "" {
if err := os . RemoveAll ( m . state . Status . SavePath ) ; err != nil {
m . l . Warning ( "failed to remove download temp folder: %s" , err )
}
}
return nil
}
// SetDownloadTarget sets the files to download for the task
func ( m * RemoteDownloadTask ) SetDownloadTarget ( ctx context . Context , args ... * downloader . SetFileToDownloadArgs ) error {
if m . state . Handle == nil {
return fmt . Errorf ( "download task not created" )
}
return m . d . SetFilesToDownload ( ctx , m . state . Handle , args ... )
}
// CancelDownload cancels the download task
func ( m * RemoteDownloadTask ) CancelDownload ( ctx context . Context ) error {
if m . state . Handle == nil {
return nil
}
return m . d . Cancel ( ctx , m . state . Handle )
}
func ( m * RemoteDownloadTask ) Summarize ( hasher hashid . Encoder ) * queue . Summary {
// unmarshal state
if m . state == nil {
if err := json . Unmarshal ( [ ] byte ( m . State ( ) ) , & m . state ) ; err != nil {
return nil
}
}
var status * downloader . TaskStatus
if m . state . Status != nil {
status = & * m . state . Status
// Redact save path
status . SavePath = ""
}
failed := m . state . Failed
if m . state . SlaveUploadState != nil && m . state . Phase != RemoteDownloadTaskPhaseTransfer {
failed = len ( m . state . SlaveUploadState . Files ) - len ( m . state . SlaveUploadState . Transferred )
}
return & queue . Summary {
Phase : string ( m . state . Phase ) ,
NodeID : m . state . NodeID ,
Props : map [ string ] any {
SummaryKeySrcStr : m . state . SrcUri ,
SummaryKeySrc : m . state . SrcFileUri ,
SummaryKeyDst : m . state . Dst ,
SummaryKeyFailed : failed ,
SummaryKeyDownloadStatus : status ,
} ,
}
}
func ( m * RemoteDownloadTask ) Progress ( ctx context . Context ) queue . Progresses {
m . Lock ( )
defer m . Unlock ( )
if m . state . NodeState . progress != nil {
merged := make ( queue . Progresses )
for k , v := range m . progress {
merged [ k ] = v
}
for k , v := range m . state . NodeState . progress {
merged [ k ] = v
}
return merged
}
return m . progress
}