Feat: task queue / compression task

This commit is contained in:
HFO4
2020-02-02 14:40:07 +08:00
parent b1490a665c
commit e722c33cd5
25 changed files with 573 additions and 338 deletions

155
pkg/task/compress.go Normal file
View File

@@ -0,0 +1,155 @@
package task
import (
"context"
"encoding/json"
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/filesystem"
"github.com/HFO4/cloudreve/pkg/util"
"os"
)
// CompressTask 文件压缩任务
type CompressTask struct {
User *model.User
TaskModel *model.Task
TaskProps CompressProps
Err *JobError
zipPath string
}
// CompressProps 压缩任务属性
type CompressProps struct {
Dirs []uint `json:"dirs"`
Files []uint `json:"files"`
Dst string `json:"dst"`
}
// Props 获取任务属性
func (job *CompressTask) Props() string {
res, _ := json.Marshal(job.TaskProps)
return string(res)
}
// Type 获取任务状态
func (job *CompressTask) Type() int {
return CompressTaskType
}
// Creator 获取创建者ID
func (job *CompressTask) Creator() uint {
return job.User.ID
}
// Model 获取任务的数据库模型
func (job *CompressTask) Model() *model.Task {
return job.TaskModel
}
// SetStatus 设定状态
func (job *CompressTask) SetStatus(status int) {
job.TaskModel.SetStatus(status)
}
// SetError 设定任务失败信息
func (job *CompressTask) SetError(err *JobError) {
job.Err = err
res, _ := json.Marshal(job.Err)
job.TaskModel.SetError(string(res))
// 删除压缩文件
job.removeZipFile()
}
func (job *CompressTask) removeZipFile() {
if job.zipPath != "" {
if err := os.Remove(job.zipPath); err != nil {
util.Log().Warning("无法删除临时压缩文件 %s , %s", job.zipPath, err)
}
}
}
// SetErrorMsg 设定任务失败信息
func (job *CompressTask) SetErrorMsg(msg string) {
job.SetError(&JobError{Msg: msg})
}
// GetError 返回任务失败信息
func (job *CompressTask) GetError() *JobError {
return job.Err
}
// Do 开始执行任务
func (job *CompressTask) Do() {
// 创建文件系统
fs, err := filesystem.NewFileSystem(job.User)
if err != nil {
job.SetErrorMsg(err.Error())
return
}
defer fs.Recycle()
util.Log().Debug("开始压缩文件")
job.TaskModel.SetProgress(CompressingProgress)
// 开始压缩
ctx := context.Background()
zipFile, err := fs.Compress(ctx, job.TaskProps.Dirs, job.TaskProps.Files, false)
if err != nil {
job.SetErrorMsg(err.Error())
return
}
job.zipPath = zipFile
util.Log().Debug("压缩文件存放至%s开始上传", zipFile)
job.TaskModel.SetProgress(TransferringProgress)
// 上传文件
err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst)
if err != nil {
job.SetErrorMsg(err.Error())
return
}
job.removeZipFile()
}
// NewCompressTask 新建压缩任务
func NewCompressTask(user *model.User, dst string, dirs, files []uint) (Job, error) {
newTask := &CompressTask{
User: user,
TaskProps: CompressProps{
Dirs: dirs,
Files: files,
Dst: dst,
},
}
record, err := Record(newTask)
if err != nil {
return nil, err
}
newTask.TaskModel = record
return newTask, nil
}
// NewCompressTaskFromModel 从数据库记录中恢复压缩任务
func NewCompressTaskFromModel(task *model.Task) (Job, error) {
user, err := model.GetUserByID(task.UserID)
if err != nil {
return nil, err
}
newTask := &CompressTask{
User: &user,
TaskModel: task,
}
err = json.Unmarshal([]byte(task.Props), &newTask.TaskProps)
if err != nil {
return nil, err
}
return newTask, nil
}

8
pkg/task/errors.go Normal file
View File

@@ -0,0 +1,8 @@
package task
import "errors"
var (
// ErrUnknownTaskType 未知任务类型
ErrUnknownTaskType = errors.New("未知任务类型")
)

98
pkg/task/job.go Normal file
View File

@@ -0,0 +1,98 @@
package task
import (
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/util"
)
// 任务类型
const (
// CompressTaskType 压缩任务
CompressTaskType = iota
)
// 任务状态
const (
// Queued 排队中
Queued = iota
// Processing 处理中
Processing
// Error 失败
Error
// Canceled 取消
Canceled
// Complete 完成
Complete
)
// 任务进度
const (
// Compressing 压缩中
CompressingProgress = iota
// Decompressing 解压缩中
DecompressingProgress
// Downloading 下载中
DownloadingProgress
// Transferring 转存中
TransferringProgress
)
// Job 任务接口
type Job interface {
Type() int // 返回任务类型
Creator() uint // 返回创建者ID
Props() string // 返回序列化后的任务属性
Model() *model.Task // 返回对应的数据库模型
SetStatus(int) // 设定任务状态
Do() // 开始执行任务
SetError(*JobError) // 设定任务失败信息
GetError() *JobError // 获取任务执行结果返回nil表示成功完成执行
}
// JobError 任务失败信息
type JobError struct {
Msg string
}
// Record 将任务记录到数据库中
func Record(job Job) (*model.Task, error) {
record := model.Task{
Status: Queued,
Type: job.Type(),
UserID: job.Creator(),
Progress: 0,
Error: "",
Props: job.Props(),
}
_, err := record.Create()
return &record, err
}
// Resume 从数据库中恢复未完成任务
func Resume() {
tasks := model.GetTasksByStatus(Queued)
if len(tasks) == 0 {
return
}
util.Log().Info("从数据库中恢复 %d 个未完成任务", len(tasks))
for i := 0; i < len(tasks); i++ {
job, err := GetJobFromModel(&tasks[i])
if err != nil {
util.Log().Warning("无法恢复任务,%s", err)
continue
}
TaskPoll.Submit(job)
}
}
// GetJobFromModel 从数据库给定模型获取任务
func GetJobFromModel(task *model.Task) (Job, error) {
switch task.Type {
case CompressTaskType:
return NewCompressTaskFromModel(task)
default:
return nil, ErrUnknownTaskType
}
}

View File

@@ -1,124 +1,60 @@
package task
import "sync"
import (
model "github.com/HFO4/cloudreve/models"
"github.com/HFO4/cloudreve/pkg/util"
)
// Pool 带有最大配额的goroutines任务池
// TaskPoll 要使用的任务池
var TaskPoll *Pool
// Pool 带有最大配额的任务池
type Pool struct {
// 容量
capacity int
// 初始容量
initialCapacity int
// 终止信号
terminateSignal chan error
// 全部任务完成的信号
finishSignal chan bool
// 有空余位置的信号
freeSignal chan bool
// 是否已关闭
closed bool
// 是否正在等待任务结束
waiting bool
// 互斥锁
lock sync.Mutex
// 等待队列
pending []Job
idleWorker chan int
}
// Job 任务
type Job interface {
// 任务处理方法如果error不为nil
// 任务池会关闭并中止接受新任务
Do() error
}
// NewGoroutinePool 创建一个容量为capacity的任务池
func NewGoroutinePool(capacity int) *Pool {
pool := &Pool{
capacity: capacity,
initialCapacity: capacity,
terminateSignal: make(chan error),
finishSignal: make(chan bool),
freeSignal: make(chan bool),
}
go pool.Schedule()
return pool
}
// Schedule 为等待队列的任务分配Worker以及检测错误状态、所有任务完成
func (pool *Pool) Schedule() {
for {
select {
case <-pool.freeSignal:
// 有新的空余名额
pool.lock.Lock()
if len(pool.pending) > 0 {
// 有待处理的任务,开始处理
var job Job
job, pool.pending = pool.pending[0], pool.pending[1:]
go pool.start(job)
} else {
if pool.waiting && pool.capacity == pool.initialCapacity {
// 所有任务已结束
pool.lock.Unlock()
pool.finishSignal <- true
return
}
pool.lock.Unlock()
}
case <-pool.terminateSignal:
// 有任务意外中止,则发送完成信号
pool.finishSignal <- true
return
}
// Add 增加可用Worker数量
func (pool *Pool) Add(num int) {
for i := 0; i < num; i++ {
pool.idleWorker <- 1
}
}
// Wait 等待队列中所有任务完成或有Job返回错误中止
func (pool *Pool) Wait() chan bool {
pool.lock.Lock()
pool.waiting = true
pool.lock.Unlock()
return pool.finishSignal
// ObtainWorker 阻塞直到获取新的Worker
func (pool *Pool) ObtainWorker() Worker {
select {
case <-pool.idleWorker:
// 有空闲Worker名额时返回新Worker
return &GeneralWorker{}
}
}
// Submit 提交新任务
// FreeWorker 添加空闲Worker
func (pool *Pool) FreeWorker() {
pool.Add(1)
}
// Submit 开始提交任务
func (pool *Pool) Submit(job Job) {
if pool.closed {
return
}
pool.lock.Lock()
if pool.capacity < 1 {
// 容量为空时,加入等待队列
pool.pending = append(pool.pending, job)
pool.lock.Unlock()
return
}
// 还有空闲容量时,开始执行任务
go pool.start(job)
go func() {
util.Log().Debug("等待获取Worker")
worker := pool.ObtainWorker()
util.Log().Debug("获取到Worker")
worker.Do(job)
util.Log().Debug("释放Worker")
pool.FreeWorker()
}()
}
// 开始执行任务
func (pool *Pool) start(job Job) {
pool.capacity--
pool.lock.Unlock()
err := job.Do()
if err != nil {
pool.closed = true
select {
case <-pool.terminateSignal:
default:
close(pool.terminateSignal)
}
// Init 初始化任务
func Init() {
maxWorker := model.GetIntSetting("max_worker_num", 10)
TaskPoll = &Pool{
idleWorker: make(chan int, maxWorker),
}
TaskPoll.Add(maxWorker)
util.Log().Info("初始化任务队列WorkerNum = %d", maxWorker)
pool.lock.Lock()
pool.capacity++
pool.lock.Unlock()
pool.freeSignal <- true
Resume()
}

41
pkg/task/worker.go Normal file
View File

@@ -0,0 +1,41 @@
package task
import "github.com/HFO4/cloudreve/pkg/util"
// Worker 处理任务的对象
type Worker interface {
Do(Job) // 执行任务
}
// GeneralWorker 通用Worker
type GeneralWorker struct {
}
// Do 执行任务
func (worker *GeneralWorker) Do(job Job) {
util.Log().Debug("开始执行任务")
job.SetStatus(Processing)
defer func() {
// 致命错误捕获
if err := recover(); err != nil {
util.Log().Debug("任务执行出错panic")
job.SetError(&JobError{Msg: "致命错误"})
job.SetStatus(Error)
}
}()
// 开始执行任务
job.Do()
// 任务执行失败
if err := job.GetError(); err != nil {
util.Log().Debug("任务执行出错")
job.SetStatus(Error)
return
}
util.Log().Debug("任务执行完成")
// 执行完成
job.SetStatus(Complete)
}