1、引言
Do not communicate by sharing memory; instead, share memory by communicating
Golang
的并发哲学是“不要通过共享内存进行通信,而要通过通信来共享内存”,提倡通过 channel
进行 goroutine
之间的数据传递和同步,而不是通过共享变量(内存)来实现。
func write(chanInt chan int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}func TestCSP(t *testing.T) {chanInt := make(chan int, 10)chanExit := make(chan bool)go write(chanInt)go read(chanInt, chanExit)for {select {case _, ok := <-chanExit:if !ok {fmt.Println("done")return}}}}
如上述示例,write
函数负责写,read
函数负责读,chanInt
负责在两个 goroutine
进行数据同步,chanExit
负责监听数据已处理完成,并最终退出。整个程序没有看到锁,非常的优雅。
接下来,来说说 channel
的特性,最后结合底层源码来加深印象。
2、特性
2.1 基本用法
由于 channel
是引用类型,需要用 make
来初始化
chanBuffer := make(chan int, 10)
chanNoBuffer := make(chan int)
这里创建的是可读写的 channel
,区别在于是否有 capacity
(容量)
- 带缓冲区的
channel
,可以存储cap
个数据 - 不带缓冲区的
channel
,一般用于同步
chanWriteOnly := make(chan<- int)
chanReadOnly := make(<-chan int)
这里创建的是只写和只读的 channel
,不过这样写意义不大,一般用于传参,接下来用这两个 chan
把引言示示例中关于 write
和 read
函数给改下
func write(chanInt chan<- int) {for i := 0; i < 10; i++ {chanInt <- i}close(chanInt)
}func read(chanInt <-chan int, chanExit chan bool) {for {v, ok := <-chanIntif !ok {break}fmt.Println(v)}chanExit <- trueclose(chanExit)
}
查看 channel
的长度和容量
func TestChanLenCAP(t *testing.T) {chanInt := make(chan int, 2)chanInt <- 1fmt.Println(len(chanInt)) // 1fmt.Println(cap(chanInt)) // 2
}
关闭 channel
close(ch)
判断 channel
是否已关闭
func TestChanIsClosed(t *testing.T) {chanInt := make(chan int, 10)close(chanInt)if _, ok := <-chanInt; !ok {fmt.Println("closed")}
}
向一个已关闭的 channel
读数据,会读到零值,并且每次读也都是零值,因此可以利用这个特性来判断 channel
是否已关闭。
2.2 异常情况
接下来看看几种需要注意的异常情况
注意: Golang 版本为 1.19.12。不同版本的调度器和运行时的行为可能会有所不同,尤其是与死锁检测相关的机制。这些变化可能导致在某些版本中程序会更快地检测到死锁,而在其他版本中则可能仅仅是阻塞而不报错。
2.2.1 给一个 nil channel
发送数据,
func TestWriteNil(t *testing.T) {var chanInt chan intchanInt <- 1
}
由于 chanInt
还没初始化,值为 nil
,此时代码会阻塞在 chanInt <- 1
这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
解法:channel
使用前需要使用 make
初始化。
2.2.2 从一个 nil channel
读数据
func TestReadNil(t *testing.T) {var chanInt chan int<-chanInt
}
由于 chanInt
还没初始化,值为 nil
,此时代码会阻塞在 <-chanInt
这一行,并最终形成死锁。
fatal error: all goroutines are asleep - deadlock!
解法:channel
使用前需要使用 make
初始化。
2.2.3 关闭一个 nil channel
func TestCloseNil(t *testing.T) {var chanInt chan intclose(chanInt)
}
如果尝试关闭一个 nil
的 channel
,会导致运行时错误 panic: close of nil channel
。
panic: close of nil channel [recovered]panic: close of nil channel
解法:channel
使用前需要使用 make
初始化。
前三个异常说明,channel
使用前一定要使用 make
进行初始化。
2.2.4 向一个已关闭的 channel
发数据
func TestWriteClosed(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)chanNoBuffer <- 1
}
向一个已关闭的 channel
发送数据会引起 panic
。
panic: send on closed channel [recovered]panic: send on closed channel
这是因为一旦 channel
被关闭,就不能再向其发送数据,但可以继续从中接收数据
。
解法:判断 channel
是否已关闭。
2.2.5 向一个已关闭的 channel
发起重复关闭动作
func TestClosedOnceMore(t *testing.T) {chanNoBuffer := make(chan int)close(chanNoBuffer)close(chanNoBuffer)
}
尝试关闭一个已经关闭的 channel
会导致运行时错误 panic: close of closed channel
。这个错误通常出现在多个 goroutine
试图关闭同一个 channel
或者代码逻辑不正确导致同一个 channel
被关闭多次。
panic: close of closed channel [recovered]panic: close of closed channel
解法:判断 channel
是否已关闭。
2.2.6 向没有缓冲区的 channel
写数据,但没有读取方
func TestSendNoBuffer(t *testing.T) {ch := make(chan int)ch <- 4
}
无缓冲的 channel
是一种同步通信机制,当只有发送方,没有接收方,会陷入阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
2.2.7 向没有缓冲区的 channel
读取数据,但没有写入方
func TestReadNoBuffer(t *testing.T) {ch := make(chan int)<-ch
}
尝试从一个无缓冲的 channel
读取数据时,如果没有其他 goroutine
向该 channel
发送数据,读取操作将会阻塞。这会导致程序死锁,并最终导致运行时错误。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
2.2.8 无缓冲区 channel
的发送和接收操作没有同时进行
func ReadNoBufferChan(chanBool chan bool) {<-chanBool
}func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)ch <- truego ReadNoBufferChan(ch)time.Sleep(1 * time.Second)
}
上面两个异常一直强调,由于无缓冲 channel
是一种同步
通信机制,需要发送和接收操作同时
进行。代码执行到 ch <- chan
时,调度器发现没有任何 goroutine
接收,于是阻塞并死锁。
fatal error: all goroutines are asleep - deadlock!
解法:无缓冲 channel
是一种同步通信机制,需要发送和接收操作同时进行。
func TestSendNoBufferChan(t *testing.T) {ch := make(chan bool)go ReadNoBufferChan(ch)ch <- truetime.Sleep(1 * time.Second)
}
把 go ReadNoBufferChan(ch)
提前,这样就确保了在发送数据之前,有一个 goroutine
正在等待接收数据。
对于无缓冲的 channel
- 读取和写入要成对出现,并且不能在同一个
goroutine
里 - 使用
for
读取数据时,写入方需要关闭channel
2.2.9 向有缓存区的 channel
先读数据
func TestWriteBufferChan(t *testing.T) {ch := make(chan int, 1)if _, ok := <-ch; !ok {fmt.Println("closed")}
}
当尝试从一个空的带缓冲的 channel
读取数据时,读取操作会阻塞,直到有数据被写入 channel
。这是因为即使是带缓冲的 channel
,也需要在读取数据时有数据可读。
带缓冲的 channel
和无缓冲的 channel
的主要区别在于:带缓冲的 channel
可以存储一定数量的数据,而无缓冲的 channel
则需要发送和接收操作同步进行。然而,这并不改变以下事实:当一个 goroutine
试图从空的 channel
读取数据时,它会被阻塞,直到有其他 goroutine
写入数据。
fatal error: all goroutines are asleep - deadlock!
解法:需要在读取数据时有数据可读。
2.2.10 向有缓存区的 channel
写数据,但没有读取数据
func TestReadBufferChan(t *testing.T) {ch := make(chan int, 1)ch <- 1ch <- 2
}
当带缓冲的 channel
在缓冲区满时,写入操作会阻塞,直到有数据被读取以腾出缓冲区空间。如没有读取方,最后就会因阻塞而死锁。
fatal error: all goroutines are asleep - deadlock!
解法:当带缓冲的 channel
在缓冲区满时,需要有读取方,或者增加缓冲区的大小。
注意:对于带缓冲的 channel 在缓冲区没超过容量之前,写入数据,若没有读取,不像不带缓冲区的 channel 那样,不会产生死锁的。
其实,最后这两个带缓冲区 channel
异常情况总结就是
- 若在同一个
goroutine
里,写数据操作一定在读数据操作前 - 若
channel
空了,接收者会阻塞 - 若
channel
满了,发送者会阻塞
3、底层实现
3.1 数据结构
Golang
的 channel
在运行时使用 runtime.hchan
结构体表示。
// runtime/chan.go
type hchan struct {qcount uint // 队列中的数据个数dataqsiz uint // 环形缓冲区的大小buf unsafe.Pointer // 环形缓冲区指针elemsize uint16 // 单个元素的大小closed uint32 // 标志 channel 是否关闭elemtype *_type // 元素的类型sendx uint // 发送操作的索引recvx uint // 接收操作的索引recvq waitq // 等待接收的 goroutine 队列sendq waitq // 等待发送的 goroutine 队列lock mutex // 保护 channel 的锁
}
先看看环形缓冲区相关的字段:
qcount
: 当前缓冲区中的元素个数。dataqsiz
: 环形缓冲区的容量。buf
: 实际存储数据的缓冲区,类型为unsafe.Pointer
(类似C
语言的void *
)。elemsize
: 每个元素的大小。sendx
: 环形缓冲区中下一个待写入的位置。recvx
: 环形缓冲区中下一个待读取的位置。
再来看看发送和接收队列:
recvq
: 等待接收的goroutine
队列。sendq
: 等待发送的goroutine
队列。
这两个队列是通过 waitq
结构体来实现的,waitq
本质上是一个双向链表,链表中的每个节点是一个 sudog
结构体,sudog
代表一个等待中的 goroutine
。
type waitq struct {first *sudoglast *sudog
}
最后看看 lock
字段
lock
锁用于保护channel
数据结构的互斥锁。Golang
使用自旋锁和互斥锁的结合来保证channel
操作的线程安全。
3.2 初始化
func makechan(t *chantype, size int) *hchan {elem := t.elem// compiler checks this but be safe.if elem.size >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.align > maxAlign {throw("makechan: bad alignment")}mem, overflow := math.MulUintptr(elem.size, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.ptrdata == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}c.elemsize = uint16(elem.size)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")}return c
}
这里主要说下 switch
相关的分支代码
- 第一个分支:如果
channel
的缓冲区大小是0
(也就是创建无缓冲channel
),或channel
中的元素大小是0
(如struct{}{}
,Golang
中“空结构体”是不占内存的,size
为0
)时,调用mallocgc()
在堆上为channel
开辟一段大小为hchanSize
的内存空间。- 这里说下
c.buf = c.raceaddr()
,c.raceaddr()
会返回一个地址,这个地址在内存中不会被实际用于存储数据,但会被数据竞争检测工具(如Golang
的race detector
)用于同步,这也是无缓冲区的channel
用来做数据同步场景的由来。
- 这里说下
- 第二个分支:如果元素不包含指针时。调用
mallocgc
一次性分配hchan
和buf
的内存。 - 第三个分支:默认情况元素类型中有指针类型,调用了两次分配空间的函数
new/mallocgc
。
仔细看,三个分支都调用了 mallocgc
在堆上分配内存,也就说 channel
本身会被 GC
自动回收。
在函数的最后会初始化通道结构的字段,包括元素大小、元素类型、缓冲区大小和锁。
3.2 发送数据
// entry point for c <- x from compiled code
//
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}/** generic single channel send/recv* If block is not nil,* then the protocol will not* sleep but return if it could* not complete.** sleep can wake up with g.param == nil* when a channel involved in the sleep has* been closed. it is easiest to loop and re-run* the operation; we'll see that it's now closed.*/
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 当 channel 为 nil 时处理if c == nil {if !block {return false}gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)throw("unreachable")}if debugChan {print("chansend: chan=", c, "\n")}// 竞态检测,是用来分析是否存在数据竞争。go test -race ./...if raceenabled {racereadpc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(chansend))}// Fast path: check for failed non-blocking operation without acquiring the lock.//// After observing that the channel is not closed, we observe that the channel is// not ready for sending. Each of these observations is a single word-sized read// (first c.closed and second full()).// Because a closed channel cannot transition from 'ready for sending' to// 'not ready for sending', even if the channel is closed between the two observations,// they imply a moment between the two when the channel was both not yet closed// and not ready for sending. We behave as if we observed the channel at that moment,// and report that the send cannot proceed.//// It is okay if the reads are reordered here: if we observe that the channel is not// ready for sending and then observe that it is not closed, that implies that the// channel wasn't closed during the first observation. However, nothing here// guarantees forward progress. We rely on the side effects of lock release in// chanrecv() and closechan() to update this thread's view of c.closed and full().if !block && c.closed == 0 && full(c) {return false}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 检查 channel 是否关闭if c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 检查是否有等待接收的 goroutineif sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 检查 channel 缓冲区是否有空位if c.qcount < c.dataqsiz {// Space is available in the channel buffer. Enqueue the element to send.qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}typedmemmove(c.elemtype, qp, ep)c.sendx++if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++unlock(&c.lock)return true}// 非阻塞模式下if !block {unlock(&c.lock)return false}// 阻塞模式下,将当前 goroutine 加入发送队列并挂起,receiver 会帮我们完成后续的工作// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 将当前这个发送 goroutine 打包后的 sudog 入队到 channel 的 sendq 队列中c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.// 将这个发送 g 从 Grunning -> Gwaiting// 进入休眠atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.KeepAlive(ep)// 以下唤醒后需要执行的代码// someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nilreleaseSudog(mysg)if closed {// 唤醒后,发现 channel 被关闭了if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}
代码比较长,可以分为两大部分:异常检测和发送数据
3.2.1 异常检测
代码一开始就排除了在异常章节中 nil channel
的情形,比如未初始化,或是被 GC
回收了。
接着会检测非阻塞模式下,也就是有缓冲区的 channel
,如果还未 close
并且缓冲区已经满了,则直接返回 false
。
func TestASyncSendFull(t *testing.T) {ch := make(chan int, 1) // 创建一个缓冲区大小为 1 的 channelch <- 1 // 向 channel 发送一个元素,此时缓冲区已满select {case ch <- 2: // 尝试发送第二个元素fmt.Println("Successfully sent 2")default: // 缓冲区已满,进入 default 分支fmt.Println("channel is full, unable to send 2")}
}
3.2.2 发送数据
发送数据可以归纳为以下三点
- 直接发送:当
recvq
存在等待的接收者时,那么通过runtime.send
直接将数据发送给阻塞的接收者- 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的
goroutine
标记成可运行状态grunnable
并把该goroutine
放到发送方所在的处理器的runnext
上等待执行,该处理器在下一次调度
时会立刻唤醒数据的接收方;
- 注意:这里不会立马唤醒阻塞的接收者,而是将等待接收数据的
- 异步发送:当
buf
缓冲区存在空余空间时,将发送的数据写入channel
的缓冲区; - 阻塞发送:当不存在缓冲区或者缓冲区已满时,等待其他
goroutine
从channel
接收数据;- 将当前
goroutine
加入sendq
发送队列并挂起,阻塞等待其他的协程从channel
接收数据; - 当唤醒后,检查是否因为
channel
关闭而唤醒,如果是则触发panic
。
- 将当前
发送数据的过程中包含几个会触发 goroutine
调度的时机:
- 发送数据时发现
channel
上存在等待接收数据的goroutine
,立刻设置处理器的runnext
属性,但是并不会立刻触发调度 - 发送数据时并没有找到接收方并且缓冲区已经满了,这时会将自己加入
channel
的sendq
发送队列并调用runtime.goparkunlock
触发goroutine
的调度让出处理器的使用权;
3.3 接收数据
// entry points for <- c from compiled code
//
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {_, received = chanrecv(c, elem, true)return
}// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
// Otherwise, if c is closed, zeros *ep and returns (true, false).
// Otherwise, fills in *ep with an element and returns (true, true).
// A non-nil ep must point to the heap or the caller's stack.
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// raceenabled: don't need to check ep, as it is always on the stack// or is new memory allocated by reflect.if debugChan {print("chanrecv: chan=", c, "\n")}// 如果在 nil channel 上进行 recv 操作,那么会永远阻塞if c == nil {// 非阻塞的情况下,要直接返回,非阻塞出现在一些 select 的场景中if !block {return}gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)throw("unreachable")}// Fast path: check for failed non-blocking operation without acquiring the lock.if !block && empty(c) {// After observing that the channel is not ready for receiving, we observe whether the// channel is closed.//// Reordering of these checks could lead to incorrect behavior when racing with a close.// For example, if the channel was open and not empty, was closed, and then drained,// reordered reads could incorrectly indicate "open and empty". To prevent reordering,// we use atomic loads for both checks, and rely on emptying and closing to happen in// separate critical sections under the same lock. This assumption fails when closing// an unbuffered channel with a blocked send, but that is an error condition anyway.if atomic.Load(&c.closed) == 0 {// Because a channel cannot be reopened, the later observation of the channel// being not closed implies that it was also not closed at the moment of the// first observation. We behave as if we observed the channel at that moment// and report that the receive cannot proceed.return}// The channel is irreversibly closed. Re-check whether the channel has any pending data// to receive, which could have arrived between the empty and closed checks above.// Sequential consistency is also required here, when racing with such a send.if empty(c) {// The channel is irreversibly closed and empty.if raceenabled {raceacquire(c.raceaddr())}if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}}var t0 int64if blockprofilerate > 0 {t0 = cputicks()}lock(&c.lock)// 当前 channel 中没有数据可读if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// sender 队列中有 sudog 在等待// 直接从该 sudog 中获取数据拷贝到当前 g 即可// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}if c.qcount > 0 {// 直接从 buffer 里拷贝数据// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)// 接收索引 +1c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}// buffer 元素计数 -1c.qcount--unlock(&c.lock)return true, true}// 非阻塞时,且无数据可收if !block {unlock(&c.lock)return false, false}// no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 打包成 sudogmysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 进入 recvq 队列c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.atomic.Store8(&gp.parkingOnChan, 1)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)// someone woke us up// 被唤醒if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)// 如果 channel 未被关闭,那就是真的 recv 到数据了return true, success
}func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {if c.dataqsiz == 0 {if raceenabled {racesync(c, sg)}if ep != nil {// copy data from sender// 直接从发送者复制数据recvDirect(c.elemtype, sg, ep)}} else {// 缓冲区已满,从队列头部取出数据// Queue is full. Take the item at the// head of the queue. Make the sender enqueue// its item at the tail of the queue. Since the// queue is full, those are both the same slot.qp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)racenotify(c, c.recvx, sg)}// 将数据从队列复制到接收者// copy data from queue to receiverif ep != nil {typedmemmove(c.elemtype, ep, qp)}// 将数据从发送者复制到队列// copy data from sender to queuetypedmemmove(c.elemtype, qp, sg.elem)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz}sg.elem = nilgp := sg.gunlockf()gp.param = unsafe.Pointer(sg)sg.success = trueif sg.releasetime != 0 {sg.releasetime = cputicks()}goready(gp, skip+1)
}
在 Golang
的 channel
中,有两种接收方式
num <- ch
num, ok <- ch
这两种分别对应上述源码中的 chanrecv1
和 chanrecv2
,不过最终都会走到 chanrecv
函数。
3.3.1 异常检测
当我们从一个 nil channel
接收数据时(这里 nil
有可能是被 GC
回收导致的),若是非阻塞的 channel
会直接返回,否则会直接调用 runtime.gopark
让出处理器的使用权。
如果当前 channel
已经被 close
并且缓冲区中不存在任何数据,那么会清除 ep
指针中的数据并立刻返回。这里也就说明了为什么可以多次从已关闭的 channel
读取数据而不会报错。
3.3.2 接收数据
从 channel
接收数据可以归纳为以下三种情况:
3.3.2.1 直接接收
当 sendq
发送队列存在等待的发送者时,通过 runtime.recv
从阻塞的发送者或者缓冲区中获取数据。具体分为以下两种场景,可以仔细看 recv
函数
- 场景一
当 buf
缓冲区的容量 dataqsiz
为 0
,也就是同步的 channel
,调用 recvDirect
将 sendq
发送队列中 sudog
存储的 ep
数据直接拷贝到接收者的内存地址中。
- 场景二
当缓冲区已满时(会有两次内存的拷贝)
- 先取出
buf
缓冲区头部的数据发给接收者(第一次拷贝) - 接着取出
sendq
发送队列头的数据拷贝到buf
缓冲区中,并释放一个sudog
阻塞的goroutine
(第二次拷贝)
到这里获取有人会问,为什么不直接从 sendq
取出数据发给接收方,而是要从 buf
里取出发给接收方?
原因在于 Golang
在缓冲模式下,channel
的数据在缓冲区中按照 FIFO
(先入先出)顺序存储。缓冲区头部的数据肯定是最先存入的,那么也就需要最先取出。
这里再说下场景二下关于 recvx
和 sendx
的更新机制。
- 缓冲区已满时的处理逻辑
当 buf
缓冲区满时,recvx
指向的是 buf
的头部位置,这也是下一个将要被接收的数据。注意此时 sendx
也是指向缓冲区的头部位置。因为缓冲区已满,下一次发送会覆盖最旧的数据。
- 从缓冲区读取数据
此时从已满的 buf
缓冲区读取数据,接收者从缓冲区的头部位置 recvx
获取数据,并将数据传递给接收方。并更新 recvx
,使其指向下一个将要被接收的数据位置。
- 将
sendq
拷贝到缓冲区
由于此时 buf
头部的数据已经发送,那么则取出 sendq
头部的数据覆盖刚刚头部的位置所在的数据,并更新 sendx
,使其和 recvx
保持一致,指向下一个要发送的位置。
这两个场景,无论发生哪种情况,运行时都会调用 runtime.goready
将当前处理器的 runnext
设置成发送数据的 goroutine
,在调度器下一次调度时将阻塞的发送方唤醒。
3.3.2.2 异步接收
当 buf
缓冲区的 qcount
大于 0
时,也就是带缓冲的 channel
有数据时,那么会从 buf
缓冲区中 recvx
的索引位置取出数据进行处理:
- 如果接收数据的内存地址不为空,那么会使用
runtime.typedmemmove
将缓冲区中的数据拷贝到内存中,并通过runtime.typedmemclr
清除队列中的数据 - 最后更新
channel
上相关数据:recvx
指向下一个位置(如果移动到了环形队列的队尾,下标需要回到队头),channel
的qcount
长度减一,并释放持有channel
的锁
3.3.2.3 阻塞接收
当不属于上述两种情况,即当 channel
的 sendq
发送队列中不存在等待的 goroutine
并且 buf
缓冲区中也不存在任何数据时,从 channel
中接收数据的操作会变成阻塞的。此时会将当前的goroutine
挂起并加入 channel
的接收队列 recvq
,以便在有数据可用时能够被唤醒。
当然了,若是 goroutine
被唤醒后会完成 channel
的阻塞数据接收。接收完最后进行基本的参数检查,解除 channel
的绑定并释放 sudog
。
结合异常检测那一节,发现从 channel
接收数据时,会触发 goroutine
调度的两个时机:
- 当
channel
为nil
时 - 当
buf
缓冲区中不存在数据并且也不存在数据的发送者时
3.4 关闭管道
最后来看看关闭通道实现
func closechan(c *hchan) {// 关闭一个 nil channel 会直接 panicif c == nil {panic(plainError("close of nil channel"))}// 上锁,这个锁的粒度比较大,一直到释放完所有的 sudog 才解锁lock(&c.lock)// 在 close channel 时,如果 channel 已经关闭过了,直接触发 panicif c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}c.closed = 1var glist gList// release all readersfor {sg := c.recvq.dequeue()// 弹出的 sudog 是 nil,说明读队列已经空了if sg == nil {break}// sg.elem unsafe.Pointer,指向 sudog 的数据元素// 该元素可能在堆上分配,也可能在栈上if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}// 将 goroutine 入 glist// 为最后将全部 goroutine 都 ready 做准备gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// release all writers (they will panic)// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panic(向一个关闭的 channel 发数据会引起 panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}// 将所有挂在 channel 上的 writer 从 sendq 中弹出// 该操作会使所有 writer panicgp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 在释放所有挂在 channel 上的读或写 sudog 时,是一直在临界区的unlock(&c.lock)// Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0// 使 g 的状态切换到 Grunnablegoready(gp, 3)}
}
3.4.1 异常检测
- 关闭一个
nil channel
会直接panic
- 在
close channel
时,如果channel
已经关闭过了,直接触发panic
3.4.2 释放所有接收方和发送方
关闭 channel
的主要工作是释放所有的 readers
和 writers
。
主要就是取出 recvq
和 sendq
的 sudog
加入到 goroutine
待清除 glist
队列中,与此同时该函数会清除所有 runtime.sudog
上未被处理的元素。同时需要注意的是:在处理 sendq
时有可能会 panic
,在之前的异常情况中列举往一个 close
的 channel
发送数据会引起 panic
。
最后会为所有被阻塞的 goroutine
调用 runtime.goready
触发调度。将所有 glist
队列中的 goroutine
状态从 _Gwaiting
设置为 _Grunnable
状态,等待调度器的调度。
3.4.3 优雅关闭通道
最后说说如何优雅关闭 channel
。
通过之前的异常小节介绍,发现:
- 向已关闭的
channel
发送数据,会导致panic
- 重复关闭
channel
,也会导致panic
同时,还了解了:
- 从一个已关闭的
channel
中接收数据,会得到零值,且不会导致程序异常 - 关闭一个
channel
,那么所有接收这个channel
的select case
都会收到信号
那么这里就引用 How to Gracefully Close Channels 介绍的优雅关闭 channel
方法来收尾。
package _0240623import ("log""math/rand""strconv""sync""testing""time"
)func TesGracefullyCloseChannel(t *testing.T) {rand.Seed(time.Now().UnixNano()) // needed before Go 1.20log.SetFlags(0)// ...const Max = 100000const NumReceivers = 10const NumSenders = 1000wgReceivers := sync.WaitGroup{}wgReceivers.Add(NumReceivers)// ...dataCh := make(chan int)stopCh := make(chan struct{})// stopCh is an additional signal channel.// Its sender is the moderator goroutine shown// below, and its receivers are all senders// and receivers of dataCh.toStop := make(chan string, 1)// The channel toStop is used to notify the// moderator to close the additional signal// channel (stopCh). Its senders are any senders// and receivers of dataCh, and its receiver is// the moderator goroutine shown below.// It must be a buffered channel.var stoppedBy string// moderatorgo func() {stoppedBy = <-toStopclose(stopCh)}()// sendersfor i := 0; i < NumSenders; i++ {go func(id string) {for {value := rand.Intn(Max)if value == 0 {// Here, the try-send operation is// to notify the moderator to close// the additional signal channel.select {case toStop <- "sender#" + id:default:}return}// The try-receive operation here is to// try to exit the sender goroutine as// early as possible. Try-receive and// try-send select blocks are specially// optimized by the standard Go// compiler, so they are very efficient.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and for ever in theory) if the send// to dataCh is also non-blocking. If// this is unacceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase dataCh <- value:}}}(strconv.Itoa(i))}// receiversfor i := 0; i < NumReceivers; i++ {go func(id string) {defer wgReceivers.Done()for {// Same as the sender goroutine, the// try-receive operation here is to// try to exit the receiver goroutine// as early as possible.select {case <-stopCh:returndefault:}// Even if stopCh is closed, the first// branch in this select block might be// still not selected for some loops// (and forever in theory) if the receive// from dataCh is also non-blocking. If// this is not acceptable, then the above// try-receive operation is essential.select {case <-stopCh:returncase value := <-dataCh:if value == Max-1 {// Here, the same trick is// used to notify the moderator// to close the additional// signal channel.select {case toStop <- "receiver#" + id:default:}return}log.Println(value)}}}(strconv.Itoa(i))}// ...wgReceivers.Wait()log.Println("stopped by", stoppedBy)
}
这段代码的核心是这里
// moderator
go func() {stoppedBy = <-toStopclose(stopCh)
}()
对于生产者和消费者是 M*N
的情况,显然既不能在生产方关闭通道,也不适合在消费方关闭通道。那么就引入中间方,那就是 toStop
,起个 goroutine
然后 stoppedBy = <-toStop
阻塞在这里,只要生产者和消费者一方满足条件,向 toStop
写入数据了,那么就可以关闭 stopCh
。这也正好契合上面的 moderator
注释,一个 协调者
,用来协调生产者和消费者在 M*N
情况下如何优雅关闭 channel
。