# 协程池

#### `WaitGroup`

Golang中`goroutine`和`channel`的一个非常重要的应用就是构建`协程池`。虽然Golang可以创建成千上万的协程，但是协程池能提升协程的利用效率，避免协程的频繁创建、销毁开销。为了创建协程池，我们先来看下`WaitGroup`，因为后面创建协程池会用到`WaitGroup`。

`WaitGroup`可用来等待一组`goroutines`完成执行，有点类似于`Node.js`中的`Promise.all`。假设我们在主协程中创建了三个并发的子协程，我们可能需要在主协程中等待这三个子协程完成执行，这时就可以使用`WaitGroup`来完成。下面我们通过一个例子来看下。

```go
package main

import (
    "fmt"
    "sync"
    "time"
)

func process(i int, wg *sync.WaitGroup) {
    fmt.Println("started Goroutine ", i)
    time.Sleep(2 * time.Second)
    fmt.Printf("Goroutine %d ended\n", i)
    wg.Done()
}

func main() {
    no := 3
    var wg sync.WaitGroup
    for i := 0; i < no; i++ {
        wg.Add(1)
        go process(i, &wg)
    }
    wg.Wait()
    fmt.Println("All go routines finished executing")
}
```

`WaitGroup`是一个结构体类型，这里我们声明了一个`WaitGroup`类型的变量`wg`。`WaitGroup`原理是使用一个计数器。我们每创建一个协程，都调用`wg.Add(1)`将`WaitGroup`的计数器加1。我们可以通过`wg.Done()`方法来将计数器减1。`wg.Wait()`会阻塞主协程的执行，直到计数器减为0。该程序中创建了3个协程，并且将计数器增加到了3，每个协程执行完时调用`wg.Done()`将计数器减1，因此`wgWait()`会阻塞主协程，直到3个子协程都执行完毕。

**这里有一点需要注意，我们是将`wg`的指针传递给了每个协程，如果我们不传递指针而是传递值，那么每个子协程都会生成自己的拷贝，就无法通知到主协程的`wg`。**

该程序执行结果如下，由于协程池是异步的，因此不是稳定输出。

```
started Goroutine  2
started Goroutine  0
started Goroutine  1
Goroutine 0 ended
Goroutine 2 ended
Goroutine 1 ended
```

#### 实现协程池

现在我们基于`WaitGroup`和带缓存`channel`来实现一个协程池。

协程池指的是一组随时等待分配任务的协程。协程池中的协程每接到一个任务就去执行，执行完某个任务后又继续等待分配下一个任务。

本节我们将实现的协程池比较简单，用来计算数字各位的和，比如输入为234，输出为9(2 + 3 + 4)。主要包括如下几点功能：

* 创建一个带缓存`channel`，用来写入要执行的任务，该`channel`就是后面的`jobs`。
* 创建协程池来读取`jobs`中的任务。
* 每个任务执行完毕后将执行结果写入一个带缓存`channel`，该`channel`就是后面的`results`。
* 从`result` `channel`中读取并打印结果。

首先我们来创建两个结构体类型，分别表示待执行的任务和执行结果：

```go
type Job struct {
    id int
    randomno int
}

type Result struct {
    job Job
    sumofdigits int
}
```

结构体类型`Job`含有两个属性，`id`和`randomno`。`randomno`用来表示待执行的任务输入。

结构体类型`Result`含有两个属性，`job`和`sumofdigits`。`sumofdigits`用来存储计算结果。

下面我们创建两个带缓存`channel`，`jobs`和`results`。其中`jobs`用来写入和读取任务，`results`用来写入各个任务的计算结果。

```go
var jobs = make(chan Job, 10)
var results = mak(chan Result, 10)
```

协程池中的每个协程监听`jobs channel`，从该`channel`中读取任务并执行，执行完成后将结果写入`results channel`。该流程可参考下图：&#x20;

![](https://1527815414-files.gitbook.io/~/files/v0/b/gitbook-legacy-files/o/assets%2F-L_XyF-PcxCv1bX8ii73%2F-Lgwgs4Seb8N-CU9NYEc%2F-LgwhFA28Kqi2_sFGO6G%2F4.png?alt=media\&token=658fc693-de51-4c92-b91a-af4fa4bea9ca)

函数`digits`用来完整每个任务的计算，即计算一个数组各位数字之和。这里我们让计算过程暂停2秒以模拟真实应用的计算场景。

```go
func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
```

函数`worker`用来创建协程池中的每个协程。

```go
func worker(wg *sync.WaitGroup) {
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
```

`worker`函数用来创建协程池中的一个协程，该协程内部不断的去`jobs channel`中读取数据并调用`digits`完成计算结果，然后写入带缓存的`results channel`。当`jobs channel`中所有任务都执行完毕后，即为空后，调用`wg.Done()`。

函数`createWorkerPool`用来创建整个协程池。

```go
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
```

`createWorkerPool`函数的入参为协程池中协程的协程个数。这里每创建一个协程，都调用`wg.Add(1)`将计数器加1。创建完所有协程后进入阻塞状态，等待所有协程完成任务并将计算结果写入`results channel`。接着调用`close(results)`来关闭`results channel`。这便创建完了协程池。

下面我们通过函数`allocate`来创建多个任务。

```go
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
```

函数`allocate`创建的任务写入带缓存`channel`，完成所有的任务写入后关闭该`channel`。

接着我们创建一个函数来读取`results channel`中的计算结果并打印：

```go
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
```

最后我们创建`main`函数来将所有操作串起来。

```go
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
```

这里需要留意一点，我们通过`time`来计算整个程序的执行时间。后面我们会修改协程池的大小来观察所有任务总体的完成时间。这里我们分配了`100`个任务，并在协程池中创建了`10`个协程。

完整程序为：

```go
package main

import (  
    "fmt"
    "math/rand"
    "sync"
    "time"
)

type Job struct {  
    id       int
    randomno int
}
type Result struct {  
    job         Job
    sumofdigits int
}

var jobs = make(chan Job, 10)  
var results = make(chan Result, 10)

func digits(number int) int {  
    sum := 0
    no := number
    for no != 0 {
        digit := no % 10
        sum += digit
        no /= 10
    }
    time.Sleep(2 * time.Second)
    return sum
}
func worker(wg *sync.WaitGroup) {  
    for job := range jobs {
        output := Result{job, digits(job.randomno)}
        results <- output
    }
    wg.Done()
}
func createWorkerPool(noOfWorkers int) {  
    var wg sync.WaitGroup
    for i := 0; i < noOfWorkers; i++ {
        wg.Add(1)
        go worker(&wg)
    }
    wg.Wait()
    close(results)
}
func allocate(noOfJobs int) {  
    for i := 0; i < noOfJobs; i++ {
        randomno := rand.Intn(999)
        job := Job{i, randomno}
        jobs <- job
    }
    close(jobs)
}
func result(done chan bool) {  
    for result := range results {
        fmt.Printf("Job id %d, input random no %d , sum of digits %d\n", result.job.id, result.job.randomno, result.sumofdigits)
    }
    done <- true
}
func main() {  
    startTime := time.Now()
    noOfJobs := 100
    go allocate(noOfJobs)
    done := make(chan bool)
    go result(done)
    noOfWorkers := 10
    createWorkerPool(noOfWorkers)
    <-done
    endTime := time.Now()
    diff := endTime.Sub(startTime)
    fmt.Println("total time taken ", diff.Seconds(), "seconds")
}
```

在我的机器上执行结果如下：

```
Job id 2, input random no 407 , sum of digits 11
Job id 0, input random no 878 , sum of digits 23
Job id 9, input random no 150 , sum of digits 6
...
total time taken  20.022653316 seconds
```

由于我们分配了100个任务，因此最终会打印100行结果，以及一行执行时间。由于协程池中的所有协程执行并不保证先后顺序，因此这100行打印结果输出肯定不稳定，另外由于机器的差异，执行时间也不一定是这个数值。

前面我们创建的协程池的大小为10，即共有10个协程可用。现在我们修改为20重新执行，会发现花费的时间减少了一半：

```
...
total time taken  10.016740997 seconds
```

因此，协程池中的协程越多，所有任务执行完成花费的时间也就越少，这其实非常容易理解，我们可以把协程理解成要完成某个项目的工人，肯定工位越多，花费的时间越低嘛。


---

# Agent Instructions: Querying This Documentation

If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter:

```
GET https://sunwenfei.gitbook.io/sunwenfei/golang/golang-ji-chu-jiao-cheng/bing-fa/xie-cheng-chi.md?ask=<question>
```

The question should be specific, self-contained, and written in natural language.
The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
