Feat: slave transfer file in stateless policy

This commit is contained in:
HFO4 2021-08-31 21:48:08 +08:00
parent af2dffe110
commit a7448ed0ea
11 changed files with 149 additions and 30 deletions

View file

@ -0,0 +1,7 @@
package masterinslave
import "errors"
var (
ErrNotImplemented = errors.New("this method of shadowed policy is not implemented")
)

View file

@ -0,0 +1,55 @@
package masterinslave
import (
"context"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
"net/url"
)
// Driver 影子存储策略,用于在从机端上传文件
type Driver struct {
masterID string
handler driver.Handler
policy *model.Policy
}
// NewDriver 返回新的处理器
func NewDriver(masterID string, handler driver.Handler, policy *model.Policy) driver.Handler {
return &Driver{
masterID: masterID,
handler: handler,
policy: policy,
}
}
func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
return d.handler.Put(ctx, file, dst, size)
}
func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
return d.handler.Delete(ctx, files)
}
func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
return "", ErrNotImplemented
}
func (d *Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) {
return serializer.UploadCredential{}, ErrNotImplemented
}
func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
return nil, ErrNotImplemented
}

View file

@ -1,4 +1,4 @@
package slave
package slaveinmaster
import "errors"

View file

@ -1,9 +1,10 @@
package slave
package slaveinmaster
import (
"bytes"
"context"
"encoding/json"
"errors"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
@ -49,7 +50,7 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy)
}
// Put 将ctx中指定的从机物理文件由从机上传到目标存储策略
func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
src, ok := ctx.Value(fsctx.SlaveSrcPath).(string)
if !ok {
return ErrSlaveSrcPathNotExist
@ -88,33 +89,33 @@ func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size ui
return ErrWaitResultTimeout
case msg := <-resChan:
if msg.Event != serializer.SlaveTransferSuccess {
return msg.Content.(serializer.SlaveTransferResult).Error
return errors.New(msg.Content.(serializer.SlaveTransferResult).Error)
}
}
return nil
}
func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) {
func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
return nil, ErrNotImplemented
}
func (d Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
return nil, ErrNotImplemented
}
func (d Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
func (d *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
return nil, ErrNotImplemented
}
func (d Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
return "", ErrNotImplemented
}
func (d Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) {
func (d *Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) {
return serializer.UploadCredential{}, ErrNotImplemented
}
func (d Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
return nil, ErrNotImplemented
}

View file

@ -4,7 +4,8 @@ import (
"errors"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/masterinslave"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slaveinmaster"
"io"
"net/http"
"net/url"
@ -243,7 +244,12 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
// SwitchToSlaveHandler 将负责上传的 Handler 切换为从机节点
func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) {
fs.Handler = slave.NewDriver(node, fs.Handler, &fs.User.Policy)
fs.Handler = slaveinmaster.NewDriver(node, fs.Handler, &fs.User.Policy)
}
// SwitchToShadowHandler 将负责上传的 Handler 切换为从机节点转存使用的影子处理器
func (fs *FileSystem) SwitchToShadowHandler(masterID string) {
fs.Handler = masterinslave.NewDriver(masterID, fs.Handler, fs.Policy)
}
// SetTargetFile 设置当前处理的目标文件

View file

@ -228,12 +228,14 @@ func (fs *FileSystem) UploadFromStream(ctx context.Context, src io.ReadCloser, d
}
// UploadFromPath 将本机已有文件上传到用户的文件系统
func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string) error {
func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, resetPolicy bool) error {
// 重设存储策略
fs.Policy = &fs.User.Policy
err := fs.DispatchHandler()
if err != nil {
return err
if resetPolicy {
fs.Policy = &fs.User.Policy
err := fs.DispatchHandler()
if err != nil {
return err
}
}
file, err := os.Open(util.RelativePath(src))

View file

@ -226,13 +226,13 @@ func TestFileSystem_UploadFromPath(t *testing.T) {
// 文件不存在
{
err := fs.UploadFromPath(ctx, "test/not_exist", "/")
err := fs.UploadFromPath(ctx, "test/not_exist", "/", true)
asserts.Error(err)
}
// 文存在,上传失败
{
err := fs.UploadFromPath(ctx, "tests/test.zip", "/")
err := fs.UploadFromPath(ctx, "tests/test.zip", "/", true)
asserts.Error(err)
}
}

View file

@ -59,7 +59,7 @@ const (
)
type SlaveTransferResult struct {
Error error
Error string
}
func init() {

View file

@ -106,7 +106,7 @@ func (job *CompressTask) Do() {
job.TaskModel.SetProgress(TransferringProgress)
// 上传文件
err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst)
err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, true)
if err != nil {
job.SetErrorMsg(err.Error())
return

View file

@ -1,13 +1,16 @@
package slavetask
import (
"context"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"time"
"os"
)
// TransferTask 文件中转任务
@ -53,7 +56,20 @@ func (job *TransferTask) SetErrorMsg(msg string, err error) {
if err != nil {
jobErr.Error = err.Error()
}
job.SetError(jobErr)
notifyMsg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveTransferFailed,
Content: serializer.SlaveTransferResult{
Error: err.Error(),
},
}
if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), notifyMsg); err != nil {
util.Log().Warning("无法发送转存失败通知到从机, ", err)
}
}
// GetError 返回任务失败信息
@ -63,18 +79,50 @@ func (job *TransferTask) GetError() *task.JobError {
// Do 开始执行任务
func (job *TransferTask) Do() {
time.Sleep(time.Duration(10) * time.Second)
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
job.SetErrorMsg("无法初始化匿名文件系统", err)
return
}
fs.Policy = job.Req.Policy
if err := fs.DispatchHandler(); err != nil {
job.SetErrorMsg("无法分发存储策略", err)
return
}
fs.SwitchToShadowHandler(job.MasterID)
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
file, err := os.Open(util.RelativePath(job.Req.Src))
if err != nil {
job.SetErrorMsg("无法读取源文件", err)
return
}
defer file.Close()
// 获取源文件大小
fi, err := file.Stat()
if err != nil {
job.SetErrorMsg("无法获取源文件大小", err)
return
}
size := fi.Size()
err = fs.Handler.Put(ctx, file, job.Req.Dst, uint64(size))
if err != nil {
job.SetErrorMsg("文件上传失败", err)
return
}
msg := mq.Message{
TriggeredBy: job.MasterID,
Event: serializer.SlaveTransferSuccess,
Content: serializer.SlaveTransferResult{
Error: nil,
},
Content: serializer.SlaveTransferResult{},
}
if err := slave.DefaultController.SendNotification(job.MasterID, job.Req.Hash(job.MasterID), msg); err != nil {
job.SetErrorMsg("无法发送转存结果通知", err)
util.Log().Warning("无法发送转存成功通知到从机, ", err)
}
util.Log().Debug("job done")
}

View file

@ -123,7 +123,7 @@ func (job *TransferTask) Do() {
err = fs.UploadFromStream(ctx, nil, dst, job.TaskProps.SrcSizes[file])
} else {
// 主机节点中转
err = fs.UploadFromPath(ctx, file, dst)
err = fs.UploadFromPath(ctx, file, dst, true)
}
if err != nil {