使用 Channel 等待任务结束
首先回顾上一节 channel 这一概念介绍时所写的代码:
package mainimport ("fmt""time"
)func worker(id int, c chan int) {for n := range c {fmt.Printf("Worker %d received %c\n",id, n)}
}func createWorker(id int) chan<- int {c := make(chan int)go worker(id, c)return c
}func chanDemo() {var channels [10]chan<- intfor i := 0; i < 10; i++ {channels[i] = createWorker(i)}for i := 0; i < 10; i++ {channels[i] <- 'a' + i}for i := 0; i < 10; i++ {channels[i] <- 'A' + i}time.Sleep(time.Millisecond)
}func main() {chanDemo()
}
👆 上述代码的流程如下:
首先在 main 函数中找到入口,即 chanDemo。在 chanDemo 中我们使用 var 新建了一个存储 10 个接收消息的 chan int 类型的数组,并定义了 createWorker 方法来对每一个 chan<- int
类型对象进行构造。(值得注意的是,createWorker 方法是一个返回值是chan<- int
的函数)
在 createWorker 中,使用 go worker(id, c)
来启动协程,c 既作为返回值返回,也会作为 worker 函数的参数在启动的协程中被使用。
此时,程序有多个流程在并行执行(并发),分别是 10 个 worker 和一个 chanDemo。我们在 chanDemo 中向 chan<- int
类型的通道发送消息,当 worker 当中的 c 有消息接收到的时候,会将消息打印出来。
现在我们希望对上述代码进行进一步的优化,在 worker 函数当中,当 n 打印完毕时,我们希望通知外部,此处的输出已经完成了。过去的做法可能会通过共享内存来进行通信,但是在 Golang 当中我们可以使用 channel 来实现上述需求,即:直接通过通信来共享内存。
我们对 worker 函数进行改进,添加一个名为 done 的 chan bool 类型,用来告知外部当前是否但因完毕:
func worker(id int, c chan int, done chan bool) {for n := range c {fmt.Printf("Worker %d received %c\n",id, n)done <- true}
}
当然,在 chanDemo 函数中,需要有一个变量来接受 done 这个 chan bool 的值。
为此,我们新建一个结构,命名为 worker,并将上面同名的函数修改为 goWorker。worker 的定义如下:
type worker struct {in chan intdone chan bool
}
相应地对 createWorker 方法进行修改:
func createWorker(id int) worker {w := worker{ // 使用花括号显式地对 worker 进行构造in: make(chan int),done: make(chan bool),}go doWorker(id, w.in, w.done)return w
}
函数 doWorker:
func doWorker(id int, c chan int, done chan bool) {for n := range c {fmt.Printf("Worker %d received %c\n",id, n)done <- true // 当打印执行结束时, 将 true 输入到 chan bool 的 done 当中}
}
最后的 chanDemo 函数定义如下:
func chanDemo() {var workers [10]workerfor i := 0; i < 10; i++ {workers[i] = createWorker(i)}for i := 0; i < 10; i++ {workers[i].in <- 'a' + i<-workers[i].done // 不用使用变量来对值进行接受// 直接显式地将值写出来, 即可在函数结束之前完成 chan int 的值的打印}for i := 0; i < 10; i++ {workers[i].in <- 'A' + i<-workers[i].done}}
完整的代码如下:
package mainimport ("fmt"
)func doWorker(id int, c chan int, done chan bool) {for n := range c {fmt.Printf("Worker %d received %c\n",id, n)done <- true}
}type worker struct {in chan intdone chan bool
}func createWorker(id int) worker {w := worker{in: make(chan int),done: make(chan bool),}go doWorker(id, w.in, w.done)return w
}func chanDemo() {var workers [10]workerfor i := 0; i < 10; i++ {workers[i] = createWorker(i)}for i := 0; i < 10; i++ {workers[i].in <- 'a' + i<-workers[i].done}for i := 0; i < 10; i++ {workers[i].in <- 'A' + i<-workers[i].done}}func main() {chanDemo()
}
得到的输出如下:
一个问题在于,上述输出是顺序执行的,失去了并行的意义。
我们想要做的是,一次性将想要打印的 20 个信息发送给 10 个通道,发送所有信息之后再等待所有通道的打印结束。
一个可能的改进如下:
func chanDemo() {var workers [10]workerfor i := 0; i < 10; i++ {workers[i] = createWorker(i)}for i, worker := range workers {worker.in <- 'a' + i // 向 channel 发送数据}for _, worker := range workers {<-worker.done // 首先接受数据, 再发送数据}for i, worker := range workers {worker.in <- 'A' + i}for _, worker := range workers {<-worker.done}}
此时得到的结果是:
使用 WaitGroup
除了显式地新建一个名为 done 的 chan bool 之外,还可以使用一个名为 WaitGroup 对象来完成等操作,它定义在头文件 sync 当中。
WaitGroup 有三个主要的方法,分别是 Add、Done 和 Wait。Add 负责告知 WaitGroup 当前还有多少个任务要完成(对于我们上面的例子,新建 10 个通道,对每个通道传入两个值,则有 20 个任务);Done 在 worker 内部打印结束后执行,告知外部任务已经完成;Wait 在外部执行,表示等待协程运行完毕之后再执行下一条语句,在我们的示例中,没有下一条语句,则函数返回。
完整的示例如下:
package mainimport ("fmt""sync"
)func doWorker(id int, c chan int, wg *sync.WaitGroup) {for n := range c {fmt.Printf("Worker %d received %c\n",id, n)wg.Done()}
}type worker struct {in chan intwg *sync.WaitGroup
}func createWorker(id int, wg *sync.WaitGroup) worker {w := worker{in: make(chan int),wg: wg, // 传递的是指针, 因为 WaitGroup 只有一个, 大家共用}go doWorker(id, w.in, wg)return w
}func chanDemo() {var wg sync.WaitGroupvar workers [10]workerwg.Add(20) // 有 20 个任务for i := 0; i < 10; i++ {workers[i] = createWorker(i, &wg)}for i, worker := range workers {worker.in <- 'a' + i// Wg.Add(1) // 这种做法也是可以的}for i, worker := range workers {worker.in <- 'A' + i}wg.Wait()}func main() {chanDemo()
}
输出如下: