# 协程池

#### `WaitGroup`

Golang中`goroutine`和`channel`的一个非常重要的应用就是构建`协程池`。虽然Golang可以创建成千上万的协程，但是协程池能提升协程的利用效率，避免协程的频繁创建、销毁开销。为了创建协程池，我们先来看下`WaitGroup`，因为后面创建协程池会用到`WaitGroup`。

`WaitGroup`可用来等待一组`goroutines`完成执行，有点类似于`Node.js`中的`Promise.all`。假设我们在主协程中创建了三个并发的子协程，我们可能需要在主协程中等待这三个子协程完成执行，这时就可以使用`WaitGroup`来完成。下面我们通过一个例子来看下。

```go
package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}
```

`WaitGroup`是一个结构体类型，这里我们声明了一个`WaitGroup`类型的变量`wg`。`WaitGroup`原理是使用一个计数器。我们每创建一个协程，都调用`wg.Add(1)`将`WaitGroup`的计数器加1。我们可以通过`wg.Done()`方法来将计数器减1。`wg.Wait()`会阻塞主协程的执行，直到计数器减为0。该程序中创建了3个协程，并且将计数器增加到了3，每个协程执行完时调用`wg.Done()`将计数器减1，因此`wgWait()`会阻塞主协程，直到3个子协程都执行完毕。

**这里有一点需要注意，我们是将`wg`的指针传递给了每个协程，如果我们不传递指针而是传递值，那么每个子协程都会生成自己的拷贝，就无法通知到主协程的`wg`。**

该程序执行结果如下，由于协程池是异步的，因此不是稳定输出。

```
started Goroutine  2
started Goroutine  0
started Goroutine  1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
```

#### 实现协程池

现在我们基于`WaitGroup`和带缓存`channel`来实现一个协程池。

协程池指的是一组随时等待分配任务的协程。协程池中的协程每接到一个任务就去执行，执行完某个任务后又继续等待分配下一个任务。

本节我们将实现的协程池比较简单，用来计算数字各位的和，比如输入为234，输出为9(2 + 3 + 4)。主要包括如下几点功能：

* 创建一个带缓存`channel`，用来写入要执行的任务，该`channel`就是后面的`jobs`。
* 创建协程池来读取`jobs`中的任务。
* 每个任务执行完毕后将执行结果写入一个带缓存`channel`，该`channel`就是后面的`results`。
* 从`result` `channel`中读取并打印结果。

首先我们来创建两个结构体类型，分别表示待执行的任务和执行结果：

```go
type Job struct {
    id int
    randomno int
}

type Result struct {
    job Job
    sumofdigits int
}
```

结构体类型`Job`含有两个属性，`id`和`randomno`。`randomno`用来表示待执行的任务输入。

结构体类型`Result`含有两个属性，`job`和`sumofdigits`。`sumofdigits`用来存储计算结果。

下面我们创建两个带缓存`channel`，`jobs`和`results`。其中`jobs`用来写入和读取任务，`results`用来写入各个任务的计算结果。

```go
var jobs = make(chan Job, 10)
var results = mak(chan Result, 10)
```

协程池中的每个协程监听`jobs channel`，从该`channel`中读取任务并执行，执行完成后将结果写入`results channel`。该流程可参考下图：&#x20;

![](https://1527815414-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-L_XyF-PcxCv1bX8ii73%2F-Lgwgs4Seb8N-CU9NYEc%2F-LgwhFA28Kqi2_sFGO6G%2F4.png?alt=media\&token=658fc693-de51-4c92-b91a-af4fa4bea9ca)

函数`digits`用来完整每个任务的计算，即计算一个数组各位数字之和。这里我们让计算过程暂停2秒以模拟真实应用的计算场景。

```go
func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
```

函数`worker`用来创建协程池中的每个协程。

```go
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
```

`worker`函数用来创建协程池中的一个协程，该协程内部不断的去`jobs channel`中读取数据并调用`digits`完成计算结果，然后写入带缓存的`results channel`。当`jobs channel`中所有任务都执行完毕后，即为空后，调用`wg.Done()`。

函数`createWorkerPool`用来创建整个协程池。

```go
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
```

`createWorkerPool`函数的入参为协程池中协程的协程个数。这里每创建一个协程，都调用`wg.Add(1)`将计数器加1。创建完所有协程后进入阻塞状态，等待所有协程完成任务并将计算结果写入`results channel`。接着调用`close(results)`来关闭`results channel`。这便创建完了协程池。

下面我们通过函数`allocate`来创建多个任务。

```go
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
```

函数`allocate`创建的任务写入带缓存`channel`，完成所有的任务写入后关闭该`channel`。

接着我们创建一个函数来读取`results channel`中的计算结果并打印：

```go
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
```

最后我们创建`main`函数来将所有操作串起来。

```go
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
```

这里需要留意一点，我们通过`time`来计算整个程序的执行时间。后面我们会修改协程池的大小来观察所有任务总体的完成时间。这里我们分配了`100`个任务，并在协程池中创建了`10`个协程。

完整程序为：

```go
package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
```

在我的机器上执行结果如下：

```
Job id 2, input random no 407 , sum of digits 11
Job id 0, input random no 878 , sum of digits 23
Job id 9, input random no 150 , sum of digits 6
...
total time taken  20.022653316 seconds
```

由于我们分配了100个任务，因此最终会打印100行结果，以及一行执行时间。由于协程池中的所有协程执行并不保证先后顺序，因此这100行打印结果输出肯定不稳定，另外由于机器的差异，执行时间也不一定是这个数值。

前面我们创建的协程池的大小为10，即共有10个协程可用。现在我们修改为20重新执行，会发现花费的时间减少了一半：

```
...
total time taken  10.016740997 seconds
```

因此，协程池中的协程越多，所有任务执行完成花费的时间也就越少，这其实非常容易理解，我们可以把协程理解成要完成某个项目的工人，肯定工位越多，花费的时间越低嘛。
