From a7448ed0eaa9d156abec493dc3b499bd3f548170 Mon Sep 17 00:00:00 2001 From: HFO4 <912394456@qq.com> Date: Tue, 31 Aug 2021 21:48:08 +0800 Subject: [PATCH] Feat: slave transfer file in stateless policy --- .../driver/shadow/masterinslave/errors.go | 7 ++ .../driver/shadow/masterinslave/handler.go | 55 ++++++++++++++++ .../shadow/{slave => slaveinmaster}/errors.go | 2 +- .../{slave => slaveinmaster}/handler.go | 19 +++--- pkg/filesystem/filesystem.go | 10 ++- pkg/filesystem/upload.go | 12 ++-- pkg/filesystem/upload_test.go | 4 +- pkg/serializer/slave.go | 2 +- pkg/task/compress.go | 2 +- pkg/task/slavetask/transfer.go | 64 ++++++++++++++++--- pkg/task/tranfer.go | 2 +- 11 files changed, 149 insertions(+), 30 deletions(-) create mode 100644 pkg/filesystem/driver/shadow/masterinslave/errors.go create mode 100644 pkg/filesystem/driver/shadow/masterinslave/handler.go rename pkg/filesystem/driver/shadow/{slave => slaveinmaster}/errors.go (92%) rename pkg/filesystem/driver/shadow/{slave => slaveinmaster}/handler.go (77%) diff --git a/pkg/filesystem/driver/shadow/masterinslave/errors.go b/pkg/filesystem/driver/shadow/masterinslave/errors.go new file mode 100644 index 0000000..27d0428 --- /dev/null +++ b/pkg/filesystem/driver/shadow/masterinslave/errors.go @@ -0,0 +1,7 @@ +package masterinslave + +import "errors" + +var ( + ErrNotImplemented = errors.New("this method of shadowed policy is not implemented") +) diff --git a/pkg/filesystem/driver/shadow/masterinslave/handler.go b/pkg/filesystem/driver/shadow/masterinslave/handler.go new file mode 100644 index 0000000..00b119c --- /dev/null +++ b/pkg/filesystem/driver/shadow/masterinslave/handler.go @@ -0,0 +1,55 @@ +package masterinslave + +import ( + "context" + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "io" + "net/url" +) + +// Driver 影子存储策略,用于在从机端上传文件 +type Driver struct { + masterID string + handler driver.Handler + policy *model.Policy +} + +// NewDriver 返回新的处理器 +func NewDriver(masterID string, handler driver.Handler, policy *model.Policy) driver.Handler { + return &Driver{ + masterID: masterID, + handler: handler, + policy: policy, + } +} + +func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error { + return d.handler.Put(ctx, file, dst, size) +} + +func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) { + return d.handler.Delete(ctx, files) +} + +func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) { + return nil, ErrNotImplemented +} + +func (d *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) { + return nil, ErrNotImplemented +} + +func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) { + return "", ErrNotImplemented +} + +func (d *Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) { + return serializer.UploadCredential{}, ErrNotImplemented +} + +func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { + return nil, ErrNotImplemented +} diff --git a/pkg/filesystem/driver/shadow/slave/errors.go b/pkg/filesystem/driver/shadow/slaveinmaster/errors.go similarity index 92% rename from pkg/filesystem/driver/shadow/slave/errors.go rename to pkg/filesystem/driver/shadow/slaveinmaster/errors.go index e126172..6acadc8 100644 --- a/pkg/filesystem/driver/shadow/slave/errors.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/errors.go @@ -1,4 +1,4 @@ -package slave +package slaveinmaster import "errors" diff --git a/pkg/filesystem/driver/shadow/slave/handler.go b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go similarity index 77% rename from pkg/filesystem/driver/shadow/slave/handler.go rename to pkg/filesystem/driver/shadow/slaveinmaster/handler.go index 3f8a5b8..e9f18ad 100644 --- a/pkg/filesystem/driver/shadow/slave/handler.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go @@ -1,9 +1,10 @@ -package slave +package slaveinmaster import ( "bytes" "context" "encoding/json" + "errors" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" @@ -49,7 +50,7 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) } // Put 将ctx中指定的从机物理文件由从机上传到目标存储策略 -func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error { +func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error { src, ok := ctx.Value(fsctx.SlaveSrcPath).(string) if !ok { return ErrSlaveSrcPathNotExist @@ -88,33 +89,33 @@ func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size ui return ErrWaitResultTimeout case msg := <-resChan: if msg.Event != serializer.SlaveTransferSuccess { - return msg.Content.(serializer.SlaveTransferResult).Error + return errors.New(msg.Content.(serializer.SlaveTransferResult).Error) } } return nil } -func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) { +func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) { return nil, ErrNotImplemented } -func (d Driver) Get(ctx context.Context, path string) (response.RSCloser, error) { +func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) { return nil, ErrNotImplemented } -func (d Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) { +func (d *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) { return nil, ErrNotImplemented } -func (d Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) { +func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) { return "", ErrNotImplemented } -func (d Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) { +func (d *Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) { return serializer.UploadCredential{}, ErrNotImplemented } -func (d Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { +func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { return nil, ErrNotImplemented } diff --git a/pkg/filesystem/filesystem.go b/pkg/filesystem/filesystem.go index 16eff6c..2546d24 100644 --- a/pkg/filesystem/filesystem.go +++ b/pkg/filesystem/filesystem.go @@ -4,7 +4,8 @@ import ( "errors" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" - "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slave" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/masterinslave" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slaveinmaster" "io" "net/http" "net/url" @@ -243,7 +244,12 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) { // SwitchToSlaveHandler 将负责上传的 Handler 切换为从机节点 func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) { - fs.Handler = slave.NewDriver(node, fs.Handler, &fs.User.Policy) + fs.Handler = slaveinmaster.NewDriver(node, fs.Handler, &fs.User.Policy) +} + +// SwitchToShadowHandler 将负责上传的 Handler 切换为从机节点转存使用的影子处理器 +func (fs *FileSystem) SwitchToShadowHandler(masterID string) { + fs.Handler = masterinslave.NewDriver(masterID, fs.Handler, fs.Policy) } // SetTargetFile 设置当前处理的目标文件 diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go index 2ff8997..e2b092a 100644 --- a/pkg/filesystem/upload.go +++ b/pkg/filesystem/upload.go @@ -228,12 +228,14 @@ func (fs *FileSystem) UploadFromStream(ctx context.Context, src io.ReadCloser, d } // UploadFromPath 将本机已有文件上传到用户的文件系统 -func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string) error { +func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, resetPolicy bool) error { // 重设存储策略 - fs.Policy = &fs.User.Policy - err := fs.DispatchHandler() - if err != nil { - return err + if resetPolicy { + fs.Policy = &fs.User.Policy + err := fs.DispatchHandler() + if err != nil { + return err + } } file, err := os.Open(util.RelativePath(src)) diff --git a/pkg/filesystem/upload_test.go b/pkg/filesystem/upload_test.go index 2c0d827..8473e4f 100644 --- a/pkg/filesystem/upload_test.go +++ b/pkg/filesystem/upload_test.go @@ -226,13 +226,13 @@ func TestFileSystem_UploadFromPath(t *testing.T) { // 文件不存在 { - err := fs.UploadFromPath(ctx, "test/not_exist", "/") + err := fs.UploadFromPath(ctx, "test/not_exist", "/", true) asserts.Error(err) } // 文存在,上传失败 { - err := fs.UploadFromPath(ctx, "tests/test.zip", "/") + err := fs.UploadFromPath(ctx, "tests/test.zip", "/", true) asserts.Error(err) } } diff --git a/pkg/serializer/slave.go b/pkg/serializer/slave.go index 0003072..245767a 100644 --- a/pkg/serializer/slave.go +++ b/pkg/serializer/slave.go @@ -59,7 +59,7 @@ const ( ) type SlaveTransferResult struct { - Error error + Error string } func init() { diff --git a/pkg/task/compress.go b/pkg/task/compress.go index b134922..95b06d5 100644 --- a/pkg/task/compress.go +++ b/pkg/task/compress.go @@ -106,7 +106,7 @@ func (job *CompressTask) Do() { job.TaskModel.SetProgress(TransferringProgress) // 上传文件 - err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst) + err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, true) if err != nil { job.SetErrorMsg(err.Error()) return diff --git a/pkg/task/slavetask/transfer.go b/pkg/task/slavetask/transfer.go index 5dc1fdd..c7d6da6 100644 --- a/pkg/task/slavetask/transfer.go +++ b/pkg/task/slavetask/transfer.go @@ -1,13 +1,16 @@ package slavetask import ( + "context" model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/mq" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/cloudreve/Cloudreve/v3/pkg/slave" "github.com/cloudreve/Cloudreve/v3/pkg/task" "github.com/cloudreve/Cloudreve/v3/pkg/util" - "time" + "os" ) // TransferTask 文件中转任务 @@ -53,7 +56,20 @@ func (job *TransferTask) SetErrorMsg(msg string, err error) { if err != nil { jobErr.Error = err.Error() } + job.SetError(jobErr) + + notifyMsg := mq.Message{ + TriggeredBy: job.MasterID, + Event: serializer.SlaveTransferFailed, + Content: serializer.SlaveTransferResult{ + Error: err.Error(), + }, + } + + if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil { + util.Log().Warning("无法发送转存失败通知到从机, ", err) + } } // GetError 返回任务失败信息 @@ -63,18 +79,50 @@ func (job *TransferTask) GetError() *task.JobError { // Do 开始执行任务 func (job *TransferTask) Do() { - time.Sleep(time.Duration(10) * time.Second) + fs, err := filesystem.NewAnonymousFileSystem() + if err != nil { + job.SetErrorMsg("无法初始化匿名文件系统", err) + return + } + + fs.Policy = job.Req.Policy + if err := fs.DispatchHandler(); err != nil { + job.SetErrorMsg("无法分发存储策略", err) + return + } + + fs.SwitchToShadowHandler(job.MasterID) + ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true) + file, err := os.Open(util.RelativePath(job.Req.Src)) + if err != nil { + job.SetErrorMsg("无法读取源文件", err) + return + } + + defer file.Close() + + // 获取源文件大小 + fi, err := file.Stat() + if err != nil { + job.SetErrorMsg("无法获取源文件大小", err) + return + } + + size := fi.Size() + + err = fs.Handler.Put(ctx, file, job.Req.Dst, uint64(size)) + if err != nil { + job.SetErrorMsg("文件上传失败", err) + return + } + msg := mq.Message{ TriggeredBy: job.MasterID, Event: serializer.SlaveTransferSuccess, - Content: serializer.SlaveTransferResult{ - Error: nil, - }, + Content: serializer.SlaveTransferResult{}, } if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil { - job.SetErrorMsg("无法发送转存结果通知", err) + util.Log().Warning("无法发送转存成功通知到从机, ", err) } - - util.Log().Debug("job done") } diff --git a/pkg/task/tranfer.go b/pkg/task/tranfer.go index 2c7de05..aee96fb 100644 --- a/pkg/task/tranfer.go +++ b/pkg/task/tranfer.go @@ -123,7 +123,7 @@ func (job *TransferTask) Do() { err = fs.UploadFromStream(ctx, nil, dst, job.TaskProps.SrcSizes[file]) } else { // 主机节点中转 - err = fs.UploadFromPath(ctx, file, dst) + err = fs.UploadFromPath(ctx, file, dst, true) } if err != nil {