Cloudreve/pkg/task/pool.go

61 lines
1.2 KiB
Go
Raw Normal View History

package task
2020-02-02 14:40:07 +08:00
import (
model "github.com/cloudreve/Cloudreve/v3/models"
"github.com/cloudreve/Cloudreve/v3/pkg/util"
2020-02-02 14:40:07 +08:00
)
2019-12-12 11:33:41 +08:00
2020-02-02 14:40:07 +08:00
// TaskPoll 要使用的任务池
var TaskPoll *Pool
// Pool 带有最大配额的任务池
type Pool struct {
// 容量
2020-02-02 14:40:07 +08:00
idleWorker chan int
}
2020-02-02 14:40:07 +08:00
// Add 增加可用Worker数量
func (pool *Pool) Add(num int) {
for i := 0; i < num; i++ {
pool.idleWorker <- 1
2019-12-12 11:33:41 +08:00
}
}
2020-02-02 14:40:07 +08:00
// ObtainWorker 阻塞直到获取新的Worker
func (pool *Pool) ObtainWorker() Worker {
select {
case <-pool.idleWorker:
// 有空闲Worker名额时返回新Worker
return &GeneralWorker{}
2019-12-12 11:33:41 +08:00
}
}
2020-02-02 14:40:07 +08:00
// FreeWorker 添加空闲Worker
func (pool *Pool) FreeWorker() {
pool.Add(1)
2019-12-12 11:33:41 +08:00
}
2020-02-02 14:40:07 +08:00
// Submit 开始提交任务
2019-12-12 11:33:41 +08:00
func (pool *Pool) Submit(job Job) {
2020-02-02 14:40:07 +08:00
go func() {
util.Log().Debug("等待获取Worker")
worker := pool.ObtainWorker()
util.Log().Debug("获取到Worker")
worker.Do(job)
util.Log().Debug("释放Worker")
pool.FreeWorker()
}()
2019-12-12 11:33:41 +08:00
}
2020-02-02 14:40:07 +08:00
// Init 初始化任务池
func Init() {
maxWorker := model.GetIntSetting("max_worker_num", 10)
TaskPoll = &Pool{
idleWorker: make(chan int, maxWorker),
}
2020-02-02 14:40:07 +08:00
TaskPoll.Add(maxWorker)
util.Log().Info("初始化任务队列WorkerNum = %d", maxWorker)
2019-12-12 11:33:41 +08:00
2020-02-02 14:40:07 +08:00
Resume()
}