back to index

ants

2025/12/10
loading

为什么需要协程池

超大规模并发的场景下。不加限制的大规模的goroutine可能造成内存暴涨,给及其带来极大的压力,要解决这个问题就是要限制运行的goroutine食量,合理复用,节省资源,具体就是goroutine池化

设计思路

启动服务之时先初始化一个 Goroutine Pool 池,这个 Pool 维护了一个类似栈的 LIFO 队列 ,里面存放负责处理任务的 Worker,然后在 client 端提交 task 到 Pool 中之后,在 Pool 内部,接收 task 之后的核心操作是:

  1. 检查当前 Worker 队列中是否有可用的 Worker,如果有,取出执行当前的 task;
  2. 没有可用的 Worker,判断当前在运行的 Worker 是否已超过该 Pool 的容量,如果是,则判断当前是否为非阻塞模式,是则返回nil,否则阻塞等待直到有Worker放回pool中,如果不是则新建一个worker线程处理

具体实现

Pool

Pool 是一个通用的协程池,支持不同类型的任务,亦即每一个任务绑定一个函数提交到池中,批量执行不同类型任务,是一种广义的协程池;项目通过多种Pool类型和统一的poolCommon来支持不同任务,

pool类型

Pool 类型 任务形式 Worker 类型 通道类型
Pool func() goWorker chan func()
PoolWithFunc any 参数 goWorkerWithFunc chan any
PoolWithFuncGeneric[T] 泛型参数 T goWorkerWithFuncGeneric chan T
  • pool不同任务通过闭包统一为func()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (p *Pool) Submit(task func()) error {
w, err := p.retrieveWorker()
if w != nil {
w.inputFunc(task) // task 始终是 func()
}
return err
}

// 任务 A
pool.Submit(func() { doSomething(42) })
// 任务 B
pool.Submit(func() { process(user) })
// 任务 C
pool.Submit(func() { obj.Method() })
  • poolWithFunc:用于批量执行同类任务的协程池,这种pool使用于大批量相同任务的场景,
1
2
3
4
5
6
7
8
9
10
11
type PoolWithFunc struct {
*poolCommon
fn func(any) // 统一处理函数
}
func (p *PoolWithFunc) Invoke(arg any) error {
w, err := p.retrieveWorker()
if w != nil {
w.inputArg(arg) // 传任意类型参数
}
return err
}

Woker收到arg后调用p.fn(arg),在fn里做类型分支

1
2
3
4
5
6
7
fn := func(arg any) {
switch v := arg.(type) {
case int: /* 处理 int */
case string: /* 处理 string */
case *Task: v.Execute()
}
}

pool结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
type poolCommon struct {

capacity int32

running int32

lock sync.Locker

workers workerQueue

state int32

cond *sync.Cond

allDone chan struct{}

once *sync.Once

workerCache sync.Pool

waiting int32

now int64

options *Options
}
  • capacity: Pool 的容量,也就是开启 worker 数量的上限,每一个 worker 绑定一个 goroutine;
  • running: 当前正在执行任务的 worker 数量;
  • lock:保护worker队列和配合sync.Cond保证多goroutine并发访问workers不会产生数据竞争
  • workers 是一个 slice,用来存放空闲 worker,请求进入 Pool 之后会首先检查 workers 中是否有空闲 worker,若有则取出绑定任务执行,否则判断当前运行的 worker 是否已经达到容量上限,是—阻塞等待,否—新开一个 worker 执行任务;
  • workerCache:复用worker对象,减少内存分配和GC压力

pool初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func NewPool(size int, options ...Option) (*Pool, error) {
pc, err := newPool(size, options...)
if err != nil {
return nil, err
}

pool := &Pool{poolCommon: pc}
pool.workerCache.New = func() any {
return &goWorker{
pool: pool,
task: make(chan func(), workerChanCap),
}
}

return pool, nil
}

func newPool(size int, options ...Option) (*poolCommon, error) {
opts := loadOptions(options...)
if opts.Logger == nil {
opts.Logger = defaultLogger
}
p := &poolCommon{
capacity: int32(size),
allDone: make(chan struct{}),
lock: syncx.NewSpinLock(),
once: &sync.Once{},
options: opts,
}
if p.options.PreAlloc {
if size == -1 {
return nil, ErrInvalidPreAllocSize
}
p.workers = newWorkerQueue(queueTypeLoopQueue, size)
} else {
p.workers = newWorkerQueue(queueTypeStack, 0)
}
p.cond = sync.NewCond(p.lock)
p.goPurge() //按照ExpiryDuration 定期清理过期 worker
p.goTicktock()
return p, nil
}

提交任务

第一个 if 判断当前 Pool 是否已被关闭,若是则不再接受新任务,否则获取一个 Pool 中可用的 worker,绑定该 task 执行。

1
2
3
4
5
6
7
8
9
10
11
func (p *Pool) Submit(task func()) error {
if p.IsClosed() {
return ErrPoolClosed
}

w, err := p.retrieveWorker()
if w != nil {
w.inputFunc(task)
}
return err
}

获取worker

从池中获取一个可用的worker,优先复用空闲worker,在容量允许时新建worker,否则阻塞等待或返回

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
func (p *poolCommon) retrieveWorker() (w worker, err error) {
p.lock.Lock()
retry:
if w = p.workers.detach(); w != nil {
p.lock.Unlock()
return
}
if capacity := p.Cap(); capacity == -1 || capacity > p.Running() {
w = p.workerCache.Get().(worker) // 获取对象
w.run() // 启动goroutine
p.lock.Unlock()
return
}
if p.options.Nonblocking || (p.options.MaxBlockingTasks != 0 && p.Waiting() >= p.options.MaxBlockingTasks) {
p.lock.Unlock()
return nil, ErrPoolOverload
}
p.addWaiting(1)
p.cond.Wait()
p.addWaiting(-1)
if p.IsClosed() {
p.lock.Unlock()
return nil, ErrPoolClosed
}
goto retry
}

首先加锁保证后续操作在临界区执行,通过p.workers.detach() 从空闲队列取一个 worker;若取到,解锁并返回。若队列为空且 Cap() == -1(无限制)或 Running() < Cap(),则从 workerCache 取一个 worker、调用 run() 启动,解锁并返回。若开启 Nonblocking,或 MaxBlockingTasks > 0 且当前等待数已达上限,则解锁并返回 nil, ErrPoolOverload,否则调用 p.addWaiting(1) 增加等待计数,p.cond.Wait() 阻塞;被唤醒后 p.addWaiting(-1) 减少等待计数。若池已关闭,解锁并返回 nil, ErrPoolClosed;否则 goto retry 重新尝试获取 worker。被唤醒后不直接返回,而是回到开头重新尝试,因为唤醒时可能仍无可用 worker(例如被 Broadcast 唤醒但其他 goroutine 先抢到 worker)。

Worker队列

1
2
3
4
5
6
7
8
type workerQueue interface {
len() int
isEmpty() bool
insert(worker) error
detach() worker
refresh(duration time.Duration) []worker
reset()
}

workerQueue协程池,通过 insert / detach 管理空闲 worker 的分配与回收,让 worker 在队列中等待新任务,而不是频繁创建/销毁 goroutine,refresh清理长期空闲的worker,队列包含两种实现,Stack和LoopQueue

Stack

stack结构采用动态slice存储work,插入到尾部,从尾部detach,实现简单,无容量限制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func (ws *workerStack) insert(w worker) error {
ws.items = append(ws.items, w)
return nil
}

func (ws *workerStack) detach() worker {
l := ws.len()
if l == 0 {
return nil
}
w := ws.items[l-1]
ws.items[l-1] = nil // avoid memory leaks
ws.items = ws.items[:l-1]
return w

LoopQueue

采用固定大小items+head/tail环形索引,固定容量,无扩容。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (wq *loopQueue) insert(w worker) error {
if wq.isFull {
return errQueueIsFull
}
wq.items[wq.tail] = w
wq.tail = (wq.tail + 1) % wq.size

if wq.tail == wq.head {
wq.isFull = true
}
return nil
}
func (wq *loopQueue) detach() worker {
if wq.isEmpty() {
return nil
}
w := wq.items[wq.head]
wq.items[wq.head] = nil
wq.head = (wq.head + 1) % wq.size
wq.isFull = false
return w
}

任务执行

Worker 结构

1
2
3
4
5
6
7
8
9
type goWorker struct {
worker

pool *Pool

task chan func()

lastUsed int64
}
  • pool:所属的pool,用于归还worker。更新计数
  • task:任务通道,接收func()类型任务

执行任务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
func (w *goWorker) run() {
w.pool.addRunning(1)
go func() {
defer func() {
if w.pool.addRunning(-1) == 0 && w.pool.IsClosed() {
w.pool.once.Do(func() {
close(w.pool.allDone)
})
}
w.pool.workerCache.Put(w)
if p := recover(); p != nil {
if ph := w.pool.options.PanicHandler; ph != nil {
ph(p)
} else {
w.pool.options.Logger.Printf("worker exits from panic: %v\n%s\n", p, debug.Stack())
}
}

w.pool.cond.Signal()
}()

for fn := range w.task {
if fn == nil {
return
}
fn()
if ok := w.pool.revertWorker(w); !ok {
return
}
}
}()
}
  1. addRunning(1):增加运行中的worker数量
  2. 启动goroutine,在主循环中从task获取任务
  3. revertWorker:将worker放回空闲队列,返回false表示池已关闭或者需要缩减,此时退出循环
  4. defer阶段:减少运行中worker数量,将worker放回sync.Pool复用,环形在retrieveWorker中等待的goroutine

放回空闲队列的worker,其goroutine一直在循环中,阻塞在range w.task上。由submit所在的goroutine调用w.inputFunc往w.task里发任务,worker 的 goroutine 在 for fn := range w.task 上收到 fn,执行fn

workerQueue和workerCache的区别

workerQueue:revertWorker把空闲但仍在运行的worker放回队列,共detach()取出并分配新任务,本质是复用goroutine,避免频繁创建/销毁

workerCache:在goroutine即将退出,放入sync.pool中,回收worker结构体,供之后get复用,减少分配和GC,本质是复用对象内存,不是复用goroutine

CATALOG
  1. 1. 为什么需要协程池
  2. 2. 设计思路
  3. 3. 具体实现
    1. 3.1. Pool
      1. 3.1.1. pool类型
      2. 3.1.2. pool结构体
      3. 3.1.3. pool初始化
    2. 3.2. 提交任务
    3. 3.3. 获取worker
    4. 3.4. Worker队列
      1. 3.4.1. Stack
      2. 3.4.2. LoopQueue
    5. 3.5. 任务执行
      1. 3.5.1. Worker 结构
      2. 3.5.2. 执行任务
    6. 3.6. workerQueue和workerCache的区别