协程池

WaitGroup

Golang中goroutinechannel的一个非常重要的应用就是构建协程池。虽然Golang可以创建成千上万的协程,但是协程池能提升协程的利用效率,避免协程的频繁创建、销毁开销。为了创建协程池,我们先来看下WaitGroup,因为后面创建协程池会用到WaitGroup

WaitGroup可用来等待一组goroutines完成执行,有点类似于Node.js中的Promise.all。假设我们在主协程中创建了三个并发的子协程,我们可能需要在主协程中等待这三个子协程完成执行,这时就可以使用WaitGroup来完成。下面我们通过一个例子来看下。

package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}

WaitGroup是一个结构体类型,这里我们声明了一个WaitGroup类型的变量wgWaitGroup原理是使用一个计数器。我们每创建一个协程,都调用wg.Add(1)WaitGroup的计数器加1。我们可以通过wg.Done()方法来将计数器减1。wg.Wait()会阻塞主协程的执行,直到计数器减为0。该程序中创建了3个协程,并且将计数器增加到了3,每个协程执行完时调用wg.Done()将计数器减1,因此wgWait()会阻塞主协程,直到3个子协程都执行完毕。

这里有一点需要注意,我们是将wg的指针传递给了每个协程,如果我们不传递指针而是传递值,那么每个子协程都会生成自己的拷贝,就无法通知到主协程的wg

该程序执行结果如下,由于协程池是异步的,因此不是稳定输出。

started Goroutine  2
started Goroutine  0
started Goroutine  1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended

实现协程池

现在我们基于WaitGroup和带缓存channel来实现一个协程池。

协程池指的是一组随时等待分配任务的协程。协程池中的协程每接到一个任务就去执行,执行完某个任务后又继续等待分配下一个任务。

本节我们将实现的协程池比较简单,用来计算数字各位的和,比如输入为234,输出为9(2 + 3 + 4)。主要包括如下几点功能:

  • 创建一个带缓存channel,用来写入要执行的任务,该channel就是后面的jobs

  • 创建协程池来读取jobs中的任务。

  • 每个任务执行完毕后将执行结果写入一个带缓存channel,该channel就是后面的results

  • result channel中读取并打印结果。

首先我们来创建两个结构体类型,分别表示待执行的任务和执行结果:

type Job struct {
    id int
    randomno int
}

type Result struct {
    job Job
    sumofdigits int
}

结构体类型Job含有两个属性,idrandomnorandomno用来表示待执行的任务输入。

结构体类型Result含有两个属性,jobsumofdigitssumofdigits用来存储计算结果。

下面我们创建两个带缓存channeljobsresults。其中jobs用来写入和读取任务,results用来写入各个任务的计算结果。

var jobs = make(chan Job, 10)
var results = mak(chan Result, 10)

协程池中的每个协程监听jobs channel,从该channel中读取任务并执行,执行完成后将结果写入results channel。该流程可参考下图:

函数digits用来完整每个任务的计算,即计算一个数组各位数字之和。这里我们让计算过程暂停2秒以模拟真实应用的计算场景。

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}

函数worker用来创建协程池中的每个协程。

func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}

worker函数用来创建协程池中的一个协程,该协程内部不断的去jobs channel中读取数据并调用digits完成计算结果,然后写入带缓存的results channel。当jobs channel中所有任务都执行完毕后,即为空后,调用wg.Done()

函数createWorkerPool用来创建整个协程池。

func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}

createWorkerPool函数的入参为协程池中协程的协程个数。这里每创建一个协程,都调用wg.Add(1)将计数器加1。创建完所有协程后进入阻塞状态,等待所有协程完成任务并将计算结果写入results channel。接着调用close(results)来关闭results channel。这便创建完了协程池。

下面我们通过函数allocate来创建多个任务。

func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}

函数allocate创建的任务写入带缓存channel,完成所有的任务写入后关闭该channel

接着我们创建一个函数来读取results channel中的计算结果并打印:

func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}

最后我们创建main函数来将所有操作串起来。

func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

这里需要留意一点,我们通过time来计算整个程序的执行时间。后面我们会修改协程池的大小来观察所有任务总体的完成时间。这里我们分配了100个任务,并在协程池中创建了10个协程。

完整程序为:

package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}

在我的机器上执行结果如下:

Job id 2, input random no 407 , sum of digits 11
Job id 0, input random no 878 , sum of digits 23
Job id 9, input random no 150 , sum of digits 6
...
total time taken  20.022653316 seconds

由于我们分配了100个任务,因此最终会打印100行结果,以及一行执行时间。由于协程池中的所有协程执行并不保证先后顺序,因此这100行打印结果输出肯定不稳定,另外由于机器的差异,执行时间也不一定是这个数值。

前面我们创建的协程池的大小为10,即共有10个协程可用。现在我们修改为20重新执行,会发现花费的时间减少了一半:

...
total time taken  10.016740997 seconds

因此,协程池中的协程越多,所有任务执行完成花费的时间也就越少,这其实非常容易理解,我们可以把协程理解成要完成某个项目的工人,肯定工位越多,花费的时间越低嘛。

Last updated