Init V4 community edition (#2265)

* Init V4 community edition

* Init V4 community edition
This commit is contained in:
AaronLiu
2025-04-20 17:31:25 +08:00
committed by GitHub
parent da4e44b77a
commit 21d158db07
597 changed files with 119415 additions and 41692 deletions

View File

@@ -0,0 +1,682 @@
package workflows
import (
"archive/zip"
"context"
"encoding/json"
"fmt"
"io"
"os"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager/entitysource"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
"github.com/gofrs/uuid"
)
type (
CreateArchiveTask struct {
*queue.DBTask
l logging.Logger
state *CreateArchiveTaskState
progress queue.Progresses
node cluster.Node
}
CreateArchiveTaskPhase string
CreateArchiveTaskState struct {
Uris []string `json:"uris,omitempty"`
Dst string `json:"dst,omitempty"`
TempPath string `json:"temp_path,omitempty"`
ArchiveFile string `json:"archive_file,omitempty"`
Phase CreateArchiveTaskPhase `json:"phase,omitempty"`
SlaveUploadTaskID int `json:"slave__upload_task_id,omitempty"`
SlaveArchiveTaskID int `json:"slave__archive_task_id,omitempty"`
SlaveCompressState *SlaveCreateArchiveTaskState `json:"slave_compress_state,omitempty"`
Failed int `json:"failed,omitempty"`
NodeState `json:",inline"`
}
)
const (
CreateArchiveTaskPhaseNotStarted CreateArchiveTaskPhase = "not_started"
CreateArchiveTaskPhaseCompressFiles CreateArchiveTaskPhase = "compress_files"
CreateArchiveTaskPhaseUploadArchive CreateArchiveTaskPhase = "upload_archive"
CreateArchiveTaskPhaseAwaitSlaveCompressing CreateArchiveTaskPhase = "await_slave_compressing"
CreateArchiveTaskPhaseCreateAndAwaitSlaveUploading CreateArchiveTaskPhase = "await_slave_uploading"
CreateArchiveTaskPhaseCompleteUpload CreateArchiveTaskPhase = "complete_upload"
ProgressTypeArchiveCount = "archive_count"
ProgressTypeArchiveSize = "archive_size"
ProgressTypeUpload = "upload"
ProgressTypeUploadCount = "upload_count"
)
func init() {
queue.RegisterResumableTaskFactory(queue.CreateArchiveTaskType, NewCreateArchiveTaskFromModel)
}
// NewCreateArchiveTask creates a new CreateArchiveTask
func NewCreateArchiveTask(ctx context.Context, src []string, dst string) (queue.Task, error) {
state := &CreateArchiveTaskState{
Uris: src,
Dst: dst,
NodeState: NodeState{},
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
t := &CreateArchiveTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
Type: queue.CreateArchiveTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
DirectOwner: inventory.UserFromContext(ctx),
},
}
return t, nil
}
func NewCreateArchiveTaskFromModel(task *ent.Task) queue.Task {
return &CreateArchiveTask{
DBTask: &queue.DBTask{
Task: task,
},
}
}
func (m *CreateArchiveTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
m.l = dep.Logger()
m.Lock()
if m.progress == nil {
m.progress = make(queue.Progresses)
}
m.Unlock()
// unmarshal state
state := &CreateArchiveTaskState{}
if err := json.Unmarshal([]byte(m.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
}
m.state = state
// select node
node, err := allocateNode(ctx, dep, &m.state.NodeState, types.NodeCapabilityCreateArchive)
if err != nil {
return task.StatusError, fmt.Errorf("failed to allocate node: %w", err)
}
m.node = node
next := task.StatusCompleted
if m.node.IsMaster() {
// Initialize temp folder
// Compress files
// Upload files to dst
switch m.state.Phase {
case CreateArchiveTaskPhaseNotStarted, "":
next, err = m.initializeTempFolder(ctx, dep)
case CreateArchiveTaskPhaseCompressFiles:
next, err = m.createArchiveFile(ctx, dep)
case CreateArchiveTaskPhaseUploadArchive:
next, err = m.uploadArchive(ctx, dep)
default:
next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr)
}
} else {
// Listing all files and send to slave node for compressing
// Await compressing and send to slave for uploading
// Await uploading and complete upload
switch m.state.Phase {
case CreateArchiveTaskPhaseNotStarted, "":
next, err = m.listEntitiesAndSendToSlave(ctx, dep)
case CreateArchiveTaskPhaseAwaitSlaveCompressing:
next, err = m.awaitSlaveCompressing(ctx, dep)
case CreateArchiveTaskPhaseCreateAndAwaitSlaveUploading:
next, err = m.createAndAwaitSlaveUploading(ctx, dep)
case CreateArchiveTaskPhaseCompleteUpload:
next, err = m.completeUpload(ctx, dep)
default:
next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr)
}
}
newStateStr, marshalErr := json.Marshal(m.state)
if marshalErr != nil {
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
}
m.Lock()
m.Task.PrivateState = string(newStateStr)
m.Unlock()
return next, err
}
func (m *CreateArchiveTask) Cleanup(ctx context.Context) error {
if m.state.SlaveCompressState != nil && m.state.SlaveCompressState.TempPath != "" && m.node != nil {
if err := m.node.CleanupFolders(context.Background(), m.state.SlaveCompressState.TempPath); err != nil {
m.l.Warning("Failed to cleanup slave temp folder %s: %s", m.state.SlaveCompressState.TempPath, err)
}
}
if m.state.TempPath != "" {
time.Sleep(time.Duration(1) * time.Second)
return os.RemoveAll(m.state.TempPath)
}
return nil
}
func (m *CreateArchiveTask) initializeTempFolder(ctx context.Context, dep dependency.Dep) (task.Status, error) {
tempPath, err := prepareTempFolder(ctx, dep, m)
if err != nil {
return task.StatusError, fmt.Errorf("failed to prepare temp folder: %w", err)
}
m.state.TempPath = tempPath
m.state.Phase = CreateArchiveTaskPhaseCompressFiles
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
func (m *CreateArchiveTask) listEntitiesAndSendToSlave(ctx context.Context, dep dependency.Dep) (task.Status, error) {
uris, err := fs.NewUriFromStrings(m.state.Uris...)
if err != nil {
return task.StatusError, fmt.Errorf("failed to create uri from strings: %s (%w)", err, queue.CriticalErr)
}
payload := &SlaveCreateArchiveTaskState{
Entities: make([]SlaveCreateArchiveEntity, 0, len(uris)),
Policies: make(map[int]*ent.StoragePolicy),
}
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
storagePolicyClient := dep.StoragePolicyClient()
failed, err := fm.CreateArchive(ctx, uris, io.Discard,
fs.WithDryRun(func(name string, e fs.Entity) {
payload.Entities = append(payload.Entities, SlaveCreateArchiveEntity{
Entity: e.Model(),
Path: name,
})
if _, ok := payload.Policies[e.PolicyID()]; !ok {
policy, err := storagePolicyClient.GetPolicyByID(ctx, e.PolicyID())
if err != nil {
m.l.Warning("Failed to get policy %d: %s", e.PolicyID(), err)
} else {
payload.Policies[e.PolicyID()] = policy
}
}
}),
fs.WithMaxArchiveSize(user.Edges.Group.Settings.CompressSize),
)
if err != nil {
return task.StatusError, fmt.Errorf("failed to compress files: %w", err)
}
m.state.Failed = failed
payloadStr, err := json.Marshal(payload)
if err != nil {
return task.StatusError, fmt.Errorf("failed to marshal payload: %w", err)
}
taskId, err := m.node.CreateTask(ctx, queue.SlaveCreateArchiveTaskType, string(payloadStr))
if err != nil {
return task.StatusError, fmt.Errorf("failed to create slave task: %w", err)
}
m.state.Phase = CreateArchiveTaskPhaseAwaitSlaveCompressing
m.state.SlaveArchiveTaskID = taskId
m.ResumeAfter((10 * time.Second))
return task.StatusSuspending, nil
}
func (m *CreateArchiveTask) awaitSlaveCompressing(ctx context.Context, dep dependency.Dep) (task.Status, error) {
t, err := m.node.GetTask(ctx, m.state.SlaveArchiveTaskID, false)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get slave task: %w", err)
}
m.Lock()
m.state.NodeState.progress = t.Progress
m.Unlock()
m.state.SlaveCompressState = &SlaveCreateArchiveTaskState{}
if err := json.Unmarshal([]byte(t.PrivateState), m.state.SlaveCompressState); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal slave compress state: %s (%w)", err, queue.CriticalErr)
}
if t.Status == task.StatusError {
return task.StatusError, fmt.Errorf("slave task failed: %s (%w)", t.Error, queue.CriticalErr)
}
if t.Status == task.StatusCanceled {
return task.StatusError, fmt.Errorf("slave task canceled (%w)", queue.CriticalErr)
}
if t.Status == task.StatusCompleted {
m.state.Phase = CreateArchiveTaskPhaseCreateAndAwaitSlaveUploading
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
m.l.Info("Slave task %d is still compressing, resume after 30s.", m.state.SlaveArchiveTaskID)
m.ResumeAfter((time.Second * 30))
return task.StatusSuspending, nil
}
func (m *CreateArchiveTask) createAndAwaitSlaveUploading(ctx context.Context, dep dependency.Dep) (task.Status, error) {
u := inventory.UserFromContext(ctx)
if m.state.SlaveUploadTaskID == 0 {
dst, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse dst uri %q: %s (%w)", m.state.Dst, err, queue.CriticalErr)
}
// Create slave upload task
payload := &SlaveUploadTaskState{
Files: []SlaveUploadEntity{
{
Size: m.state.SlaveCompressState.CompressedSize,
Uri: dst,
Src: m.state.SlaveCompressState.ZipFilePath,
},
},
MaxParallel: dep.SettingProvider().MaxParallelTransfer(ctx),
UserID: u.ID,
}
payloadStr, err := json.Marshal(payload)
if err != nil {
return task.StatusError, fmt.Errorf("failed to marshal payload: %w", err)
}
taskId, err := m.node.CreateTask(ctx, queue.SlaveUploadTaskType, string(payloadStr))
if err != nil {
return task.StatusError, fmt.Errorf("failed to create slave task: %w", err)
}
m.state.NodeState.progress = nil
m.state.SlaveUploadTaskID = taskId
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
m.l.Info("Checking slave upload task %d...", m.state.SlaveUploadTaskID)
t, err := m.node.GetTask(ctx, m.state.SlaveUploadTaskID, true)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get slave task: %w", err)
}
m.Lock()
m.state.NodeState.progress = t.Progress
m.Unlock()
if t.Status == task.StatusError {
return task.StatusError, fmt.Errorf("slave task failed: %s (%w)", t.Error, queue.CriticalErr)
}
if t.Status == task.StatusCanceled {
return task.StatusError, fmt.Errorf("slave task canceled (%w)", queue.CriticalErr)
}
if t.Status == task.StatusCompleted {
m.state.Phase = CreateArchiveTaskPhaseCompleteUpload
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
m.l.Info("Slave task %d is still uploading, resume after 30s.", m.state.SlaveUploadTaskID)
m.ResumeAfter(time.Second * 30)
return task.StatusSuspending, nil
}
func (m *CreateArchiveTask) completeUpload(ctx context.Context, dep dependency.Dep) (task.Status, error) {
return task.StatusCompleted, nil
}
func (m *CreateArchiveTask) createArchiveFile(ctx context.Context, dep dependency.Dep) (task.Status, error) {
uris, err := fs.NewUriFromStrings(m.state.Uris...)
if err != nil {
return task.StatusError, fmt.Errorf("failed to create uri from strings: %s (%w)", err, queue.CriticalErr)
}
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
// Create temp zip file
fileName := fmt.Sprintf("%s.zip", uuid.Must(uuid.NewV4()))
zipFilePath := filepath.Join(
m.state.TempPath,
fileName,
)
zipFile, err := util.CreatNestedFile(zipFilePath)
if err != nil {
return task.StatusError, fmt.Errorf("failed to create zip file: %w", err)
}
defer zipFile.Close()
// Start compressing
m.Lock()
m.progress[ProgressTypeArchiveCount] = &queue.Progress{}
m.progress[ProgressTypeArchiveSize] = &queue.Progress{}
m.Unlock()
failed, err := fm.CreateArchive(ctx, uris, zipFile,
fs.WithArchiveCompression(true),
fs.WithMaxArchiveSize(user.Edges.Group.Settings.CompressSize),
fs.WithProgressFunc(func(current, diff int64, total int64) {
atomic.AddInt64(&m.progress[ProgressTypeArchiveSize].Current, diff)
atomic.AddInt64(&m.progress[ProgressTypeArchiveCount].Current, 1)
}),
)
if err != nil {
zipFile.Close()
_ = os.Remove(zipFilePath)
return task.StatusError, fmt.Errorf("failed to compress files: %w", err)
}
m.state.Failed = failed
m.Lock()
delete(m.progress, ProgressTypeArchiveSize)
delete(m.progress, ProgressTypeArchiveCount)
m.Unlock()
m.state.Phase = CreateArchiveTaskPhaseUploadArchive
m.state.ArchiveFile = fileName
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
func (m *CreateArchiveTask) uploadArchive(ctx context.Context, dep dependency.Dep) (task.Status, error) {
fm := manager.NewFileManager(dep, inventory.UserFromContext(ctx))
zipFilePath := filepath.Join(
m.state.TempPath,
m.state.ArchiveFile,
)
m.l.Info("Uploading archive file %s to %s...", zipFilePath, m.state.Dst)
uri, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return task.StatusError, fmt.Errorf(
"failed to parse dst uri %q: %s (%w)",
m.state.Dst,
err,
queue.CriticalErr,
)
}
file, err := os.Open(zipFilePath)
if err != nil {
return task.StatusError, fmt.Errorf("failed to open compressed archive %q: %s", m.state.ArchiveFile, err)
}
defer file.Close()
fi, err := file.Stat()
if err != nil {
return task.StatusError, fmt.Errorf("failed to get file info: %w", err)
}
size := fi.Size()
m.Lock()
m.progress[ProgressTypeUpload] = &queue.Progress{}
m.Unlock()
fileData := &fs.UploadRequest{
Props: &fs.UploadProps{
Uri: uri,
Size: size,
},
ProgressFunc: func(current, diff int64, total int64) {
atomic.StoreInt64(&m.progress[ProgressTypeUpload].Current, current)
atomic.StoreInt64(&m.progress[ProgressTypeUpload].Total, total)
},
File: file,
Seeker: file,
}
_, err = fm.Update(ctx, fileData)
if err != nil {
return task.StatusError, fmt.Errorf("failed to upload archive file: %w", err)
}
return task.StatusCompleted, nil
}
func (m *CreateArchiveTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
if m.state.NodeState.progress != nil {
merged := make(queue.Progresses)
for k, v := range m.progress {
merged[k] = v
}
for k, v := range m.state.NodeState.progress {
merged[k] = v
}
return merged
}
return m.progress
}
func (m *CreateArchiveTask) Summarize(hasher hashid.Encoder) *queue.Summary {
// unmarshal state
if m.state == nil {
if err := json.Unmarshal([]byte(m.State()), &m.state); err != nil {
return nil
}
}
failed := m.state.Failed
if m.state.SlaveCompressState != nil {
failed = m.state.SlaveCompressState.Failed
}
return &queue.Summary{
NodeID: m.state.NodeID,
Phase: string(m.state.Phase),
Props: map[string]any{
SummaryKeySrcMultiple: m.state.Uris,
SummaryKeyDst: m.state.Dst,
SummaryKeyFailed: failed,
},
}
}
type (
SlaveCreateArchiveEntity struct {
Entity *ent.Entity `json:"entity"`
Path string `json:"path"`
}
SlaveCreateArchiveTaskState struct {
Entities []SlaveCreateArchiveEntity `json:"entities"`
Policies map[int]*ent.StoragePolicy `json:"policies"`
CompressedSize int64 `json:"compressed_size"`
TempPath string `json:"temp_path"`
ZipFilePath string `json:"zip_file_path"`
Failed int `json:"failed"`
}
SlaveCreateArchiveTask struct {
*queue.InMemoryTask
mu sync.RWMutex
progress queue.Progresses
l logging.Logger
state *SlaveCreateArchiveTaskState
}
)
// NewSlaveCreateArchiveTask creates a new SlaveCreateArchiveTask from raw private state
func NewSlaveCreateArchiveTask(ctx context.Context, props *types.SlaveTaskProps, id int, state string) queue.Task {
return &SlaveCreateArchiveTask{
InMemoryTask: &queue.InMemoryTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
ID: id,
CorrelationID: logging.CorrelationID(ctx),
PublicState: &types.TaskPublicState{
SlaveTaskProps: props,
},
PrivateState: state,
},
},
},
progress: make(queue.Progresses),
}
}
func (t *SlaveCreateArchiveTask) Do(ctx context.Context) (task.Status, error) {
ctx = prepareSlaveTaskCtx(ctx, t.Model().PublicState.SlaveTaskProps)
dep := dependency.FromContext(ctx)
t.l = dep.Logger()
fm := manager.NewFileManager(dep, nil)
// unmarshal state
state := &SlaveCreateArchiveTaskState{}
if err := json.Unmarshal([]byte(t.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
}
t.state = state
totalFiles := int64(0)
totalFileSize := int64(0)
for _, e := range t.state.Entities {
totalFiles++
totalFileSize += e.Entity.Size
}
t.Lock()
t.progress[ProgressTypeArchiveCount] = &queue.Progress{Total: totalFiles}
t.progress[ProgressTypeArchiveSize] = &queue.Progress{Total: totalFileSize}
t.Unlock()
// 3. Create temp workspace
tempPath, err := prepareTempFolder(ctx, dep, t)
if err != nil {
return task.StatusError, fmt.Errorf("failed to prepare temp folder: %w", err)
}
t.state.TempPath = tempPath
// 2. Create archive file
fileName := fmt.Sprintf("%s.zip", uuid.Must(uuid.NewV4()))
zipFilePath := filepath.Join(
t.state.TempPath,
fileName,
)
zipFile, err := util.CreatNestedFile(zipFilePath)
if err != nil {
return task.StatusError, fmt.Errorf("failed to create zip file: %w", err)
}
defer zipFile.Close()
zipWriter := zip.NewWriter(zipFile)
defer zipWriter.Close()
// 3. Download each entity and write into zip file
for _, e := range t.state.Entities {
policy, ok := t.state.Policies[e.Entity.StoragePolicyEntities]
if !ok {
state.Failed++
t.l.Warning("Policy not found for entity %d, skipping...", e.Entity.ID)
continue
}
entity := fs.NewEntity(e.Entity)
es, err := fm.GetEntitySource(ctx, 0,
fs.WithEntity(entity),
fs.WithPolicy(fm.CastStoragePolicyOnSlave(ctx, policy)),
)
if err != nil {
state.Failed++
t.l.Warning("Failed to get entity source for entity %d: %s, skipping...", e.Entity.ID, err)
continue
}
// Write to zip file
header := &zip.FileHeader{
Name: e.Path,
Modified: entity.UpdatedAt(),
UncompressedSize64: uint64(entity.Size()),
Method: zip.Deflate,
}
writer, err := zipWriter.CreateHeader(header)
if err != nil {
es.Close()
state.Failed++
t.l.Warning("Failed to create zip header for %s: %s, skipping...", e.Path, err)
continue
}
es.Apply(entitysource.WithContext(ctx))
_, err = io.Copy(writer, es)
es.Close()
if err != nil {
state.Failed++
t.l.Warning("Failed to write entity %d to zip file: %s, skipping...", e.Entity.ID, err)
}
atomic.AddInt64(&t.progress[ProgressTypeArchiveSize].Current, entity.Size())
atomic.AddInt64(&t.progress[ProgressTypeArchiveCount].Current, 1)
}
zipWriter.Close()
stat, err := zipFile.Stat()
if err != nil {
return task.StatusError, fmt.Errorf("failed to get compressed file info: %w", err)
}
t.state.CompressedSize = stat.Size()
t.state.ZipFilePath = zipFilePath
// Clear unused fields to save space
t.state.Entities = nil
t.state.Policies = nil
newStateStr, marshalErr := json.Marshal(t.state)
if marshalErr != nil {
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
}
t.Lock()
t.Task.PrivateState = string(newStateStr)
t.Unlock()
return task.StatusCompleted, nil
}
func (m *SlaveCreateArchiveTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
return m.progress
}

View File

@@ -0,0 +1,766 @@
package workflows
import (
"context"
"encoding/json"
"fmt"
"io"
"os"
"path"
"path/filepath"
"strings"
"sync/atomic"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs/dbfs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
"github.com/gofrs/uuid"
"github.com/mholt/archiver/v4"
)
type (
ExtractArchiveTask struct {
*queue.DBTask
l logging.Logger
state *ExtractArchiveTaskState
progress queue.Progresses
node cluster.Node
}
ExtractArchiveTaskPhase string
ExtractArchiveTaskState struct {
Uri string `json:"uri,omitempty"`
Encoding string `json:"encoding,omitempty"`
Dst string `json:"dst,omitempty"`
TempPath string `json:"temp_path,omitempty"`
TempZipFilePath string `json:"temp_zip_file_path,omitempty"`
ProcessedCursor string `json:"processed_cursor,omitempty"`
SlaveTaskID int `json:"slave_task_id,omitempty"`
NodeState `json:",inline"`
Phase ExtractArchiveTaskPhase `json:"phase,omitempty"`
}
)
const (
ExtractArchivePhaseNotStarted ExtractArchiveTaskPhase = ""
ExtractArchivePhaseDownloadZip ExtractArchiveTaskPhase = "download_zip"
ExtractArchivePhaseAwaitSlaveComplete ExtractArchiveTaskPhase = "await_slave_complete"
ProgressTypeExtractCount = "extract_count"
ProgressTypeExtractSize = "extract_size"
ProgressTypeDownload = "download"
SummaryKeySrc = "src"
SummaryKeyDst = "dst"
)
func init() {
queue.RegisterResumableTaskFactory(queue.ExtractArchiveTaskType, NewExtractArchiveTaskFromModel)
}
// NewExtractArchiveTask creates a new ExtractArchiveTask
func NewExtractArchiveTask(ctx context.Context, src, dst, encoding string) (queue.Task, error) {
state := &ExtractArchiveTaskState{
Uri: src,
Dst: dst,
Encoding: encoding,
NodeState: NodeState{},
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
t := &ExtractArchiveTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
Type: queue.ExtractArchiveTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
DirectOwner: inventory.UserFromContext(ctx),
},
}
return t, nil
}
func NewExtractArchiveTaskFromModel(task *ent.Task) queue.Task {
return &ExtractArchiveTask{
DBTask: &queue.DBTask{
Task: task,
},
}
}
func (m *ExtractArchiveTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
m.l = dep.Logger()
m.Lock()
if m.progress == nil {
m.progress = make(queue.Progresses)
}
m.Unlock()
// unmarshal state
state := &ExtractArchiveTaskState{}
if err := json.Unmarshal([]byte(m.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
}
m.state = state
// select node
node, err := allocateNode(ctx, dep, &m.state.NodeState, types.NodeCapabilityExtractArchive)
if err != nil {
return task.StatusError, fmt.Errorf("failed to allocate node: %w", err)
}
m.node = node
next := task.StatusCompleted
if node.IsMaster() {
switch m.state.Phase {
case ExtractArchivePhaseNotStarted:
next, err = m.masterExtractArchive(ctx, dep)
case ExtractArchivePhaseDownloadZip:
next, err = m.masterDownloadZip(ctx, dep)
default:
next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr)
}
} else {
switch m.state.Phase {
case ExtractArchivePhaseNotStarted:
next, err = m.createSlaveExtractTask(ctx, dep)
case ExtractArchivePhaseAwaitSlaveComplete:
next, err = m.awaitSlaveExtractComplete(ctx, dep)
default:
next, err = task.StatusError, fmt.Errorf("unknown phase %q: %w", m.state.Phase, queue.CriticalErr)
}
}
newStateStr, marshalErr := json.Marshal(m.state)
if marshalErr != nil {
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
}
m.Lock()
m.Task.PrivateState = string(newStateStr)
m.Unlock()
return next, err
}
func (m *ExtractArchiveTask) createSlaveExtractTask(ctx context.Context, dep dependency.Dep) (task.Status, error) {
uri, err := fs.NewUriFromString(m.state.Uri)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse src uri: %s (%w)", err, queue.CriticalErr)
}
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
// Get entity source to extract
archiveFile, err := fm.Get(ctx, uri, dbfs.WithFileEntities(), dbfs.WithRequiredCapabilities(dbfs.NavigatorCapabilityDownloadFile))
if err != nil {
return task.StatusError, fmt.Errorf("failed to get archive file: %s (%w)", err, queue.CriticalErr)
}
// Validate file size
if user.Edges.Group.Settings.DecompressSize > 0 && archiveFile.Size() > user.Edges.Group.Settings.DecompressSize {
return task.StatusError,
fmt.Errorf("file size %d exceeds the limit %d (%w)", archiveFile.Size(), user.Edges.Group.Settings.DecompressSize, queue.CriticalErr)
}
// Create slave task
storagePolicyClient := dep.StoragePolicyClient()
policy, err := storagePolicyClient.GetPolicyByID(ctx, archiveFile.PrimaryEntity().PolicyID())
if err != nil {
return task.StatusError, fmt.Errorf("failed to get policy: %w", err)
}
payload := &SlaveExtractArchiveTaskState{
FileName: archiveFile.DisplayName(),
Entity: archiveFile.PrimaryEntity().Model(),
Policy: policy,
Encoding: m.state.Encoding,
Dst: m.state.Dst,
UserID: user.ID,
}
payloadStr, err := json.Marshal(payload)
if err != nil {
return task.StatusError, fmt.Errorf("failed to marshal payload: %w", err)
}
taskId, err := m.node.CreateTask(ctx, queue.SlaveExtractArchiveType, string(payloadStr))
if err != nil {
return task.StatusError, fmt.Errorf("failed to create slave task: %w", err)
}
m.state.Phase = ExtractArchivePhaseAwaitSlaveComplete
m.state.SlaveTaskID = taskId
m.ResumeAfter((10 * time.Second))
return task.StatusSuspending, nil
}
func (m *ExtractArchiveTask) awaitSlaveExtractComplete(ctx context.Context, dep dependency.Dep) (task.Status, error) {
t, err := m.node.GetTask(ctx, m.state.SlaveTaskID, true)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get slave task: %w", err)
}
m.Lock()
m.state.NodeState.progress = t.Progress
m.Unlock()
if t.Status == task.StatusError {
return task.StatusError, fmt.Errorf("slave task failed: %s (%w)", t.Error, queue.CriticalErr)
}
if t.Status == task.StatusCanceled {
return task.StatusError, fmt.Errorf("slave task canceled (%w)", queue.CriticalErr)
}
if t.Status == task.StatusCompleted {
return task.StatusCompleted, nil
}
m.l.Info("Slave task %d is still compressing, resume after 30s.", m.state.SlaveTaskID)
m.ResumeAfter((time.Second * 30))
return task.StatusSuspending, nil
}
func (m *ExtractArchiveTask) masterExtractArchive(ctx context.Context, dep dependency.Dep) (task.Status, error) {
uri, err := fs.NewUriFromString(m.state.Uri)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse src uri: %s (%w)", err, queue.CriticalErr)
}
dst, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse dst uri: %s (%w)", err, queue.CriticalErr)
}
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
// Get entity source to extract
archiveFile, err := fm.Get(ctx, uri, dbfs.WithFileEntities(), dbfs.WithRequiredCapabilities(dbfs.NavigatorCapabilityDownloadFile))
if err != nil {
return task.StatusError, fmt.Errorf("failed to get archive file: %s (%w)", err, queue.CriticalErr)
}
// Validate file size
if user.Edges.Group.Settings.DecompressSize > 0 && archiveFile.Size() > user.Edges.Group.Settings.DecompressSize {
return task.StatusError,
fmt.Errorf("file size %d exceeds the limit %d (%w)", archiveFile.Size(), user.Edges.Group.Settings.DecompressSize, queue.CriticalErr)
}
es, err := fm.GetEntitySource(ctx, 0, fs.WithEntity(archiveFile.PrimaryEntity()))
if err != nil {
return task.StatusError, fmt.Errorf("failed to get entity source: %w", err)
}
defer es.Close()
m.l.Info("Extracting archive %q to %q", uri, m.state.Dst)
// Identify file format
format, readStream, err := archiver.Identify(archiveFile.DisplayName(), es)
if err != nil {
return task.StatusError, fmt.Errorf("failed to identify archive format: %w", err)
}
m.l.Info("Archive file %q format identified as %q", uri, format.Name())
extractor, ok := format.(archiver.Extractor)
if !ok {
return task.StatusError, fmt.Errorf("format not an extractor %s")
}
if format.Name() == ".zip" {
// Zip extractor requires a Seeker+ReadAt
if m.state.TempZipFilePath == "" && !es.IsLocal() {
m.state.Phase = ExtractArchivePhaseDownloadZip
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
if m.state.TempZipFilePath != "" {
// Use temp zip file path
zipFile, err := os.Open(m.state.TempZipFilePath)
if err != nil {
return task.StatusError, fmt.Errorf("failed to open temp zip file: %w", err)
}
defer zipFile.Close()
readStream = zipFile
}
if es.IsLocal() {
if _, err = es.Seek(0, 0); err != nil {
return task.StatusError, fmt.Errorf("failed to seek entity source: %w", err)
}
readStream = es
}
if m.state.Encoding != "" {
m.l.Info("Using encoding %q for zip archive", m.state.Encoding)
extractor = archiver.Zip{TextEncoding: m.state.Encoding}
}
}
needSkipToCursor := false
if m.state.ProcessedCursor != "" {
needSkipToCursor = true
}
m.Lock()
m.progress[ProgressTypeExtractCount] = &queue.Progress{}
m.progress[ProgressTypeExtractSize] = &queue.Progress{}
m.Unlock()
// extract and upload
err = extractor.Extract(ctx, readStream, nil, func(ctx context.Context, f archiver.File) error {
if needSkipToCursor && f.NameInArchive != m.state.ProcessedCursor {
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
m.l.Info("File %q already processed, skipping...", f.NameInArchive)
return nil
}
// Found cursor, start from cursor +1
if m.state.ProcessedCursor == f.NameInArchive {
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
needSkipToCursor = false
return nil
}
rawPath := util.FormSlash(f.NameInArchive)
savePath := dst.JoinRaw(rawPath)
// Check if path is legit
if !strings.HasPrefix(savePath.Path(), util.FillSlash(path.Clean(dst.Path()))) {
m.l.Warning("Path %q is not legit, skipping...", f.NameInArchive)
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
return nil
}
if f.FileInfo.IsDir() {
_, err := fm.Create(ctx, savePath, types.FileTypeFolder)
if err != nil {
m.l.Warning("Failed to create directory %q: %s, skipping...", rawPath, err)
}
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
m.state.ProcessedCursor = f.NameInArchive
return nil
}
fileStream, err := f.Open()
if err != nil {
m.l.Warning("Failed to open file %q in archive file: %s, skipping...", rawPath, err)
return nil
}
fileData := &fs.UploadRequest{
Props: &fs.UploadProps{
Uri: savePath,
Size: f.Size(),
},
ProgressFunc: func(current, diff int64, total int64) {
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, diff)
},
File: fileStream,
}
_, err = fm.Update(ctx, fileData, fs.WithNoEntityType())
if err != nil {
return fmt.Errorf("failed to upload file %q in archive file: %w", rawPath, err)
}
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
m.state.ProcessedCursor = f.NameInArchive
return nil
})
if err != nil {
return task.StatusError, fmt.Errorf("failed to extract archive: %w", err)
}
return task.StatusCompleted, nil
}
func (m *ExtractArchiveTask) masterDownloadZip(ctx context.Context, dep dependency.Dep) (task.Status, error) {
uri, err := fs.NewUriFromString(m.state.Uri)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse src uri: %s (%w)", err, queue.CriticalErr)
}
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
// Get entity source to extract
archiveFile, err := fm.Get(ctx, uri, dbfs.WithFileEntities(), dbfs.WithRequiredCapabilities(dbfs.NavigatorCapabilityDownloadFile))
if err != nil {
return task.StatusError, fmt.Errorf("failed to get archive file: %s (%w)", err, queue.CriticalErr)
}
es, err := fm.GetEntitySource(ctx, 0, fs.WithEntity(archiveFile.PrimaryEntity()))
if err != nil {
return task.StatusError, fmt.Errorf("failed to get entity source: %w", err)
}
defer es.Close()
// For non-local entity, we need to download the whole zip file first
tempPath, err := prepareTempFolder(ctx, dep, m)
if err != nil {
return task.StatusError, fmt.Errorf("failed to prepare temp folder: %w", err)
}
m.state.TempPath = tempPath
fileName := fmt.Sprintf("%s.zip", uuid.Must(uuid.NewV4()))
zipFilePath := filepath.Join(
m.state.TempPath,
fileName,
)
zipFile, err := util.CreatNestedFile(zipFilePath)
if err != nil {
return task.StatusError, fmt.Errorf("failed to create zip file: %w", err)
}
m.Lock()
m.progress[ProgressTypeDownload] = &queue.Progress{Total: es.Entity().Size()}
m.Unlock()
defer zipFile.Close()
if _, err := io.Copy(zipFile, util.NewCallbackReader(es, func(i int64) {
atomic.AddInt64(&m.progress[ProgressTypeDownload].Current, i)
})); err != nil {
zipFile.Close()
if err := os.Remove(zipFilePath); err != nil {
m.l.Warning("Failed to remove temp zip file %q: %s", zipFilePath, err)
}
return task.StatusError, fmt.Errorf("failed to copy zip file to local temp: %w", err)
}
m.Lock()
delete(m.progress, ProgressTypeDownload)
m.Unlock()
m.state.TempZipFilePath = zipFilePath
m.state.Phase = ExtractArchivePhaseNotStarted
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
func (m *ExtractArchiveTask) Summarize(hasher hashid.Encoder) *queue.Summary {
if m.state == nil {
if err := json.Unmarshal([]byte(m.State()), &m.state); err != nil {
return nil
}
}
return &queue.Summary{
NodeID: m.state.NodeID,
Phase: string(m.state.Phase),
Props: map[string]any{
SummaryKeySrc: m.state.Uri,
SummaryKeyDst: m.state.Dst,
},
}
}
func (m *ExtractArchiveTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
if m.state.NodeState.progress != nil {
merged := make(queue.Progresses)
for k, v := range m.progress {
merged[k] = v
}
for k, v := range m.state.NodeState.progress {
merged[k] = v
}
return merged
}
return m.progress
}
func (m *ExtractArchiveTask) Cleanup(ctx context.Context) error {
if m.state.TempPath != "" {
time.Sleep(time.Duration(1) * time.Second)
return os.RemoveAll(m.state.TempPath)
}
return nil
}
type (
SlaveExtractArchiveTask struct {
*queue.InMemoryTask
l logging.Logger
state *SlaveExtractArchiveTaskState
progress queue.Progresses
node cluster.Node
}
SlaveExtractArchiveTaskState struct {
FileName string `json:"file_name"`
Entity *ent.Entity `json:"entity"`
Policy *ent.StoragePolicy `json:"policy"`
Encoding string `json:"encoding,omitempty"`
Dst string `json:"dst,omitempty"`
UserID int `json:"user_id"`
TempPath string `json:"temp_path,omitempty"`
TempZipFilePath string `json:"temp_zip_file_path,omitempty"`
ProcessedCursor string `json:"processed_cursor,omitempty"`
}
)
// NewSlaveExtractArchiveTask creates a new SlaveExtractArchiveTask from raw private state
func NewSlaveExtractArchiveTask(ctx context.Context, props *types.SlaveTaskProps, id int, state string) queue.Task {
return &SlaveExtractArchiveTask{
InMemoryTask: &queue.InMemoryTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
ID: id,
CorrelationID: logging.CorrelationID(ctx),
PublicState: &types.TaskPublicState{
SlaveTaskProps: props,
},
PrivateState: state,
},
},
},
progress: make(queue.Progresses),
}
}
func (m *SlaveExtractArchiveTask) Do(ctx context.Context) (task.Status, error) {
ctx = prepareSlaveTaskCtx(ctx, m.Model().PublicState.SlaveTaskProps)
dep := dependency.FromContext(ctx)
m.l = dep.Logger()
np, err := dep.NodePool(ctx)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get node pool: %w", err)
}
m.node, err = np.Get(ctx, types.NodeCapabilityNone, 0)
if err != nil || !m.node.IsMaster() {
return task.StatusError, fmt.Errorf("failed to get master node: %w", err)
}
fm := manager.NewFileManager(dep, nil)
// unmarshal state
state := &SlaveExtractArchiveTaskState{}
if err := json.Unmarshal([]byte(m.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
}
m.state = state
m.Lock()
if m.progress == nil {
m.progress = make(queue.Progresses)
}
m.progress[ProgressTypeExtractCount] = &queue.Progress{}
m.progress[ProgressTypeExtractSize] = &queue.Progress{}
m.Unlock()
dst, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse dst uri: %s (%w)", err, queue.CriticalErr)
}
// 1. Get entity source
entity := fs.NewEntity(m.state.Entity)
es, err := fm.GetEntitySource(ctx, 0, fs.WithEntity(entity), fs.WithPolicy(fm.CastStoragePolicyOnSlave(ctx, m.state.Policy)))
if err != nil {
return task.StatusError, fmt.Errorf("failed to get entity source: %w", err)
}
defer es.Close()
// 2. Identify file format
format, readStream, err := archiver.Identify(m.state.FileName, es)
if err != nil {
return task.StatusError, fmt.Errorf("failed to identify archive format: %w", err)
}
m.l.Info("Archive file %q format identified as %q", m.state.FileName, format.Name())
extractor, ok := format.(archiver.Extractor)
if !ok {
return task.StatusError, fmt.Errorf("format not an extractor %s")
}
if format.Name() == ".zip" {
if _, err = es.Seek(0, 0); err != nil {
return task.StatusError, fmt.Errorf("failed to seek entity source: %w", err)
}
if m.state.TempZipFilePath == "" && !es.IsLocal() {
tempPath, err := prepareTempFolder(ctx, dep, m)
if err != nil {
return task.StatusError, fmt.Errorf("failed to prepare temp folder: %w", err)
}
m.state.TempPath = tempPath
fileName := fmt.Sprintf("%s.zip", uuid.Must(uuid.NewV4()))
zipFilePath := filepath.Join(
m.state.TempPath,
fileName,
)
zipFile, err := util.CreatNestedFile(zipFilePath)
if err != nil {
return task.StatusError, fmt.Errorf("failed to create zip file: %w", err)
}
m.Lock()
m.progress[ProgressTypeDownload] = &queue.Progress{Total: es.Entity().Size()}
m.Unlock()
defer zipFile.Close()
if _, err := io.Copy(zipFile, util.NewCallbackReader(es, func(i int64) {
atomic.AddInt64(&m.progress[ProgressTypeDownload].Current, i)
})); err != nil {
return task.StatusError, fmt.Errorf("failed to copy zip file to local temp: %w", err)
}
zipFile.Close()
m.state.TempZipFilePath = zipFilePath
}
if es.IsLocal() {
readStream = es
} else if m.state.TempZipFilePath != "" {
// Use temp zip file path
zipFile, err := os.Open(m.state.TempZipFilePath)
if err != nil {
return task.StatusError, fmt.Errorf("failed to open temp zip file: %w", err)
}
defer zipFile.Close()
readStream = zipFile
}
if es.IsLocal() {
readStream = es
}
if m.state.Encoding != "" {
m.l.Info("Using encoding %q for zip archive", m.state.Encoding)
extractor = archiver.Zip{TextEncoding: m.state.Encoding}
}
}
needSkipToCursor := false
if m.state.ProcessedCursor != "" {
needSkipToCursor = true
}
// 3. Extract and upload
err = extractor.Extract(ctx, readStream, nil, func(ctx context.Context, f archiver.File) error {
if needSkipToCursor && f.NameInArchive != m.state.ProcessedCursor {
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
m.l.Info("File %q already processed, skipping...", f.NameInArchive)
return nil
}
// Found cursor, start from cursor +1
if m.state.ProcessedCursor == f.NameInArchive {
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
needSkipToCursor = false
return nil
}
rawPath := util.FormSlash(f.NameInArchive)
savePath := dst.JoinRaw(rawPath)
// Check if path is legit
if !strings.HasPrefix(savePath.Path(), util.FillSlash(path.Clean(dst.Path()))) {
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, f.Size())
m.l.Warning("Path %q is not legit, skipping...", f.NameInArchive)
return nil
}
if f.FileInfo.IsDir() {
_, err := fm.Create(ctx, savePath, types.FileTypeFolder, fs.WithNode(m.node), fs.WithStatelessUserID(m.state.UserID))
if err != nil {
m.l.Warning("Failed to create directory %q: %s, skipping...", rawPath, err)
}
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
m.state.ProcessedCursor = f.NameInArchive
return nil
}
fileStream, err := f.Open()
if err != nil {
m.l.Warning("Failed to open file %q in archive file: %s, skipping...", rawPath, err)
return nil
}
fileData := &fs.UploadRequest{
Props: &fs.UploadProps{
Uri: savePath,
Size: f.Size(),
},
ProgressFunc: func(current, diff int64, total int64) {
atomic.AddInt64(&m.progress[ProgressTypeExtractSize].Current, diff)
},
File: fileStream,
}
_, err = fm.Update(ctx, fileData, fs.WithNode(m.node), fs.WithStatelessUserID(m.state.UserID), fs.WithNoEntityType())
if err != nil {
return fmt.Errorf("failed to upload file %q in archive file: %w", rawPath, err)
}
atomic.AddInt64(&m.progress[ProgressTypeExtractCount].Current, 1)
m.state.ProcessedCursor = f.NameInArchive
return nil
})
if err != nil {
return task.StatusError, fmt.Errorf("failed to extract archive: %w", err)
}
return task.StatusCompleted, nil
}
func (m *SlaveExtractArchiveTask) Cleanup(ctx context.Context) error {
if m.state.TempPath != "" {
time.Sleep(time.Duration(1) * time.Second)
return os.RemoveAll(m.state.TempPath)
}
return nil
}
func (m *SlaveExtractArchiveTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
return m.progress
}

View File

@@ -0,0 +1,657 @@
package workflows
import (
"context"
"encoding/json"
"errors"
"fmt"
"os"
"path"
"path/filepath"
"sync"
"sync/atomic"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/downloader"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
"github.com/cloudreve/Cloudreve/v4/pkg/hashid"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
"github.com/gofrs/uuid"
"github.com/samber/lo"
)
type (
RemoteDownloadTask struct {
*queue.DBTask
l logging.Logger
state *RemoteDownloadTaskState
node cluster.Node
d downloader.Downloader
progress queue.Progresses
}
RemoteDownloadTaskPhase string
RemoteDownloadTaskState struct {
SrcFileUri string `json:"src_file_uri,omitempty"`
SrcUri string `json:"src_uri,omitempty"`
Dst string `json:"dst,omitempty"`
Handle *downloader.TaskHandle `json:"handle,omitempty"`
Status *downloader.TaskStatus `json:"status,omitempty"`
NodeState `json:",inline"`
Phase RemoteDownloadTaskPhase `json:"phase,omitempty"`
SlaveUploadTaskID int `json:"slave__upload_task_id,omitempty"`
SlaveUploadState *SlaveUploadTaskState `json:"slave_upload_state,omitempty"`
GetTaskStatusTried int `json:"get_task_status_tried,omitempty"`
Transferred map[int]interface{} `json:"transferred,omitempty"`
Failed int `json:"failed,omitempty"`
}
)
const (
RemoteDownloadTaskPhaseNotStarted RemoteDownloadTaskPhase = ""
RemoteDownloadTaskPhaseMonitor = "monitor"
RemoteDownloadTaskPhaseTransfer = "transfer"
RemoteDownloadTaskPhaseAwaitSeeding = "seeding"
GetTaskStatusMaxTries = 5
SummaryKeyDownloadStatus = "download"
SummaryKeySrcStr = "src_str"
ProgressTypeRelocateTransferCount = "relocate"
ProgressTypeUploadSinglePrefix = "upload_single_"
SummaryKeySrcMultiple = "src_multiple"
SummaryKeySrcDstPolicyID = "dst_policy_id"
SummaryKeyFailed = "failed"
)
func init() {
queue.RegisterResumableTaskFactory(queue.RemoteDownloadTaskType, NewRemoteDownloadTaskFromModel)
}
// NewRemoteDownloadTask creates a new RemoteDownloadTask
func NewRemoteDownloadTask(ctx context.Context, src string, srcFile, dst string) (queue.Task, error) {
state := &RemoteDownloadTaskState{
SrcUri: src,
SrcFileUri: srcFile,
Dst: dst,
NodeState: NodeState{},
}
stateBytes, err := json.Marshal(state)
if err != nil {
return nil, fmt.Errorf("failed to marshal state: %w", err)
}
t := &RemoteDownloadTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
Type: queue.RemoteDownloadTaskType,
CorrelationID: logging.CorrelationID(ctx),
PrivateState: string(stateBytes),
PublicState: &types.TaskPublicState{},
},
DirectOwner: inventory.UserFromContext(ctx),
},
}
return t, nil
}
func NewRemoteDownloadTaskFromModel(task *ent.Task) queue.Task {
return &RemoteDownloadTask{
DBTask: &queue.DBTask{
Task: task,
},
}
}
func (m *RemoteDownloadTask) Do(ctx context.Context) (task.Status, error) {
dep := dependency.FromContext(ctx)
m.l = dep.Logger()
// unmarshal state
state := &RemoteDownloadTaskState{}
if err := json.Unmarshal([]byte(m.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
}
m.state = state
// select node
node, err := allocateNode(ctx, dep, &m.state.NodeState, types.NodeCapabilityRemoteDownload)
if err != nil {
return task.StatusError, fmt.Errorf("failed to allocate node: %w", err)
}
m.node = node
// create downloader instance
if m.d == nil {
d, err := node.CreateDownloader(ctx, dep.RequestClient(), dep.SettingProvider())
if err != nil {
return task.StatusError, fmt.Errorf("failed to create downloader: %w", err)
}
m.d = d
}
next := task.StatusCompleted
switch m.state.Phase {
case RemoteDownloadTaskPhaseNotStarted:
next, err = m.createDownloadTask(ctx, dep)
case RemoteDownloadTaskPhaseMonitor, RemoteDownloadTaskPhaseAwaitSeeding:
next, err = m.monitor(ctx, dep)
case RemoteDownloadTaskPhaseTransfer:
if m.node.IsMaster() {
next, err = m.masterTransfer(ctx, dep)
} else {
next, err = m.slaveTransfer(ctx, dep)
}
}
newStateStr, marshalErr := json.Marshal(m.state)
if marshalErr != nil {
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
}
m.Lock()
m.Task.PrivateState = string(newStateStr)
m.Unlock()
return next, err
}
func (m *RemoteDownloadTask) createDownloadTask(ctx context.Context, dep dependency.Dep) (task.Status, error) {
if m.state.Handle != nil {
m.state.Phase = RemoteDownloadTaskPhaseMonitor
return task.StatusSuspending, nil
}
user := inventory.UserFromContext(ctx)
torrentUrl := m.state.SrcUri
if m.state.SrcFileUri != "" {
// Target is a torrent file
uri, err := fs.NewUriFromString(m.state.SrcFileUri)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse src file uri: %s (%w)", err, queue.CriticalErr)
}
fm := manager.NewFileManager(dep, user)
expire := time.Now().Add(dep.SettingProvider().EntityUrlValidDuration(ctx))
torrentUrls, _, err := fm.GetEntityUrls(ctx, []manager.GetEntityUrlArgs{
{URI: uri},
}, fs.WithUrlExpire(&expire))
if err != nil {
return task.StatusError, fmt.Errorf("failed to get torrent entity urls: %w", err)
}
if len(torrentUrls) == 0 {
return task.StatusError, fmt.Errorf("no torrent urls found")
}
torrentUrl = torrentUrls[0]
}
// Create download task
handle, err := m.d.CreateTask(ctx, torrentUrl, user.Edges.Group.Settings.RemoteDownloadOptions)
if err != nil {
return task.StatusError, fmt.Errorf("failed to create download task: %w", err)
}
m.state.Handle = handle
m.state.Phase = RemoteDownloadTaskPhaseMonitor
return task.StatusSuspending, nil
}
func (m *RemoteDownloadTask) monitor(ctx context.Context, dep dependency.Dep) (task.Status, error) {
resumeAfter := time.Duration(m.node.Settings(ctx).Interval) * time.Second
// Update task status
status, err := m.d.Info(ctx, m.state.Handle)
if err != nil {
if errors.Is(err, downloader.ErrTaskNotFount) && m.state.Status != nil {
// If task is not found, but it previously existed, consider it as canceled
m.l.Warning("task not found, consider it as canceled")
return task.StatusCanceled, nil
}
m.state.GetTaskStatusTried++
if m.state.GetTaskStatusTried >= GetTaskStatusMaxTries {
return task.StatusError, fmt.Errorf("failed to get task status after %d retry: %w", m.state.GetTaskStatusTried, err)
}
m.l.Warning("failed to get task info: %s, will retry.", err)
m.ResumeAfter(resumeAfter)
return task.StatusSuspending, nil
}
// Follow to new handle if needed
if status.FollowedBy != nil {
m.l.Info("Task handle updated to %v", status.FollowedBy)
m.state.Handle = status.FollowedBy
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
if m.state.Status == nil || m.state.Status.Total != status.Total {
m.l.Info("download size changed, re-validate files.")
// First time to get status / total size changed, check user capacity
if err := m.validateFiles(ctx, dep, status); err != nil {
m.state.Status = status
return task.StatusError, fmt.Errorf("failed to validate files: %s (%w)", err, queue.CriticalErr)
}
}
m.state.Status = status
m.state.GetTaskStatusTried = 0
m.l.Debug("Monitor %q task state: %s", status.Name, status.State)
switch status.State {
case downloader.StatusSeeding:
m.l.Info("Download task seeding")
if m.state.Phase == RemoteDownloadTaskPhaseMonitor {
// Not transferred
m.state.Phase = RemoteDownloadTaskPhaseTransfer
return task.StatusSuspending, nil
} else if !m.node.Settings(ctx).WaitForSeeding {
// Skip seeding
m.l.Info("Download task seeding skipped.")
return task.StatusCompleted, nil
} else {
// Still seeding
m.ResumeAfter(resumeAfter)
return task.StatusSuspending, nil
}
case downloader.StatusCompleted:
m.l.Info("Download task completed")
if m.state.Phase == RemoteDownloadTaskPhaseMonitor {
// Not transferred
m.state.Phase = RemoteDownloadTaskPhaseTransfer
return task.StatusSuspending, nil
}
// Seeding complete
m.l.Info("Download task seeding completed")
return task.StatusCompleted, nil
case downloader.StatusDownloading:
m.ResumeAfter(resumeAfter)
return task.StatusSuspending, nil
case downloader.StatusUnknown, downloader.StatusError:
return task.StatusError, fmt.Errorf("download task failed with state %q (%w)", status.State, queue.CriticalErr)
}
m.ResumeAfter(resumeAfter)
return task.StatusSuspending, nil
}
func (m *RemoteDownloadTask) slaveTransfer(ctx context.Context, dep dependency.Dep) (task.Status, error) {
u := inventory.UserFromContext(ctx)
if m.state.Transferred == nil {
m.state.Transferred = make(map[int]interface{})
}
if m.state.SlaveUploadTaskID == 0 {
dstUri, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse dst uri %q: %s (%w)", m.state.Dst, err, queue.CriticalErr)
}
// Create slave upload task
payload := &SlaveUploadTaskState{
Files: []SlaveUploadEntity{},
MaxParallel: dep.SettingProvider().MaxParallelTransfer(ctx),
UserID: u.ID,
}
// Construct files to be transferred
for _, f := range m.state.Status.Files {
if !f.Selected {
continue
}
// Skip already transferred
if _, ok := m.state.Transferred[f.Index]; ok {
continue
}
dst := dstUri.JoinRaw(f.Name)
src := filepath.FromSlash(path.Join(m.state.Status.SavePath, f.Name))
payload.Files = append(payload.Files, SlaveUploadEntity{
Src: src,
Uri: dst,
Size: f.Size,
Index: f.Index,
})
}
payloadStr, err := json.Marshal(payload)
if err != nil {
return task.StatusError, fmt.Errorf("failed to marshal payload: %w", err)
}
taskId, err := m.node.CreateTask(ctx, queue.SlaveUploadTaskType, string(payloadStr))
if err != nil {
return task.StatusError, fmt.Errorf("failed to create slave task: %w", err)
}
m.state.NodeState.progress = nil
m.state.SlaveUploadTaskID = taskId
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
m.l.Info("Checking slave upload task %d...", m.state.SlaveUploadTaskID)
t, err := m.node.GetTask(ctx, m.state.SlaveUploadTaskID, true)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get slave task: %w", err)
}
m.Lock()
m.state.NodeState.progress = t.Progress
m.Unlock()
m.state.SlaveUploadState = &SlaveUploadTaskState{}
if err := json.Unmarshal([]byte(t.PrivateState), m.state.SlaveUploadState); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal slave compress state: %s (%w)", err, queue.CriticalErr)
}
if t.Status == task.StatusError || t.Status == task.StatusCompleted {
if len(m.state.SlaveUploadState.Transferred) < len(m.state.SlaveUploadState.Files) {
// Not all files transferred, retry
slaveTaskId := m.state.SlaveUploadTaskID
m.state.SlaveUploadTaskID = 0
for i, _ := range m.state.SlaveUploadState.Transferred {
m.state.Transferred[m.state.SlaveUploadState.Files[i].Index] = struct{}{}
}
m.l.Warning("Slave task %d failed to transfer %d files, retrying...", slaveTaskId, len(m.state.SlaveUploadState.Files)-len(m.state.SlaveUploadState.Transferred))
return task.StatusError, fmt.Errorf(
"slave task failed to transfer %d files, first 5 errors: %s",
len(m.state.SlaveUploadState.Files)-len(m.state.SlaveUploadState.Transferred),
m.state.SlaveUploadState.First5TransferErrors,
)
} else {
m.state.Phase = RemoteDownloadTaskPhaseAwaitSeeding
m.ResumeAfter(0)
return task.StatusSuspending, nil
}
}
if t.Status == task.StatusCanceled {
return task.StatusError, fmt.Errorf("slave task canceled (%w)", queue.CriticalErr)
}
m.l.Info("Slave task %d is still uploading, resume after 30s.", m.state.SlaveUploadTaskID)
m.ResumeAfter(time.Second * 30)
return task.StatusSuspending, nil
}
func (m *RemoteDownloadTask) masterTransfer(ctx context.Context, dep dependency.Dep) (task.Status, error) {
if m.state.Transferred == nil {
m.state.Transferred = make(map[int]interface{})
}
maxParallel := dep.SettingProvider().MaxParallelTransfer(ctx)
wg := sync.WaitGroup{}
worker := make(chan int, maxParallel)
for i := 0; i < maxParallel; i++ {
worker <- i
}
// Sum up total count and select files
totalCount := 0
totalSize := int64(0)
allFiles := make([]downloader.TaskFile, 0, len(m.state.Status.Files))
for _, f := range m.state.Status.Files {
if f.Selected {
allFiles = append(allFiles, f)
totalSize += f.Size
totalCount++
}
}
m.Lock()
m.progress = make(queue.Progresses)
m.progress[ProgressTypeUploadCount] = &queue.Progress{Total: int64(totalCount)}
m.progress[ProgressTypeUpload] = &queue.Progress{Total: totalSize}
m.Unlock()
dstUri, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return task.StatusError, fmt.Errorf("failed to parse dst uri: %s (%w)", err, queue.CriticalErr)
}
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
failed := int64(0)
ae := serializer.NewAggregateError()
transferFunc := func(workerId int, file downloader.TaskFile) {
defer func() {
atomic.AddInt64(&m.progress[ProgressTypeUploadCount].Current, 1)
worker <- workerId
wg.Done()
}()
dst := dstUri.JoinRaw(file.Name)
src := filepath.FromSlash(path.Join(m.state.Status.SavePath, file.Name))
m.l.Info("Uploading file %s to %s...", src, file.Name, dst)
progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId)
m.Lock()
m.progress[progressKey] = &queue.Progress{Identifier: dst.String(), Total: file.Size}
m.Unlock()
fileStream, err := os.Open(src)
if err != nil {
m.l.Warning("Failed to open file %s: %s", src, err.Error())
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size)
atomic.AddInt64(&failed, 1)
ae.Add(file.Name, fmt.Errorf("failed to open file: %w", err))
return
}
defer fileStream.Close()
fileData := &fs.UploadRequest{
Props: &fs.UploadProps{
Uri: dst,
Size: file.Size,
},
ProgressFunc: func(current, diff int64, total int64) {
atomic.AddInt64(&m.progress[progressKey].Current, diff)
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, diff)
},
File: fileStream,
}
_, err = fm.Update(ctx, fileData, fs.WithNoEntityType())
if err != nil {
m.l.Warning("Failed to upload file %s: %s", src, err.Error())
atomic.AddInt64(&failed, 1)
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size)
ae.Add(file.Name, fmt.Errorf("failed to upload file: %w", err))
return
}
m.Lock()
m.state.Transferred[file.Index] = nil
m.Unlock()
}
// Start upload files
for _, file := range allFiles {
// Check if file is already transferred
if _, ok := m.state.Transferred[file.Index]; ok {
m.l.Info("File %s already transferred, skipping...", file.Name)
atomic.AddInt64(&m.progress[ProgressTypeUpload].Current, file.Size)
atomic.AddInt64(&m.progress[ProgressTypeUploadCount].Current, 1)
continue
}
select {
case <-ctx.Done():
return task.StatusError, ctx.Err()
case workerId := <-worker:
wg.Add(1)
go transferFunc(workerId, file)
}
}
wg.Wait()
if failed > 0 {
m.state.Failed = int(failed)
m.l.Error("Failed to transfer %d file(s).", failed)
return task.StatusError, fmt.Errorf("failed to transfer %d file(s), first 5 errors: %s", failed, ae.FormatFirstN(5))
}
m.l.Info("All files transferred.")
m.state.Phase = RemoteDownloadTaskPhaseAwaitSeeding
return task.StatusSuspending, nil
}
func (m *RemoteDownloadTask) awaitSeeding(ctx context.Context, dep dependency.Dep) (task.Status, error) {
return task.StatusSuspending, nil
}
func (m *RemoteDownloadTask) validateFiles(ctx context.Context, dep dependency.Dep, status *downloader.TaskStatus) error {
// Validate files
user := inventory.UserFromContext(ctx)
fm := manager.NewFileManager(dep, user)
dstUri, err := fs.NewUriFromString(m.state.Dst)
if err != nil {
return fmt.Errorf("failed to parse dst uri: %w", err)
}
selectedFiles := lo.Filter(status.Files, func(f downloader.TaskFile, _ int) bool {
return f.Selected
})
if len(selectedFiles) == 0 {
return fmt.Errorf("no selected file found in download task")
}
// find the first valid file
var placeholderFileName string
for _, f := range selectedFiles {
if f.Name != "" {
placeholderFileName = f.Name
break
}
}
if placeholderFileName == "" {
// File name not available yet, generate one
m.l.Debug("File name not available yet, generate one to validate the destination")
placeholderFileName = uuid.Must(uuid.NewV4()).String()
}
// Create a placeholder file then delete it to validate the destination
session, err := fm.PrepareUpload(ctx, &fs.UploadRequest{
Props: &fs.UploadProps{
Uri: dstUri.Join(path.Base(placeholderFileName)),
Size: status.Total,
UploadSessionID: uuid.Must(uuid.NewV4()).String(),
ExpireAt: time.Now().Add(time.Second * 3600),
},
})
if err != nil {
return err
}
fm.OnUploadFailed(ctx, session)
return nil
}
func (m *RemoteDownloadTask) Cleanup(ctx context.Context) error {
if m.state.Handle != nil {
if err := m.d.Cancel(ctx, m.state.Handle); err != nil {
m.l.Warning("failed to cancel download task: %s", err)
}
}
if m.state.Status != nil && m.node.IsMaster() && m.state.Status.SavePath != "" {
if err := os.RemoveAll(m.state.Status.SavePath); err != nil {
m.l.Warning("failed to remove download temp folder: %s", err)
}
}
return nil
}
// SetDownloadTarget sets the files to download for the task
func (m *RemoteDownloadTask) SetDownloadTarget(ctx context.Context, args ...*downloader.SetFileToDownloadArgs) error {
if m.state.Handle == nil {
return fmt.Errorf("download task not created")
}
return m.d.SetFilesToDownload(ctx, m.state.Handle, args...)
}
// CancelDownload cancels the download task
func (m *RemoteDownloadTask) CancelDownload(ctx context.Context) error {
if m.state.Handle == nil {
return nil
}
return m.d.Cancel(ctx, m.state.Handle)
}
func (m *RemoteDownloadTask) Summarize(hasher hashid.Encoder) *queue.Summary {
// unmarshal state
if m.state == nil {
if err := json.Unmarshal([]byte(m.State()), &m.state); err != nil {
return nil
}
}
var status *downloader.TaskStatus
if m.state.Status != nil {
status = &*m.state.Status
// Redact save path
status.SavePath = ""
}
failed := m.state.Failed
if m.state.SlaveUploadState != nil && m.state.Phase != RemoteDownloadTaskPhaseTransfer {
failed = len(m.state.SlaveUploadState.Files) - len(m.state.SlaveUploadState.Transferred)
}
return &queue.Summary{
Phase: string(m.state.Phase),
NodeID: m.state.NodeID,
Props: map[string]any{
SummaryKeySrcStr: m.state.SrcUri,
SummaryKeySrc: m.state.SrcFileUri,
SummaryKeyDst: m.state.Dst,
SummaryKeyFailed: failed,
SummaryKeyDownloadStatus: status,
},
}
}
func (m *RemoteDownloadTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
if m.state.NodeState.progress != nil {
merged := make(queue.Progresses)
for k, v := range m.progress {
merged[k] = v
}
for k, v := range m.state.NodeState.progress {
merged[k] = v
}
return merged
}
return m.progress
}

View File

@@ -0,0 +1,224 @@
package workflows
import (
"context"
"encoding/json"
"fmt"
"os"
"path/filepath"
"sync"
"sync/atomic"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/ent"
"github.com/cloudreve/Cloudreve/v4/ent/task"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/fs"
"github.com/cloudreve/Cloudreve/v4/pkg/filemanager/manager"
"github.com/cloudreve/Cloudreve/v4/pkg/logging"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/serializer"
)
type (
SlaveUploadEntity struct {
Uri *fs.URI `json:"uri"`
Src string `json:"src"`
Size int64 `json:"size"`
Index int `json:"index"`
}
SlaveUploadTaskState struct {
MaxParallel int `json:"max_parallel"`
Files []SlaveUploadEntity `json:"files"`
Transferred map[int]interface{} `json:"transferred"`
UserID int `json:"user_id"`
First5TransferErrors string `json:"first_5_transfer_errors,omitempty"`
}
SlaveUploadTask struct {
*queue.InMemoryTask
progress queue.Progresses
l logging.Logger
state *SlaveUploadTaskState
node cluster.Node
}
)
// NewSlaveUploadTask creates a new SlaveUploadTask from raw private state
func NewSlaveUploadTask(ctx context.Context, props *types.SlaveTaskProps, id int, state string) queue.Task {
return &SlaveUploadTask{
InMemoryTask: &queue.InMemoryTask{
DBTask: &queue.DBTask{
Task: &ent.Task{
ID: id,
CorrelationID: logging.CorrelationID(ctx),
PublicState: &types.TaskPublicState{
SlaveTaskProps: props,
},
PrivateState: state,
},
},
},
progress: make(queue.Progresses),
}
}
func (t *SlaveUploadTask) Do(ctx context.Context) (task.Status, error) {
ctx = prepareSlaveTaskCtx(ctx, t.Model().PublicState.SlaveTaskProps)
dep := dependency.FromContext(ctx)
t.l = dep.Logger()
np, err := dep.NodePool(ctx)
if err != nil {
return task.StatusError, fmt.Errorf("failed to get node pool: %w", err)
}
t.node, err = np.Get(ctx, types.NodeCapabilityNone, 0)
if err != nil || !t.node.IsMaster() {
return task.StatusError, fmt.Errorf("failed to get master node: %w", err)
}
fm := manager.NewFileManager(dep, nil)
// unmarshal state
state := &SlaveUploadTaskState{}
if err := json.Unmarshal([]byte(t.State()), state); err != nil {
return task.StatusError, fmt.Errorf("failed to unmarshal state: %w", err)
}
t.state = state
if t.state.Transferred == nil {
t.state.Transferred = make(map[int]interface{})
}
wg := sync.WaitGroup{}
worker := make(chan int, t.state.MaxParallel)
for i := 0; i < t.state.MaxParallel; i++ {
worker <- i
}
// Sum up total count
totalCount := 0
totalSize := int64(0)
for _, res := range state.Files {
totalSize += res.Size
totalCount++
}
t.Lock()
t.progress[ProgressTypeUploadCount] = &queue.Progress{}
t.progress[ProgressTypeUpload] = &queue.Progress{}
t.Unlock()
atomic.StoreInt64(&t.progress[ProgressTypeUploadCount].Total, int64(totalCount))
atomic.StoreInt64(&t.progress[ProgressTypeUpload].Total, totalSize)
ae := serializer.NewAggregateError()
transferFunc := func(workerId, fileId int, file SlaveUploadEntity) {
defer func() {
atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1)
worker <- workerId
wg.Done()
}()
t.l.Info("Uploading file %s to %s...", file.Src, file.Uri.String())
progressKey := fmt.Sprintf("%s%d", ProgressTypeUploadSinglePrefix, workerId)
t.Lock()
t.progress[progressKey] = &queue.Progress{Identifier: file.Uri.String(), Total: file.Size}
t.Unlock()
handle, err := os.Open(file.Src)
if err != nil {
t.l.Warning("Failed to open file %s: %s", file.Src, err.Error())
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size)
ae.Add(filepath.Base(file.Src), fmt.Errorf("failed to open file: %w", err))
return
}
stat, err := handle.Stat()
if err != nil {
t.l.Warning("Failed to get file stat for %s: %s", file.Src, err.Error())
handle.Close()
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size)
ae.Add(filepath.Base(file.Src), fmt.Errorf("failed to get file stat: %w", err))
return
}
fileData := &fs.UploadRequest{
Props: &fs.UploadProps{
Uri: file.Uri,
Size: stat.Size(),
},
ProgressFunc: func(current, diff int64, total int64) {
atomic.AddInt64(&t.progress[progressKey].Current, diff)
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, diff)
atomic.StoreInt64(&t.progress[progressKey].Total, total)
},
File: handle,
Seeker: handle,
}
_, err = fm.Update(ctx, fileData, fs.WithNode(t.node), fs.WithStatelessUserID(t.state.UserID), fs.WithNoEntityType())
if err != nil {
handle.Close()
t.l.Warning("Failed to upload file %s: %s", file.Src, err.Error())
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size)
ae.Add(filepath.Base(file.Src), fmt.Errorf("failed to upload file: %w", err))
return
}
t.Lock()
t.state.Transferred[fileId] = nil
t.Unlock()
handle.Close()
}
// Start upload files
for fileId, file := range t.state.Files {
// Check if file is already transferred
if _, ok := t.state.Transferred[fileId]; ok {
t.l.Info("File %s already transferred, skipping...", file.Src)
atomic.AddInt64(&t.progress[ProgressTypeUpload].Current, file.Size)
atomic.AddInt64(&t.progress[ProgressTypeUploadCount].Current, 1)
continue
}
select {
case <-ctx.Done():
return task.StatusError, ctx.Err()
case workerId := <-worker:
wg.Add(1)
go transferFunc(workerId, fileId, file)
}
}
wg.Wait()
t.state.First5TransferErrors = ae.FormatFirstN(5)
newStateStr, marshalErr := json.Marshal(t.state)
if marshalErr != nil {
return task.StatusError, fmt.Errorf("failed to marshal state: %w", marshalErr)
}
t.Lock()
t.Task.PrivateState = string(newStateStr)
t.Unlock()
// If all files are failed to transfer, return error
if len(t.state.Transferred) != len(t.state.Files) {
t.l.Warning("%d files not transferred", len(t.state.Files)-len(t.state.Transferred))
if len(t.state.Transferred) == 0 {
return task.StatusError, fmt.Errorf("all file failed to transfer")
}
}
return task.StatusCompleted, nil
}
func (m *SlaveUploadTask) Progress(ctx context.Context) queue.Progresses {
m.Lock()
defer m.Unlock()
return m.progress
}

View File

@@ -0,0 +1,62 @@
package workflows
import (
"context"
"fmt"
"path"
"strconv"
"time"
"github.com/cloudreve/Cloudreve/v4/application/dependency"
"github.com/cloudreve/Cloudreve/v4/inventory/types"
"github.com/cloudreve/Cloudreve/v4/pkg/cluster"
"github.com/cloudreve/Cloudreve/v4/pkg/queue"
"github.com/cloudreve/Cloudreve/v4/pkg/util"
)
const (
TaskTempPath = "fm_workflows"
slaveProgressRefreshInterval = 5 * time.Second
)
type NodeState struct {
NodeID int `json:"node_id"`
progress queue.Progresses
}
// allocateNode allocates a node for the task.
func allocateNode(ctx context.Context, dep dependency.Dep, state *NodeState, capability types.NodeCapability) (cluster.Node, error) {
np, err := dep.NodePool(ctx)
if err != nil {
return nil, fmt.Errorf("failed to get node pool: %w", err)
}
node, err := np.Get(ctx, capability, state.NodeID)
if err != nil {
return nil, fmt.Errorf("failed to get node: %w", err)
}
state.NodeID = node.ID()
return node, nil
}
// prepareSlaveTaskCtx prepares the context for the slave task.
func prepareSlaveTaskCtx(ctx context.Context, props *types.SlaveTaskProps) context.Context {
ctx = context.WithValue(ctx, cluster.SlaveNodeIDCtx{}, strconv.Itoa(props.NodeID))
ctx = context.WithValue(ctx, cluster.MasterSiteUrlCtx{}, props.MasterSiteURl)
ctx = context.WithValue(ctx, cluster.MasterSiteVersionCtx{}, props.MasterSiteVersion)
ctx = context.WithValue(ctx, cluster.MasterSiteIDCtx{}, props.MasterSiteID)
return ctx
}
func prepareTempFolder(ctx context.Context, dep dependency.Dep, t queue.Task) (string, error) {
settings := dep.SettingProvider()
tempPath := util.DataPath(path.Join(settings.TempPath(ctx), TaskTempPath, strconv.Itoa(t.ID())))
if err := util.CreatNestedFolder(tempPath); err != nil {
return "", fmt.Errorf("failed to create temp folder: %w", err)
}
dep.Logger().Info("Temp folder created: %s", tempPath)
return tempPath, nil
}