文章目录
- channel底层实现
- channel发送、接收数据
- 有缓冲 channel
- channel 先写再读
- channel 先读再写(when the receiver comes first)
- 无缓冲channel
- channel存在3种状态:
channel底层实现
// channel 类型定义
type hchan struct {// channel 中的元素数量, lenqcount uint // total data in the queue// channel 的大小, capdataqsiz uint // size of the circular queue// channel 的缓冲区,环形数组实现 buf unsafe.Pointer // points to an array of dataqsiz elements// 单个元素(数据类型)的大小elemsize uint16// closed 标志位closed uint32// 元素的类型elemtype *_type // element type 指向类型元数据 (内存复制、垃圾回收等机制依赖数据的类型信息)// send 和 recieve 的索引,用于实现环形数组队列(用于记录 交替读写的下标位置)sendx uint // send indexrecvx uint // receive index// recv goroutine 等待队列 想读取数据但又被阻塞住的 goroutine 队列recvq waitq // list of recv waiters// send goroutine 等待队列 想发送数据但又被阻塞住的 goroutine 队列sendq waitq // list of send waiters// lock protects all fields in hchan, as well as several// fields in sudogs blocked on this channel.//// Do not change another G's status while holding this lock// (in particular, do not ready a G), as this can deadlock// with stack shrinking.lock mutex
}// 等待队列的链表实现
type waitq struct { first *sudog last *sudog
}// in src/runtime/runtime2.go
// 对 G 的封装
type sudog struct {// The following fields are protected by the hchan.lock of the// channel this sudog is blocking on. shrinkstack depends on// this for sudogs involved in channel ops.g *gselectdone *uint32 // CAS to 1 to win select race (may point to stack)next *sudogprev *sudogelem unsafe.Pointer // data element (may point to stack)// The following fields are never accessed concurrently.// For channels, waitlink is only accessed by g.// For semaphores, all fields (including the ones above)// are only accessed when holding a semaRoot lock.acquiretime int64releasetime int64ticket uint32parent *sudog // semaRoot binary treewaitlink *sudog // g.waiting list or semaRootwaittail *sudog // semaRootc *hchan // channel
}
sendq 和 recvq 存储了当前 Channel 由于缓冲区空间不足而阻塞的 Goroutine 列表,这些等待队列使用双向链表 waitq 表示,链表中所有的元素都是 sudog 结构:
type waitq struct {first *sudoglast *sudog
}
一个通道发送和接收通道,默认是阻塞的。
如果没有缓冲区,单纯的往其中放入元素立马就会进入阻塞状态,必须有其他的线程从其中取走元素。通俗的讲要有一个线程不断的取这个管道的元素,才能往其中放入元素。它就像一个窄窄的门框,进去就得出来。
而有一个缓冲区的管道想一段地道,放入的元素不会马上进入阻塞状态,只有第二个准备进入而第一个还没有进入的情况下才会阻塞。
package mainimport ("fmt""time"
)func main() {intChan := make(chan int, 1)go func() {for {v, ok := <-intChanif !ok {break}else{fmt.Println(v)}}}()intChan <- 1close(intChan)time.Sleep(time.Second * 1)
}
Channel是异步进行的。
channel发送、接收数据
// G1
func main() { ... for _, task := range tasks { taskCh <- task} ...
}// G2
func worker() { for { task := <-taskChprocess(task) }
}
有缓冲 channel
channel 先写再读
这一次会优先判断缓冲数据区域是否已满,如果未满,则将数据保存在缓冲数据区域,即环形队列里。
如果已满,G1 暂时被挂在了 recvq ,让G1调gopark()休眠起来, G1与M解绑。
当 G2 要读取数据时,会优先从缓冲数据区域去读取,并且在读取完后,会检查 sendq 队列,如果 goroutine 有等待队列,则会将它上面的 data 补充到缓冲数据区域,并且也对其设置 goready 函数。同时设置 G1 goready 函数,G1状态从waitting改为runnable,调度到本地队列,等待下次调度运行。
channel 先读再写(when the receiver comes first)
G2 先读,但没数据,暂时被挂在了 recvq 队列,然后休眠起来。
G1 在写数据时,发现 recvq 队列有 goroutine 存在,于是直接将数据发送给 G2。同时设置 G2 goready 函数,等待下次调度运行。
G2因为有runnext指针,因为亲和性的原因优先级较高,会把G2调度到原来的P local quene中(p.runext指针)
所以说go语言的goroutine调度是协作式的,你阻塞靠别人唤醒, 因为由runtime实现
On resuming, G2 does not need to acquire channel lock and manipulate the buffer. Also, one fewer memory copy.
优点:不重复入队出队,没有锁的开销, 减少一次内存拷贝开销,效率很高
无缓冲channel
跟有缓冲情况类似
channel存在3种状态:
- nil,未初始化的状态,只进行了声明,或者手动赋值为nil
- active,正常的channel,可读或者可写
- closed,已关闭,千万不要误认为关闭channel后,channel的值是nil