管道(channel)

上一节我们介绍了Golang中如何使用goroutine实现并发,本节我们介绍下channel,即管道,来看看Golang中是怎么使用channel来进行goroutine之间的通信的。

什么是channel

channel可以看作是goroutine互相通信的管道。就像水可以从管道的一端流向另一端,数据也可以从channel的一端流向另一端。

声明channel

每个channel都需要绑定一个数据类型,这个数据类型就是channel可以传输的数据的类型,channel不允许传输其他任何数据类型。

一个channel的类型声明如下:

chan T

channel的零值为nilnilchannel没有任何用处,我们需要使用make来初始化一个channel

package main

import "fmt"

func main() {
    var a chan int
    if a == nil {
        fmt.Println("channel a is nil, going to define it")
        a = make(chan int)
        fmt.Printf("Type of a is %T", a)
    }
}

这里声明的channel未手动初始化,因此自动初始化为零值nil。后面的判断条件a == nil成立后执行if语句内部的语句。我们使用make(chan int)创建了一个chan int类型的channel。该程序输出如下:

channel a is nil, going to define it
Type of a is chan int

当然,我们也可以声明channel的同时进行手动初始化:

a := make(chan int)

发送 & 接收数据

channel中发送、接收数据的语法如下:

data := <- a // read from channel a
a <- data // write to channel a

箭头的指向形象的表明了是从channel中接收数据,还是向challel发送数据。data := <- a中箭头从channel指出,表示从channel a中读取数据并赋值给变量dataa <- data中箭头指向一个channel,因此是向channel a中写入数据data

channel发送、从channel接收数据会阻塞goroutine执行

channel发送数据,以及从channel接收数据会阻塞当前goroutine的执行。这句话怎么理解呢?其实很简单,无论我们是向channel发送数据,还是从channel接收数据,无非是通过类似于data := <- a或者a <- data的语句来完成,而语句本身的执行肯定是位于一个goroutine内。如果执行的是a <- data,即向一个channel发送数据,这时执行流会阻塞,直到有其他goroutine从这个channel中读取了刚刚写入的数据。类似的,如果执行的是data := <- a,即从一个channel中读取数据,这时执行流也也会阻塞,直到有其他goroutine向这个channel中写入数据。

channel就是基于这个特性而不是类似于其他编程语言中的锁来实现不同goroutine之间的高效通信的。

channel使用举例

下面我们通过例子来看下goroutine是如何使用channel进行通信的。

我们首先还用上一节的那个简单例子:

package main

import (
    "fmt"
    "time"
)

func hello() {
    fmt.Println("Hello world goroutine")
}
func main() {
    go hello()
    time.Sleep(1 * time.Second)
    fmt.Println("main function")
}

该程序使用了time.Sleep来让主协程等待hello协程的执行,下面我们改用正规做法,即使用channel来同步主协程和hello子协程的执行:

package main

import (
    "fmt"
)

func hello(done chan bool) {
    fmt.Println("Hello world goroutine")
    done <- true
}
func main() {
    done := make(chan bool)
    go hello(done)
    <-done
    fmt.Println("main function")
}

这里我们通过done := make(chan bool)创建了一个channel,该channel的类型为chan bool,即支持传递bool类型的数据。我们将该channel作为入参传给了hello协程,因此该协程内部可以给该channel传递数据。注意我们开启协程之后的一行代码:<-done。这行代码是从done channel内读取数据,因此会阻塞主协程的执行,直到该channel内有数据能够读到为止,直到读到数据主协程才会继续执行。

这里的<-done仅仅等待done channel有数据可读,并没有将读到的数据赋值给某个变量,这是完全合法的。比如这里我们仅仅是为了找到同步点,而并不需要读到什么数据。

协程hello内部先打印了Hello world goroutine,然后向done channel内部写入了一个bool类型的数据true,这时主协程收到了这个数据,不再被阻塞,接着打印main function

该程序的执行结果如下,注意是稳定输出:

Hello world goroutine
main function

我们再修改下前面的程序,在hello协程里面加个sleep,让hello协程的执行时间变长点,看看主协程会不会等待该子协程向channel内写入数据。

package main

import (
    "fmt"
    "time"
)

func hello(done chan bool) {
    fmt.Println("hello go routine is going to sleep")
    time.Sleep(4 * time.Second)
    fmt.Println("hello go routine awake and going to write to done")
    done <- true
}
func main() {
    done := make(chan bool)
    fmt.Println("Main going to call hello go goroutine")
    go hello(done)
    <-done
    fmt.Println("Main received data")
}

这里我们在hello协程内sleep了四秒钟才向channel内写入了数据,由于<-done是阻塞的,因此仍然需要等待channel中写入数据,该程序执行结果如下,也是稳定输出:

Main going to call hello go goroutine
hello go routine is going to sleep
hello go routine awake and going to write to done
Main received data

下面我们再举个例子,以更好的理解goroutinechannel。这个程序用来计算一个数子的各个位的平方和,各个位的立方和,再将二者加起来。比如输出是123,那么算法如下:

squares = (1 1) + (2 2) + (3 3) cubes = (1 1 1) + (2 2 2) + (3 3 * 3) output = squares + cubes = 50

我们实现上将平方和、立方和的计算放到不同的协程内去执行,在主协程内等待两个协程执行完成后再将两个协程的计算结果想加。

package main

import "fmt"

func calcSquares(number int, squareop chan int) {
    sum := 0
    for number != 0 {
        digit := number % 10
        sum += digit * digit
        number /= 10
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {
    sum := 0
    for number != 0 {
        digit := number % 10
        sum += digit * digit * digit
        number /= 10
    }
    cubeop <- sum
}

func main() {
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output", squares+cubes)
}

这里squares, cubes := <-sqrch, <-cubech会等待两个协程将计算结果写到对应的channel内。该程序执行结果如下:

Final output 1536

死锁

使用channel时需要注意别陷入死锁状态。如果某个goroutine向某个channel发送数据,那么需要某个其他goroutine接收该channel的数据,否则会发送运行时错误:deadlock。类似的,如果某个goroutine正在等待从某个channel中接收数据,那么需要某个其他goroutine向该channel中写入数据,否则也会发送运行时错误:deadlock

package main

func main() {
    ch := make(chan int)
    ch <- 5
}

这里我们创建了一个channelch,类型为chan int。然后我们向该channel中写入了一个值5。但是在该程序中并没有其他goroutine接收该channel中的数据。因此会报运行时错误:

fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan send]:
main.main()
        /Users/goWorkspace/src/test/main.go:5 +0x55
exit status 2

可读写channel、读channel、写channel

前面我们讨论的channel都是既可读、又可写的。我们也可以创建仅支持可读,或仅支持可写的channel

package main

import "fmt"

func sendData(sendch chan<- int) {
    sendch <- 10
}

func main() {
    sendch := make(chan<- int)
    go sendData(sendch)
    fmt.Println(<-sendch)
}

这里我们创建了一个写channel sendch,从箭头指向也能看出来这是一个写channelfmt.Println(<-sendch)尝试从该channel中读取数据,这是不允许的,Golang不允许从写channel中读取数据,因此报以下错误:

./main.go:12:14: invalid operation: <-sendch (receive from send-only type chan<- int)

尽然channel是用来做通信、数据传输用的,那么仅支持可写、或仅支持可读有什么意义呢?答案是channel 转换。我们可以将读写channel转换为读channel或者写channel;但反之不行,不能将读channel或者写channel转换为读写channel

package main

import "fmt"

func sendData(sendch chan<- int) {
    sendch <- 10
}

func main() {
    chnl := make(chan int)
    go sendData(chnl)
    fmt.Println(<-chnl)
}

这里我们首先创建了一个读写channel chnl,然后将该读写channel传给goroutine sendDatasendData通过参数将该读写channel转换为了写channel,因此sendData内部该channel是仅可写的,而在main channel中是既可读又可写的。该程序输出如下:10

关闭channel

channel的发送方有权关闭channel,关闭channel会通知数据接收方,告知对方数据已发送完成,没有后续数据了。

数据接收方可通过如下方式来判断channel是否已经关闭:

v, ok := <-ch

如果是正常的数据发送,则oktrue,如果ch被数据发送方关闭了,则okfalse,并且数据v接收到的值为对应数据类型的默认零值。

package main

import "fmt"

func producer(chnl chan int) {
    for i := 0; i < 10; i++ {
        chnl <- i
    }
    close(chnl)
}
func main() {
    ch := make(chan int)
    go producer(ch)
    for {
        v, ok := <-ch
        if ok == false {
            break
        }
        fmt.Println("Received ", v, ok)
    }
}

这里的producer协程向channel中写入了0到9,接着关闭了channel。主函数通过一个无限循环来不断的读取channel中的数据,通过ok字段来判断channel是否关闭,如果为false则表示关闭。该程序执行结果如下:

Received  0 true
Received  1 true
Received  2 true
Received  3 true
Received  4 true
Received  5 true
Received  6 true
Received  7 true
Received  8 true
Received  9 true

for range

前面使用无限for循环来从channel中读取数据,Golang提供了一种更加方便的方式来不断的读取channel中的数据,直到channel关闭。

package main

import "fmt"

func producer(chnl chan int) {
    for i := 0; i < 10; i++ {
        chnl <- i
    }
    close(chnl)
}
func main() {
    ch := make(chan int)
    go producer(ch)
    for v := range ch {
        fmt.Println("Received ", v)
    }
}

这里for range语句从channel ch中不断读取数据,直到channel关闭。该程序输出如下:

Received  0
Received  1
Received  2
Received  3
Received  4
Received  5
Received  6
Received  7
Received  8
Received  9

我们使用for range重构一下前面计算平方和、立方和的程序,细心的话会发现前面calcSquares函数和calcCubes函数中有几行数字循环的样板代码,我们使用for range重构下:

package main

import "fmt"

func digits(number int, dchnl chan int) {
    for number != 0 {
        digit := number % 10
        dchnl <- digit
        number /= 10
    }
    close(dchnl)
}
func calcSquares(number int, squareop chan int) {
    sum := 0
    dch := make(chan int)
    go digits(number, dch)
    for digit := range dch {
        sum += digit * digit
    }
    squareop <- sum
}

func calcCubes(number int, cubeop chan int) {
    sum := 0
    dch := make(chan int)
    go digits(number, dch)
    for digit := range dch {
        sum += digit * digit * digit
    }
    cubeop <- sum
}

func main() {
    number := 589
    sqrch := make(chan int)
    cubech := make(chan int)
    go calcSquares(number, sqrch)
    go calcCubes(number, cubech)
    squares, cubes := <-sqrch, <-cubech
    fmt.Println("Final output", squares+cubes)
}

我们将样板代码抽成了一个函数digits,在calcSquares函数和calcCubes函数中分别创建了协程来不断输出迭代出来的数字,直到协程被关闭。该程序输出结果为:

Final output 1536

Last updated