为什么需要协程池
超大规模并发的场景下。不加限制的大规模的goroutine可能造成内存暴涨,给及其带来极大的压力,要解决这个问题就是要限制运行的goroutine食量,合理复用,节省资源,具体就是goroutine池化
设计思路
启动服务之时先初始化一个 Goroutine Pool 池,这个 Pool 维护了一个类似栈的 LIFO 队列 ,里面存放负责处理任务的 Worker,然后在 client 端提交 task 到 Pool 中之后,在 Pool 内部,接收 task 之后的核心操作是:
- 检查当前 Worker 队列中是否有可用的 Worker,如果有,取出执行当前的 task;
- 没有可用的 Worker,判断当前在运行的 Worker 是否已超过该 Pool 的容量,如果是,则判断当前是否为非阻塞模式,是则返回nil,否则阻塞等待直到有Worker放回pool中,如果不是则新建一个worker线程处理
具体实现
Pool
Pool 是一个通用的协程池,支持不同类型的任务,亦即每一个任务绑定一个函数提交到池中,批量执行不同类型任务,是一种广义的协程池;项目通过多种Pool类型和统一的poolCommon来支持不同任务,
pool类型
| Pool 类型 |
任务形式 |
Worker 类型 |
通道类型 |
| Pool |
func() |
goWorker |
chan func() |
| PoolWithFunc |
any 参数 |
goWorkerWithFunc |
chan any |
| PoolWithFuncGeneric[T] |
泛型参数 T |
goWorkerWithFuncGeneric |
chan T |
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (p *Pool) Submit(task func()) error { w, err := p.retrieveWorker() if w != nil { w.inputFunc(task) } return err }
pool.Submit(func() { doSomething(42) })
pool.Submit(func() { process(user) })
pool.Submit(func() { obj.Method() })
|
- poolWithFunc:用于批量执行同类任务的协程池,这种pool使用于大批量相同任务的场景,
1 2 3 4 5 6 7 8 9 10 11
| type PoolWithFunc struct { *poolCommon fn func(any) } func (p *PoolWithFunc) Invoke(arg any) error { w, err := p.retrieveWorker() if w != nil { w.inputArg(arg) } return err }
|
Woker收到arg后调用p.fn(arg),在fn里做类型分支
1 2 3 4 5 6 7
| fn := func(arg any) { switch v := arg.(type) { case int: case string: case *Task: v.Execute() } }
|
pool结构体
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| type poolCommon struct {
capacity int32
running int32
lock sync.Locker
workers workerQueue
state int32
cond *sync.Cond
allDone chan struct{}
once *sync.Once
workerCache sync.Pool
waiting int32
now int64
options *Options }
|
capacity: Pool 的容量,也就是开启 worker 数量的上限,每一个 worker 绑定一个 goroutine;
running: 当前正在执行任务的 worker 数量;
lock:保护worker队列和配合sync.Cond保证多goroutine并发访问workers不会产生数据竞争
workers 是一个 slice,用来存放空闲 worker,请求进入 Pool 之后会首先检查 workers 中是否有空闲 worker,若有则取出绑定任务执行,否则判断当前运行的 worker 是否已经达到容量上限,是—阻塞等待,否—新开一个 worker 执行任务;
workerCache:复用worker对象,减少内存分配和GC压力
pool初始化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| func NewPool(size int, options ...Option) (*Pool, error) { pc, err := newPool(size, options...) if err != nil { return nil, err }
pool := &Pool{poolCommon: pc} pool.workerCache.New = func() any { return &goWorker{ pool: pool, task: make(chan func(), workerChanCap), } }
return pool, nil }
func newPool(size int, options ...Option) (*poolCommon, error) { opts := loadOptions(options...) if opts.Logger == nil { opts.Logger = defaultLogger } p := &poolCommon{ capacity: int32(size), allDone: make(chan struct{}), lock: syncx.NewSpinLock(), once: &sync.Once{}, options: opts, } if p.options.PreAlloc { if size == -1 { return nil, ErrInvalidPreAllocSize } p.workers = newWorkerQueue(queueTypeLoopQueue, size) } else { p.workers = newWorkerQueue(queueTypeStack, 0) } p.cond = sync.NewCond(p.lock) p.goPurge() p.goTicktock() return p, nil }
|
提交任务
第一个 if 判断当前 Pool 是否已被关闭,若是则不再接受新任务,否则获取一个 Pool 中可用的 worker,绑定该 task 执行。
1 2 3 4 5 6 7 8 9 10 11
| func (p *Pool) Submit(task func()) error { if p.IsClosed() { return ErrPoolClosed }
w, err := p.retrieveWorker() if w != nil { w.inputFunc(task) } return err }
|
获取worker
从池中获取一个可用的worker,优先复用空闲worker,在容量允许时新建worker,否则阻塞等待或返回
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| func (p *poolCommon) retrieveWorker() (w worker, err error) { p.lock.Lock() retry: if w = p.workers.detach(); w != nil { p.lock.Unlock() return } if capacity := p.Cap(); capacity == -1 || capacity > p.Running() { w = p.workerCache.Get().(worker) w.run() p.lock.Unlock() return } if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) { p.lock.Unlock() return nil, ErrPoolOverload } p.addWaiting(1) p.cond.Wait() p.addWaiting(-1) if p.IsClosed() { p.lock.Unlock() return nil, ErrPoolClosed } goto retry }
|
首先加锁保证后续操作在临界区执行,通过p.workers.detach() 从空闲队列取一个 worker;若取到,解锁并返回。若队列为空且 Cap() == -1(无限制)或 Running() < Cap(),则从 workerCache 取一个 worker、调用 run() 启动,解锁并返回。若开启 Nonblocking,或 MaxBlockingTasks > 0 且当前等待数已达上限,则解锁并返回 nil, ErrPoolOverload,否则调用 p.addWaiting(1) 增加等待计数,p.cond.Wait() 阻塞;被唤醒后 p.addWaiting(-1) 减少等待计数。若池已关闭,解锁并返回 nil, ErrPoolClosed;否则 goto retry 重新尝试获取 worker。被唤醒后不直接返回,而是回到开头重新尝试,因为唤醒时可能仍无可用 worker(例如被 Broadcast 唤醒但其他 goroutine 先抢到 worker)。
Worker队列
1 2 3 4 5 6 7 8
| type workerQueue interface { len() int isEmpty() bool insert(worker) error detach() worker refresh(duration time.Duration) []worker reset() }
|
workerQueue协程池,通过 insert / detach 管理空闲 worker 的分配与回收,让 worker 在队列中等待新任务,而不是频繁创建/销毁 goroutine,refresh清理长期空闲的worker,队列包含两种实现,Stack和LoopQueue
Stack
stack结构采用动态slice存储work,插入到尾部,从尾部detach,实现简单,无容量限制
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| func (ws *workerStack) insert(w worker) error { ws.items = append(ws.items, w) return nil }
func (ws *workerStack) detach() worker { l := ws.len() if l == 0 { return nil } w := ws.items[l-1] ws.items[l-1] = nil ws.items = ws.items[:l-1] return w
|
LoopQueue
采用固定大小items+head/tail环形索引,固定容量,无扩容。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| func (wq *loopQueue) insert(w worker) error { if wq.isFull { return errQueueIsFull } wq.items[wq.tail] = w wq.tail = (wq.tail + 1) % wq.size
if wq.tail == wq.head { wq.isFull = true } return nil } func (wq *loopQueue) detach() worker { if wq.isEmpty() { return nil } w := wq.items[wq.head] wq.items[wq.head] = nil wq.head = (wq.head + 1) % wq.size wq.isFull = false return w }
|
任务执行
Worker 结构
1 2 3 4 5 6 7 8 9
| type goWorker struct { worker
pool *Pool
task chan func()
lastUsed int64 }
|
- pool:所属的pool,用于归还worker。更新计数
- task:任务通道,接收func()类型任务
执行任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| func (w *goWorker) run() { w.pool.addRunning(1) go func() { defer func() { if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() { w.pool.once.Do(func() { close(w.pool.allDone) }) } w.pool.workerCache.Put(w) if p := recover(); p != nil { if ph := w.pool.options.PanicHandler; ph != nil { ph(p) } else { w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack()) } }
w.pool.cond.Signal() }()
for fn := range w.task { if fn == nil { return } fn() if ok := w.pool.revertWorker(w); !ok { return } } }() }
|
- addRunning(1):增加运行中的worker数量
- 启动goroutine,在主循环中从task获取任务
- revertWorker:将worker放回空闲队列,返回false表示池已关闭或者需要缩减,此时退出循环
- defer阶段:减少运行中worker数量,将worker放回sync.Pool复用,环形在retrieveWorker中等待的goroutine
放回空闲队列的worker,其goroutine一直在循环中,阻塞在range w.task上。由submit所在的goroutine调用w.inputFunc往w.task里发任务,worker 的 goroutine 在 for fn := range w.task 上收到 fn,执行fn
workerQueue和workerCache的区别
workerQueue:revertWorker把空闲但仍在运行的worker放回队列,共detach()取出并分配新任务,本质是复用goroutine,避免频繁创建/销毁
workerCache:在goroutine即将退出,放入sync.pool中,回收worker结构体,供之后get复用,减少分配和GC,本质是复用对象内存,不是复用goroutine