diff --git a/pkg/filesystem/archive.go b/pkg/filesystem/archive.go index ac0a2fc..cca49eb 100644 --- a/pkg/filesystem/archive.go +++ b/pkg/filesystem/archive.go @@ -306,9 +306,9 @@ func (fs *FileSystem) Decompress(ctx context.Context, src, dst string) error { err = fs.UploadFromStream(ctx, &fsctx.FileStream{ File: fileStream, Size: uint64(size), - Name: path.Base(dst), - VirtualPath: path.Dir(dst), - }) + Name: path.Base(savePath), + VirtualPath: path.Dir(savePath), + }, true) fileStream.Close() if err != nil { util.Log().Debug("无法上传压缩包内的文件%s , %s , 跳过", rawPath, err) diff --git a/pkg/filesystem/driver/remote/client.go b/pkg/filesystem/driver/remote/client.go index a0a5642..ab764f5 100644 --- a/pkg/filesystem/driver/remote/client.go +++ b/pkg/filesystem/driver/remote/client.go @@ -57,6 +57,7 @@ func NewClient(policy *model.Policy) (Client, error) { request.WithEndpoint(serverURL.ResolveReference(base).String()), request.WithCredential(authInstance, int64(signTTL)), request.WithMasterMeta(), + request.WithSlaveMeta(policy.AccessKey), ), }, nil } diff --git a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go index 11e5f9a..7fc7b09 100644 --- a/pkg/filesystem/driver/shadow/slaveinmaster/handler.go +++ b/pkg/filesystem/driver/shadow/slaveinmaster/handler.go @@ -52,14 +52,10 @@ func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) func (d *Driver) Put(ctx context.Context, file fsctx.FileHeader) error { defer file.Close() - src, ok := ctx.Value(fsctx.SlaveSrcPath).(string) - if !ok { - return ErrSlaveSrcPathNotExist - } - + fileInfo := file.Info() req := serializer.SlaveTransferReq{ - Src: src, - Dst: file.Info().SavePath, + Src: fileInfo.Src, + Dst: fileInfo.SavePath, Policy: d.policy, } diff --git a/pkg/filesystem/fsctx/stream.go b/pkg/filesystem/fsctx/stream.go index 122ead5..c51d28c 100644 --- a/pkg/filesystem/fsctx/stream.go +++ b/pkg/filesystem/fsctx/stream.go @@ -26,6 +26,7 @@ type UploadTaskInfo struct { UploadSessionID *string AppendStart uint64 Model interface{} + Src string } // FileHeader 上传来的文件数据处理器 @@ -54,14 +55,23 @@ type FileStream struct { UploadSessionID *string AppendStart uint64 Model interface{} + Src string } func (file *FileStream) Read(p []byte) (n int, err error) { - return file.File.Read(p) + if file.File != nil { + return file.File.Read(p) + } + + return 0, io.EOF } func (file *FileStream) Close() error { - return file.File.Close() + if file.File != nil { + return file.File.Close() + } + + return nil } func (file *FileStream) Seek(offset int64, whence int) (int64, error) { @@ -85,6 +95,7 @@ func (file *FileStream) Info() *UploadTaskInfo { UploadSessionID: file.UploadSessionID, AppendStart: file.AppendStart, Model: file.Model, + Src: file.Src, } } diff --git a/pkg/filesystem/upload.go b/pkg/filesystem/upload.go index 7ebbd4b..38d42ef 100644 --- a/pkg/filesystem/upload.go +++ b/pkg/filesystem/upload.go @@ -224,7 +224,16 @@ func (fs *FileSystem) CreateUploadSession(ctx context.Context, file *fsctx.FileS } // UploadFromStream 从文件流上传文件 -func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStream) error { +func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStream, resetPolicy bool) error { + if resetPolicy { + // 重设存储策略 + fs.Policy = &fs.User.Policy + err := fs.DispatchHandler() + if err != nil { + return err + } + } + // 给文件系统分配钩子 fs.Lock.Lock() if fs.Hooks == nil { @@ -242,16 +251,7 @@ func (fs *FileSystem) UploadFromStream(ctx context.Context, file *fsctx.FileStre } // UploadFromPath 将本机已有文件上传到用户的文件系统 -func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, resetPolicy bool, mode fsctx.WriteMode) error { - // 重设存储策略 - if resetPolicy { - fs.Policy = &fs.User.Policy - err := fs.DispatchHandler() - if err != nil { - return err - } - } - +func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, mode fsctx.WriteMode) error { file, err := os.Open(util.RelativePath(src)) if err != nil { return err @@ -273,5 +273,5 @@ func (fs *FileSystem) UploadFromPath(ctx context.Context, src, dst string, reset Name: path.Base(dst), VirtualPath: path.Dir(dst), Mode: mode, - }) + }, true) } diff --git a/pkg/task/compress.go b/pkg/task/compress.go index d4a3400..f7314ec 100644 --- a/pkg/task/compress.go +++ b/pkg/task/compress.go @@ -106,7 +106,7 @@ func (job *CompressTask) Do() { job.TaskModel.SetProgress(TransferringProgress) // 上传文件 - err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, true, 0) + err = fs.UploadFromPath(ctx, zipFile, job.TaskProps.Dst, 0) if err != nil { job.SetErrorMsg(err.Error()) return diff --git a/pkg/task/tranfer.go b/pkg/task/tranfer.go index 4983c4f..b802324 100644 --- a/pkg/task/tranfer.go +++ b/pkg/task/tranfer.go @@ -117,16 +117,18 @@ func (job *TransferTask) Do() { } // 切换为从机节点处理上传 + fs.SetPolicyFromPath(path.Dir(dst)) fs.SwitchToSlaveHandler(node) err = fs.UploadFromStream(context.Background(), &fsctx.FileStream{ File: nil, Size: job.TaskProps.SrcSizes[file], Name: path.Base(dst), VirtualPath: path.Dir(dst), - }) + Src: file, + }, false) } else { // 主机节点中转 - err = fs.UploadFromPath(context.Background(), file, dst, true, 0) + err = fs.UploadFromPath(context.Background(), file, dst, 0) } if err != nil { diff --git a/pkg/task/worker.go b/pkg/task/worker.go index 07ef36b..3e01f17 100644 --- a/pkg/task/worker.go +++ b/pkg/task/worker.go @@ -1,6 +1,9 @@ package task -import "github.com/cloudreve/Cloudreve/v3/pkg/util" +import ( + "fmt" + "github.com/cloudreve/Cloudreve/v3/pkg/util" +) // Worker 处理任务的对象 type Worker interface { @@ -20,7 +23,7 @@ func (worker *GeneralWorker) Do(job Job) { // 致命错误捕获 if err := recover(); err != nil { util.Log().Debug("任务执行出错,%s", err) - job.SetError(&JobError{Msg: "致命错误"}) + job.SetError(&JobError{Msg: "致命错误", Error: fmt.Sprintf("%s", err)}) job.SetStatus(Error) } }() diff --git a/routers/router.go b/routers/router.go index 99c6025..21f42ae 100644 --- a/routers/router.go +++ b/routers/router.go @@ -219,7 +219,15 @@ func InitMasterRouter() *gin.Engine { // 事件通知 slave.PUT("notification/:subject", controllers.SlaveNotificationPush) // 上传 - slave.POST("upload", controllers.SlaveUpload) + upload := slave.Group("upload") + { + // 上传分片 + upload.POST(":sessionId", controllers.SlaveUpload) + // 创建上传会话上传 + upload.PUT("", controllers.SlaveGetUploadSession) + // 删除上传会话 + upload.DELETE(":sessionId", controllers.SlaveDeleteUploadSession) + } // OneDrive 存储策略凭证 slave.GET("credential/onedrive/:id", controllers.SlaveGetOneDriveCredential) } diff --git a/service/explorer/upload.go b/service/explorer/upload.go index 5753c2b..ced4841 100644 --- a/service/explorer/upload.go +++ b/service/explorer/upload.go @@ -7,6 +7,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/auth" "github.com/cloudreve/Cloudreve/v3/pkg/cache" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/local" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/hashid" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" @@ -137,6 +138,8 @@ func (service *UploadService) SlaveUpload(ctx context.Context, c *gin.Context) s return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) } + fs.Handler = local.Driver{} + // 解析需要的参数 service.Index, _ = strconv.Atoi(c.Query("chunk")) mode := fsctx.Append