From af2dffe110e78d6051f59274c9093a1d0a5ec973 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Tue, 31 Aug 2021 21:47:32 +0800 Subject: [PATCH] Feat: master waiting slave transfer result --- models/migration.go | 1 + pkg/filesystem/driver/shadow/slave/errors.go | 1 + pkg/filesystem/driver/shadow/slave/handler.go | 18 +++++++++++-- pkg/serializer/slave.go | 26 ++++++++++++++++--- pkg/task/slavetask/transfer.go | 23 +++++++++++++--- service/explorer/slave.go | 5 ++-- 6 files changed, 63 insertions(+), 11 deletions(-) diff --git a/models/migration.go b/models/migration.go index 3e869ef..3e493f0 100644 --- a/models/migration.go +++ b/models/migration.go @@ -108,6 +108,7 @@ func addDefaultSettings() { {Name: "slave_node_retry", Value: `3`, Type: "slave"}, {Name: "slave_ping_interval", Value: `300`, Type: "slave"}, {Name: "slave_recover_interval", Value: `600`, Type: "slave"}, + {Name: "slave_transfer_timeout", Value: `172800`, Type: "timeout"}, {Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"}, {Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"}, {Name: "onedrive_callback_check", Value: `20`, Type: "timeout"}, diff --git a/pkg/filesystem/driver/shadow/slave/errors.go b/pkg/filesystem/driver/shadow/slave/errors.go index f1dd4f1..e126172 100644 --- a/pkg/filesystem/driver/shadow/slave/errors.go +++ b/pkg/filesystem/driver/shadow/slave/errors.go @@ -5,4 +5,5 @@ import "errors" var ( ErrNotImplemented = errors.New("this method of shadowed policy is not implemented") ErrSlaveSrcPathNotExist = errors.New("cannot determine source file path in slave node") + ErrWaitResultTimeout = errors.New("timeout waiting for slave transfer result") ) diff --git a/pkg/filesystem/driver/shadow/slave/handler.go b/pkg/filesystem/driver/shadow/slave/handler.go index d1713dc..3f8a5b8 100644 --- a/pkg/filesystem/driver/shadow/slave/handler.go +++ b/pkg/filesystem/driver/shadow/slave/handler.go @@ -9,6 +9,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" + "github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "io" @@ -65,6 +66,10 @@ func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size ui return err } + // 订阅转存结果 + resChan := mq.GlobalMQ.Subscribe(req.Hash(model.GetSettingByName("siteID")), 0) + defer mq.GlobalMQ.Unsubscribe(req.Hash(model.GetSettingByName("siteID")), resChan) + res, err := d.client.Request("PUT", "task/transfer", bytes.NewReader(body)). CheckHTTPResponse(200). DecodeResponse() @@ -76,9 +81,18 @@ func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size ui return serializer.NewErrorFromResponse(res) } - // TODO: subscribe and wait + // 等待转存结果或者超时 + waitTimeout := model.GetIntSetting("slave_transfer_timeout", 172800) + select { + case <-time.After(time.Duration(waitTimeout) * time.Second): + return ErrWaitResultTimeout + case msg := <-resChan: + if msg.Event != serializer.SlaveTransferSuccess { + return msg.Content.(serializer.SlaveTransferResult).Error + } + } - return ErrNotImplemented + return nil } func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) { diff --git a/pkg/serializer/slave.go b/pkg/serializer/slave.go index f4d7adb..0003072 100644 --- a/pkg/serializer/slave.go +++ b/pkg/serializer/slave.go @@ -1,6 +1,8 @@ package serializer import ( + "crypto/sha1" + "encoding/gob" "fmt" model "github.com/cloudreve/Cloudreve/v3/models" ) @@ -38,12 +40,28 @@ type SlaveAria2Call struct { // SlaveTransferReq 从机中转任务创建请求 type SlaveTransferReq struct { - Src string `json:"src"` - Dst string `json:"dst"` - Policy *model.Policy + Src string `json:"src"` + Dst string `json:"dst"` + Policy *model.Policy `json:"policy"` } // Hash 返回创建请求的唯一标识,保持创建请求幂等 func (s *SlaveTransferReq) Hash(id string) string { - return fmt.Sprintf("transfer-%s-%s-%s-%d", id, s.Src, s.Dst, s.Policy.ID) + h := sha1.New() + h.Write([]byte(fmt.Sprintf("transfer-%s-%s-%s-%d", id, s.Src, s.Dst, s.Policy.ID))) + bs := h.Sum(nil) + return fmt.Sprintf("%x", bs) +} + +const ( + SlaveTransferSuccess = "success" + SlaveTransferFailed = "failed" +) + +type SlaveTransferResult struct { + Error error +} + +func init() { + gob.Register(SlaveTransferResult{}) } diff --git a/pkg/task/slavetask/transfer.go b/pkg/task/slavetask/transfer.go index bfdc0e7..5dc1fdd 100644 --- a/pkg/task/slavetask/transfer.go +++ b/pkg/task/slavetask/transfer.go @@ -2,15 +2,19 @@ package slavetask import ( model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/slave" "github.com/cloudreve/Cloudreve/v3/pkg/task" "github.com/cloudreve/Cloudreve/v3/pkg/util" + "time" ) // TransferTask 文件中转任务 type TransferTask struct { - Err *task.JobError - Req *serializer.SlaveTransferReq + Err *task.JobError + Req *serializer.SlaveTransferReq + MasterID string } // Props 获取任务属性 @@ -59,5 +63,18 @@ func (job *TransferTask) GetError() *task.JobError { // Do 开始执行任务 func (job *TransferTask) Do() { - util.Log().Debug("job") + time.Sleep(time.Duration(10) * time.Second) + msg := mq.Message{ + TriggeredBy: job.MasterID, + Event: serializer.SlaveTransferSuccess, + Content: serializer.SlaveTransferResult{ + Error: nil, + }, + } + + if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil { + job.SetErrorMsg("无法发送转存结果通知", err) + } + + util.Log().Debug("job done") } diff --git a/service/explorer/slave.go b/service/explorer/slave.go index 9f6bb94..56fe3bf 100644 --- a/service/explorer/slave.go +++ b/service/explorer/slave.go @@ -148,10 +148,11 @@ func (service *SlaveFileService) Thumb(ctx context.Context, c *gin.Context) seri func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serializer.Response { if id, ok := c.Get("MasterSiteID"); ok { job := &slavetask.TransferTask{ - Req: req, + Req: req, + MasterID: id.(string), } - if err := slave.DefaultController.SubmitTask(id.(string), job, req.Hash(id.(string))); err != nil { + if err := slave.DefaultController.SubmitTask(job.MasterID, job, req.Hash(job.MasterID)); err != nil { return serializer.Err(serializer.CodeInternalSetting, "任务创建失败", err) }