Cloudreve/pkg/task/pool.go
2019-12-13 09:46:53 +08:00

124 lines
2.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package task
import "sync"
// Pool 带有最大配额的goroutines任务池
type Pool struct {
// 容量
capacity int
// 初始容量
initialCapacity int
// 终止信号
terminateSignal chan error
// 全部任务完成的信号
finishSignal chan bool
// 有空余位置的信号
freeSignal chan bool
// 是否已关闭
closed bool
// 是否正在等待任务结束
waiting bool
// 互斥锁
lock sync.Mutex
// 等待队列
pending []Job
}
// Job 任务
type Job interface {
// 任务处理方法如果error不为nil
// 任务池会关闭并中止接受新任务
Do() error
}
// NewGoroutinePool 创建一个容量为capacity的任务池
func NewGoroutinePool(capacity int) *Pool {
pool := &Pool{
capacity: capacity,
initialCapacity: capacity,
terminateSignal: make(chan error),
finishSignal: make(chan bool),
freeSignal: make(chan bool),
}
go pool.Schedule()
return pool
}
// Schedule 为等待队列的任务分配Worker以及检测错误状态、所有任务完成
func (pool *Pool) Schedule() {
for {
select {
case <-pool.freeSignal:
// 有新的空余名额
pool.lock.Lock()
if len(pool.pending) > 0 {
// 有待处理的任务,开始处理
var job Job
job, pool.pending = pool.pending[0], pool.pending[1:]
go pool.start(job)
} else {
if pool.waiting && pool.capacity == pool.initialCapacity {
// 所有任务已结束
pool.lock.Unlock()
pool.finishSignal <- true
return
}
pool.lock.Unlock()
}
case <-pool.terminateSignal:
// 有任务意外中止,则发送完成信号
pool.finishSignal <- true
return
}
}
}
// Wait 等待队列中所有任务完成或有Job返回错误中止
func (pool *Pool) Wait() chan bool {
pool.lock.Lock()
pool.waiting = true
pool.lock.Unlock()
return pool.finishSignal
}
// Submit 提交新任务
func (pool *Pool) Submit(job Job) {
if pool.closed {
return
}
pool.lock.Lock()
if pool.capacity < 1 {
// 容量为空时,加入等待队列
pool.pending = append(pool.pending, job)
pool.lock.Unlock()
return
}
// 还有空闲容量时,开始执行任务
go pool.start(job)
}
// 开始执行任务
func (pool *Pool) start(job Job) {
pool.capacity--
pool.lock.Unlock()
err := job.Do()
if err != nil {
pool.closed = true
select {
case <-pool.terminateSignal:
default:
close(pool.terminateSignal)
}
}
pool.lock.Lock()
pool.capacity++
pool.lock.Unlock()
pool.freeSignal <- true
}