Cloudreve/pkg/filesystem/driver/shadow/slaveinmaster/handler.go

122 lines
3.5 KiB
Go
Raw Normal View History

Feat: aria2 download and transfer in slave node (#1040) * Feat: retrieve nodes from data table * Feat: master node ping slave node in REST API * Feat: master send scheduled ping request * Feat: inactive nodes recover loop * Modify: remove database operations from aria2 RPC caller implementation * Feat: init aria2 client in master node * Feat: Round Robin load balancer * Feat: create and monitor aria2 task in master node * Feat: salve receive and handle heartbeat * Fix: Node ID will be 0 in download record generated in older version * Feat: sign request headers with all `X-` prefix * Feat: API call to slave node will carry meta data in headers * Feat: call slave aria2 rpc method from master * Feat: get slave aria2 task status Feat: encode slave response data using gob * Feat: aria2 callback to master node / cancel or select task to slave node * Fix: use dummy aria2 client when caller initialize failed in master node * Feat: slave aria2 status event callback / salve RPC auth * Feat: prototype for slave driven filesystem * Feat: retry for init aria2 client in master node * Feat: init request client with global options * Feat: slave receive async task from master * Fix: competition write in request header * Refactor: dependency initialize order * Feat: generic message queue implementation * Feat: message queue implementation * Feat: master waiting slave transfer result * Feat: slave transfer file in stateless policy * Feat: slave transfer file in slave policy * Feat: slave transfer file in local policy * Feat: slave transfer file in OneDrive policy * Fix: failed to initialize update checker http client * Feat: list slave nodes for dashboard * Feat: test aria2 rpc connection in slave * Feat: add and save node * Feat: add and delete node in node pool * Fix: temp file cannot be removed when aria2 task fails * Fix: delete node in admin panel * Feat: edit node and get node info * Modify: delete unused settings
2021-10-31 09:41:56 +08:00
package slaveinmaster
import (
"bytes"
"context"
"encoding/json"
"errors"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/driver"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/fsctx"
"github.com/cloudreve/Cloudreve/v3/pkg/filesystem/response"
"github.com/cloudreve/Cloudreve/v3/pkg/mq"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"io"
"net/url"
"time"
)
// Driver 影子存储策略,将上传任务指派给从机节点处理,并等待从机通知上传结果
type Driver struct {
node cluster.Node
handler driver.Handler
policy *model.Policy
client request.Client
}
// NewDriver 返回新的从机指派处理器
func NewDriver(node cluster.Node, handler driver.Handler, policy *model.Policy) driver.Handler {
var endpoint *url.URL
if serverURL, err := url.Parse(node.DBModel().Server); err == nil {
var controller *url.URL
controller, _ = url.Parse("/api/v3/slave")
endpoint = serverURL.ResolveReference(controller)
}
signTTL := model.GetIntSetting("slave_api_timeout", 60)
return &Driver{
node: node,
handler: handler,
policy: policy,
client: request.NewClient(
request.WithMasterMeta(),
request.WithTimeout(time.Duration(signTTL)*time.Second),
request.WithCredential(node.SlaveAuthInstance(), int64(signTTL)),
request.WithEndpoint(endpoint.String()),
),
}
}
// Put 将ctx中指定的从机物理文件由从机上传到目标存储策略
func (d *Driver) Put(ctx context.Context, file io.ReadCloser, dst string, size uint64) error {
src, ok := ctx.Value(fsctx.SlaveSrcPath).(string)
if !ok {
return ErrSlaveSrcPathNotExist
}
req := serializer.SlaveTransferReq{
Src: src,
Dst: dst,
Policy: d.policy,
}
body, err := json.Marshal(req)
if err != nil {
return err
}
// 订阅转存结果
resChan := mq.GlobalMQ.Subscribe(req.Hash(model.GetSettingByName("siteID")), 0)
defer mq.GlobalMQ.Unsubscribe(req.Hash(model.GetSettingByName("siteID")), resChan)
res, err := d.client.Request("PUT", "task/transfer", bytes.NewReader(body)).
CheckHTTPResponse(200).
DecodeResponse()
if err != nil {
return err
}
if res.Code != 0 {
return serializer.NewErrorFromResponse(res)
}
// 等待转存结果或者超时
waitTimeout := model.GetIntSetting("slave_transfer_timeout", 172800)
select {
case <-time.After(time.Duration(waitTimeout) * time.Second):
return ErrWaitResultTimeout
case msg := <-resChan:
if msg.Event != serializer.SlaveTransferSuccess {
return errors.New(msg.Content.(serializer.SlaveTransferResult).Error)
}
}
return nil
}
func (d *Driver) Delete(ctx context.Context, files []string) ([]string, error) {
return d.handler.Delete(ctx, files)
}
func (d *Driver) Get(ctx context.Context, path string) (response.RSCloser, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Thumb(ctx context.Context, path string) (*response.ContentResponse, error) {
return nil, ErrNotImplemented
}
func (d *Driver) Source(ctx context.Context, path string, url url.URL, ttl int64, isDownload bool, speed int) (string, error) {
return "", ErrNotImplemented
}
func (d *Driver) Token(ctx context.Context, ttl int64, uploadSession *serializer.UploadSession) (serializer.UploadCredential, error) {
Feat: aria2 download and transfer in slave node (#1040) * Feat: retrieve nodes from data table * Feat: master node ping slave node in REST API * Feat: master send scheduled ping request * Feat: inactive nodes recover loop * Modify: remove database operations from aria2 RPC caller implementation * Feat: init aria2 client in master node * Feat: Round Robin load balancer * Feat: create and monitor aria2 task in master node * Feat: salve receive and handle heartbeat * Fix: Node ID will be 0 in download record generated in older version * Feat: sign request headers with all `X-` prefix * Feat: API call to slave node will carry meta data in headers * Feat: call slave aria2 rpc method from master * Feat: get slave aria2 task status Feat: encode slave response data using gob * Feat: aria2 callback to master node / cancel or select task to slave node * Fix: use dummy aria2 client when caller initialize failed in master node * Feat: slave aria2 status event callback / salve RPC auth * Feat: prototype for slave driven filesystem * Feat: retry for init aria2 client in master node * Feat: init request client with global options * Feat: slave receive async task from master * Fix: competition write in request header * Refactor: dependency initialize order * Feat: generic message queue implementation * Feat: message queue implementation * Feat: master waiting slave transfer result * Feat: slave transfer file in stateless policy * Feat: slave transfer file in slave policy * Feat: slave transfer file in local policy * Feat: slave transfer file in OneDrive policy * Fix: failed to initialize update checker http client * Feat: list slave nodes for dashboard * Feat: test aria2 rpc connection in slave * Feat: add and save node * Feat: add and delete node in node pool * Fix: temp file cannot be removed when aria2 task fails * Fix: delete node in admin panel * Feat: edit node and get node info * Modify: delete unused settings
2021-10-31 09:41:56 +08:00
return serializer.UploadCredential{}, ErrNotImplemented
}
func (d *Driver) List(ctx context.Context, path string, recursive bool) ([]response.Object, error) {
return nil, ErrNotImplemented
}