Feat: master waiting slave transfer result

This commit is contained in:
HFO4 2021-08-31 21:47:32 +08:00
parent 50e55c7f20
commit af2dffe110
6 changed files with 63 additions and 11 deletions

View file

@ -108,6 +108,7 @@ func addDefaultSettings() {
{Name: "slave_node_retry", Value: `3`, Type: "slave"},
{Name: "slave_ping_interval", Value: `300`, Type: "slave"},
{Name: "slave_recover_interval", Value: `600`, Type: "slave"},
{Name: "slave_transfer_timeout", Value: `172800`, Type: "timeout"},
{Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"},
{Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"},
{Name: "onedrive_callback_check", Value: `20`, Type: "timeout"},

View file

@ -5,4 +5,5 @@ import "errors"
var (
ErrNotImplemented = errors.New("this method of shadowed policy is not implemented")
ErrSlaveSrcPathNotExist = errors.New("cannot determine source file path in slave node")
ErrWaitResultTimeout = errors.New("timeout waiting for slave transfer result")
)

View file

@ -9,6 +9,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
@ -65,6 +66,10 @@ func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size ui
return err
}
// 订阅转存结果
resChan := mq.GlobalMQ.Subscribe(req.Hash(model.GetSettingByName("siteID")), 0)
defer mq.GlobalMQ.Unsubscribe(req.Hash(model.GetSettingByName("siteID")), resChan)
res, err := d.client.Request("PUT", "task/transfer", bytes.NewReader(body)).
CheckHTTPResponse(200).
DecodeResponse()
@ -76,9 +81,18 @@ func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size ui
return serializer.NewErrorFromResponse(res)
}
// TODO: subscribe and wait
// 等待转存结果或者超时
waitTimeout := model.GetIntSetting("slave_transfer_timeout", 172800)
select {
case <-time.After(time.Duration(waitTimeout) * time.Second):
return ErrWaitResultTimeout
case msg := <-resChan:
if msg.Event != serializer.SlaveTransferSuccess {
return msg.Content.(serializer.SlaveTransferResult).Error
}
}
return ErrNotImplemented
return nil
}
func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) {

View file

@ -1,6 +1,8 @@
package serializer
import (
"crypto/sha1"
"encoding/gob"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
)
@ -38,12 +40,28 @@ type SlaveAria2Call struct {
// SlaveTransferReq 从机中转任务创建请求
type SlaveTransferReq struct {
Src string `json:"src"`
Dst string `json:"dst"`
Policy *model.Policy
Src string `json:"src"`
Dst string `json:"dst"`
Policy *model.Policy `json:"policy"`
}
// Hash 返回创建请求的唯一标识,保持创建请求幂等
func (s *SlaveTransferReq) Hash(id string) string {
return fmt.Sprintf("transfer-%s-%s-%s-%d", id, s.Src, s.Dst, s.Policy.ID)
h := sha1.New()
h.Write([]byte(fmt.Sprintf("transfer-%s-%s-%s-%d", id, s.Src, s.Dst, s.Policy.ID)))
bs := h.Sum(nil)
return fmt.Sprintf("%x", bs)
}
const (
SlaveTransferSuccess = "success"
SlaveTransferFailed = "failed"
)
type SlaveTransferResult struct {
Error error
}
func init() {
gob.Register(SlaveTransferResult{})
}

View file

@ -2,15 +2,19 @@ package slavetask
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"time"
)
// TransferTask 文件中转任务
type TransferTask struct {
Err *task.JobError
Req *serializer.SlaveTransferReq
Err *task.JobError
Req *serializer.SlaveTransferReq
MasterID string
}
// Props 获取任务属性
@ -59,5 +63,18 @@ func (job *TransferTask) GetError() *task.JobError {
// Do 开始执行任务
func (job *TransferTask) Do() {
util.Log().Debug("job")
time.Sleep(time.Duration(10) * time.Second)
msg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveTransferSuccess,
Content: serializer.SlaveTransferResult{
Error: nil,
},
}
if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil {
job.SetErrorMsg("无法发送转存结果通知", err)
}
util.Log().Debug("job done")
}

View file

@ -148,10 +148,11 @@ func (service *SlaveFileService) Thumb(ctx context.Context, c *gin.Context) seri
func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serializer.Response {
if id, ok := c.Get("MasterSiteID"); ok {
job := &slavetask.TransferTask{
Req: req,
Req: req,
MasterID: id.(string),
}
if err := slave.DefaultController.SubmitTask(id.(string), job, req.Hash(id.(string))); err != nil {
if err := slave.DefaultController.SubmitTask(job.MasterID, job, req.Hash(job.MasterID)); err != nil {
return serializer.Err(serializer.CodeInternalSetting, "任务创建失败", err)
}