Feat: add and delete node in node pool

This commit is contained in:
HFO4 2021-10-29 20:24:07 +08:00
parent c205111d26
commit 0ed7839479
6 changed files with 83 additions and 13 deletions

View file

@ -106,8 +106,8 @@ func addDefaultSettings() {
{Name: "upload_session_timeout", Value: `86400`, Type: "timeout"},
{Name: "slave_api_timeout", Value: `60`, Type: "timeout"},
{Name: "slave_node_retry", Value: `3`, Type: "slave"},
{Name: "slave_ping_interval", Value: `300`, Type: "slave"},
{Name: "slave_recover_interval", Value: `600`, Type: "slave"},
{Name: "slave_ping_interval", Value: `60`, Type: "slave"},
{Name: "slave_recover_interval", Value: `120`, Type: "slave"},
{Name: "slave_transfer_timeout", Value: `172800`, Type: "timeout"},
{Name: "onedrive_monitor_timeout", Value: `600`, Type: "timeout"},
{Name: "share_download_session_timeout", Value: `2073600`, Type: "timeout"},

View file

@ -16,6 +16,7 @@ type Node struct {
MasterKey string `gorm:"type:text"` // 从->主 通信密钥
Aria2Enabled bool // 是否支持用作离线下载节点
Aria2Options string `gorm:"type:text"` // 离线下载配置
Rank int // 负载均衡权重
// 数据库忽略字段
Aria2OptionsSerialized Aria2Option `gorm:"-"`

View file

@ -19,6 +19,12 @@ type Pool interface {
// Returns node by ID
GetNodeByID(id uint) Node
// Add given node into pool. If node existed, refresh node.
Add(node *model.Node)
// Delete and kill node from pool by given node id
Delete(id uint)
}
// NodePool 通用节点池
@ -95,17 +101,7 @@ func (pool *NodePool) initFromDB() error {
pool.active = make(map[uint]Node)
pool.inactive = make(map[uint]Node)
for i := 0; i < len(nodes); i++ {
newNode := NewNodeFromDBModel(&nodes[i])
if newNode.IsActive() {
pool.active[nodes[i].ID] = newNode
} else {
pool.inactive[nodes[i].ID] = newNode
}
// 订阅节点状态变更
newNode.SubscribeStatusChange(func(isActive bool, id uint) {
pool.nodeStatusChange(isActive, id)
})
pool.add(&nodes[i])
}
pool.lock.Unlock()
@ -113,6 +109,54 @@ func (pool *NodePool) initFromDB() error {
return nil
}
func (pool *NodePool) add(node *model.Node) {
newNode := NewNodeFromDBModel(node)
if newNode.IsActive() {
pool.active[node.ID] = newNode
} else {
pool.inactive[node.ID] = newNode
}
// 订阅节点状态变更
newNode.SubscribeStatusChange(func(isActive bool, id uint) {
pool.nodeStatusChange(isActive, id)
})
}
func (pool *NodePool) Add(node *model.Node) {
pool.lock.Lock()
defer pool.lock.Unlock()
if _, ok := pool.active[node.ID]; ok {
// TODO: refresh node
return
}
if _, ok := pool.inactive[node.ID]; ok {
return
}
pool.add(node)
}
func (pool *NodePool) Delete(id uint) {
pool.lock.Lock()
defer pool.lock.Unlock()
if node, ok := pool.active[id]; ok {
node.Kill()
delete(pool.active, id)
return
}
if node, ok := pool.inactive[id]; ok {
node.Kill()
delete(pool.inactive, id)
return
}
}
// BalanceNodeByFeature 根据 feature 和 LoadBalancer 取出节点
func (pool *NodePool) BalanceNodeByFeature(feature string, lb balancer.Balancer) (error, Node) {
pool.lock.RLock()

View file

@ -454,3 +454,14 @@ func AdminAddNode(c *gin.Context) {
c.JSON(200, ErrorResponse(err))
}
}
// AdminToggleNode 启用/暂停节点
func AdminToggleNode(c *gin.Context) {
var service admin.ToggleNodeService
if err := c.ShouldBindUri(&service); err == nil {
res := service.Toggle()
c.JSON(200, res)
} else {
c.JSON(200, ErrorResponse(err))
}
}

View file

@ -452,6 +452,8 @@ func InitMasterRouter() *gin.Engine {
node.POST("aria2/test", controllers.AdminTestAria2)
// 创建/保存节点
node.POST("", controllers.AdminAddNode)
// 启用/暂停节点
node.PATCH("enable/:id/:desired", controllers.AdminToggleNode)
}
}

View file

@ -69,3 +69,15 @@ func (service *AdminListService) Nodes() serializer.Response {
"active": isActive,
}}
}
// ToggleNodeService 开关节点服务
type ToggleNodeService struct {
ID uint `uri:"id"`
Desired int `uri:"desired"`
}
// Toggle 开关节点
func (service *ToggleNodeService) Toggle() serializer.Response {
return serializer.Response{}
}