总结:worker pool (工作池) – Goroutine池
本质上是生产者消费者模型。
可以有效控制goroutine数量,防止暴涨。
以下是内容解释:
1、为什么需要 Goroutine 池?
试想这样一种情况,如果给每个客户端都分配一个 session 来维护连接,然后每个 session 会启动3个 goroutine,一个用来读取消息,一个用来发送消息,一个用来检测生命周期。那么,当大量客户端连接时,那 goroutine 数量就是3倍客户端的数量啊。即使 goroutine 再轻量,几百万的客户都涌上来的时候,内存也绷不住啊。
golang 号称可以百万级别并发,但 goroutine 也不应该无限制的创建吧,毕竟每次都向系统申请内存,系统内存总有耗尽的一天吧。
那有没有一个池一样的东西,可以让 goroutine 可以重用,而不需要不节制的创建呢?
也就是说,有什么方案可以减缓大规模 Goroutine 对系统的调度和内存压力?要想解决问题,最重要的是找到造成问题的根源,这个问题根源是什么?Goroutine 的数量过多导致资源侵占,那要解决这个问题就要限制运行的 Goroutine 数量,合理复用,节省资源,具体就是 — Goroutine池化。
这就诞生了 Goroutine 池的概念。
Goroutine 池不是 golang 官方给出的一个概念,而是程序员在实践中发现存在上述问题而给出的一种用于解决使用 goroutine 时所产生的实际问题的思路。
试想,Goroutine 池中预先保存一定数量的 Goroutine ,而新任务将不再以创建新 Goroutine 的方式去执行,而是将任务发布到任务队列,Goroutine 池中的 Goroutine 不断的从任务队列中取出任务并执行,可以有效的减少 Goroutine 创建和销毁所带来的开销。
总结一下为什么要实现 Goroutine 池:
即便每个goroutine只分配4KB的内存,但如果是恐怖如斯的数量,聚少成多,内存会占用过高。
会对GC造成极大的负担,首先GC会在回收 goroutine 上消耗性能,其次GC本身也是 goroutine ,内存吃紧的状态下连GC的调度都会出现问题。
提高响应速度,减少创建协程的时间。
更好的管理协程,控制最大并发数量,定期回收。
2、Goroutine 池是什么?
Goroutine 池是一个池子,里面有一些 Goroutine 。
这个池子有一个最大容量,其内部的 Goroutine 数量不能超过其最大容量。
可以将池子中的每个 Goroutine 看作是一个 worker ,用于执行任务。
更准确的说,Goroutine 池是一个架构。该架构由两部分组成:
- 一个池子,里面有一些 Goroutine 。
- 一个任务队列,里面放着给池子里的 Goroutine 执行的任务。
新来了一个任务,如果池子存满了 Goroutine ,而且它们都在工作,那么就将该任务放入任务队列,等待被处理;
如果池子没满,就新开一个 Goroutine 去处理该任务。
3、怎么实现 Goroutine 池?
3.1、Goroutine 池只是一个抽象的概念
Golang 没有封装好的线程池。
Goroutine 池只是一个概念,需要我们自己写代码时有意识地实现 Goroutine 池。
3.2、Goroutine 池的设计思路
- 启动服务的时候初始化一个 Goroutine Pool,这个协程池维护了 任务的管道 和 worker(也就是 Goroutine)。
- 外部将请求投递到 Goroutine Pool,Goroutine Pool 的操作是:判断当前运行的 worker 是否已经超过 Pool 的容量,如果超过就将请求放到任务管道中直到运行的 worker 将管道中的任务执行;如果没有超过就新开一个 worker 处理。
3.3、生产者消费者模型
在这个 投递 —> 等待 —> 执行 的过程中,我们很容易想到生产者消费者模型:
生产者 –(生产任务)–> 队列 –(消费任务)–> 消费者
实际上,用来执行任务的 goroutine 就是消费者,操作任务池的 goroutine 就是生产者, 而队列则可以使用 go 的 buffer channel,至此,任务池的建模到此结束。
4、一个实现 Goroutine 池的实例
需求:
计算一个数字的各个位数之和,例如:数字123,结果为1+2+3=6。随机生成数字进行计算。
代码示例:
package main
import (
"fmt"
"math/rand"
)
type Job struct {
// id
Id int
// 需要计算的随机数
RandNum int
}
type Result struct {
// 这里必须传对象实例
job *Job
// 求和
sum int
}
// Goroutine池
func main(){
// 需要2个管道
// 1.job管道
jobChan := make(chan *Job, 128)
// 2.结果管道
resultChan := make(chan *Result, 128)
// 3.创建工作池
createPool(64, jobChan, resultChan)
// 4.开个打印的协程
go func(resultChan chan *Result) {
// 遍历结果管道打印
for result := range resultChan {
fmt.Printf("job id:%v randnum:%v result:%d\n", result.job.Id,
result.job.RandNum, result.sum)
}
}(resultChan)
var id int
// 循环创建job,输入到管道
for {
id++
// 生成随机数
r_num := rand.Int()
job := &Job{
Id: id,
RandNum: r_num,
}
jobChan <- job
}
}
// 创建工作池
// 参数1:开几个协程
func createPool(num int, jobChan chan *Job, resultChan chan *Result) {
// 根据开协程个数,去跑运行
for i := 0; i < num; i++ {
go func(jobChan chan *Job, resultChan chan *Result, i int) {
// 执行运算
// 遍历job管道所有数据,进行相加
for job := range jobChan {
// 随机数接过来
r_num := job.RandNum
// 随机数每一位相加
// 定义返回值
var sum int
for r_num != 0 {
tmp := r_num % 10
sum += tmp
r_num /= 10
}
// 想要的结果是Result
r := &Result{
job: job,
sum: sum,
}
//运算结果扔到管道
resultChan <- r
}
}(jobChan, resultChan, i)
}
}
输出结果的一部分:
解释:
如上图,本程序由两个协程、一个协程池、两个管道构成。
job 协程不断产生任务,然后放入 jobchan 任务队列;协程池中有64个协程,不断地从任务队列 jobchan 中取任务来执行,将结果放入 resultchan 任务队列;printf 协程从 resultchan 任务队列中取出结果来进行打印。
其中,我们可以看到两对生产者消费者:job 协程和协程池是一对生产者消费者,协程池和printf 协程也是一对生产者消费者。
说明:
在这个例子中,两个管道和协程池可以理解为 “Goroutine 池”。
它的作用是一直在用有限的 Goroutine 数量进行一项无限的工作。
试想,如果没有这两个管道和协程池:
job 协程一直在产生任务,来一个任务就要创建一个协程计算它…那将是无数个协程…
5、一个 Goroutine 池的实践 —— ants
项目地址:ants
最后编辑:joker.liu 更新时间:2023-04-21 14:33