diff --git a/middleware/cluster.go b/middleware/cluster.go index 8a59c36..079a4f4 100644 --- a/middleware/cluster.go +++ b/middleware/cluster.go @@ -56,7 +56,7 @@ func SlaveRPCSignRequired() gin.HandlerFunc { return } - SignRequired(slaveNode.GetAuthInstance())(c) + SignRequired(slaveNode.MasterAuthInstance())(c) } } diff --git a/pkg/cluster/master.go b/pkg/cluster/master.go index 7307a6c..28f1a3f 100644 --- a/pkg/cluster/master.go +++ b/pkg/cluster/master.go @@ -76,13 +76,20 @@ func (node *MasterNode) IsFeatureEnabled(feature string) bool { } } -func (node *MasterNode) GetAuthInstance() auth.Auth { +func (node *MasterNode) MasterAuthInstance() auth.Auth { node.lock.RLock() defer node.lock.RUnlock() return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)} } +func (node *MasterNode) SlaveAuthInstance() auth.Auth { + node.lock.RLock() + defer node.lock.RUnlock() + + return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)} +} + // SubscribeStatusChange 订阅节点状态更改 func (node *MasterNode) SubscribeStatusChange(callback func(isActive bool, id uint)) { } @@ -122,6 +129,13 @@ func (node *MasterNode) IsMater() bool { return true } +func (node *MasterNode) DBModel() *model.Node { + node.lock.RLock() + defer node.lock.RUnlock() + + return node.Model +} + func (r *rpcService) Init() error { r.parent.lock.Lock() defer r.parent.lock.Unlock() diff --git a/pkg/cluster/node.go b/pkg/cluster/node.go index 9f34f50..745dd25 100644 --- a/pkg/cluster/node.go +++ b/pkg/cluster/node.go @@ -36,7 +36,13 @@ type Node interface { IsMater() bool // Get auth instance used to check RPC call from slave to master - GetAuthInstance() auth.Auth + MasterAuthInstance() auth.Auth + + // Get auth instance used to check RPC call from master to slave + SlaveAuthInstance() auth.Auth + + // Get node DB model + DBModel() *model.Node } // Create new node from DB model diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go index ca78f65..be92ba5 100644 --- a/pkg/cluster/slave.go +++ b/pkg/cluster/slave.go @@ -11,16 +11,14 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/util" "io" "net/url" - "path" "strings" "sync" "time" ) type SlaveNode struct { - Model *model.Node - AuthInstance auth.Auth - Active bool + Model *model.Node + Active bool caller slaveCaller callback func(bool, uint) @@ -36,15 +34,30 @@ type slaveCaller struct { // Init 初始化节点 func (node *SlaveNode) Init(nodeModel *model.Node) { node.lock.Lock() + defer node.lock.Unlock() node.Model = nodeModel - node.AuthInstance = auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)} - node.caller.Client = request.NewClient() + + // Init http request client + var endpoint *url.URL + if serverURL, err := url.Parse(node.Model.Server); err == nil { + var controller *url.URL + controller, _ = url.Parse("/api/v3/slave") + endpoint = serverURL.ResolveReference(controller) + } + + signTTL := model.GetIntSetting("slave_api_timeout", 60) + node.caller.Client = request.NewClient( + request.WithMasterMeta(), + request.WithTimeout(time.Duration(signTTL)*time.Second), + request.WithCredential(auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)}, int64(signTTL)), + request.WithEndpoint(endpoint.String()), + ) + node.caller.parent = node node.Active = true if node.close != nil { node.close <- true } - node.lock.Unlock() go node.StartPingLoop() } @@ -77,15 +90,11 @@ func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingRe } bodyReader := strings.NewReader(string(reqBodyEncoded)) - signTTL := model.GetIntSetting("slave_api_timeout", 60) resp, err := node.caller.Client.Request( "POST", - node.getAPIUrl("heartbeat"), + "heartbeat", bodyReader, - request.WithMasterMeta(), - request.WithTimeout(time.Duration(signTTL)*time.Second), - request.WithCredential(node.AuthInstance, int64(signTTL)), ).CheckHTTPResponse(200).DecodeResponse() if err != nil { return nil, err @@ -145,36 +154,6 @@ func (node *SlaveNode) ID() uint { return node.Model.ID } -// getAPIUrl 获取接口请求地址 -func (node *SlaveNode) getAPIUrl(scope string) string { - node.lock.RLock() - serverURL, err := url.Parse(node.Model.Server) - node.lock.RUnlock() - if err != nil { - return "" - } - - var controller *url.URL - controller, _ = url.Parse("/api/v3/slave") - controller.Path = path.Join(controller.Path, scope) - return serverURL.ResolveReference(controller).String() -} - -func (node *SlaveNode) changeStatus(isActive bool) { - node.lock.RLock() - id := node.Model.ID - if isActive != node.Active { - node.lock.RUnlock() - node.lock.Lock() - node.Active = isActive - node.lock.Unlock() - node.callback(isActive, id) - } else { - node.lock.RUnlock() - } - -} - func (node *SlaveNode) StartPingLoop() { node.lock.Lock() node.close = make(chan bool) @@ -235,6 +214,31 @@ loop: } } +func (node *SlaveNode) IsMater() bool { + return false +} + +func (node *SlaveNode) MasterAuthInstance() auth.Auth { + node.lock.RLock() + defer node.lock.RUnlock() + + return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)} +} + +func (node *SlaveNode) SlaveAuthInstance() auth.Auth { + node.lock.RLock() + defer node.lock.RUnlock() + + return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)} +} + +func (node *SlaveNode) DBModel() *model.Node { + node.lock.RLock() + defer node.lock.RUnlock() + + return node.Model +} + // getHeartbeatContent gets serializer.NodePingReq used to send heartbeat to slave func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingReq { return &serializer.NodePingReq{ @@ -246,15 +250,19 @@ func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingRe } } -func (node *SlaveNode) IsMater() bool { - return false -} - -func (node *SlaveNode) GetAuthInstance() auth.Auth { +func (node *SlaveNode) changeStatus(isActive bool) { node.lock.RLock() - defer node.lock.RUnlock() + id := node.Model.ID + if isActive != node.Active { + node.lock.RUnlock() + node.lock.Lock() + node.Active = isActive + node.lock.Unlock() + node.callback(isActive, id) + } else { + node.lock.RUnlock() + } - return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)} } func (s *slaveCaller) Init() error { @@ -268,14 +276,10 @@ func (s *slaveCaller) SendAria2Call(body *serializer.SlaveAria2Call, scope strin return nil, err } - signTTL := model.GetIntSetting("slave_api_timeout", 60) return s.Client.Request( "POST", - s.parent.getAPIUrl("aria2/"+scope), + "aria2/"+scope, reqReader, - request.WithMasterMeta(), - request.WithTimeout(time.Duration(signTTL)*time.Second), - request.WithCredential(s.parent.AuthInstance, int64(signTTL)), ).CheckHTTPResponse(200).DecodeResponse() } diff --git a/pkg/filesystem/driver/shadow/slave/handler.go b/pkg/filesystem/driver/shadow/slave/handler.go index dbd8e07..515d9e2 100644 --- a/pkg/filesystem/driver/shadow/slave/handler.go +++ b/pkg/filesystem/driver/shadow/slave/handler.go @@ -1,7 +1,9 @@ package slave import ( + "bytes" "context" + "encoding/json" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver" @@ -11,6 +13,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "io" "net/url" + "time" ) // Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果 @@ -23,42 +26,81 @@ type Driver struct { // NewDriver 返回新的从机指派处理器 func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler { + var endpoint *url.URL + if serverURL, err := url.Parse(node.DBModel().Server); err == nil { + var controller *url.URL + controller, _ = url.Parse("/api/v3/slave") + endpoint = serverURL.ResolveReference(controller) + } + + signTTL := model.GetIntSetting("slave_api_timeout", 60) return &Driver{ node: node, handler: handler, policy: policy, - client: request.NewClient(request.WithMasterMeta()), + client: request.NewClient( + request.WithMasterMeta(), + request.WithTimeout(time.Duration(signTTL)*time.Second), + request.WithCredential(node.SlaveAuthInstance(), int64(signTTL)), + request.WithEndpoint(endpoint.String()), + ), } } +// Put 将ctx中指定的从机物理文件由从机上传到目标存储策略 func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error { - realBase, ok := ctx.Value(fsctx.SlaveSrcPath).(string) + src, ok := ctx.Value(fsctx.SlaveSrcPath).(string) if !ok { return ErrSlaveSrcPathNotExist } + req := serializer.SlaveTransferReq{ + Src: src, + Dst: dst, + Policy: d.policy, + } + + body, err := json.Marshal(req) + if err != nil { + return err + } + + res, err := d.client.Request("PUT", "task、transfer", bytes.NewReader(body)). + CheckHTTPResponse(200). + DecodeResponse() + if err != nil { + return err + } + + if res.Code != 0 { + return serializer.NewErrorFromResponse(res) + } + + // TODO: subscribe and wait + + return ErrNotImplemented } func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) { - panic("implement me") + return nil, ErrNotImplemented } func (d Driver) Get(ctx context.Context, path string) (response.RSCloser, error) { - panic("implement me") + return nil, ErrNotImplemented } func (d Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) { - panic("implement me") + return nil, ErrNotImplemented } func (d Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) { - panic("implement me") + return "", ErrNotImplemented } func (d Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) { - panic("implement me") + return serializer.UploadCredential{}, ErrNotImplemented } func (d Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) { - panic("implement me") + return nil, ErrNotImplemented } diff --git a/pkg/request/options.go b/pkg/request/options.go index 0e3511e..ed913e0 100644 --- a/pkg/request/options.go +++ b/pkg/request/options.go @@ -4,6 +4,7 @@ import ( "context" "github.com/cloudreve/Cloudreve/v3/pkg/auth" "net/http" + "net/url" "time" ) @@ -20,6 +21,7 @@ type options struct { ctx context.Context contentLength int64 masterMeta bool + endpoint *url.URL } type optionFunc func(*options) @@ -90,3 +92,11 @@ func WithMasterMeta() Option { o.masterMeta = true }) } + +// Endpoint 使用同一的请求Endpoint +func WithEndpoint(endpoint string) Option { + endpointURL, _ := url.Parse(endpoint) + return optionFunc(func(o *options) { + o.endpoint = endpointURL + }) +} diff --git a/pkg/request/request.go b/pkg/request/request.go index eef19ee..c8e6054 100644 --- a/pkg/request/request.go +++ b/pkg/request/request.go @@ -7,6 +7,8 @@ import ( "io" "io/ioutil" "net/http" + "path" + "sync" model "github.com/cloudreve/Cloudreve/v3/models" "github.com/cloudreve/Cloudreve/v3/pkg/auth" @@ -31,6 +33,7 @@ type Client interface { // HTTPClient 实现 Client 接口 type HTTPClient struct { + mu sync.Mutex options *options } @@ -49,25 +52,35 @@ func NewClient(opts ...Option) Client { // Request 发送HTTP请求 func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Option) *Response { // 应用额外设置 + c.mu.Lock() + options := *c.options + c.mu.Unlock() for _, o := range opts { - o.apply(c.options) + o.apply(&options) } // 创建请求客户端 - client := &http.Client{Timeout: c.options.timeout} + client := &http.Client{Timeout: options.timeout} // size为0时将body设为nil - if c.options.contentLength == 0 { + if options.contentLength == 0 { body = nil } + // 确定请求URL + if options.endpoint != nil { + targetURL := *options.endpoint + targetURL.Path = path.Join(targetURL.Path, target) + target = targetURL.String() + } + // 创建请求 var ( req *http.Request err error ) - if c.options.ctx != nil { - req, err = http.NewRequestWithContext(c.options.ctx, method, target, body) + if options.ctx != nil { + req, err = http.NewRequestWithContext(options.ctx, method, target, body) } else { req, err = http.NewRequest(method, target, body) } @@ -76,21 +89,21 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio } // 添加请求相关设置 - req.Header = c.options.header + req.Header = options.header - if c.options.masterMeta { + if options.masterMeta { req.Header.Add("X-Site-Url", model.GetSiteURL().String()) req.Header.Add("X-Site-Id", model.GetSettingByName("siteID")) req.Header.Add("X-Cloudreve-Version", conf.BackendVersion) } - if c.options.contentLength != -1 { - req.ContentLength = c.options.contentLength + if options.contentLength != -1 { + req.ContentLength = options.contentLength } // 签名请求 - if c.options.sign != nil { - auth.SignRequest(c.options.sign, req, c.options.signTTL) + if options.sign != nil { + auth.SignRequest(options.sign, req, options.signTTL) } // 发送请求 diff --git a/pkg/serializer/slave.go b/pkg/serializer/slave.go index af65ac3..f4d7adb 100644 --- a/pkg/serializer/slave.go +++ b/pkg/serializer/slave.go @@ -1,6 +1,9 @@ package serializer -import model "github.com/cloudreve/Cloudreve/v3/models" +import ( + "fmt" + model "github.com/cloudreve/Cloudreve/v3/models" +) // RemoteDeleteRequest 远程策略删除接口请求正文 type RemoteDeleteRequest struct { @@ -32,3 +35,15 @@ type SlaveAria2Call struct { GroupOptions map[string]interface{} `json:"group_options"` Files []int `json:"files"` } + +// SlaveTransferReq 从机中转任务创建请求 +type SlaveTransferReq struct { + Src string `json:"src"` + Dst string `json:"dst"` + Policy *model.Policy +} + +// Hash 返回创建请求的唯一标识,保持创建请求幂等 +func (s *SlaveTransferReq) Hash(id string) string { + return fmt.Sprintf("transfer-%s-%s-%s-%d", id, s.Src, s.Dst, s.Policy.ID) +} diff --git a/pkg/slave/slave.go b/pkg/slave/slave.go index f3cd9d6..8f7f7b9 100644 --- a/pkg/slave/slave.go +++ b/pkg/slave/slave.go @@ -9,6 +9,7 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/cluster" "github.com/cloudreve/Cloudreve/v3/pkg/request" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/task" "net/http" "net/url" "sync" @@ -26,6 +27,9 @@ type Controller interface { // Send event change message to master node SendAria2Notification(string, common.StatusEvent) error + + // Submit async task into task pool + SubmitTask(string, task.Job, string) error } type slaveController struct { @@ -36,10 +40,11 @@ type slaveController struct { // info of master node type masterInfo struct { - slaveID uint - id string - ttl int - url *url.URL + slaveID uint + id string + ttl int + url *url.URL + jobTracker map[string]bool // used to invoke aria2 rpc calls instance cluster.Node } @@ -72,10 +77,11 @@ func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializ } c.masters[req.SiteID] = masterInfo{ - slaveID: req.Node.ID, - id: req.SiteID, - url: masterUrl, - ttl: req.CredentialTTL, + slaveID: req.Node.ID, + id: req.SiteID, + url: masterUrl, + ttl: req.CredentialTTL, + jobTracker: make(map[string]bool), instance: cluster.NewNodeFromDBModel(&model.Node{ MasterKey: req.Node.MasterKey, Type: model.MasterNodeType, @@ -112,7 +118,7 @@ func (c *slaveController) SendAria2Notification(id string, msg common.StatusEven node.url.ResolveReference(apiPath).String(), nil, request.WithHeader(http.Header{"X-Node-Id": []string{fmt.Sprintf("%d", node.slaveID)}}), - request.WithCredential(node.instance.GetAuthInstance(), int64(node.ttl)), + request.WithCredential(node.instance.MasterAuthInstance(), int64(node.ttl)), ).CheckHTTPResponse(200).DecodeResponse() if err != nil { return err @@ -128,3 +134,21 @@ func (c *slaveController) SendAria2Notification(id string, msg common.StatusEven c.lock.RUnlock() return ErrMasterNotFound } + +// SubmitTask 提交异步任务 +func (c *slaveController) SubmitTask(id string, job task.Job, hash string) error { + c.lock.RLock() + defer c.lock.RUnlock() + + if node, ok := c.masters[id]; ok { + if _, ok := node.jobTracker[hash]; ok { + // 任务已存在,直接返回 + return nil + } + + task.TaskPoll.Submit(job) + return nil + } + + return ErrMasterNotFound +} diff --git a/pkg/task/pool.go b/pkg/task/pool.go index d44877d..4fe550f 100644 --- a/pkg/task/pool.go +++ b/pkg/task/pool.go @@ -2,6 +2,7 @@ package task import ( model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/conf" "github.com/cloudreve/Cloudreve/v3/pkg/util" ) @@ -56,5 +57,7 @@ func Init() { TaskPoll.Add(maxWorker) util.Log().Info("初始化任务队列,WorkerNum = %d", maxWorker) - Resume() + if conf.SystemConfig.Mode == "master" { + Resume() + } } diff --git a/pkg/task/slavetask/transfer.go b/pkg/task/slavetask/transfer.go new file mode 100644 index 0000000..0210fe9 --- /dev/null +++ b/pkg/task/slavetask/transfer.go @@ -0,0 +1,63 @@ +package slavetask + +import ( + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/task" + "github.com/cloudreve/Cloudreve/v3/pkg/util" +) + +// TransferTask 文件中转任务 +type TransferTask struct { + Err *task.JobError + Req *serializer.SlaveTransferReq +} + +// Props 获取任务属性 +func (job *TransferTask) Props() string { + return "" +} + +// Type 获取任务类型 +func (job *TransferTask) Type() int { + return 0 +} + +// Creator 获取创建者ID +func (job *TransferTask) Creator() uint { + return 0 +} + +// Model 获取任务的数据库模型 +func (job *TransferTask) Model() *model.Task { + return nil +} + +// SetStatus 设定状态 +func (job *TransferTask) SetStatus(status int) { +} + +// SetError 设定任务失败信息 +func (job *TransferTask) SetError(err *task.JobError) { + job.Err = err + +} + +// SetErrorMsg 设定任务失败信息 +func (job *TransferTask) SetErrorMsg(msg string, err error) { + jobErr := &task.JobError{Msg: msg} + if err != nil { + jobErr.Error = err.Error() + } + job.SetError(jobErr) +} + +// GetError 返回任务失败信息 +func (job *TransferTask) GetError() *task.JobError { + return job.Err +} + +// Do 开始执行任务 +func (job *TransferTask) Do() { + util.Log().Println("job", "") +} diff --git a/routers/controllers/slave.go b/routers/controllers/slave.go index 6b3100a..c766fa2 100644 --- a/routers/controllers/slave.go +++ b/routers/controllers/slave.go @@ -232,3 +232,14 @@ func SlaveSelectTask(c *gin.Context) { c.JSON(200, ErrorResponse(err)) } } + +// SlaveCreateTransferTask 从机创建中转任务 +func SlaveCreateTransferTask(c *gin.Context) { + var service serializer.SlaveTransferReq + if err := c.ShouldBindJSON(&service); err == nil { + res := explorer.CreateTransferTask(c, &service) + c.JSON(200, res) + } else { + c.JSON(200, ErrorResponse(err)) + } +} diff --git a/routers/router.go b/routers/router.go index 0735f4c..f1ed77b 100644 --- a/routers/router.go +++ b/routers/router.go @@ -68,6 +68,12 @@ func InitSlaveRouter() *gin.Engine { // 选取任务文件 aria2.POST("select", controllers.SlaveSelectTask) } + + // 异步任务 + task := v3.Group("task") + { + task.PUT("transfer", controllers.SlaveCreateTransferTask) + } } return r } diff --git a/service/explorer/file.go b/service/explorer/file.go index d128c86..8b56871 100644 --- a/service/explorer/file.go +++ b/service/explorer/file.go @@ -2,7 +2,6 @@ package explorer import ( "context" - "encoding/base64" "encoding/json" "fmt" "io/ioutil" @@ -20,7 +19,6 @@ import ( "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" "github.com/cloudreve/Cloudreve/v3/pkg/serializer" "github.com/gin-gonic/gin" - "github.com/jinzhu/gorm" ) // SingleFileService 对单文件进行操作的五福,path为文件完整路径 @@ -43,29 +41,6 @@ type DownloadService struct { ID string `uri:"id" binding:"required"` } -// SlaveDownloadService 从机文件下載服务 -type SlaveDownloadService struct { - PathEncoded string `uri:"path" binding:"required"` - Name string `uri:"name" binding:"required"` - Speed int `uri:"speed" binding:"min=0"` -} - -// SlaveFileService 从机单文件文件相关服务 -type SlaveFileService struct { - PathEncoded string `uri:"path" binding:"required"` -} - -// SlaveFilesService 从机多文件相关服务 -type SlaveFilesService struct { - Files []string `json:"files" binding:"required,gt=0"` -} - -// SlaveListService 从机列表服务 -type SlaveListService struct { - Path string `json:"path" binding:"required,min=1,max=65535"` - Recursive bool `json:"recursive"` -} - // New 创建新文件 func (service *SingleFileService) Create(c *gin.Context) serializer.Response { // 创建文件系统 @@ -449,106 +424,3 @@ func (service *FileIDService) PutContent(ctx context.Context, c *gin.Context) se Code: 0, } } - -// ServeFile 通过签名的URL下载从机文件 -func (service *SlaveDownloadService) ServeFile(ctx context.Context, c *gin.Context, isDownload bool) serializer.Response { - // 创建文件系统 - fs, err := filesystem.NewAnonymousFileSystem() - if err != nil { - return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) - } - defer fs.Recycle() - - // 解码文件路径 - fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded) - if err != nil { - return serializer.ParamErr("无法解析的文件地址", err) - } - - // 根据URL里的信息创建一个文件对象和用户对象 - file := model.File{ - Name: service.Name, - SourceName: string(fileSource), - Policy: model.Policy{ - Model: gorm.Model{ID: 1}, - Type: "local", - }, - } - fs.User = &model.User{ - Group: model.Group{SpeedLimit: service.Speed}, - } - fs.FileTarget = []model.File{file} - - // 开始处理下载 - ctx = context.WithValue(ctx, fsctx.GinCtx, c) - rs, err := fs.GetDownloadContent(ctx, 0) - if err != nil { - return serializer.Err(serializer.CodeNotSet, err.Error(), err) - } - defer rs.Close() - - // 设置下载文件名 - if isDownload { - c.Header("Content-Disposition", "attachment; filename=\""+url.PathEscape(fs.FileTarget[0].Name)+"\"") - } - - // 发送文件 - http.ServeContent(c.Writer, c.Request, fs.FileTarget[0].Name, time.Now(), rs) - - return serializer.Response{ - Code: 0, - } -} - -// Delete 通过签名的URL删除从机文件 -func (service *SlaveFilesService) Delete(ctx context.Context, c *gin.Context) serializer.Response { - // 创建文件系统 - fs, err := filesystem.NewAnonymousFileSystem() - if err != nil { - return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) - } - defer fs.Recycle() - - // 删除文件 - failed, err := fs.Handler.Delete(ctx, service.Files) - if err != nil { - // 将Data字段写为字符串方便主控端解析 - data, _ := json.Marshal(serializer.RemoteDeleteRequest{Files: failed}) - - return serializer.Response{ - Code: serializer.CodeNotFullySuccess, - Data: string(data), - Msg: fmt.Sprintf("有 %d 个文件未能成功删除", len(failed)), - Error: err.Error(), - } - } - return serializer.Response{Code: 0} -} - -// Thumb 通过签名URL获取从机文件缩略图 -func (service *SlaveFileService) Thumb(ctx context.Context, c *gin.Context) serializer.Response { - // 创建文件系统 - fs, err := filesystem.NewAnonymousFileSystem() - if err != nil { - return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) - } - defer fs.Recycle() - - // 解码文件路径 - fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded) - if err != nil { - return serializer.ParamErr("无法解析的文件地址", err) - } - fs.FileTarget = []model.File{{SourceName: string(fileSource), PicInfo: "1,1"}} - - // 获取缩略图 - resp, err := fs.GetThumb(ctx, 0) - if err != nil { - return serializer.Err(serializer.CodeNotSet, "无法获取缩略图", err) - } - - defer resp.Content.Close() - http.ServeContent(c.Writer, c.Request, "thumb.png", time.Now(), resp.Content) - - return serializer.Response{Code: 0} -} diff --git a/service/explorer/slave.go b/service/explorer/slave.go new file mode 100644 index 0000000..9f6bb94 --- /dev/null +++ b/service/explorer/slave.go @@ -0,0 +1,162 @@ +package explorer + +import ( + "context" + "encoding/base64" + "encoding/json" + "fmt" + model "github.com/cloudreve/Cloudreve/v3/models" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem" + "github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx" + "github.com/cloudreve/Cloudreve/v3/pkg/serializer" + "github.com/cloudreve/Cloudreve/v3/pkg/slave" + "github.com/cloudreve/Cloudreve/v3/pkg/task/slavetask" + "github.com/gin-gonic/gin" + "github.com/jinzhu/gorm" + "net/http" + "net/url" + "time" +) + +// SlaveDownloadService 从机文件下載服务 +type SlaveDownloadService struct { + PathEncoded string `uri:"path" binding:"required"` + Name string `uri:"name" binding:"required"` + Speed int `uri:"speed" binding:"min=0"` +} + +// SlaveFileService 从机单文件文件相关服务 +type SlaveFileService struct { + PathEncoded string `uri:"path" binding:"required"` +} + +// SlaveFilesService 从机多文件相关服务 +type SlaveFilesService struct { + Files []string `json:"files" binding:"required,gt=0"` +} + +// SlaveListService 从机列表服务 +type SlaveListService struct { + Path string `json:"path" binding:"required,min=1,max=65535"` + Recursive bool `json:"recursive"` +} + +// ServeFile 通过签名的URL下载从机文件 +func (service *SlaveDownloadService) ServeFile(ctx context.Context, c *gin.Context, isDownload bool) serializer.Response { + // 创建文件系统 + fs, err := filesystem.NewAnonymousFileSystem() + if err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) + } + defer fs.Recycle() + + // 解码文件路径 + fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded) + if err != nil { + return serializer.ParamErr("无法解析的文件地址", err) + } + + // 根据URL里的信息创建一个文件对象和用户对象 + file := model.File{ + Name: service.Name, + SourceName: string(fileSource), + Policy: model.Policy{ + Model: gorm.Model{ID: 1}, + Type: "local", + }, + } + fs.User = &model.User{ + Group: model.Group{SpeedLimit: service.Speed}, + } + fs.FileTarget = []model.File{file} + + // 开始处理下载 + ctx = context.WithValue(ctx, fsctx.GinCtx, c) + rs, err := fs.GetDownloadContent(ctx, 0) + if err != nil { + return serializer.Err(serializer.CodeNotSet, err.Error(), err) + } + defer rs.Close() + + // 设置下载文件名 + if isDownload { + c.Header("Content-Disposition", "attachment; filename=\""+url.PathEscape(fs.FileTarget[0].Name)+"\"") + } + + // 发送文件 + http.ServeContent(c.Writer, c.Request, fs.FileTarget[0].Name, time.Now(), rs) + + return serializer.Response{ + Code: 0, + } +} + +// Delete 通过签名的URL删除从机文件 +func (service *SlaveFilesService) Delete(ctx context.Context, c *gin.Context) serializer.Response { + // 创建文件系统 + fs, err := filesystem.NewAnonymousFileSystem() + if err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) + } + defer fs.Recycle() + + // 删除文件 + failed, err := fs.Handler.Delete(ctx, service.Files) + if err != nil { + // 将Data字段写为字符串方便主控端解析 + data, _ := json.Marshal(serializer.RemoteDeleteRequest{Files: failed}) + + return serializer.Response{ + Code: serializer.CodeNotFullySuccess, + Data: string(data), + Msg: fmt.Sprintf("有 %d 个文件未能成功删除", len(failed)), + Error: err.Error(), + } + } + return serializer.Response{Code: 0} +} + +// Thumb 通过签名URL获取从机文件缩略图 +func (service *SlaveFileService) Thumb(ctx context.Context, c *gin.Context) serializer.Response { + // 创建文件系统 + fs, err := filesystem.NewAnonymousFileSystem() + if err != nil { + return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err) + } + defer fs.Recycle() + + // 解码文件路径 + fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded) + if err != nil { + return serializer.ParamErr("无法解析的文件地址", err) + } + fs.FileTarget = []model.File{{SourceName: string(fileSource), PicInfo: "1,1"}} + + // 获取缩略图 + resp, err := fs.GetThumb(ctx, 0) + if err != nil { + return serializer.Err(serializer.CodeNotSet, "无法获取缩略图", err) + } + + defer resp.Content.Close() + http.ServeContent(c.Writer, c.Request, "thumb.png", time.Now(), resp.Content) + + return serializer.Response{Code: 0} +} + +// CreateTransferTask 创建从机文件转存任务 +func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serializer.Response { + if id, ok := c.Get("MasterSiteID"); ok { + job := &slavetask.TransferTask{ + Req: req, + } + + if err := slave.DefaultController.SubmitTask(id.(string), job, req.Hash(id.(string))); err != nil { + return serializer.Err(serializer.CodeInternalSetting, "任务创建失败", err) + } + + return serializer.Response{} + } + + return serializer.ParamErr("未知的主机节点ID", nil) +}