Feat: master send scheduled ping request

This commit is contained in:
HFO4 2021-08-19 20:18:23 +08:00
parent 08704fffb0
commit edbc3db7f4
6 changed files with 118 additions and 28 deletions

View file

@ -101,6 +101,8 @@ func addDefaultSettings() {
{Name: "upload_credential_timeout", Value: `1800`, Type: "timeout"},
{Name: "upload_session_timeout", Value: `86400`, Type: "timeout"},
{Name: "slave_api_timeout", Value: `60`, Type: "timeout"},
{Name: "slave_node_retry", Value: `3`, Type: "slave"},
{Name: "slave_ping_interval", Value: `300`, Type: "slave"},
{Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"},
{Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"},
{Name: "onedrive_callback_check", Value: `20`, Type: "timeout"},

View file

@ -6,6 +6,7 @@ import "github.com/jinzhu/gorm"
type Node struct {
gorm.Model
Status NodeStatus // 节点状态
Name string // 节点别名
Type ModelType // 节点状态
Server string // 服务器地址
SecretKey string `gorm:"type:text"` // 通信密钥

View file

@ -22,5 +22,10 @@ func (node *MasterNode) IsFeatureEnabled(feature string) bool {
}
// SubscribeStatusChange 订阅节点状态更改
func (node *MasterNode) SubscribeStatusChange(callback func()) {
func (node *MasterNode) SubscribeStatusChange(callback func(isActive bool, id uint)) {
}
// IsActive 返回节点是否在线
func (node *MasterNode) IsActive() bool {
return true
}

View file

@ -9,18 +9,21 @@ import (
type Node interface {
IsFeatureEnabled(feature string) bool
SubscribeStatusChange(callback func())
SubscribeStatusChange(callback func(isActive bool, id uint))
Ping(req *serializer.NodePingReq) (*serializer.NodePingResp, error)
IsActive() bool
}
func getNodeFromDBModel(node *model.Node) Node {
switch node.Type {
case model.SlaveNodeType:
return &SlaveNode{
slave := &SlaveNode{
Model: node,
AuthInstance: auth.HMACAuth{SecretKey: []byte(node.SecretKey)},
Client: request.HTTPClient{},
}
go slave.StartPingLoop()
return slave
default:
return &MasterNode{
Model: node,

View file

@ -18,9 +18,12 @@ type Pool interface {
// NodePool 通用节点池
type NodePool struct {
nodes []Node
active map[uint]Node
inactive map[uint]Node
featureMap map[string][]Node
lock sync.RWMutex
lock sync.RWMutex
}
func (pool *NodePool) Select() {
@ -37,6 +40,39 @@ func Init() {
}
}
func (pool *NodePool) buildIndexMap() {
pool.lock.Lock()
for _, feature := range featureGroup {
pool.featureMap[feature] = make([]Node, 0)
}
for _, v := range pool.active {
for _, feature := range featureGroup {
if v.IsFeatureEnabled(feature) {
pool.featureMap[feature] = append(pool.featureMap[feature], v)
}
}
}
pool.lock.Unlock()
}
func (pool *NodePool) nodeStatusChange(isActive bool, id uint) {
util.Log().Debug("从机节点 [ID=%d] 状态变更 [active=%t]", id, isActive)
pool.lock.Lock()
if isActive {
node := pool.inactive[id]
delete(pool.inactive, id)
pool.active[id] = node
} else {
node := pool.active[id]
delete(pool.active, id)
pool.inactive[id] = node
}
pool.lock.Unlock()
pool.buildIndexMap()
}
func (pool *NodePool) initFromDB() error {
nodes, err := model.GetNodesByStatus(model.NodeActive)
if err != nil {
@ -44,25 +80,23 @@ func (pool *NodePool) initFromDB() error {
}
pool.lock.Lock()
for _, feature := range featureGroup {
pool.featureMap[feature] = make([]Node, 0)
}
pool.active = make(map[uint]Node)
pool.inactive = make(map[uint]Node)
for i := 0; i < len(nodes); i++ {
newNode := getNodeFromDBModel(&nodes[i])
pool.nodes = append(pool.nodes, newNode)
for _, feature := range featureGroup {
if newNode.IsFeatureEnabled(feature) {
pool.featureMap[feature] = append(pool.featureMap[feature], newNode)
}
if newNode.IsActive() {
pool.active[nodes[i].ID] = newNode
} else {
pool.inactive[nodes[i].ID] = newNode
}
newNode.SubscribeStatusChange(func() {
// 订阅节点状态变更
newNode.SubscribeStatusChange(func(isActive bool, id uint) {
pool.nodeStatusChange(isActive, id)
})
}
pool.lock.Unlock()
pool.buildIndexMap()
return nil
}

View file

@ -1,13 +1,13 @@
package cluster
import (
"context"
"encoding/json"
"errors"
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/auth"
"github.com/cloudreve/Cloudreve/v3/pkg/request"
"github.com/cloudreve/Cloudreve/v3/pkg/serializer"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
"net/url"
"path"
"strings"
@ -20,8 +20,9 @@ type SlaveNode struct {
AuthInstance auth.Auth
Client request.Client
callback func()
ctx context.Context
active bool
callback func(bool, uint)
close chan bool
lock sync.RWMutex
}
@ -34,7 +35,7 @@ func (node *SlaveNode) IsFeatureEnabled(feature string) bool {
}
// SubscribeStatusChange 订阅节点状态更改
func (node *SlaveNode) SubscribeStatusChange(callback func()) {
func (node *SlaveNode) SubscribeStatusChange(callback func(bool, uint)) {
node.lock.Lock()
node.callback = callback
node.lock.Unlock()
@ -77,6 +78,11 @@ func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingRe
return &res, nil
}
// IsActive 返回节点是否在线
func (node *SlaveNode) IsActive() bool {
return node.active
}
// getAPIUrl 获取接口请求地址
func (node *SlaveNode) getAPIUrl(scope string) string {
node.lock.RLock()
@ -92,16 +98,55 @@ func (node *SlaveNode) getAPIUrl(scope string) string {
return serverURL.ResolveReference(controller).String()
}
func (node *SlaveNode) pingLoop() {
t := time.NewTicker(time.Second)
func (node *SlaveNode) changeStatus(isActive bool) {
node.lock.RLock()
id := node.Model.ID
if isActive != node.active {
node.lock.RUnlock()
node.lock.Lock()
node.active = isActive
node.lock.Unlock()
node.callback(isActive, id)
} else {
node.lock.RUnlock()
}
}
func (node *SlaveNode) StartPingLoop() {
node.lock.Lock()
node.close = make(chan bool)
node.active = true
node.lock.Unlock()
t := time.NewTicker(time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second)
defer t.Stop()
util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name)
retry := 0
loop:
for {
select {
case <-t.C:
case <-node.ctx.Done():
util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name)
res, err := node.Ping(&serializer.NodePingReq{})
if err != nil {
util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err)
retry++
if retry > model.GetIntSetting("slave_node_retry", 3) {
util.Log().Debug("从机节点 [%s] Ping 重试已达到最大限制,将从机节点标记为不可用", node.Model.Name)
node.changeStatus(false)
break loop
}
break
}
util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res)
node.changeStatus(true)
case <-node.close:
util.Log().Debug("从机节点 [%s] 收到关闭信号", node.Model.Name)
break loop
}
}
}
// PingLoop
// RecoverLoop