Feat: prototype for slave driven filesystem

This commit is contained in:
HFO4 2021-08-29 20:32:03 +08:00
parent 870df708bf
commit 99953825ff
4 changed files with 44 additions and 10 deletions

View file

@ -239,17 +239,24 @@ func (monitor *Monitor) RemoveTempFolder() {
func (monitor *Monitor) Complete(status rpc.StatusInfo) bool {
// 创建中转任务
file := make([]string, 0, len(monitor.Task.StatusInfo.Files))
sizes := make(map[string]uint64, len(monitor.Task.StatusInfo.Files))
for i := 0; i < len(monitor.Task.StatusInfo.Files); i++ {
if monitor.Task.StatusInfo.Files[i].Selected == "true" {
file = append(file, monitor.Task.StatusInfo.Files[i].Path)
fileInfo := monitor.Task.StatusInfo.Files[i]
if fileInfo.Selected == "true" {
file = append(file, fileInfo.Path)
size, _ := strconv.ParseUint(fileInfo.Length, 10, 64)
sizes[fileInfo.Path] = size
}
}
job, err := task.NewTransferTask(
monitor.Task.UserID,
file,
monitor.Task.Dst,
monitor.Task.Parent,
true,
monitor.node.ID(),
sizes,
)
if err != nil {
monitor.setErrorStatus(err)

View file

@ -3,6 +3,7 @@ package filesystem
import (
"context"
"errors"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"io"
"net/http"
"net/url"
@ -134,7 +135,6 @@ func NewFileSystem(user *model.User) (*FileSystem, error) {
// 分配存储策略适配器
err := fs.DispatchHandler()
// TODO 分配默认钩子
return fs, err
}
@ -159,7 +159,6 @@ func NewAnonymousFileSystem() (*FileSystem, error) {
}
// DispatchHandler 根据存储策略分配文件适配器
// TODO 完善测试
func (fs *FileSystem) DispatchHandler() error {
var policyType string
var currentPolicy *model.Policy
@ -272,6 +271,11 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
return fs, err
}
// SwitchToSlaveHandler 将负责上传的 Handler 切换为从机节点
func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) {
}
// SetTargetFile 设置当前处理的目标文件
func (fs *FileSystem) SetTargetFile(files *[]model.File) {
if len(fs.FileTarget) == 0 {

View file

@ -96,7 +96,9 @@ func Resume() {
continue
}
TaskPoll.Submit(job)
if job != nil {
TaskPoll.Submit(job)
}
}
}

View file

@ -9,6 +9,7 @@ import (
"strings"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
@ -26,11 +27,14 @@ type TransferTask struct {
// TransferProps 中转任务属性
type TransferProps struct {
Src []string `json:"src"` // 原始文件
Parent string `json:"parent"` // 父目录
Dst string `json:"dst"` // 目的目录ID
Src []string `json:"src"` // 原始文件
SrcSizes map[string]uint64 `json:"src_size"` // 原始文件的大小信息,从机转存时使用
Parent string `json:"parent"` // 父目录
Dst string `json:"dst"` // 目的目录ID
// 将会保留原始文件的目录结构Src 除去 Parent 开头作为最终路径
TrimPath bool `json:"trim_path"`
// 负责处理中专任务的节点ID
NodeID uint `json:"node_id"`
}
// Props 获取任务属性
@ -104,7 +108,22 @@ func (job *TransferTask) Do() {
}
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
err = fs.UploadFromPath(ctx, file, dst)
if job.TaskProps.NodeID > 1 {
// 指定为从机中转
// 获取从机节点
node := cluster.Default.GetNodeByID(job.TaskProps.NodeID)
if node == nil {
job.SetErrorMsg("从机节点不可用", nil)
}
// 切换为从机节点处理上传
fs.SwitchToSlaveHandler(node)
err = fs.UploadFromStream(ctx, nil, dst, job.TaskProps.SrcSizes[file])
} else {
err = fs.UploadFromPath(ctx, file, dst)
}
if err != nil {
job.SetErrorMsg("文件转存失败", err)
}
@ -122,7 +141,7 @@ func (job *TransferTask) Recycle() {
}
// NewTransferTask 新建中转任务
func NewTransferTask(user uint, src []string, dst, parent string, trim bool) (Job, error) {
func NewTransferTask(user uint, src []string, dst, parent string, trim bool, node uint, sizes map[string]uint64) (Job, error) {
creator, err := model.GetActiveUserByID(user)
if err != nil {
return nil, err
@ -135,6 +154,8 @@ func NewTransferTask(user uint, src []string, dst, parent string, trim bool) (Jo
Parent: parent,
Dst: dst,
TrimPath: trim,
NodeID: node,
SrcSizes: sizes,
},
}