逐步学习Go-并发通道chan(channel)

概述

Go的Routines并发模型是基于CSP,如果你看过七周七并发,那么你应该了解。

什么是CSP?

"Communicating Sequential Processes"(CSP)这个词组的含义来自其英文直译以及在计算机科学中的使用环境。

CSP是 Tony Hoare 在1978年提出的,论文地址在:Communicating sequential processes | Communications of the ACM

拆字解释下

Communicating Sequential Processes(CSP)的三个单词:

  • C for Communicating: 通信,什么的通信那?进程/线程/协程的通信。

  • S for Sequenctial: 顺序的,什么的顺序?进程/线程/协程之间执行任务时应该是有顺序的,完全并行执行是理想化的,现实中就是要先指定完第一个或者第一批任务才能执行第二个或者第二批任务。

  • P for Processses: 进程,这个是进程,估计是因为这个概念提出来的时候比较早。我们这儿得抽象一下,Processes指的是进程/线程/协程。

那么我们来总结一下CSP,CSP就是多个能够进行通信,并且按照顺序执行任务的独立进程。这些进程在各自执行自己的任务的时候,还可以通过某种方式是进行通信。

在Golang中就是通过Channel进行通信。

好了,CSP解释完了,我们来看Go中的Channel,另外CSP的参与者Go Routine我在之前的文章中有提到过,大家可以去:逐步学习Go-协程goroutine

这张图就描述了CSP编程模型。

file

Go中routine代表图中的Process,Channel就是goroutine之间的连接。通道可以让一个goroutine发送信息到另一个goroutine。

Go中的channel

Go中Channel有两种类型:

  1. 无缓冲Channel(Unbuffered)
  2. 有缓冲Channel(Buffered)
    有缓冲的Channel其实就是一个环形缓冲队列;无缓冲的没有队列,因为读写都会阻塞。

Channel的定义

var channel名称 chan channel类型// 类型自动推断
channel名称 := make(chan channel类型, buffer数量(int可以为0))

COPY

比如:我们可以这样来定义:

// 定义了一个channel,还没有make,不确定是否为有缓冲和无缓冲channel
var ch chan int// 定义了一个chnnel, 容量为0,无缓冲channel
ch := make(chan int, 0)// 定义了一个channel,容量为1,有缓冲channel
ch := make(chan int, 1)

COPY

我们实际使用的时候把Channel理解为队列就可以了。

Go中的Channel有两种类型:

  1. 无缓冲channel
  2. 有缓冲channel

无缓冲和有缓冲的特性如下:

  • 无缓冲Channel
    • 无缓冲Channel没有存储数据的能力
    • 发送方向Channel中发送数据的时候,发送方会阻塞直到有接受者接受这个数据
    • 无缓冲Channel典型应用就是go协程同步通信
    • 无缓冲Channel保证通信双方都要准备好数据交换
  • 有缓冲Channel
    • 有缓冲Channel需要定义Channel的容量
    • 发送方向有缓冲Channel发送数据的时候,只有容量满的时候才会阻塞
    • 接收方只有在有缓冲Channel为空时才会阻塞
    • 有缓冲通道的典型应用场景是生产者和消费者

Channel的操作

Channel主要支持2中操作:

  1. 发送(send)
  2. 接收(recv)

这三种操作在代码中的的定义和使用:

  1. 发送和接收都使用<-

来看代码:

// 先定义一个无缓冲channel
ch := make(chan int, 0)
ch := make(chan int)// 发送数据到channel
ch <- 1// 从channel中接收数据
<- ch

COPY

我们看到发送和接收都是使用<-,差别在于:

  1. ch在<-的左边,操作为发送
  2. ch在<-的右边,操作为接收

另外,channel在使用之前都要先创建,使用完毕后要关闭,分别使用makeclose关闭。

// 创建相当于分配channel(allocation)
ch := make(chan int, 0)// 关闭channel,释放channel资源
defer close(ch)

COPY

channel创建完直接关闭了还能操作发送和接收吗?

这个问题我们通过写代码来测试,我们先来测试发送,然后再测试接收。

  • 发送数据到关闭的Channel

    func TestUnbufferedChannel_ShouldPanic_whenWriteValueToAClosedChannel(t *testing.T) {f := func() {ch := make(chan int)close(ch)ch <- 1
    }assert.Panics(t, f, "should panic")
    }
    COPY

    运行截图:

    file

我们的UT PASS了表示发生了panic,这就说明我们不能向已经关闭的channel发送数据。

  • 在已经关闭的Channel上接收

func TestUnbufferedChannel_ShouldSuccess_whenRecvValueAtAClosedChannel(t *testing.T) {ch := make(chan int)close(ch)var val = <-chassert.Equal(t, 0, val)
}func TestUnbufferedChannel_ShouldSuccess_whenRecvEmptyValueAtAClosedChannel(t *testing.T) {ch := make(chan string)close(ch)var val = <-chassert.Equal(t, "", val)
}

COPY

运行截图:

file

这两个UT都可以PASS,我只截图了一个PASS,这说明我们可以在一个关闭的channel上接收数据,只是接收到的都是0值。关于0值要特别说明一下,0值是针对不同类型的,比如:int的0值就是0,string的0值就是空字符串,指针的0值就是nil,看下面代码:

file

并非“任何后续的接收操作都将立即返回零值”,而是当channel中所有已发送的值都被接收后,接下来的接收操作会立即返回零值。

无缓冲channel

无缓冲通道顾名思义:就是没有数据缓冲能力的Channel,有goroutine向无缓冲Channel发送了数据就必须有另一个goroutine来接受,否则发送的goroutine会阻塞;反之,有goroutine从这个channel接受数据而没有另一个goroutine向这个channel发送,那么接受的goroutine也会阻塞。

应用场景:

  1. 部分任务需要同步就用无缓冲channel

来看场景代码:

有发送无接受

发送goroutine会被阻塞。


func TestUnbufferedChannel_ShouldWriteTimeout_WhenNoRoutineReadTheChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)is_timeout := falsetry_to_write_value := 1// whenselect {// 直接向channel中发送case c <- try_to_write_value:case <-time.After(3 * time.Second):// shouldis_timeout = true}assert.True(t, is_timeout)}

COPY

file

有接受无发送

接收goroutine会被阻塞


func TestUnbufferedChannel_ShouldReadTimeout_WhenNoValueWriteToChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)is_timeout := falseselect {// 直接接受channel中的数据case <-c:case <-time.After(3 * time.Second):// shouldis_timeout = true}// 三秒后超时assert.True(t, is_timeout)}

COPY

有发送有接受

有发送有接收,一切正常。

func sum(s []int, c chan int) {sum := 0for _, v := range s {sum += v}// 将累加结果发送到channelc <- sum
}func TestUnbufferedChannel_ShouldRecvValues_WhenWriteValueToChannel(t *testing.T) {// 创建无缓冲channelc := make(chan int)// givens := []int{1, 2, 3, 4, 5, 6}// when// 执行数组累加go sum(s[:], c)ret1 := <-c// should// 和应该是21assert.Equal(t, 21, ret1)
}

COPY

file

使用无缓冲Channel控制并发

// 先定义一个worker函数
// worker函数从无缓冲channel中接收
// 可以接到到数据就执行后面的打印内容
// 打印完成后退出
func worker(id int, lock chan bool) {var shouldRun = <-lockif shouldRun {fmt.Printf("time: %v Worker %d is working\n", time.Now(), id)time.Sleep(time.Second)fmt.Printf("time: %v Worker %d has finished\n", time.Now(), id)}
}func TestUnbufferedChannel_ShouldRunOneByOne_When(t *testing.T) {lock := make(chan bool, 1)// 启动5个goroutine等待释放接收for i := 0; i < 5; i++ {go worker(i, lock)}// 发送5个true到channelfor i := 0; i < 5; i++ {lock <- truetime.Sleep(time.Second)}close(lock)time.Sleep(10 * time.Second)
}

COPY

file

使用无缓冲Channel实现CompleteFuture.anyOf()

CompleteFuture.anyOf() 是 Java 中的一个函数,它返回一个新的 CompletableFuture,当给定的任何 CompletableFuture 完成时,返回的 CompletableFuture 也完成,并带有完成的 CompletableFuture 的结果。


// future函数使用time.Sleep模拟实际业务处理延迟
// 业务处理完成后将业务数据写入无缓冲Channel
func future(id int, delay time.Duration, resChan chan int) {time.Sleep(delay)fmt.Printf("Hi, I have finished my task, my id is %d\n", id)resChan <- id
}// 接收一系列上面的future, 然后使用go routine启动这些future函数并将结果写入到result channel,最后再返回result channel。
func anyOf(futures ...<-chan int) <-chan int {result := make(chan int)for _, future := range futures {go func(f <-chan int) {result <- <-f}(future)}return result
}func TestAnyOf_ShouldSuccess(t *testing.T) {// 创建无缓冲的 channelresChan1 := make(chan int)resChan2 := make(chan int)resChan3 := make(chan int)// 启动 goroutinesgo future(1, 3*time.Second, resChan1)go future(2, 2*time.Second, resChan2)go future(3, 5*time.Second, resChan3)result := anyOf(resChan1, resChan2, resChan3)assert.Equal(t, 2, <-result)
}

COPY

上面有两个比较让人纠结的语法:

  1. <-chan int
  2. result <- <-f
  • <-chan int表示只读通道,anyOf只能读取通道内的数据;有了只读就有只写,只写通道chan<- int
  • result <- <-f表示从通道f中接收数据并将数据写入到result通道。这一行相当于执行了
    v := <-f
    result <- v
    COPY

    file

有缓冲channel

有缓冲channel就是你可以暂时把数据发送到channel,如果channel的缓冲区没有被占用完就不会阻塞,缓冲区被占用完了就被阻塞了。

特性:

  1. 发送goroutine在缓冲区没有用完之前不会阻塞,缓冲区被使用完了之后发送goroutine就会被阻塞
  2. 接受goroutine在缓冲区有数据时,不会阻塞,缓冲区没有数据时会被阻塞

有缓冲channel应用场景是什么?

  1. 任务队列就是最典型的场景,生产者消费者模型
  2. 其他无缓冲channel搞不定的就用有缓冲channel

实现一个有缓冲channel的RateLimiter

import ("sync""sync/atomic""testing""fmt""time""github.com/stretchr/testify/assert"
)type RateLimiter struct {tokens       chan struct{}refillTicker *time.TickercloseCh      chan struct{}
}func NewRateLimiter(rate int) *RateLimiter {r := &RateLimiter{tokens:       make(chan struct{}, rate),refillTicker: time.NewTicker(time.Second / time.Duration(rate)),closeCh:      make(chan struct{}),}go r.refill()return r
}func (r *RateLimiter) refill() {for {select {case <-r.refillTicker.C:select {case r.tokens <- struct{}{}:default:}case <-r.closeCh:r.refillTicker.Stop()return}}
}func (r *RateLimiter) Acquire() {<-r.tokens
}func (r *RateLimiter) TryAcquire() bool {select {case <-r.tokens:return truedefault:return false}
}func (r *RateLimiter) Close() {close(r.closeCh)
}func myTask(id int) {fmt.Printf("time: %v workder %d is working\n", time.Now(), id)time.Sleep(20 * time.Millisecond)fmt.Printf("time: %v workder %d has finished\n", time.Now(), id)
}func TestRateLimiter_ShouldPermitWithBlocking_WhenRequestOnce(t *testing.T) {rateLimiter := NewRateLimiter(100)startTime := time.Now()for i := 0; i < 1; i++ {rateLimiter.TryAcquire()myTask(i)}endTime := time.Now()elapsedTime := endTime.Sub(startTime)fmt.Printf("elapsed time: %v\n", elapsedTime)fmt.Printf("explect time: %v\n", 300*time.Millisecond)assert.True(t, elapsedTime < 300*time.Millisecond)
}func TestRateLimiter_ShouldLimitPermits_WhenGivenLimitedResource(t *testing.T) {var counter int32 = 0rateLimiter := NewRateLimiter(100)wg := sync.WaitGroup{}startTime := time.Now()for i := range 1000 {wg.Add(1)go func() {rateLimiter.Acquire()myTask(i)atomic.AddInt32(&counter, 1)wg.Done()}()}wg.Wait()endTime := time.Now()elapsedTime := endTime.Sub(startTime)fmt.Printf("elapsed time: %v\n", elapsedTime)fmt.Printf("should greater than explect time: %v\n", 10*time.Second)assert.Equal(t, counter, int32(1000))assert.True(t, 10*time.Second < elapsedTime)
}

COPY

file

实现无缓冲Channel实现Java中的CyclicBarrier

CyclicBarrier 是一个同步工具,它允许一组线程互相等待,直到他们都到达了一个共同的屏障点。在涉及固定大小的线程团队必须偶尔相互等待的程序中,CyclicBarriers 非常有用。之所以称之为“循环”屏障,是因为在等待的线程被释放之后,它可以被重复使用。

  • await() 所有的参与者都调用了wait方法后返回或者被中断
    我们就实现这个await方法,暂时不支持中断,代码如下:

package mainimport ("fmt""sync""sync/atomic""time"
)// CyclicBarrier 让一组goroutine在到达某个点之后才能继续执行
type CyclicBarrier struct {// 总goroutine数量participant int// 用于等待所有goroutine准备好waitGroup sync.WaitGroup// 无缓冲channel,用于goroutine间同步barrierChan chan struct{}running     int32
}// NewCyclicBarrier 创建一个新的CyclicBarrier
func NewCyclicBarrier(participant int) *CyclicBarrier {b := &CyclicBarrier{participant: participant,barrierChan: make(chan struct{}),running:     int32(participant),}// 设置等待的goroutine数b.waitGroup.Add(participant)return b
}// 当一个goroutine调用Wait时,
// 它将在屏障处等待,
// 直到所有goroutine都到达这里
func (b *CyclicBarrier) Wait() {// 一个goroutine准备好了b.waitGroup.Done()// 等待所有goroutine都准备好b.waitGroup.Wait()// 当所有goroutine都准备好了,关闭channel进行广播通知if atomic.AddInt32(&b.running, -1) == 0 {close(b.barrierChan)} else {// 等待通知<-b.barrierChan}}// 阻塞调用goroutine直到所有goroutine都调用了Wait方法,
// 屏障开放后,重新置为待关闭状态
func (b *CyclicBarrier) Await() {// 等待屏障开放的信号<-b.barrierChan// 重置屏障状态b.barrierChan = make(chan struct{})b.waitGroup.Add(b.participant)
}func (b *CyclicBarrier) Close() {close(b.barrierChan)
}func main() {// 这里我们设置3个goroutine参与barrier := NewCyclicBarrier(100)for i := 0; i < 100; i++ {go func(i int) {fmt.Printf("Goroutine %d is working...\n", i)// 模拟工作time.Sleep(time.Duration(i+1) * time.Second)fmt.Printf("Goroutine %d reached the barrier.\n", i)barrier.Wait()fmt.Printf("Goroutine %d passed the barrier.\n", i)}(i)}// 主goroutine等待所有goroutine都到达屏障barrier.Await()fmt.Println("All goroutines have passed the barrier")
}

COPY

参考

  1. go/src/runtime/chan.go at master · golang/go · GitHub
  2. 逐步学习Go-并发通道chan(channel) – FOF编程网

编写不易,如有问题请评论告知

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/288765.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

day 36 贪心算法 part05● 435. 无重叠区间 ● 763.划分字母区间 ● 56. 合并区间

一遍过。首先把区间按左端点排序&#xff0c;然后右端点有两种情况。 假设是a区间&#xff0c;b区间。。。这样排列的顺序&#xff0c;那么 假设a[1]>b[0],如果a[1]>b[1]&#xff0c;就应该以b[1]为准&#xff0c;否则以a[1]为准。 class Solution { public:static bo…

Android中运动事件的处理

1.目录 目录 1.目录 2.前言 3.程序演示 4.第二种程序示例 5.扩展 2.前言 触摸屏&#xff08;TouchScreen&#xff09;和滚动球&#xff08;TrackBall&#xff09;是 Android 中除了键盘之外的主要输入设备。如果需要使用触摸屏和滚动球&#xff0c;主要可以通过使用运动事…

渐变色x轴换行柱状图

// 系统上云率const optionBar {title: {text: 系统上云率,left: left,textStyle: {color: "#fff",fontSize: 14,fontWeight: 650,align: "center",},},color: [#32C5FF, #00F766, #EECB5F],grid: {top: 40,bottom: 0,},legend: { // 控制图例组件show: …

K8s Pod亲和性、污点、容忍度、生命周期与健康探测详解(上)

&#x1f407;明明跟你说过&#xff1a;个人主页 &#x1f3c5;个人专栏&#xff1a;《Kubernetes航线图&#xff1a;从船长到K8s掌舵者》 &#x1f3c5; &#x1f516;行路有良友&#xff0c;便是天堂&#x1f516; 在上一章节中&#xff0c;我们详细探讨了Pod的概念、创建、…

逐步学习Go-协程goroutine

参考&#xff1a;逐步学习Go-协程goroutine – FOF编程网 什么是线程&#xff1f; 简单来说线程就是现代操作系统使用CPU的基本单元。线程基本包括了线程ID&#xff0c;程序计数器&#xff0c;寄存器和线程栈。线程共享进程的代码区&#xff0c;数据区和操作系统的资源。 线…

数据结构——排序算法

1、排序的概念 排序是指的是将一组数据&#xff08;如数字、单词、记录等&#xff09;按照某种特定的顺序&#xff08;升序或降序&#xff09;进行排列的过程。排序算法是实现排序的程序或方法&#xff0c;它们在软件开发和数据处理中扮演着至关重要的角色。 排序算法可以根据…

servlet开发详解

一、什么是servlet&#xff0c;干什么用的&#xff1f;&#xff1f;&#xff1f; tomcat作为一个web服务器&#xff0c;也称作servlet容器。servlet只有放在web服务器中才能运行&#xff0c;不能独立运行。tomcat这个容器要做三件事&#xff1a;接收请求、处理请求和响应请求。…

文生视频大模型Sora的复现经验

大家好&#xff0c;我是herosunly。985院校硕士毕业&#xff0c;现担任算法研究员一职&#xff0c;热衷于机器学习算法研究与应用。曾获得阿里云天池比赛第一名&#xff0c;CCF比赛第二名&#xff0c;科大讯飞比赛第三名。拥有多项发明专利。对机器学习和深度学习拥有自己独到的…

java调用jacob进行文件转换ppt转pdf或者png

java调用jacob进行文件转换ppt转pdf或者png 前情提要 最近项目上&#xff0c;遇到一个复杂的ppt&#xff0c;最终要求是要将ppt每一页转成图片原本这个是不难&#xff0c;网上一搜一大堆案例&#xff0c;外加我本身也比较精通aspose&#xff0c;那还不是分分钟搞定。结果就是…

Healix Protocol 的 HLX 通证预售:医疗领域的未来展望

Healix Protocol推出 HLX 通证预售&#xff0c;将带来医疗领域的重要变革。通过其区块链技术&#xff0c;Healix Protocol致力于重新定义医疗服务的可及性与负担性&#xff0c;成为医疗行业的希望之光。该项目旨在增强透明度、可及性和效率&#xff0c;推动医疗体系向更加公平和…

Hadoop面试重点

文章目录 1. Hadoop 常用端口号2.Hadoop特点3.Hadoop1.x、2.x、3.x区别 1. Hadoop 常用端口号 hadoop2.xhadoop3.x访问HDFS 端口500709870访问 MR 执行情况端口80888088历史服务器1988819888客户端访问集群端口90008020 2.Hadoop特点 高可靠&#xff1a;Hadoop底层维护多个数…

Rust语言中Regex正则表达式,匹配和查找替换等

官方仓库&#xff1a;https://crates.io/crates/regex 文档地址&#xff1a;regex - Rust github仓库地址&#xff1a;GitHub - rust-lang/regex: An implementation of regular expressions for Rust. This implementation uses finite automata and guarantees linear tim…

LoadBalance 负载均衡服务调用

前身:Ribbon LB负载均衡(Load Balance)是什么 简单的说就是将用户的请求平摊的分配到多个服务上&#xff0c;从而达到系统的HA&#xff08;高可用&#xff09;&#xff0c;常见的负载均衡有软件Nginx&#xff0c;LVS&#xff0c;硬件 F5等 spring-cloud-starter-loadbalancer组…

OSG编程指南<二十一>:OSG视图与相机视点更新设置及OSG宽屏变形

1、概述 什么是视图?在《OpenGL 编程指南》中有下面的比喻,从笔者开始学习图形学就影响深刻,相信对读者学习场景管理也会非常有帮助。 产生目标场景视图的变换过程类似于用相机进行拍照,主要有如下的步骤: (1)把照相机固定在三脚架上,让它对准场景(视图变换)。 (2)…

【办公类-21-11】 20240327三级育婴师 多个二级文件夹的docx合并成docx有页码,转PDF

背景展示&#xff1a;有页码的操作题 背景需求&#xff1a; 实操课终于全部结束了&#xff0c;把考试内容&#xff08;docx&#xff09;都写好了 【办公类-21-10】三级育婴师 视频转文字docx&#xff08;等线小五单倍行距&#xff09;&#xff0c;批量改成“宋体小四、1.5倍行…

洛谷day3

B2053 求一元二次方程 - 洛谷 掌握printf用法&#xff1b; #include <iostream> #include <cmath> using namespace std; double a,b,c; double delta; double x1,x2;int main() {cin>>a>>b>>c;delta b*b-4*a*c;if(delta>0){x1 (-bsqrt…

从根本上优雅地解决 VSCode 中的 Python 模块导入问题

整体概述&#xff1a; 在我尝试运行 test_deal_file.py 时&#xff0c;我遇到了一个 ModuleNotFoundError 错误&#xff0c;Python告诉我找不到名为 controllers 的模块。这意味着我无法从 deal_file.py 中导入 read_excel 函数。 为了解决这个问题&#xff0c;我尝试了几种方法…

JAVA面试大全之JVM和调优篇

目录 1、类加载机制 1.1、类加载的生命周期&#xff1f; 1.2、类加载器的层次? 1.3、Class.forName()和ClassLoader.loadClass()区别? 1.4、JVM有哪些类加载机制&#xff1f; 2、内存结构 2.1、说说JVM内存整体的结构&#xff1f;线程私有还是共享的&#xff1f; 2.2…

Rust使用原始字符串字面量实现Regex双引号嵌套双引号正则匹配

rust使用Regex实现正则匹配的时候&#xff0c;如果想实现匹配双引号&#xff0c;就需要使用原始字符串字面量&#xff0c;不然无法使用双引号嵌套的。r#"..."# 就表示原始字符串字面量。 比如使用双引号匹配&#xff1a; use regex::Regex;fn main() {println!(&qu…

【性能优化】 【回溯】 【字符串】1307. 口算难题

作者推荐 视频算法专题 本文涉及知识点 数学 回溯 字符串 性能优化 LeetCode1307. 口算难题 给你一个方程&#xff0c;左边用 words 表示&#xff0c;右边用 result 表示。 你需要根据以下规则检查方程是否可解&#xff1a; 每个字符都会被解码成一位数字&#xff08;0 - …