协程池
WaitGroup
WaitGroup
Golang中goroutine
和channel
的一个非常重要的应用就是构建协程池
。虽然Golang可以创建成千上万的协程,但是协程池能提升协程的利用效率,避免协程的频繁创建、销毁开销。为了创建协程池,我们先来看下WaitGroup
,因为后面创建协程池会用到WaitGroup
。
WaitGroup
可用来等待一组goroutines
完成执行,有点类似于Node.js
中的Promise.all
。假设我们在主协程中创建了三个并发的子协程,我们可能需要在主协程中等待这三个子协程完成执行,这时就可以使用WaitGroup
来完成。下面我们通过一个例子来看下。
package main
import (
"fmt"
"sync"
"time"
)
func process(i int, wg *sync.WaitGroup) {
fmt.Println("started Goroutine ", i)
time.Sleep(2 * time.Second)
fmt.Printf("Goroutine %d ended\n", i)
wg.Done()
}
func main() {
no := 3
var wg sync.WaitGroup
for i := 0; i < no; i++ {
wg.Add(1)
go process(i, &wg)
}
wg.Wait()
fmt.Println("All go routines finished executing")
}
WaitGroup
是一个结构体类型,这里我们声明了一个WaitGroup
类型的变量wg
。WaitGroup
原理是使用一个计数器。我们每创建一个协程,都调用wg.Add(1)
将WaitGroup
的计数器加1。我们可以通过wg.Done()
方法来将计数器减1。wg.Wait()
会阻塞主协程的执行,直到计数器减为0。该程序中创建了3个协程,并且将计数器增加到了3,每个协程执行完时调用wg.Done()
将计数器减1,因此wgWait()
会阻塞主协程,直到3个子协程都执行完毕。
这里有一点需要注意,我们是将wg
的指针传递给了每个协程,如果我们不传递指针而是传递值,那么每个子协程都会生成自己的拷贝,就无法通知到主协程的wg
。
该程序执行结果如下,由于协程池是异步的,因此不是稳定输出。
started Goroutine 2
started Goroutine 0
started Goroutine 1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
实现协程池
现在我们基于WaitGroup
和带缓存channel
来实现一个协程池。
协程池指的是一组随时等待分配任务的协程。协程池中的协程每接到一个任务就去执行,执行完某个任务后又继续等待分配下一个任务。
本节我们将实现的协程池比较简单,用来计算数字各位的和,比如输入为234,输出为9(2 + 3 + 4)。主要包括如下几点功能:
创建一个带缓存
channel
,用来写入要执行的任务,该channel
就是后面的jobs
。创建协程池来读取
jobs
中的任务。每个任务执行完毕后将执行结果写入一个带缓存
channel
,该channel
就是后面的results
。从
result
channel
中读取并打印结果。
首先我们来创建两个结构体类型,分别表示待执行的任务和执行结果:
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
结构体类型Job
含有两个属性,id
和randomno
。randomno
用来表示待执行的任务输入。
结构体类型Result
含有两个属性,job
和sumofdigits
。sumofdigits
用来存储计算结果。
下面我们创建两个带缓存channel
,jobs
和results
。其中jobs
用来写入和读取任务,results
用来写入各个任务的计算结果。
var jobs = make(chan Job, 10)
var results = mak(chan Result, 10)
协程池中的每个协程监听jobs channel
,从该channel
中读取任务并执行,执行完成后将结果写入results channel
。该流程可参考下图:
函数digits
用来完整每个任务的计算,即计算一个数组各位数字之和。这里我们让计算过程暂停2秒以模拟真实应用的计算场景。
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
函数worker
用来创建协程池中的每个协程。
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
worker
函数用来创建协程池中的一个协程,该协程内部不断的去jobs channel
中读取数据并调用digits
完成计算结果,然后写入带缓存的results channel
。当jobs channel
中所有任务都执行完毕后,即为空后,调用wg.Done()
。
函数createWorkerPool
用来创建整个协程池。
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
createWorkerPool
函数的入参为协程池中协程的协程个数。这里每创建一个协程,都调用wg.Add(1)
将计数器加1。创建完所有协程后进入阻塞状态,等待所有协程完成任务并将计算结果写入results channel
。接着调用close(results)
来关闭results channel
。这便创建完了协程池。
下面我们通过函数allocate
来创建多个任务。
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
函数allocate
创建的任务写入带缓存channel
,完成所有的任务写入后关闭该channel
。
接着我们创建一个函数来读取results channel
中的计算结果并打印:
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
最后我们创建main
函数来将所有操作串起来。
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
这里需要留意一点,我们通过time
来计算整个程序的执行时间。后面我们会修改协程池的大小来观察所有任务总体的完成时间。这里我们分配了100
个任务,并在协程池中创建了10
个协程。
完整程序为:
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
type Job struct {
id int
randomno int
}
type Result struct {
job Job
sumofdigits int
}
var jobs = make(chan Job, 10)
var results = make(chan Result, 10)
func digits(number int) int {
sum := 0
no := number
for no != 0 {
digit := no % 10
sum += digit
no /= 10
}
time.Sleep(2 * time.Second)
return sum
}
func worker(wg *sync.WaitGroup) {
for job := range jobs {
output := Result{job, digits(job.randomno)}
results <- output
}
wg.Done()
}
func createWorkerPool(noOfWorkers int) {
var wg sync.WaitGroup
for i := 0; i < noOfWorkers; i++ {
wg.Add(1)
go worker(&wg)
}
wg.Wait()
close(results)
}
func allocate(noOfJobs int) {
for i := 0; i < noOfJobs; i++ {
randomno := rand.Intn(999)
job := Job{i, randomno}
jobs <- job
}
close(jobs)
}
func result(done chan bool) {
for result := range results {
fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
}
done <- true
}
func main() {
startTime := time.Now()
noOfJobs := 100
go allocate(noOfJobs)
done := make(chan bool)
go result(done)
noOfWorkers := 10
createWorkerPool(noOfWorkers)
<-done
endTime := time.Now()
diff := endTime.Sub(startTime)
fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
在我的机器上执行结果如下:
Job id 2, input random no 407 , sum of digits 11
Job id 0, input random no 878 , sum of digits 23
Job id 9, input random no 150 , sum of digits 6
...
total time taken 20.022653316 seconds
由于我们分配了100个任务,因此最终会打印100行结果,以及一行执行时间。由于协程池中的所有协程执行并不保证先后顺序,因此这100行打印结果输出肯定不稳定,另外由于机器的差异,执行时间也不一定是这个数值。
前面我们创建的协程池的大小为10,即共有10个协程可用。现在我们修改为20重新执行,会发现花费的时间减少了一半:
...
total time taken 10.016740997 seconds
因此,协程池中的协程越多,所有任务执行完成花费的时间也就越少,这其实非常容易理解,我们可以把协程理解成要完成某个项目的工人,肯定工位越多,花费的时间越低嘛。
Last updated
Was this helpful?