Cloudreve/pkg/task/slavetask/transfer.go
AaronLiu 056de22edb
Feat: aria2 download and transfer in slave node (#1040)
* Feat: retrieve nodes from data table

* Feat: master node ping slave node in REST API

* Feat: master send scheduled ping request

* Feat: inactive nodes recover loop

* Modify: remove database operations from aria2 RPC caller implementation

* Feat: init aria2 client in master node

* Feat: Round Robin load balancer

* Feat: create and monitor aria2 task in master node

* Feat: salve receive and handle heartbeat

* Fix: Node ID will be 0 in download record generated in older version

* Feat: sign request headers with all `X-` prefix

* Feat: API call to slave node will carry meta data in headers

* Feat: call slave aria2 rpc method from master

* Feat: get slave aria2 task status
Feat: encode slave response data using gob

* Feat: aria2 callback to master node / cancel or select task to slave node

* Fix: use dummy aria2 client when caller initialize failed in master node

* Feat: slave aria2 status event callback / salve RPC auth

* Feat: prototype for slave driven filesystem

* Feat: retry for init aria2 client in master node

* Feat: init request client with global options

* Feat: slave receive async task from master

* Fix: competition write in request header

* Refactor: dependency initialize order

* Feat: generic message queue implementation

* Feat: message queue implementation

* Feat: master waiting slave transfer result

* Feat: slave transfer file in stateless policy

* Feat: slave transfer file in slave policy

* Feat: slave transfer file in local policy

* Feat: slave transfer file in OneDrive policy

* Fix: failed to initialize update checker http client

* Feat: list slave nodes for dashboard

* Feat: test aria2 rpc connection in slave

* Feat: add and save node

* Feat: add and delete node in node pool

* Fix: temp file cannot be removed when aria2 task fails

* Fix: delete node in admin panel

* Feat: edit node and get node info

* Modify: delete unused settings
2021-10-31 09:41:56 +08:00

145 lines
3.4 KiB
Go

package slavetask
import (
"context"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"os"
"path/filepath"
)
// TransferTask 文件中转任务
type TransferTask struct {
Err *task.JobError
Req *serializer.SlaveTransferReq
MasterID string
}
// Props 获取任务属性
func (job *TransferTask) Props() string {
return ""
}
// Type 获取任务类型
func (job *TransferTask) Type() int {
return 0
}
// Creator 获取创建者ID
func (job *TransferTask) Creator() uint {
return 0
}
// Model 获取任务的数据库模型
func (job *TransferTask) Model() *model.Task {
return nil
}
// SetStatus 设定状态
func (job *TransferTask) SetStatus(status int) {
}
// SetError 设定任务失败信息
func (job *TransferTask) SetError(err *task.JobError) {
job.Err = err
}
// SetErrorMsg 设定任务失败信息
func (job *TransferTask) SetErrorMsg(msg string, err error) {
jobErr := &task.JobError{Msg: msg}
if err != nil {
jobErr.Error = err.Error()
}
job.SetError(jobErr)
notifyMsg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveTransferFailed,
Content: serializer.SlaveTransferResult{
Error: err.Error(),
},
}
if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil {
util.Log().Warning("无法发送转存失败通知到从机, ", err)
}
}
// GetError 返回任务失败信息
func (job *TransferTask) GetError() *task.JobError {
return job.Err
}
// Do 开始执行任务
func (job *TransferTask) Do() {
defer job.Recycle()
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
job.SetErrorMsg("无法初始化匿名文件系统", err)
return
}
fs.Policy = job.Req.Policy
if err := fs.DispatchHandler(); err != nil {
job.SetErrorMsg("无法分发存储策略", err)
return
}
master, err := slave.DefaultController.GetMasterInfo(job.MasterID)
if err != nil {
job.SetErrorMsg("找不到主机节点", err)
return
}
fs.SwitchToShadowHandler(master.Instance, master.URL.String(), master.ID)
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
file, err := os.Open(util.RelativePath(job.Req.Src))
if err != nil {
job.SetErrorMsg("无法读取源文件", err)
return
}
defer file.Close()
// 获取源文件大小
fi, err := file.Stat()
if err != nil {
job.SetErrorMsg("无法获取源文件大小", err)
return
}
size := fi.Size()
err = fs.Handler.Put(ctx, file, job.Req.Dst, uint64(size))
if err != nil {
job.SetErrorMsg("文件上传失败", err)
return
}
msg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveTransferSuccess,
Content: serializer.SlaveTransferResult{},
}
if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil {
util.Log().Warning("无法发送转存成功通知到从机, ", err)
}
}
// Recycle 回收临时文件
func (job *TransferTask) Recycle() {
err := os.RemoveAll(filepath.Dir(job.Req.Src))
if err != nil {
util.Log().Warning("无法删除中转临时目录[%s], %s", job.Req.Src, err)
}
}