Feat: retry for init aria2 client in master node
This commit is contained in:
parent
99953825ff
commit
3b47e314e9
7 changed files with 115 additions and 35 deletions
|
@ -102,16 +102,19 @@ func (node *MasterNode) Kill() {
|
|||
// GetAria2Instance 获取主机Aria2实例
|
||||
func (node *MasterNode) GetAria2Instance() common.Aria2 {
|
||||
node.lock.RLock()
|
||||
defer node.lock.RUnlock()
|
||||
|
||||
if !node.Model.Aria2Enabled {
|
||||
node.lock.RUnlock()
|
||||
return &common.DummyAria2{}
|
||||
}
|
||||
|
||||
if !node.aria2RPC.Initialized {
|
||||
node.lock.RUnlock()
|
||||
node.aria2RPC.Init()
|
||||
return &common.DummyAria2{}
|
||||
}
|
||||
|
||||
defer node.lock.RUnlock()
|
||||
return &node.aria2RPC
|
||||
}
|
||||
|
||||
|
|
39
pkg/filesystem/driver/handler.go
Normal file
39
pkg/filesystem/driver/handler.go
Normal file
|
@ -0,0 +1,39 @@
|
|||
package driver
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
||||
"io"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// Handler 存储策略适配器
|
||||
type Handler interface {
|
||||
// 上传文件, dst为文件存储路径,size 为文件大小。上下文关闭
|
||||
// 时,应取消上传并清理临时文件
|
||||
Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error
|
||||
|
||||
// 删除一个或多个给定路径的文件,返回删除失败的文件路径列表及错误
|
||||
Delete(ctx context.Context, files []string) ([]string, error)
|
||||
|
||||
// 获取文件内容
|
||||
Get(ctx context.Context, path string) (response.RSCloser, error)
|
||||
|
||||
// 获取缩略图,可直接在ContentResponse中返回文件数据流,也可指
|
||||
// 定为重定向
|
||||
Thumb(ctx context.Context, path string) (*response.ContentResponse, error)
|
||||
|
||||
// 获取外链/下载地址,
|
||||
// url - 站点本身地址,
|
||||
// isDownload - 是否直接下载
|
||||
Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error)
|
||||
|
||||
// Token 获取有效期为ttl的上传凭证和签名,同时回调会话有效期为sessionTTL
|
||||
Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error)
|
||||
|
||||
// List 递归列取远程端path路径下文件、目录,不包含path本身,
|
||||
// 返回的对象路径以path作为起始根目录.
|
||||
// recursive - 是否递归列出
|
||||
List(ctx context.Context, path string, recursive bool) ([]response.Object, error)
|
||||
}
|
7
pkg/filesystem/driver/shadow/slave/errors.go
Normal file
7
pkg/filesystem/driver/shadow/slave/errors.go
Normal file
|
@ -0,0 +1,7 @@
|
|||
package slave
|
||||
|
||||
import "errors"
|
||||
|
||||
var (
|
||||
ErrNotImplemented = errors.New("This method of shadowed policy is not implemented")
|
||||
)
|
57
pkg/filesystem/driver/shadow/slave/handler.go
Normal file
57
pkg/filesystem/driver/shadow/slave/handler.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package slave
|
||||
|
||||
import (
|
||||
"context"
|
||||
model "github.com/cloudreve/Cloudreve/v3/models"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
||||
"io"
|
||||
"net/url"
|
||||
)
|
||||
|
||||
// Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果
|
||||
type Driver struct {
|
||||
node cluster.Node
|
||||
handler driver.Handler
|
||||
policy *model.Policy
|
||||
}
|
||||
|
||||
// NewDriver 返回新的从机指派处理器
|
||||
func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler {
|
||||
return &Driver{
|
||||
node: node,
|
||||
handler: handler,
|
||||
policy: policy,
|
||||
}
|
||||
}
|
||||
|
||||
func (d Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
|
||||
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (d Driver) Delete(ctx context.Context, files []string) ([]string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (d Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (d Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (d Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (d Driver) Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error) {
|
||||
panic("implement me")
|
||||
}
|
||||
|
||||
func (d Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
|
||||
panic("implement me")
|
||||
}
|
|
@ -1,9 +1,10 @@
|
|||
package filesystem
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/shadow/slave"
|
||||
"io"
|
||||
"net/http"
|
||||
"net/url"
|
||||
|
@ -20,7 +21,6 @@ import (
|
|||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/remote"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/s3"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver/upyun"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
||||
"github.com/gin-gonic/gin"
|
||||
|
@ -44,36 +44,6 @@ type FileHeader interface {
|
|||
GetVirtualPath() string
|
||||
}
|
||||
|
||||
// Handler 存储策略适配器
|
||||
type Handler interface {
|
||||
// 上传文件, dst为文件存储路径,size 为文件大小。上下文关闭
|
||||
// 时,应取消上传并清理临时文件
|
||||
Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error
|
||||
|
||||
// 删除一个或多个给定路径的文件,返回删除失败的文件路径列表及错误
|
||||
Delete(ctx context.Context, files []string) ([]string, error)
|
||||
|
||||
// 获取文件内容
|
||||
Get(ctx context.Context, path string) (response.RSCloser, error)
|
||||
|
||||
// 获取缩略图,可直接在ContentResponse中返回文件数据流,也可指
|
||||
// 定为重定向
|
||||
Thumb(ctx context.Context, path string) (*response.ContentResponse, error)
|
||||
|
||||
// 获取外链/下载地址,
|
||||
// url - 站点本身地址,
|
||||
// isDownload - 是否直接下载
|
||||
Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error)
|
||||
|
||||
// Token 获取有效期为ttl的上传凭证和签名,同时回调会话有效期为sessionTTL
|
||||
Token(ctx context.Context, ttl int64, callbackKey string) (serializer.UploadCredential, error)
|
||||
|
||||
// List 递归列取远程端path路径下文件、目录,不包含path本身,
|
||||
// 返回的对象路径以path作为起始根目录.
|
||||
// recursive - 是否递归列出
|
||||
List(ctx context.Context, path string, recursive bool) ([]response.Object, error)
|
||||
}
|
||||
|
||||
// FileSystem 管理文件的文件系统
|
||||
type FileSystem struct {
|
||||
// 文件系统所有者
|
||||
|
@ -97,7 +67,7 @@ type FileSystem struct {
|
|||
/*
|
||||
文件系统处理适配器
|
||||
*/
|
||||
Handler Handler
|
||||
Handler driver.Handler
|
||||
|
||||
// 回收锁
|
||||
recycleLock sync.Mutex
|
||||
|
@ -273,7 +243,7 @@ func NewFileSystemFromCallback(c *gin.Context) (*FileSystem, error) {
|
|||
|
||||
// SwitchToSlaveHandler 将负责上传的 Handler 切换为从机节点
|
||||
func (fs *FileSystem) SwitchToSlaveHandler(node cluster.Node) {
|
||||
|
||||
fs.Handler = slave.NewDriver(node, fs.Handler, &fs.User.Policy)
|
||||
}
|
||||
|
||||
// SetTargetFile 设置当前处理的目标文件
|
||||
|
|
|
@ -41,4 +41,6 @@ const (
|
|||
ValidateCapacityOnceCtx
|
||||
// 禁止上传时同名覆盖操作
|
||||
DisableOverwrite
|
||||
// 文件在从机节点中的路径
|
||||
SlaveSrcPath
|
||||
)
|
||||
|
|
|
@ -108,6 +108,7 @@ func (job *TransferTask) Do() {
|
|||
}
|
||||
|
||||
ctx := context.WithValue(context.Background(), fsctx.DisableOverwrite, true)
|
||||
ctx = context.WithValue(ctx, fsctx.SlaveSrcPath, file)
|
||||
if job.TaskProps.NodeID > 1 {
|
||||
// 指定为从机中转
|
||||
|
||||
|
@ -121,6 +122,7 @@ func (job *TransferTask) Do() {
|
|||
fs.SwitchToSlaveHandler(node)
|
||||
err = fs.UploadFromStream(ctx, nil, dst, job.TaskProps.SrcSizes[file])
|
||||
} else {
|
||||
// 主机节点中转
|
||||
err = fs.UploadFromPath(ctx, file, dst)
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue