Go源码--channel源码解读

简介

channel顾名思义就是channel的意思,主要用来在协程之间传递数据,所以是并发安全的。其实现原理,其实就是一个共享内存再加上锁,底层阻塞机制使用的是GMP模型。可见 GMP模型就是那个道,道生一,一生二,二生三,三生万物。
一个简单的例子 如下:

func main() {c := make(chan int32, 1)go func() {c <- 1}()go func() {fmt.Println(<-c)}()time.Sleep(2 * time.Second)close(c)
}

运行结果是:

1

其汇编部分代码如下:

 CALL    runtime.makechan(SB)  // 对应 make(chan int32,1)...CALL    runtime.chanrecv1(SB) // 对应 <-c ...CALL    runtime.chansend1(SB) // 对应 c<-1 其中 <- 编译后就代表运行时 chanrecv1 函数 ...CALL    runtime.closechan(SB) // 对应 close(c)

CALL runtime.closechan(SB)
协程部分汇编代码,因为不是重点,略过 感兴趣的自己可以编译看看。

chan的所有内容都存放在runtime.hchan这个结构体中,makechan,chanrecv1和chansend1函数都是操作hchan这个结构体来实现chan的功能的。下面我们来看下 hchan结构体。

两种重要结构体

hchan 结构体

hchan结构体 如下

type hchan struct {qcount   uint           // total data in the queue  环形队列里面的总的数据量 dataqsiz uint           // size of the circular queue // 环形队列大小 就是 make chan时 申请的大小buf      unsafe.Pointer // points to an array of dataqsiz elements // 指向环形队列的指针elemsize uint16         // 储存的元素类型占空间大小closed   uint32         // chan 状态 1 关闭 0 未关闭elemtype *_type         // element type // 元素类型   // make chan是 指定的类型 不过这个类型要进行运行时转换 但对应关系是这样的 sendx    uint           // send index // 发送索引recvx    uint           // receive index //  获取索引recvq    waitq          // list of recv waiters // 获取协程等待队列sendq    waitq          // list of send waiters // 发送携程等待队列lock mutex // 锁
}
waitq 结构体

waitq结构体用来存储阻塞在chan上的协程状态sudog结构体(内部包含了 协程信息)
其结构如下:

// 等待队列
type waitq struct {first *sudog // 双向链表 头last  *sudog // 双向链表 尾
}

这个结构体有两个函数 enqueuq 和dequeue 插入链表和删除链表 就是双向链表的基础操作

hchan的结构丑图如下:
在这里插入图片描述
接下来我们按照 执行流程来梳理下部分源码 源码位置在 runtime/chan.go中,首先是 make(chan int32,1)函数,我们都知道 make函数可以初始化,map,slice和chan。编译时根据不同类型,会调用makechan,makeslcie和makemap函数。我们来看下 makechan函数。

makechan

其源码如下:
请结合比较丑流程图来看,比较好理解。


func makechan(t *chantype, size int) *hchan {elem := t.Elem// compiler checks this but be safe.if elem.Size_ >= 1<<16 {throw("makechan: invalid channel element type")}if hchanSize%maxAlign != 0 || elem.Align_ > maxAlign {throw("makechan: bad alignment")}// 计算内存mem, overflow := math.MulUintptr(elem.Size_, uintptr(size))if overflow || mem > maxAlloc-hchanSize || size < 0 {panic(plainError("makechan: size out of range"))}// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.// buf points into the same allocation, elemtype is persistent.// SudoG's are referenced from their owning thread so they can't be collected.// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.var c *hchanswitch {case mem == 0:// Queue or element size is zero.// 如果chan是空的 则需要申请hchanSize空间 以满足 内存对齐要求c = (*hchan)(mallocgc(hchanSize, nil, true))// Race detector uses this location for synchronization.c.buf = c.raceaddr()case elem.PtrBytes == 0:// Elements do not contain pointers.// Allocate hchan and buf in one call.c = (*hchan)(mallocgc(hchanSize+mem, nil, true))c.buf = add(unsafe.Pointer(c), hchanSize)default:// 分配内存// Elements contain pointers.c = new(hchan)c.buf = mallocgc(mem, elem, true)}// 初始化hchanc.elemsize = uint16(elem.Size_)c.elemtype = elemc.dataqsiz = uint(size)lockInit(&c.lock, lockRankHchan)if debugChan {print("makechan: chan=", c, "; elemsize=", elem.Size_, "; dataqsiz=", size, "\n")}return c
}

其实 就是 根据 chan 中元素类型和 大小 来计算需要的存储空间。逻辑还是比较清晰的。初始化了空间后,接下来就应该向chan存数据了,上例中是c <- 1,在编译时会转译成 chansend1函数。其源码如下:

func chansend1(c *hchan, elem unsafe.Pointer) {chansend(c, elem, true, getcallerpc())
}

它只是调用了 chansend 函数 且 阻塞 状态是 true(阻塞状态true代表 如果 chan 的 buf满员,则写协程放入sendq,如果 chan 为空,则读协程会放入 recvq。 如果阻塞状态是 false 如果chan buf,满员 则返回错误。如果chan 为空,则读协程读取chan返回错误。阻塞状态为false 主要是select函数用)。接下来我们来看下其源码:

chansend

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {// 省略部分代码// chan没关闭 且 缓存buf已满 且 是非阻塞模式(select方法使用)  则不会写入 sendq里 直接返回错误if !block && c.closed == 0 && full(c) {return false}// todovar t0 int64if blockprofilerate > 0 {t0 = cputicks()}// 加锁lock(&c.lock)// 如果chan已经关闭 直接panicif c.closed != 0 {unlock(&c.lock)panic(plainError("send on closed channel"))}// 有阻塞的取协程 送的数据 直接交给取协程 并将取协程唤醒if sg := c.recvq.dequeue(); sg != nil {// Found a waiting receiver. We pass the value we want to send// directly to the receiver, bypassing the channel buffer (if any).send(c, sg, ep, func() { unlock(&c.lock) }, 3)return true}// 如果没有阻塞的获取协程 且 channel容量没满 则需要插入channel中if c.qcount < c.dataqsiz {// 获取要放的位置的指针qp := chanbuf(c, c.sendx)if raceenabled {racenotify(c, c.sendx, nil)}// 将元素 放入其中typedmemmove(c.elemtype, qp, ep)c.sendx++// ring buffer 循环数组 到尾 就 从头开始if c.sendx == c.dataqsiz {c.sendx = 0}c.qcount++// 解锁unlock(&c.lock)return true}// buf满了 但是没有 阻塞 则直接返回失败if !block {unlock(&c.lock)return false}// 如果channel满了 则所有send协程就需要加入 sendq 里排队,创建一个 等待状态的 sudog 包装当前 协程和数据 ep 放入 sendq中// Block on the channel. Some receiver will complete our operation for us.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.// 初始化 sudogmysg.elem = epmysg.waitlink = nilmysg.g = gpmysg.isSelect = falsemysg.c = cgp.waiting = mysggp.param = nil// 放入 发送等待队列c.sendq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.gp.parkingOnChan.Store(true)// 调用GMP模型 阻塞协程 并释放 chan的 lock 锁,释放后别的协程可以进来gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceBlockChanSend, 2)// Ensure the value being sent is kept alive until the// receiver copies it out. The sudog has a pointer to the// stack object, but sudogs aren't considered as roots of the// stack tracer.// ep保持活着 不被垃圾回收KeepAlive(ep)// 从等待队列唤醒 // someone woke us up.if mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseclosed := !mysg.successgp.param = nilif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}mysg.c = nil// 释放sudogreleaseSudog(mysg)if closed {if c.closed == 0 {throw("chansend: spurious wakeup")}panic(plainError("send on closed channel"))}return true
}

其执行主流程如下:
首先加锁

  1. 如果recvq里有数据,则将recvq链表头节点中的sudog拿出来,将send的数据直接交给sudog的recv协程,然后唤醒这个协程,最后释放当前send协程拥有的锁,返回。
  2. 否则 如果 hchan buf没满,则将数据存入其中,更新 qcount的值,释放锁,返回。
  3. 否则 初始化一个sudog并将 当前 send协程放入其中,并阻塞(调用GMP模型),然后释放当前send协程拥有的锁(别的send协程可以执行 chansend)
  4. 当前阻塞的send协程被待被recv协程唤醒。
  5. 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回

这里有注意的点,sendq里的协程只能被 recv协程唤醒,反之亦然。这里就带来一个有趣的问题,sendq和recvq能都有数据吗?大神们可以思考下。

send讲完了,接下来改recv了,要不不就阻塞了吗,
例子中的 < - c 是 编译时会转译成 chanrecv1,我们来看下源码:

func chanrecv1(c *hchan, elem unsafe.Pointer) {chanrecv(c, elem, true)
}

其block参数也是 true ,这阻塞跟 send一样。我们来看下 chanrecv方法吧

chanrecv

其源码如下:

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {// 非主逻辑代码 跳过lock(&c.lock)// 如果chan已关闭 且qcount==0 则 证明chan中已没有数据 返回if c.closed != 0 {if c.qcount == 0 {if raceenabled {raceacquire(c.raceaddr())}unlock(&c.lock)if ep != nil {typedmemclr(c.elemtype, ep)}return true, false}// The channel has been closed, but the channel's buffer have data.} else {// 如果chan没关闭 先看 sendq里 有没有阻塞获取sudog  如果有 取出来 直接将 数据给其中的协程 并唤醒 阻塞的读操作 并解锁// Just found waiting sender with not closed.if sg := c.sendq.dequeue(); sg != nil {// Found a waiting sender. If buffer is size 0, receive value// directly from sender. Otherwise, receive from head of queue// and add sender's value to the tail of the queue (both map to// the same buffer slot because the queue is full).recv(c, sg, ep, func() { unlock(&c.lock) }, 3)return true, true}}// 走到这里 证明 chan没关闭 且 sendq没数据 则从缓存区取数据if c.qcount > 0 {// Receive directly from queueqp := chanbuf(c, c.recvx)if raceenabled {racenotify(c, c.recvx, nil)}if ep != nil {typedmemmove(c.elemtype, ep, qp)}typedmemclr(c.elemtype, qp)c.recvx++if c.recvx == c.dataqsiz {c.recvx = 0}c.qcount--unlock(&c.lock)return true, true}// 非阻塞(waitq不能有数据), 返回if !block {unlock(&c.lock)return false, false}// 将 协程 构造sudog 存放到 recvq 中 阻塞协程, 下面代码跟 sendq类似 // no sender available: block on this channel.gp := getg()mysg := acquireSudog()mysg.releasetime = 0if t0 != 0 {mysg.releasetime = -1}// No stack splits between assigning elem and enqueuing mysg// on gp.waiting where copystack can find it.mysg.elem = epmysg.waitlink = nilgp.waiting = mysgmysg.g = gpmysg.isSelect = falsemysg.c = cgp.param = nil// 插入 recvq 链表尾部c.recvq.enqueue(mysg)// Signal to anyone trying to shrink our stack that we're about// to park on a channel. The window between when this G's status// changes and when we set gp.activeStackChans is not safe for// stack shrinking.gp.parkingOnChan.Store(true)gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceBlockChanRecv, 2)// someone woke us upif mysg != gp.waiting {throw("G waiting list is corrupted")}gp.waiting = nilgp.activeStackChans = falseif mysg.releasetime > 0 {blockevent(mysg.releasetime-t0, 2)}success := mysg.successgp.param = nilmysg.c = nilreleaseSudog(mysg)return true, success
}

其执行主流程如下:
首先加锁

  1. 如果协程已关闭 且 buf中没有数据 则 返回
  2. 如果协程没关闭 则从 sendq里取sudog数据,如果取到了 就 将 数据 传递给 sudog的send协程 当前recv协程释放锁,唤醒send协程 返回。
  3. 如果 没有从sendq里取到数据,则从 buf 里取数据,更新qcount 然后 解锁,返回
  4. 如果 buf 为空 ,则初始化一个sudog 将 当前协程放入其中,阻塞当前recv协程,释放当前recv协程拥有的锁。
  5. 等待其他send协程唤醒 当前recv协程。
  6. 唤醒后 将 当前协程状态变为非等待,释放当前协程对应的sudog 返回

send和recv后,接下来就该 close了
close( c)编译后 函数 closechan函数 我们来看下

closechan

源码如下:


func closechan(c *hchan) {if c == nil {panic(plainError("close of nil channel"))}// 加锁lock(&c.lock)if c.closed != 0 {unlock(&c.lock)panic(plainError("close of closed channel"))}if raceenabled {callerpc := getcallerpc()racewritepc(c.raceaddr(), callerpc, abi.FuncPCABIInternal(closechan))racerelease(c.raceaddr())}// 将锁状态变为1 c.closed = 1var glist gList// 释放所有 读协程 // release all readersfor {sg := c.recvq.dequeue()if sg == nil {break}if sg.elem != nil {typedmemclr(c.elemtype, sg.elem)sg.elem = nil}if sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}// 释放所有写协程 包括其数据 所以 我们从关闭的协程里读的数据 就是 buf 中的 不会有 sendq里的数据// release all writers (they will panic)for {sg := c.sendq.dequeue()if sg == nil {break}sg.elem = nilif sg.releasetime != 0 {sg.releasetime = cputicks()}gp := sg.ggp.param = unsafe.Pointer(sg)sg.success = falseif raceenabled {raceacquireg(gp, c.raceaddr())}glist.push(gp)}unlock(&c.lock)// 将所有协程唤醒 // Ready all Gs now that we've dropped the channel lock.for !glist.empty() {gp := glist.pop()gp.schedlink = 0goready(gp, 3)}
}

close 主要是将阻塞的 其读和写协程 携带的 元素 释放,(但是缓存buf里的数据并没释放)可以使得GC捕获,然后将阻塞的协程唤醒。这时 在 chansend函数 lock()处 阻塞的协程会panic,被goready 唤醒的协程会正常退出;在 chanrecv 函数 lock()处 阻塞的协程(或者继续执行<-c的协程)会继续从 buf 拿数据,当数据获取完后,会退出。

总结

channel 其实就是用了共享内存加锁这种机制来处理协程之间的共享数据的,这次阅读源码还是有些细节没整明白,整体理解的也不够透彻,还望大神指正。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/370946.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

[数据结构] 基于选择的排序 选择排序堆排序

标题&#xff1a;[数据结构] 基于选择的排序 选择排序&&堆排序 水墨不写bug &#xff08;图片来源于网络&#xff09; 目录 &#xff08;一&#xff09;选择排序 实现&#xff1a;(默认从小到大排序) 优化后实现方法&#xff1a; &#xff08;二&#xff09;堆排序…

UNIAPP_顶部导航栏右侧添加uni-icons图标,并绑定点击事件,自定义导航栏右侧图标

效果 1、导入插件 uni-icons插件&#xff1a;https://ext.dcloud.net.cn/plugin?nameuni-icons 复制 uniicons.ttf 文件到 static/fonts/ 下 仅需要那个uniicons.ttf文件&#xff0c;不引入插件、单独把那个文件下载到本地也是可以的 2、配置页面 "app-plus":…

1.Python学习笔记

一、环境配置 1.Python解释器 把程序员用编程语言编写的程序&#xff0c;翻译成计算机可以执行的机器语言 安装&#xff1a; 双击Python3.7.0-选择自定义安装【Customize installation】-勾选配置环境变量 如果没有勾选配置环境变量&#xff0c;输入python就会提示找不到命令…

论文略读:Large Language Models Relearn Removed Concepts

通过神经元修剪在模型编辑方面取得的进展为从大型语言模型中去除不良概念提供了希望。 然而&#xff0c;目前尚不清楚在编辑后模型是否具有重新学习修剪概念的能力——>论文通过在重新训练期间跟踪修剪神经元中的概念显著性和相似性来评估模型中的概念重新学习 研究结果表明…

vue3+electron项目搭建,遇到的坑

我主要是写后端,所以对前端的vue啊vue-cli只是知其然,不知其所以然 这样也导致了我在开发前端时候遇到了很多的坑 第一个坑, vue2升级vue3始终升级不成功 第二个坑, vue add electron-builder一直卡进度,进度条走完就是不出提示succes 第一个坑的解决办法: 按照网上说的升级v…

DAMA学习笔记(四)-数据建模与设计

1.引言 数据建模是发现、分析和确定数据需求的过程&#xff0c;用一种称为数据模型的精确形式表示和传递这些数据需求。建模过程中要求组织发现并记录数据组合的方式。数据常见的模式: 关系模式、多维模式、面向对象模式、 事实模式、时间序列模式和NoSQL模式。按照描述详细程度…

如何用简单的html,css,js写出一个带有背景层的删除弹出框

虽然每次项目都是主要写后端&#xff0c;但是有时候前端的样式太丑了&#xff0c;也有点看不下去。弹出框是项目中用的比较多的&#xff0c;比如删除&#xff0c;修改或者添加什么的&#xff0c;都需要一个弹出框。 所以这里简单记录一下&#xff0c;应该如何实现。实现效果如…

Android TextView的属性与用法

文本控件包括TextView、EditText、AutoCompleteTextView、CheckedTextView、MultiAutoCompleteTextView、TextInputLayout等&#xff0c;其中TextView、EditText是最基本最重要的文本控件&#xff0c;是必须要掌握的文本控件。 1.TextView TextView控件用于显示文本信息&…

深度学习原理与Pytorch实战

深度学习原理与Pytorch实战 第2版 强化学习人工智能神经网络书籍 python动手学深度学习框架书 TransformerBERT图神经网络&#xff1a; 技术讲解 编辑推荐 1.基于PyTorch新版本&#xff0c;涵盖深度学习基础知识和前沿技术&#xff0c;由浅入深&#xff0c;通俗易懂&#xf…

分布式整合

一、分布式架构介绍 什么是分布式系统 分布式系统指一个硬件或软件组件分布在不同的网络计算机上&#xff0c;彼此之间仅仅通过消息传递进行通信和协调的系统。 通俗的理解&#xff0c;分布式系统就是一个业务拆分成多个子业务&#xff0c;分布在不同的服务器节点&#xff0…

LabVIEW与OpenCV图像处理对比

LabVIEW和OpenCV在图像处理方面各有特点。LabVIEW擅长图形化编程、实时处理和硬件集成&#xff0c;而OpenCV则提供丰富的算法和多语言支持。通过DLL、Python节点等方式&#xff0c;OpenCV的功能可在LabVIEW中实现。本文将结合具体案例详细分析两者的特点及实现方法。 LabVIEW与…

vue3+antd 实现文件夹目录右键菜单功能

原本的目录结构&#xff1a; 右键菜单&#xff1a; 点击菜单以后会触发回调&#xff1a; 完整的前端代码&#xff1a; <template><a-directory-treev-model:expandedKeys"expandedKeys"v-model:selectedKeys"selectedKeys"multipleshow-li…

解决后端限制导致前端配置跨域仍请求失败报504的问题

文章目录 问题一、通过配置跨域方式二、直接真实接口请求三、解决方式四、后端这样做的原因 总结 问题 前端项目设置跨域proxy处理&#xff0c;接口请求不会报跨域&#xff0c;但是接口请求报了504&#xff0c;这种情况如何处理呢&#xff0c;后端又为何要这么做&#xff0c;下…

【初阶数据结构】深入解析循环队列:探索底层逻辑

&#x1f525;引言 本篇将介绍如何实现循环队列并实现过程需要注意的事项&#xff0c;虽然篇幅较小&#xff0c;但是其中逻辑还是值得引人思考的&#xff0c;循环队列可以采用数组或链表实现&#xff0c;这篇将采用数组实现循环队列 &#x1f308;个人主页&#xff1a;是店小二…

【数据结构】05.双向链表

一、双向链表的结构 注意&#xff1a;这里的“带头”跟前面我们说的“头节点”是两个概念&#xff0c;带头链表里的头节点&#xff0c;实际为“哨兵位”&#xff0c;哨兵位节点不存储任何有效元素&#xff0c;只是站在这里“放哨的”。 “哨兵位”存在的意义&#xff1a;遍历循…

cs231n作业1——KNN

参考文章&#xff1a;assignment1——KNN KNN 测试时分别计算测试样本和训练集中的每个样本的距离&#xff0c;然后选取距离最近的k个样本的标签信息来进行分类。 方法1&#xff1a;Two Loops for i in range(num_test):for j in range(num_train):dist X[i, :] - self.X…

Python爬虫获取视频

验证电脑是否安装python 1.winr输入cmd 2.在黑窗口输入 python.exe 3.不是命令不存在就说明python环境安装完成 抓取快手视频 1.在phcharm应用中新建一个项目 3.新建一个python文件 4.选择python文件,随便起一个名字后按回车 5.安装requests pip install requests 6.寻找需要的…

苹果电脑能玩赛博朋克2077吗 如何在mac上运行赛博朋克2077 crossover能玩什么游戏

各位喜欢赛博朋克风的一定不能错过《赛博朋克2077》。那么《赛博朋克2077》是一款什么样的游戏&#xff1f;《赛博朋克2077》在苹果电脑上可以运行吗&#xff1f;一起来看看介绍吧。 一、《赛博朋克2077》是一款什么样的游戏&#xff1f; 《赛博朋克2077》是一款由CD Projekt …

【Java探索之旅】初识多态_概念_实现条件

文章目录 &#x1f4d1;前言一、多态1.1 概念1.2 多态的实现条件 &#x1f324;️全篇总结 &#x1f4d1;前言 多态作为面向对象编程中的重要概念&#xff0c;为我们提供了一种灵活而强大的编程方式。通过多态&#xff0c;同一种操作可以应用于不同的对象&#xff0c;并根据对象…

Spring Boot基础篇

快速上手 SpringBoot是由Pivotal团队提高的全新框架&#xff0c;其设计目的是用来简化Spring应用的初始化搭建以及开发过程 入门案例 在Idea创建 创建时要选择Spring Initializr。 Server URL为要连接的网站&#xff0c;默认为官网start.spring.io&#xff08;访问速度慢&…