Feat: test aria2 rpc connection in slave
This commit is contained in:
parent
a129eb36ae
commit
0dc573253f
7 changed files with 97 additions and 31 deletions
|
@ -140,11 +140,6 @@ Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; verti
|
|||
{Name: "gravatar_server", Value: `https://www.gravatar.com/`, Type: "avatar"},
|
||||
{Name: "defaultTheme", Value: `#3f51b5`, Type: "basic"},
|
||||
{Name: "themes", Value: `{"#3f51b5":{"palette":{"primary":{"main":"#3f51b5"},"secondary":{"main":"#f50057"}}},"#2196f3":{"palette":{"primary":{"main":"#2196f3"},"secondary":{"main":"#FFC107"}}},"#673AB7":{"palette":{"primary":{"main":"#673AB7"},"secondary":{"main":"#2196F3"}}},"#E91E63":{"palette":{"primary":{"main":"#E91E63"},"secondary":{"main":"#42A5F5","contrastText":"#fff"}}},"#FF5722":{"palette":{"primary":{"main":"#FF5722"},"secondary":{"main":"#3F51B5"}}},"#FFC107":{"palette":{"primary":{"main":"#FFC107"},"secondary":{"main":"#26C6DA"}}},"#8BC34A":{"palette":{"primary":{"main":"#8BC34A","contrastText":"#fff"},"secondary":{"main":"#FF8A65","contrastText":"#fff"}}},"#009688":{"palette":{"primary":{"main":"#009688"},"secondary":{"main":"#4DD0E1","contrastText":"#fff"}}},"#607D8B":{"palette":{"primary":{"main":"#607D8B"},"secondary":{"main":"#F06292"}}},"#795548":{"palette":{"primary":{"main":"#795548"},"secondary":{"main":"#4CAF50","contrastText":"#fff"}}}}`, Type: "basic"},
|
||||
{Name: "aria2_token", Value: ``, Type: "aria2"},
|
||||
{Name: "aria2_rpcurl", Value: ``, Type: "aria2"},
|
||||
{Name: "aria2_temp_path", Value: ``, Type: "aria2"},
|
||||
{Name: "aria2_options", Value: `{}`, Type: "aria2"},
|
||||
{Name: "aria2_interval", Value: `60`, Type: "aria2"},
|
||||
{Name: "max_worker_num", Value: `10`, Type: "task"},
|
||||
{Name: "max_parallel_transfer", Value: `4`, Type: "task"},
|
||||
{Name: "secret_key", Value: util.RandStringRunes(256), Type: "auth"},
|
||||
|
|
|
@ -1,11 +1,16 @@
|
|||
package aria2
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net/url"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
model "github.com/cloudreve/Cloudreve/v3/models"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/common"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/monitor"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/aria2/rpc"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/balancer"
|
||||
)
|
||||
|
||||
|
@ -41,3 +46,20 @@ func Init(isReload bool) {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestRPCConnection 发送测试用的 RPC 请求,测试服务连通性
|
||||
func TestRPCConnection(server, secret string, timeout int) (rpc.VersionInfo, error) {
|
||||
// 解析RPC服务地址
|
||||
rpcServer, err := url.Parse(server)
|
||||
if err != nil {
|
||||
return rpc.VersionInfo{}, fmt.Errorf("cannot parse RPC server: %w", err)
|
||||
}
|
||||
|
||||
rpcServer.Path = "/jsonrpc"
|
||||
caller, err := rpc.New(context.Background(), rpcServer.String(), secret, time.Duration(timeout)*time.Second, nil)
|
||||
if err != nil {
|
||||
return rpc.VersionInfo{}, fmt.Errorf("cannot initialize rpc connection: %w", err)
|
||||
}
|
||||
|
||||
return caller.GetVersion()
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ package controllers
|
|||
import (
|
||||
"io"
|
||||
|
||||
model "github.com/cloudreve/Cloudreve/v3/models"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/aria2"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/email"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
||||
|
@ -92,7 +93,13 @@ func AdminSendTestMail(c *gin.Context) {
|
|||
func AdminTestAria2(c *gin.Context) {
|
||||
var service admin.Aria2TestService
|
||||
if err := c.ShouldBindJSON(&service); err == nil {
|
||||
res := service.Test()
|
||||
var res serializer.Response
|
||||
if service.Type == model.MasterNodeType {
|
||||
res = service.TestMaster()
|
||||
} else {
|
||||
res = service.TestSlave()
|
||||
}
|
||||
|
||||
c.JSON(200, res)
|
||||
} else {
|
||||
c.JSON(200, ErrorResponse(err))
|
||||
|
|
|
@ -40,6 +40,8 @@ func InitSlaveRouter() *gin.Engine {
|
|||
{
|
||||
// Ping
|
||||
v3.POST("ping", controllers.SlavePing)
|
||||
// 测试 Aria2 RPC 连接
|
||||
v3.POST("ping/aria2", controllers.AdminTestAria2)
|
||||
// 接收主机心跳包
|
||||
v3.POST("heartbeat", controllers.SlaveHeartbeat)
|
||||
// 上传
|
||||
|
@ -446,6 +448,9 @@ func InitMasterRouter() *gin.Engine {
|
|||
{
|
||||
// 列出从机节点
|
||||
node.POST("list", controllers.AdminListNodes)
|
||||
|
||||
// 列出从机节点
|
||||
node.POST("aria2/test", controllers.AdminTestAria2)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -1,43 +1,71 @@
|
|||
package admin
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"encoding/json"
|
||||
model "github.com/cloudreve/Cloudreve/v3/models"
|
||||
"net/url"
|
||||
"time"
|
||||
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/aria2"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/request"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
||||
)
|
||||
|
||||
// Aria2TestService aria2连接测试服务
|
||||
type Aria2TestService struct {
|
||||
Server string `json:"server" binding:"required"`
|
||||
Token string `json:"token"`
|
||||
Server string `json:"server" binding:"required"`
|
||||
RPC string `json:"rpc" binding:"required"`
|
||||
Secret string `json:"secret" binding:"required"`
|
||||
Token string `json:"token"`
|
||||
Type model.ModelType `json:"type"`
|
||||
}
|
||||
|
||||
// Test 测试aria2连接
|
||||
func (service *Aria2TestService) Test() serializer.Response {
|
||||
testRPC := aria2.RPCService{}
|
||||
|
||||
// 解析RPC服务地址
|
||||
server, err := url.Parse(service.Server)
|
||||
func (service *Aria2TestService) TestMaster() serializer.Response {
|
||||
res, err := aria2.TestRPCConnection(service.RPC, service.Token, 5)
|
||||
if err != nil {
|
||||
return serializer.ParamErr("无法解析 aria2 RPC 服务地址, "+err.Error(), nil)
|
||||
}
|
||||
server.Path = "/jsonrpc"
|
||||
|
||||
if err := testRPC.Init(server.String(), service.Token, 5, map[string]interface{}{}); err != nil {
|
||||
return serializer.ParamErr("无法初始化连接, "+err.Error(), nil)
|
||||
return serializer.ParamErr(err.Error(), err)
|
||||
}
|
||||
|
||||
defer testRPC.Caller.Close()
|
||||
|
||||
info, err := testRPC.Caller.GetVersion()
|
||||
if err != nil {
|
||||
return serializer.ParamErr("无法请求 RPC 服务, "+err.Error(), nil)
|
||||
}
|
||||
|
||||
if info.Version == "" {
|
||||
if res.Version == "" {
|
||||
return serializer.ParamErr("RPC 服务返回非预期响应", nil)
|
||||
}
|
||||
|
||||
return serializer.Response{Data: info.Version}
|
||||
return serializer.Response{Data: res.Version}
|
||||
}
|
||||
|
||||
func (service *Aria2TestService) TestSlave() serializer.Response {
|
||||
slave, err := url.Parse(service.Server)
|
||||
if err != nil {
|
||||
return serializer.ParamErr("无法解析从机端地址,"+err.Error(), nil)
|
||||
}
|
||||
|
||||
controller, _ := url.Parse("/api/v3/slave/ping/aria2")
|
||||
|
||||
// 请求正文
|
||||
service.Type = model.MasterNodeType
|
||||
bodyByte, _ := json.Marshal(service)
|
||||
|
||||
r := request.NewClient()
|
||||
res, err := r.Request(
|
||||
"POST",
|
||||
slave.ResolveReference(controller).String(),
|
||||
bytes.NewReader(bodyByte),
|
||||
request.WithTimeout(time.Duration(10)*time.Second),
|
||||
request.WithCredential(
|
||||
auth.HMACAuth{SecretKey: []byte(service.Secret)},
|
||||
int64(model.GetIntSetting("slave_api_timeout", 60)),
|
||||
),
|
||||
).DecodeResponse()
|
||||
if err != nil {
|
||||
return serializer.ParamErr("无连接到从机,"+err.Error(), nil)
|
||||
}
|
||||
|
||||
if res.Code != 0 {
|
||||
return serializer.ParamErr("成功接到从机,但是从机返回:"+res.Msg, nil)
|
||||
}
|
||||
|
||||
return serializer.Response{Data: res.Data.(string)}
|
||||
}
|
||||
|
|
|
@ -2,6 +2,7 @@ package admin
|
|||
|
||||
import (
|
||||
model "github.com/cloudreve/Cloudreve/v3/models"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/cluster"
|
||||
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
|
||||
"strings"
|
||||
)
|
||||
|
@ -35,8 +36,16 @@ func (service *AdminListService) Nodes() serializer.Response {
|
|||
// 查询记录
|
||||
tx.Limit(service.PageSize).Offset((service.Page - 1) * service.PageSize).Find(&res)
|
||||
|
||||
isActive := make(map[uint]bool)
|
||||
for i := 0; i < len(res); i++ {
|
||||
if node := cluster.Default.GetNodeByID(res[i].ID); node != nil {
|
||||
isActive[res[i].ID] = node.IsActive()
|
||||
}
|
||||
}
|
||||
|
||||
return serializer.Response{Data: map[string]interface{}{
|
||||
"total": total,
|
||||
"items": res,
|
||||
"total": total,
|
||||
"items": res,
|
||||
"active": isActive,
|
||||
}}
|
||||
}
|
||||
|
|
|
@ -245,7 +245,7 @@ func (service *SlaveTestService) Test() serializer.Response {
|
|||
}
|
||||
|
||||
if res.Code != 0 {
|
||||
return serializer.ParamErr("成功接到从机,但是"+res.Msg, nil)
|
||||
return serializer.ParamErr("成功接到从机,但是从机返回:"+res.Msg, nil)
|
||||
}
|
||||
|
||||
return serializer.Response{}
|
||||
|
|
Loading…
Add table
Reference in a new issue