diff --git a/models/migration.go b/models/migration.go index 516b93f..9bbe3a9 100644 --- a/models/migration.go +++ b/models/migration.go @@ -103,6 +103,7 @@ func addDefaultSettings() { {Name: "slave_api_timeout", Value: `60`, Type: "timeout"}, {Name: "slave_node_retry", Value: `3`, Type: "slave"}, {Name: "slave_ping_interval", Value: `300`, Type: "slave"}, + {Name: "slave_recover_interval", Value: `600`, Type: "slave"}, {Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"}, {Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"}, {Name: "onedrive_callback_check", Value: `20`, Type: "timeout"}, diff --git a/pkg/cluster/node.go b/pkg/cluster/node.go index 4060277..dcbff47 100644 --- a/pkg/cluster/node.go +++ b/pkg/cluster/node.go @@ -21,6 +21,7 @@ func getNodeFromDBModel(node *model.Node) Node { Model: node, AuthInstance: auth.HMACAuth{SecretKey: []byte(node.SecretKey)}, Client: request.HTTPClient{}, + Active: true, } go slave.StartPingLoop() return slave diff --git a/pkg/cluster/pool.go b/pkg/cluster/pool.go index 9060dcc..09b04ee 100644 --- a/pkg/cluster/pool.go +++ b/pkg/cluster/pool.go @@ -57,7 +57,7 @@ func (pool *NodePool) buildIndexMap() { } func (pool *NodePool) nodeStatusChange(isActive bool, id uint) { - util.Log().Debug("从机节点 [ID=%d] 状态变更 [active=%t]", id, isActive) + util.Log().Debug("从机节点 [ID=%d] 状态变更 [Active=%t]", id, isActive) pool.lock.Lock() if isActive { node := pool.inactive[id] diff --git a/pkg/cluster/slave.go b/pkg/cluster/slave.go index 65aacce..aa74364 100644 --- a/pkg/cluster/slave.go +++ b/pkg/cluster/slave.go @@ -19,8 +19,8 @@ type SlaveNode struct { Model *model.Node AuthInstance auth.Auth Client request.Client + Active bool - active bool callback func(bool, uint) close chan bool lock sync.RWMutex @@ -80,7 +80,7 @@ func (node *SlaveNode) Ping(req *serializer.NodePingReq) (*serializer.NodePingRe // IsActive 返回节点是否在线 func (node *SlaveNode) IsActive() bool { - return node.active + return node.Active } // getAPIUrl 获取接口请求地址 @@ -101,10 +101,10 @@ func (node *SlaveNode) getAPIUrl(scope string) string { func (node *SlaveNode) changeStatus(isActive bool) { node.lock.RLock() id := node.Model.ID - if isActive != node.active { + if isActive != node.Active { node.lock.RUnlock() node.lock.Lock() - node.active = isActive + node.Active = isActive node.lock.Unlock() node.callback(isActive, id) } else { @@ -116,34 +116,51 @@ func (node *SlaveNode) changeStatus(isActive bool) { func (node *SlaveNode) StartPingLoop() { node.lock.Lock() node.close = make(chan bool) - node.active = true node.lock.Unlock() - t := time.NewTicker(time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second) - defer t.Stop() + tickDuration := time.Duration(model.GetIntSetting("slave_ping_interval", 300)) * time.Second + recoverDuration := time.Duration(model.GetIntSetting("slave_recover_interval", 600)) * time.Second + pingTicker := time.NewTicker(tickDuration) + defer pingTicker.Stop() util.Log().Debug("从机节点 [%s] 启动心跳循环", node.Model.Name) retry := 0 + recoverMode := false loop: for { select { - case <-t.C: + case <-pingTicker.C: util.Log().Debug("从机节点 [%s] 发送Ping", node.Model.Name) res, err := node.Ping(&serializer.NodePingReq{}) if err != nil { util.Log().Debug("Ping从机节点 [%s] 时发生错误: %s", node.Model.Name, err) retry++ - if retry > model.GetIntSetting("slave_node_retry", 3) { + if retry >= model.GetIntSetting("slave_node_retry", 3) { util.Log().Debug("从机节点 [%s] Ping 重试已达到最大限制,将从机节点标记为不可用", node.Model.Name) node.changeStatus(false) - break loop + + if !recoverMode { + // 启动恢复监控循环 + util.Log().Debug("从机节点 [%s] 进入恢复模式", node.Model.Name) + pingTicker.Stop() + pingTicker = time.NewTicker(recoverDuration) + recoverMode = true + } } - break + } else { + if recoverMode { + util.Log().Debug("从机节点 [%s] 复活", node.Model.Name) + pingTicker.Stop() + pingTicker = time.NewTicker(tickDuration) + recoverMode = false + } + + util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res) + node.changeStatus(true) + retry = 0 } - util.Log().Debug("从机节点 [%s] 状态: %s", node.Model.Name, res) - node.changeStatus(true) case <-node.close: util.Log().Debug("从机节点 [%s] 收到关闭信号", node.Model.Name) break loop