Feat: slave receive async task from master

This commit is contained in:
HFO4 2021-08-29 20:37:58 +08:00
parent 23d1839b29
commit 5662daa593
15 changed files with 460 additions and 215 deletions

View file

@ -56,7 +56,7 @@ func SlaveRPCSignRequired() gin.HandlerFunc {
return
}
SignRequired(slaveNode.GetAuthInstance())(c)
SignRequired(slaveNode.MasterAuthInstance())(c)
}
}

View file

@ -76,13 +76,20 @@ func (node *MasterNode) IsFeatureEnabled(feature string) bool {
}
}
func (node *MasterNode) GetAuthInstance() auth.Auth {
func (node *MasterNode) MasterAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
}
func (node *MasterNode) SlaveAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)}
}
// SubscribeStatusChange 订阅节点状态更改
func (node *MasterNode) SubscribeStatusChange(callback func(isActive bool, id uint)) {
}
@ -122,6 +129,13 @@ func (node *MasterNode) IsMater() bool {
return true
}
func (node *MasterNode) DBModel() *model.Node {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model
}
func (r *rpcService) Init() error {
r.parent.lock.Lock()
defer r.parent.lock.Unlock()

View file

@ -36,7 +36,13 @@ type Node interface {
IsMater() bool
// Get auth instance used to check RPC call from slave to master
GetAuthInstance() auth.Auth
MasterAuthInstance() auth.Auth
// Get auth instance used to check RPC call from master to slave
SlaveAuthInstance() auth.Auth
// Get node DB model
DBModel() *model.Node
}
// Create new node from DB model

View file

@ -11,16 +11,14 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"io"
"net/url"
"path"
"strings"
"sync"
"time"
)
type SlaveNode struct {
Model *model.Node
AuthInstance auth.Auth
Active bool
Model *model.Node
Active bool
caller slaveCaller
callback func(bool, uint)
@ -36,15 +34,30 @@ type slaveCaller struct {
// Init 初始化节点
func (node *SlaveNode) Init(nodeModel *model.Node) {
node.lock.Lock()
defer node.lock.Unlock()
node.Model = nodeModel
node.AuthInstance = auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)}
node.caller.Client = request.NewClient()
// Init http request client
var endpoint *url.URL
if serverURL, err := url.Parse(node.Model.Server); err == nil {
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave")
endpoint = serverURL.ResolveReference(controller)
}
signTTL := model.GetIntSetting("slave_api_timeout", 60)
node.caller.Client = request.NewClient(
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(auth.HMACAuth{SecretKey: []byte(nodeModel.SlaveKey)}, int64(signTTL)),
request.WithEndpoint(endpoint.String()),
)
node.caller.parent = node
node.Active = true
if node.close != nil {
node.close <- true
}
node.lock.Unlock()
go node.StartPingLoop()
}
@ -77,15 +90,11 @@ func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingRe
}
bodyReader := strings.NewReader(string(reqBodyEncoded))
signTTL := model.GetIntSetting("slave_api_timeout", 60)
resp, err := node.caller.Client.Request(
"POST",
node.getAPIUrl("heartbeat"),
"heartbeat",
bodyReader,
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(node.AuthInstance, int64(signTTL)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return nil, err
@ -145,36 +154,6 @@ func (node *SlaveNode) ID() uint {
return node.Model.ID
}
// getAPIUrl 获取接口请求地址
func (node *SlaveNode) getAPIUrl(scope string) string {
node.lock.RLock()
serverURL, err := url.Parse(node.Model.Server)
node.lock.RUnlock()
if err != nil {
return ""
}
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave")
controller.Path = path.Join(controller.Path, scope)
return serverURL.ResolveReference(controller).String()
}
func (node *SlaveNode) changeStatus(isActive bool) {
node.lock.RLock()
id := node.Model.ID
if isActive != node.Active {
node.lock.RUnlock()
node.lock.Lock()
node.Active = isActive
node.lock.Unlock()
node.callback(isActive, id)
} else {
node.lock.RUnlock()
}
}
func (node *SlaveNode) StartPingLoop() {
node.lock.Lock()
node.close = make(chan bool)
@ -235,6 +214,31 @@ loop:
}
}
func (node *SlaveNode) IsMater() bool {
return false
}
func (node *SlaveNode) MasterAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
}
func (node *SlaveNode) SlaveAuthInstance() auth.Auth {
node.lock.RLock()
defer node.lock.RUnlock()
return auth.HMACAuth{SecretKey: []byte(node.Model.SlaveKey)}
}
func (node *SlaveNode) DBModel() *model.Node {
node.lock.RLock()
defer node.lock.RUnlock()
return node.Model
}
// getHeartbeatContent gets serializer.NodePingReq used to send heartbeat to slave
func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingReq {
return &serializer.NodePingReq{
@ -246,15 +250,19 @@ func (node *SlaveNode) getHeartbeatContent(isUpdate bool) *serializer.NodePingRe
}
}
func (node *SlaveNode) IsMater() bool {
return false
}
func (node *SlaveNode) GetAuthInstance() auth.Auth {
func (node *SlaveNode) changeStatus(isActive bool) {
node.lock.RLock()
defer node.lock.RUnlock()
id := node.Model.ID
if isActive != node.Active {
node.lock.RUnlock()
node.lock.Lock()
node.Active = isActive
node.lock.Unlock()
node.callback(isActive, id)
} else {
node.lock.RUnlock()
}
return auth.HMACAuth{SecretKey: []byte(node.Model.MasterKey)}
}
func (s *slaveCaller) Init() error {
@ -268,14 +276,10 @@ func (s *slaveCaller) SendAria2Call(body *serializer.SlaveAria2Call, scope strin
return nil, err
}
signTTL := model.GetIntSetting("slave_api_timeout", 60)
return s.Client.Request(
"POST",
s.parent.getAPIUrl("aria2/"+scope),
"aria2/"+scope,
reqReader,
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(s.parent.AuthInstance, int64(signTTL)),
).CheckHTTPResponse(200).DecodeResponse()
}

View file

@ -1,7 +1,9 @@
package slave
import (
"bytes"
"context"
"encoding/json"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
@ -11,6 +13,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
"net/url"
"time"
)
// Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果
@ -23,42 +26,81 @@ type Driver struct {
// NewDriver 返回新的从机指派处理器
func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler {
var endpoint *url.URL
if serverURL, err := url.Parse(node.DBModel().Server); err == nil {
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave")
endpoint = serverURL.ResolveReference(controller)
}
signTTL := model.GetIntSetting("slave_api_timeout", 60)
return &Driver{
node: node,
handler: handler,
policy: policy,
client: request.NewClient(request.WithMasterMeta()),
client: request.NewClient(
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(node.SlaveAuthInstance(), int64(signTTL)),
request.WithEndpoint(endpoint.String()),
),
}
}
// Put 将ctx中指定的从机物理文件由从机上传到目标存储策略
func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
realBase, ok := ctx.Value(fsctx.SlaveSrcPath).(string)
src, ok := ctx.Value(fsctx.SlaveSrcPath).(string)
if !ok {
return ErrSlaveSrcPathNotExist
}
req := serializer.SlaveTransferReq{
Src: src,
Dst: dst,
Policy: d.policy,
}
body, err := json.Marshal(req)
if err != nil {
return err
}
res, err := d.client.Request("PUT", "task、transfer", bytes.NewReader(body)).
CheckHTTPResponse(200).
DecodeResponse()
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
// TODO: subscribe and wait
return ErrNotImplemented
}
func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) {
panic("implement me")
return nil, ErrNotImplemented
}
func (d Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
panic("implement me")
return nil, ErrNotImplemented
}
func (d Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
panic("implement me")
return nil, ErrNotImplemented
}
func (d Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
panic("implement me")
return "", ErrNotImplemented
}
func (d Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) {
panic("implement me")
return serializer.UploadCredential{}, ErrNotImplemented
}
func (d Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
panic("implement me")
return nil, ErrNotImplemented
}

View file

@ -4,6 +4,7 @@ import (
"context"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"net/http"
"net/url"
"time"
)
@ -20,6 +21,7 @@ type options struct {
ctx context.Context
contentLength int64
masterMeta bool
endpoint *url.URL
}
type optionFunc func(*options)
@ -90,3 +92,11 @@ func WithMasterMeta() Option {
o.masterMeta = true
})
}
// Endpoint 使用同一的请求Endpoint
func WithEndpoint(endpoint string) Option {
endpointURL, _ := url.Parse(endpoint)
return optionFunc(func(o *options) {
o.endpoint = endpointURL
})
}

View file

@ -7,6 +7,8 @@ import (
"io"
"io/ioutil"
"net/http"
"path"
"sync"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
@ -31,6 +33,7 @@ type Client interface {
// HTTPClient 实现 Client 接口
type HTTPClient struct {
mu sync.Mutex
options *options
}
@ -49,25 +52,35 @@ func NewClient(opts ...Option) Client {
// Request 发送HTTP请求
func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Option) *Response {
// 应用额外设置
c.mu.Lock()
options := *c.options
c.mu.Unlock()
for _, o := range opts {
o.apply(c.options)
o.apply(&options)
}
// 创建请求客户端
client := &http.Client{Timeout: c.options.timeout}
client := &http.Client{Timeout: options.timeout}
// size为0时将body设为nil
if c.options.contentLength == 0 {
if options.contentLength == 0 {
body = nil
}
// 确定请求URL
if options.endpoint != nil {
targetURL := *options.endpoint
targetURL.Path = path.Join(targetURL.Path, target)
target = targetURL.String()
}
// 创建请求
var (
req *http.Request
err error
)
if c.options.ctx != nil {
req, err = http.NewRequestWithContext(c.options.ctx, method, target, body)
if options.ctx != nil {
req, err = http.NewRequestWithContext(options.ctx, method, target, body)
} else {
req, err = http.NewRequest(method, target, body)
}
@ -76,21 +89,21 @@ func (c HTTPClient) Request(method, target string, body io.Reader, opts ...Optio
}
// 添加请求相关设置
req.Header = c.options.header
req.Header = options.header
if c.options.masterMeta {
if options.masterMeta {
req.Header.Add("X-Site-Url", model.GetSiteURL().String())
req.Header.Add("X-Site-Id", model.GetSettingByName("siteID"))
req.Header.Add("X-Cloudreve-Version", conf.BackendVersion)
}
if c.options.contentLength != -1 {
req.ContentLength = c.options.contentLength
if options.contentLength != -1 {
req.ContentLength = options.contentLength
}
// 签名请求
if c.options.sign != nil {
auth.SignRequest(c.options.sign, req, c.options.signTTL)
if options.sign != nil {
auth.SignRequest(options.sign, req, options.signTTL)
}
// 发送请求

View file

@ -1,6 +1,9 @@
package serializer
import model "github.com/cloudreve/Cloudreve/v3/models"
import (
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
)
// RemoteDeleteRequest 远程策略删除接口请求正文
type RemoteDeleteRequest struct {
@ -32,3 +35,15 @@ type SlaveAria2Call struct {
GroupOptions map[string]interface{} `json:"group_options"`
Files []int `json:"files"`
}
// SlaveTransferReq 从机中转任务创建请求
type SlaveTransferReq struct {
Src string `json:"src"`
Dst string `json:"dst"`
Policy *model.Policy
}
// Hash 返回创建请求的唯一标识,保持创建请求幂等
func (s *SlaveTransferReq) Hash(id string) string {
return fmt.Sprintf("transfer-%s-%s-%s-%d", id, s.Src, s.Dst, s.Policy.ID)
}

View file

@ -9,6 +9,7 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"net/http"
"net/url"
"sync"
@ -26,6 +27,9 @@ type Controller interface {
// Send event change message to master node
SendAria2Notification(string, common.StatusEvent) error
// Submit async task into task pool
SubmitTask(string, task.Job, string) error
}
type slaveController struct {
@ -36,10 +40,11 @@ type slaveController struct {
// info of master node
type masterInfo struct {
slaveID uint
id string
ttl int
url *url.URL
slaveID uint
id string
ttl int
url *url.URL
jobTracker map[string]bool
// used to invoke aria2 rpc calls
instance cluster.Node
}
@ -72,10 +77,11 @@ func (c *slaveController) HandleHeartBeat(req *serializer.NodePingReq) (serializ
}
c.masters[req.SiteID] = masterInfo{
slaveID: req.Node.ID,
id: req.SiteID,
url: masterUrl,
ttl: req.CredentialTTL,
slaveID: req.Node.ID,
id: req.SiteID,
url: masterUrl,
ttl: req.CredentialTTL,
jobTracker: make(map[string]bool),
instance: cluster.NewNodeFromDBModel(&model.Node{
MasterKey: req.Node.MasterKey,
Type: model.MasterNodeType,
@ -112,7 +118,7 @@ func (c *slaveController) SendAria2Notification(id string, msg common.StatusEven
node.url.ResolveReference(apiPath).String(),
nil,
request.WithHeader(http.Header{"X-Node-Id": []string{fmt.Sprintf("%d", node.slaveID)}}),
request.WithCredential(node.instance.GetAuthInstance(), int64(node.ttl)),
request.WithCredential(node.instance.MasterAuthInstance(), int64(node.ttl)),
).CheckHTTPResponse(200).DecodeResponse()
if err != nil {
return err
@ -128,3 +134,21 @@ func (c *slaveController) SendAria2Notification(id string, msg common.StatusEven
c.lock.RUnlock()
return ErrMasterNotFound
}
// SubmitTask 提交异步任务
func (c *slaveController) SubmitTask(id string, job task.Job, hash string) error {
c.lock.RLock()
defer c.lock.RUnlock()
if node, ok := c.masters[id]; ok {
if _, ok := node.jobTracker[hash]; ok {
// 任务已存在,直接返回
return nil
}
task.TaskPoll.Submit(job)
return nil
}
return ErrMasterNotFound
}

View file

@ -2,6 +2,7 @@ package task
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/conf"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
@ -56,5 +57,7 @@ func Init() {
TaskPoll.Add(maxWorker)
util.Log().Info("初始化任务队列WorkerNum = %d", maxWorker)
Resume()
if conf.SystemConfig.Mode == "master" {
Resume()
}
}

View file

@ -0,0 +1,63 @@
package slavetask
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/task"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
)
// TransferTask 文件中转任务
type TransferTask struct {
Err *task.JobError
Req *serializer.SlaveTransferReq
}
// Props 获取任务属性
func (job *TransferTask) Props() string {
return ""
}
// Type 获取任务类型
func (job *TransferTask) Type() int {
return 0
}
// Creator 获取创建者ID
func (job *TransferTask) Creator() uint {
return 0
}
// Model 获取任务的数据库模型
func (job *TransferTask) Model() *model.Task {
return nil
}
// SetStatus 设定状态
func (job *TransferTask) SetStatus(status int) {
}
// SetError 设定任务失败信息
func (job *TransferTask) SetError(err *task.JobError) {
job.Err = err
}
// SetErrorMsg 设定任务失败信息
func (job *TransferTask) SetErrorMsg(msg string, err error) {
jobErr := &task.JobError{Msg: msg}
if err != nil {
jobErr.Error = err.Error()
}
job.SetError(jobErr)
}
// GetError 返回任务失败信息
func (job *TransferTask) GetError() *task.JobError {
return job.Err
}
// Do 开始执行任务
func (job *TransferTask) Do() {
util.Log().Println("job", "")
}

View file

@ -232,3 +232,14 @@ func SlaveSelectTask(c *gin.Context) {
c.JSON(200, ErrorResponse(err))
}
}
// SlaveCreateTransferTask 从机创建中转任务
func SlaveCreateTransferTask(c *gin.Context) {
var service serializer.SlaveTransferReq
if err := c.ShouldBindJSON(&service); err == nil {
res := explorer.CreateTransferTask(c, &service)
c.JSON(200, res)
} else {
c.JSON(200, ErrorResponse(err))
}
}

View file

@ -68,6 +68,12 @@ func InitSlaveRouter() *gin.Engine {
// 选取任务文件
aria2.POST("select", controllers.SlaveSelectTask)
}
// 异步任务
task := v3.Group("task")
{
task.PUT("transfer", controllers.SlaveCreateTransferTask)
}
}
return r
}

View file

@ -2,7 +2,6 @@ package explorer
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
@ -20,7 +19,6 @@ import (
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/gin-gonic/gin"
"github.com/jinzhu/gorm"
)
// SingleFileService 对单文件进行操作的五福path为文件完整路径
@ -43,29 +41,6 @@ type DownloadService struct {
ID string `uri:"id" binding:"required"`
}
// SlaveDownloadService 从机文件下載服务
type SlaveDownloadService struct {
PathEncoded string `uri:"path" binding:"required"`
Name string `uri:"name" binding:"required"`
Speed int `uri:"speed" binding:"min=0"`
}
// SlaveFileService 从机单文件文件相关服务
type SlaveFileService struct {
PathEncoded string `uri:"path" binding:"required"`
}
// SlaveFilesService 从机多文件相关服务
type SlaveFilesService struct {
Files []string `json:"files" binding:"required,gt=0"`
}
// SlaveListService 从机列表服务
type SlaveListService struct {
Path string `json:"path" binding:"required,min=1,max=65535"`
Recursive bool `json:"recursive"`
}
// New 创建新文件
func (service *SingleFileService) Create(c *gin.Context) serializer.Response {
// 创建文件系统
@ -449,106 +424,3 @@ func (service *FileIDService) PutContent(ctx context.Context, c *gin.Context) se
Code: 0,
}
}
// ServeFile 通过签名的URL下载从机文件
func (service *SlaveDownloadService) ServeFile(ctx context.Context, c *gin.Context, isDownload bool) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 解码文件路径
fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded)
if err != nil {
return serializer.ParamErr("无法解析的文件地址", err)
}
// 根据URL里的信息创建一个文件对象和用户对象
file := model.File{
Name: service.Name,
SourceName: string(fileSource),
Policy: model.Policy{
Model: gorm.Model{ID: 1},
Type: "local",
},
}
fs.User = &model.User{
Group: model.Group{SpeedLimit: service.Speed},
}
fs.FileTarget = []model.File{file}
// 开始处理下载
ctx = context.WithValue(ctx, fsctx.GinCtx, c)
rs, err := fs.GetDownloadContent(ctx, 0)
if err != nil {
return serializer.Err(serializer.CodeNotSet, err.Error(), err)
}
defer rs.Close()
// 设置下载文件名
if isDownload {
c.Header("Content-Disposition", "attachment; filename=\""+url.PathEscape(fs.FileTarget[0].Name)+"\"")
}
// 发送文件
http.ServeContent(c.Writer, c.Request, fs.FileTarget[0].Name, time.Now(), rs)
return serializer.Response{
Code: 0,
}
}
// Delete 通过签名的URL删除从机文件
func (service *SlaveFilesService) Delete(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 删除文件
failed, err := fs.Handler.Delete(ctx, service.Files)
if err != nil {
// 将Data字段写为字符串方便主控端解析
data, _ := json.Marshal(serializer.RemoteDeleteRequest{Files: failed})
return serializer.Response{
Code: serializer.CodeNotFullySuccess,
Data: string(data),
Msg: fmt.Sprintf("有 %d 个文件未能成功删除", len(failed)),
Error: err.Error(),
}
}
return serializer.Response{Code: 0}
}
// Thumb 通过签名URL获取从机文件缩略图
func (service *SlaveFileService) Thumb(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 解码文件路径
fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded)
if err != nil {
return serializer.ParamErr("无法解析的文件地址", err)
}
fs.FileTarget = []model.File{{SourceName: string(fileSource), PicInfo: "1,1"}}
// 获取缩略图
resp, err := fs.GetThumb(ctx, 0)
if err != nil {
return serializer.Err(serializer.CodeNotSet, "无法获取缩略图", err)
}
defer resp.Content.Close()
http.ServeContent(c.Writer, c.Request, "thumb.png", time.Now(), resp.Content)
return serializer.Response{Code: 0}
}

162
service/explorer/slave.go Normal file
View file

@ -0,0 +1,162 @@
package explorer
import (
"context"
"encoding/base64"
"encoding/json"
"fmt"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/slave"
"github.com/cloudreve/Cloudreve/v3/pkg/task/slavetask"
"github.com/gin-gonic/gin"
"github.com/jinzhu/gorm"
"net/http"
"net/url"
"time"
)
// SlaveDownloadService 从机文件下載服务
type SlaveDownloadService struct {
PathEncoded string `uri:"path" binding:"required"`
Name string `uri:"name" binding:"required"`
Speed int `uri:"speed" binding:"min=0"`
}
// SlaveFileService 从机单文件文件相关服务
type SlaveFileService struct {
PathEncoded string `uri:"path" binding:"required"`
}
// SlaveFilesService 从机多文件相关服务
type SlaveFilesService struct {
Files []string `json:"files" binding:"required,gt=0"`
}
// SlaveListService 从机列表服务
type SlaveListService struct {
Path string `json:"path" binding:"required,min=1,max=65535"`
Recursive bool `json:"recursive"`
}
// ServeFile 通过签名的URL下载从机文件
func (service *SlaveDownloadService) ServeFile(ctx context.Context, c *gin.Context, isDownload bool) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 解码文件路径
fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded)
if err != nil {
return serializer.ParamErr("无法解析的文件地址", err)
}
// 根据URL里的信息创建一个文件对象和用户对象
file := model.File{
Name: service.Name,
SourceName: string(fileSource),
Policy: model.Policy{
Model: gorm.Model{ID: 1},
Type: "local",
},
}
fs.User = &model.User{
Group: model.Group{SpeedLimit: service.Speed},
}
fs.FileTarget = []model.File{file}
// 开始处理下载
ctx = context.WithValue(ctx, fsctx.GinCtx, c)
rs, err := fs.GetDownloadContent(ctx, 0)
if err != nil {
return serializer.Err(serializer.CodeNotSet, err.Error(), err)
}
defer rs.Close()
// 设置下载文件名
if isDownload {
c.Header("Content-Disposition", "attachment; filename=\""+url.PathEscape(fs.FileTarget[0].Name)+"\"")
}
// 发送文件
http.ServeContent(c.Writer, c.Request, fs.FileTarget[0].Name, time.Now(), rs)
return serializer.Response{
Code: 0,
}
}
// Delete 通过签名的URL删除从机文件
func (service *SlaveFilesService) Delete(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 删除文件
failed, err := fs.Handler.Delete(ctx, service.Files)
if err != nil {
// 将Data字段写为字符串方便主控端解析
data, _ := json.Marshal(serializer.RemoteDeleteRequest{Files: failed})
return serializer.Response{
Code: serializer.CodeNotFullySuccess,
Data: string(data),
Msg: fmt.Sprintf("有 %d 个文件未能成功删除", len(failed)),
Error: err.Error(),
}
}
return serializer.Response{Code: 0}
}
// Thumb 通过签名URL获取从机文件缩略图
func (service *SlaveFileService) Thumb(ctx context.Context, c *gin.Context) serializer.Response {
// 创建文件系统
fs, err := filesystem.NewAnonymousFileSystem()
if err != nil {
return serializer.Err(serializer.CodePolicyNotAllowed, err.Error(), err)
}
defer fs.Recycle()
// 解码文件路径
fileSource, err := base64.RawURLEncoding.DecodeString(service.PathEncoded)
if err != nil {
return serializer.ParamErr("无法解析的文件地址", err)
}
fs.FileTarget = []model.File{{SourceName: string(fileSource), PicInfo: "1,1"}}
// 获取缩略图
resp, err := fs.GetThumb(ctx, 0)
if err != nil {
return serializer.Err(serializer.CodeNotSet, "无法获取缩略图", err)
}
defer resp.Content.Close()
http.ServeContent(c.Writer, c.Request, "thumb.png", time.Now(), resp.Content)
return serializer.Response{Code: 0}
}
// CreateTransferTask 创建从机文件转存任务
func CreateTransferTask(c *gin.Context, req *serializer.SlaveTransferReq) serializer.Response {
if id, ok := c.Get("MasterSiteID"); ok {
job := &slavetask.TransferTask{
Req: req,
}
if err := slave.DefaultController.SubmitTask(id.(string), job, req.Hash(id.(string))); err != nil {
return serializer.Err(serializer.CodeInternalSetting, "任务创建失败", err)
}
return serializer.Response{}
}
return serializer.ParamErr("未知的主机节点ID", nil)
}