16 go语言(golang) - 并发编程select和workerpool

select

在Go语言中,select语句用于处理多个channel的操作。它类似于switch语句,但专门用于channel通信。通过使用select,可以同时等待多个channel操作,并在其中一个操作准备好时执行相应的代码块。这对于需要处理并发任务和协调goroutine之间的通信非常有用。

基本用法

  • 多路复用:同时监听多个channel上的数据传输。
  • 非阻塞选择:如果没有任何case可以执行,可以使用default分支来实现非阻塞行为。
  • 超时控制:结合time.After函数,可以实现超时机制。
select {
case <-ch1:// 如果ch1成功读取到数据,则执行该case
case ch2 <- x:// 如果x成功发送到ch2,则执行该case
default:// 如果上面都没有成功,则进入default处理流程
}

具体示例

  • 使用select语句来等待任意一个channel的数据传入,并根据哪个通道先收到消息来决定执行哪个分支。
func Test1(t *testing.T) {ch1 := make(chan string)ch2 := make(chan string)ch3 := make(chan string)go func() {time.Sleep(100 * time.Millisecond)ch1 <- "线程1"}()go func() {time.Sleep(200 * time.Millisecond)ch2 <- "线程2"}()go func() {// 只管消费数据,阻塞着一直消费for range ch3 {}}()for i := 0; i < 5; i++ {select {// 这样写会有双重读取的问题// 因为尝试从 ch1 接收数据,然后立即再次接收数据并打印。这会导致问题,因为第二次接收是在没有检查是否有数据可用的情况下进行的,这可能会阻塞程序。//case <-ch1://	value := <-ch1//	fmt.Printf("接受到来自 %s 的消息 \n", value)case value := <-ch1:fmt.Printf("接受到来自 %s 的消息 \n", value)case value := <-ch2:fmt.Printf("接受到来自 %s 的消息 \n", value)case ch3 <- "msg":time.Sleep(800 * time.Millisecond)fmt.Printf("尝试向ch3发送消息 \n")}}}

输出

尝试向ch3发送消息 
接受到来自 线程2 的消息 
尝试向ch3发送消息 
尝试向ch3发送消息 
尝试向ch3发送消息 

注意:selectcase不是顺序执行的,在Go语言中,select语句的行为与switch语句不同。它并不是按顺序从上到下检查每个case,而是随机选择一个可以执行的case。这种设计是为了避免优先级问题,从而使得所有可用的channel操作都有平等的机会被选中。

default

func Test2(t *testing.T) {ch1 := make(chan string)ch2 := make(chan string)go func() {time.Sleep(100 * time.Millisecond)ch1 <- "线程1"}()go func() {time.Sleep(200 * time.Millisecond)ch2 <- "线程2"}()for i := 0; i < 3; i++ {select {case value := <-ch1:fmt.Printf("接受到来自 %s 的消息 \n", value)case value := <-ch2:fmt.Printf("接受到来自 %s 的消息 \n", value)default:fmt.Println("没有数据")time.Sleep(1000 * time.Millisecond) // 没有等待的话会直接输出三次『没有数据』}}}

工作原理

  1. 随机选择
    • 当有多个case都准备好时,Go会随机选择其中一个进行执行。这意味着如果有多个channel同时可以接收或发送数据,具体哪个case会被选中是不可预测的。
    • default分支比较特殊,只有在没有其他case可以执行时才会被选择。因此,它并不是与其他case一起随机选择的,而是作为一种“兜底”机制。
  2. 非阻塞检查
    • 如果没有任何channel操作可以立即进行,并且提供了default分支,那么default分支将被执行。
    • 如果没有default分支,则select将阻塞直到某个channel操作可以进行。
    • 如果所有channel操作都无法立即进行(即没有可用的数据接收或发送),且存在一个default分支,那么select将立即执行该默认分支,而不会阻塞。
  3. 公平性
    • 这种随机选择机制确保了所有准备好的通道都有机会被处理,而不会因为代码中的位置而导致某些通道总是优先于其他通道。

超时控制

  • 使用 time.After 函数可以实现对某个操作设置超时时间。如果在指定时间内没有接收到数据,可以执行超时逻辑。
func Test3(t *testing.T) {ch := make(chan string)go func() {var random = rand.Intn(2000)// 模拟随机等待0到2秒time.Sleep(time.Duration(random) * time.Millisecond)ch <- "msg"}()select {case msg := <-ch:fmt.Println("接受到消息:", msg)case <-time.After(1 * time.Second):fmt.Println("Timeout!")}
}

关闭通道检测

  • 当一个通道被关闭并且所有的数据都被读取完毕后,再次读取会立即返回零值,这可以通过 select 来检测。
func Test4(t *testing.T) {ch := make(chan string)go func() {ch <- "msg"}()select {case msg, ok := <-ch:if ok {fmt.Println("接受到消息:", msg)} else {fmt.Println("channel 已经关闭")}}close(ch)select {case msg, ok := <-ch:if ok {fmt.Println("接受到消息:", msg)} else {fmt.Println("channel 已经关闭")}}}

workerpool

在Golang中,Worker Pool是一种并发编程模式,用于限制同时运行的goroutine数量,以控制资源使用和提高程序的性能。通过使用Worker Pool,可以有效地管理任务执行,避免因过多的goroutine导致系统资源耗尽。

基本原理

  1. 任务队列:一个用于存储待处理任务的通道。
  2. Workers:一组固定数量的goroutine,从任务队列中获取任务并执行。
  3. 结果收集:通常会有一个结果通道,用于收集每个worker完成后的结果。

工作流程

  • 主线程将所有待处理的任务放入到任务队列中。
  • 一组预先启动好的worker从该队列中获取任务进行处理。
  • 每个worker在完成其当前工作后,会继续从队列中获取下一个可用的工作,直到所有工作都被完成。

与线程池的区别

Worker Pool与线程池在概念上非常相似。两者都是用于管理并发任务执行的设计模式,旨在限制同时运行的工作单元(goroutines或线程)的数量,以有效利用系统资源并提高性能。

相似

  1. 资源管理:都用于限制并发执行单元(如goroutine或线程)的数量,从而控制对系统资源(如CPU、内存等)的使用。
  2. 复用工作单元:通过复用已有的工作单元来减少创建和销毁它们所带来的开销,提高效率。
  3. 异步执行:允许提交大量任务,并由池中的工作单元异步地完成这些任务。

不同

  1. 实现机制

    • 在Golang中,Worker Pool通常基于goroutines实现,而不是操作系统级别的线程。这使得Golang中的Worker Pool更加轻量级,因为goroutine比传统线程更小且启动速度更快。
    • 传统语言中的线程池通常直接基于操作系统提供的线程模型,这可能会导致较高的上下文切换开销和内存消耗。
  2. 调度方式

    • Golang有自己的调度器来管理goroutines,它可以自动将数千个甚至更多个goroutine映射到少量OS线程上运行。
    • 传统语言中的线程池依赖于操作系统调度器来管理和分配CPU时间片给各个线程。

实现

Golang标准库中没有内置的专门用于实现Worker Pool的包或功能。不过,Golang提供了强大的goroutine和channel机制,使得实现自定义的Worker Pool变得相对简单。

Worker Pool(工作池)的实现思路主要围绕如何有效管理和调度一组有限的工作者(goroutine)来执行任务。

1、定义 Worker Pool 结构

首先,定义一个 WorkerPool 结构,它包含以下元素:

  • 最大工作者数(maxWorkers:控制同时运行的 goroutine 的最大数量。
  • 任务队列(taskQueue:用于存储待处理任务,通常使用channel来实现。
  • 停止信号(stopSignal:一个通道,用于发送停止信号给所有工作者,让它们停止执行。
  • 同步机制:如互斥锁(sync.Mutex)或 WaitGroup(sync.WaitGroup),用于同步和等待所有工作者完成。
// WorkerPool
// 池
type WorkerPool struct {maxWorkerNums inttaskQueue     chan TaskstopSignal    chan int // 停止信号,接受到数据时时停止waitGroup     sync.WaitGroup// 保证停止后不能再提交任务isStop boolmu     sync.Mutex
}

2、初始化 Worker Pool

实现一个 New 函数来初始化 WorkerPool,设置初始状态,并启动一定数量的工作者。

  • 根据 maxWorkers 初始化工作者队列。
  • 创建 taskQueue
// NewWorkerPool
/* 初始化池*/
func NewWorkerPool(maxWorkerNums int) WorkerPool {return WorkerPool{maxWorkerNums,make(chan Task),make(chan int, maxWorkerNums),sync.WaitGroup{},false,sync.Mutex{},}
}

3、提交任务

实现一个 Submit 方法,用于提交任务到 Worker Pool。

func (p *WorkerPool) submit(task Task) {p.mu.Lock()defer p.mu.Unlock()// 前提是队列还没有关闭的情况下,提交任务if !p.isStop {p.taskQueue <- task}
}

4、启动池

工作者运行在一个无限循环中,不断从 workerQueue 中取出任务并执行。

  • 使用 for 循环和 select 语句监听 workerQueue 中的任务。
  • 执行任务,然后等待下一个任务。
  • 如果接收到任务队列关闭或 stopSignal,工作者退出循环。
// worker执行任务
func (p *WorkerPool) worker(num int) {defer p.waitGroup.Done()for {select {case <-p.stopSignal:// 接受到停止信号fmt.Printf("【%d号】接受到停止讯号,结束!\n", num)returncase task, ok := <-p.taskQueue:if ok {// 具体的业务逻辑process(task, num)} else {// 任务队列被关闭了,表示没有任务了fmt.Printf("【%d号】完成!\n", num)return}default:// 暂时没有任务,睡眠10毫秒fmt.Println("!空闲!没有任务")time.Sleep(time.Millisecond * 100)}}
}

5、停止

实现一个 Stop 方法,用于优雅地关闭 Worker Pool。

  • 发送停止信号给所有工作者,让它们结束循环。
  • 使用 WaitGroup 等待所有工作者完成当前任务。
  • 关闭任务队列和工作者队列。
func (p *WorkerPool) stop() {p.mu.Lock()defer p.mu.Unlock()// 发送停止讯号for i := 0; i < p.maxWorkerNums; i++ {p.stopSignal <- 1}if !p.isStop { // 确保只关闭一次。p.isStop = trueclose(p.taskQueue)}p.waitGroup.Wait()close(p.stopSignal)
}

6、具体任务和业务逻辑

// Task 具体需要执行单元任务
type Task interface{}func process(t Task, num int) {fmt.Printf("...【%d号】正在处理任务:%v\n", num, t)// 模拟耗时操作time.Sleep(time.Duration(rand.Intn(1000)) * time.Millisecond)fmt.Printf("✓完成任务:%v\n", t)
}

7、启动测试程序

package mainimport ("fmt""math/rand""sync""time"
)func main() {// 初始化一个工作者池pool := NewWorkerPool(5)// 初始化并开始工作pool.start()// 模拟发送1000个任务wg := sync.WaitGroup{}wg.Add(1)go func() {for i := 0; i < 1000; i++ {pool.submit(fmt.Sprintf("任务%d", i))}}()wg.Done()wg.Wait()// 模拟程序运行中time.Sleep(2 * time.Second)// 强制停止程序pool.stop()fmt.Println("main结束")
}

输出:

!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
...【1号】正在处理任务:任务0
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
!空闲!没有任务
...【3号】正在处理任务:任务1
...【0号】正在处理任务:任务2
!空闲!没有任务
...【4号】正在处理任务:任务3
✓完成任务:任务3
...【4号】正在处理任务:任务4
...【2号】正在处理任务:任务5
✓完成任务:任务4
...【4号】正在处理任务:任务6
✓完成任务:任务6
...【4号】正在处理任务:任务7
✓完成任务:任务2
...【0号】正在处理任务:任务8
✓完成任务:任务5
...【2号】正在处理任务:任务9
✓完成任务:任务0
...【1号】正在处理任务:任务10
✓完成任务:任务7
...【4号】正在处理任务:任务11
✓完成任务:任务1
...【3号】正在处理任务:任务12
✓完成任务:任务8
...【0号】正在处理任务:任务13
✓完成任务:任务11
...【4号】正在处理任务:任务14
✓完成任务:任务10
...【1号】正在处理任务:任务15
✓完成任务:任务12
...【3号】正在处理任务:任务16
✓完成任务:任务13
...【0号】正在处理任务:任务17
✓完成任务:任务14
...【4号】正在处理任务:任务18
✓完成任务:任务17
...【0号】正在处理任务:任务19
✓完成任务:任务9
...【2号】正在处理任务:任务20
✓完成任务:任务15
...【1号】正在处理任务:任务21
✓完成任务:任务18
...【4号】正在处理任务:任务22
✓完成任务:任务21
...【1号】正在处理任务:任务23
✓完成任务:任务22
...【4号】正在处理任务:任务24
✓完成任务:任务24
...【4号】正在处理任务:任务25
✓完成任务:任务16
...【3号】正在处理任务:任务26
✓完成任务:任务23
...【1号】正在处理任务:任务27
✓完成任务:任务20
...【2号】正在处理任务:任务28
✓完成任务:任务26
【3号】完成!
✓完成任务:任务19
【0号】完成!
✓完成任务:任务27
【1号】完成!
✓完成任务:任务25
【4号】接受到停止讯号,结束!
✓完成任务:任务28
【2号】接受到停止讯号,结束!
main结束

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/480457.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

TCP socket api详解 续

文章目录 守护进程怎么做到&#xff1f;setsid返回值 dev/null字符文件 daemonTCP协议 退出的时候呢&#xff1f; 会话有很多后台任务&#xff0c;bash肯定会退&#xff0c;那后台会话怎么办呢&#xff1f; 理论上也要退的&#xff0c;但实际上关了bash&#xff0c;bash肯定要…

06_数据类型

数据类型 数据类型分类 JavaScript 语言的每一个值,都属于某一种数据类型。JavaScript 的数据类型,共有六种。(ES6 又新增了第七种 Symbol 类型的值和第八种 BigInt类型,当前课程暂不涉及) 据类型分类 原始类型(基础类型) var age = 20, var name = 尚学堂"; var le…

scrapy豆瓣爬虫增强-批量随机请求头

1.1 豆瓣爬虫增强,中间件随机请求头 1.2 清除原有的中间件,进行中间件测试 1.3 导入全新的中间件 1.4 运行爬虫,这个时候的请求头是固定的 1.5 强化对agent的输出,会舍弃输出cookie,使输出更明了 1.6 转移输出请求头位置 新增输出 造成这样问题的原因是Douban/Douban/settings…

非关系型数据库有哪些特点?

非关系型数据库&#xff08;NoSQL&#xff09;具有以下主要特点‌&#xff1a;‌1 ‌灵活的数据存储方式‌&#xff1a;非关系型数据库不采用传统的基于表格的数据存储方式&#xff0c;而是采用更加灵活的数据存储方式。它可以存储各种类型的数据&#xff0c;包括文本、图像、音…

智慧防汛平台在城市生命线安全建设中的应用

随着城市化进程的加快&#xff0c;城市基础设施的复杂性和互联性不断增强&#xff0c;城市生命线的安全管理面临前所未有的挑战。智慧防汛平台作为城市生命线安全建设的重要组成部分&#xff0c;通过现代信息技术提升城市防汛应急管理的智能化水平&#xff0c;保障城市安全。 …

【ChatGPT大模型开发调用】如何获得 OpenAl API Key?

如何获取 OpenAI API Key 获取 OpenAI API Key 主要有以下三种途径&#xff1a; OpenAI 官方平台 (推荐): 开发者用户可以直接在 OpenAI 官方网站 (platform.openai.com) 注册并申请 API Key。 通常&#xff0c;您可以在账户设置或开发者平台的相关页面找到申请入口。 Azure…

vue3 发送 axios 请求时没有接受到响应数据

<script setup> import Edit from ./components/Edit.vue import axios from axios import { onMounted,ref } from vue// TODO: 列表渲染 //装数据的列表 const list ref([]) const count ref(0) const getList async () > {//通过发送 /list 请求从后端拿到列表数…

衡山派D133EBS 开发环境安装及SDK编译烧写镜像烧录

1.创建新文件夹&#xff0c;用来存放SDK包&#xff08;其实本质就是路径要对就ok了&#xff09;&#xff0c;右键鼠标通过Open Git Bash here来打开git 输入命令 git clone --depth1 https://gitee.com/lcsc/luban-lite.git 来拉取&#xff0c;如下所示&#xff1a;&#xff0…

关于Vscode配置Unity环境时的一些报错问题(持续更新)

第一种报错&#xff1a; 下载net请求超时&#xff08;一般都会超时很正常的&#xff09; 实际时并不需要解决&#xff0c;它对你的项目毫无影响 第二种报错&#xff1a; .net版本不匹配 解决&#xff1a;&#xff08;由于造成问题不一样&#xff0c;所以建议都尝试一次&…

快速理解微服务中Fegin的概念

一.由来 1.在传统的架构里面&#xff0c;我们是通过使用RestTemplate来访问其他的服务&#xff0c;但是这种方式就存在了一个很大的缺陷&#xff0c;也就是被调用方如果发生了服务的迁移(IP和端口发生了变化)&#xff0c;那么调用方也需要同步的在代码里面进行修改&#xff0c;…

【网络安全 | 漏洞挖掘】绕过SAML认证获得管理员面板访问权限

未经许可,不得转载。 文章目录 什么是SAML认证?SAML是如何工作的?SAML响应结构漏洞结果什么是SAML认证? SAML(安全断言标记语言)用于单点登录(SSO)。它是一种功能,允许用户在多个服务之间切换时无需多次登录。例如,如果你已经登录了facebook.com,就不需要再次输入凭…

STM32C011开发(1)----开发板测试

STM32C011开发----1.开发板测试 概述硬件准备视频教学样品申请源码下载参考程序生成STM32CUBEMX串口配置LED配置堆栈设置串口重定向主循环演示 概述 STM32C011F4P6-TSSOP20 评估套件可以使用户能够无缝评估 STM32C0 系列TSSOP20 封装的微控制器功能&#xff0c;基于 ARM Corte…

医院分诊管理系统|Java|SSM|VUE| 前后端分离

【重要1⃣️】前后端源码万字文档部署文档 【重要2⃣️】正版源码有问题包售后 【重要3⃣️】可复制品不支持退换货 【包含内容】 【一】项目提供非常完整的源码注释 【二】相关技术栈文档 【三】源码讲解视频 【其它服务】 【一】可…

Android数据存储——文件存储、SharedPreferences、SQLite、Litepal

数据存储全方案——详解持久化技术 Android系统中主要提供了3中方式用于简单地实现数据持久化功能&#xff0c;即文件存储、SharedPreference存储以及数据库存储。除了这三种方式外&#xff0c;还可以将数据保存在手机的SD卡中&#xff0c;不给使用文件、SharedPreference或者…

vue3 + vite + antdv 项目中自定义图标

前言&#xff1a; 去iconfont-阿里巴巴矢量图标库 下载自己需要的icon图标&#xff0c;下载格式为svg&#xff1b;项目中在存放静态资源的文件夹下 assets 创建一个存放svg格式的图片的文件夹。 步骤&#xff1a; 1、安装vite-plugin-svg-icons npm i vite-plugin-svg-icons …

【H2O2|全栈】Node.js(2)

目录 前言 开篇语 准备工作 npm 概念 常见指令 项目中的包 创建项目 启动项目 服务器搭建 express 基本步骤 搭建应用 创建路由 监听端口 启动服务器 面试相关 结束语 前言 开篇语 本系列博客分享Node.js的相关知识点&#xff0c;本章讲解npm与服务器的简单…

QChart数据可视化

目录 一、QChart基本介绍 1.1 QChart基本概念与用途 1.2 主要类的介绍 1.2.1 QChartView类 1.2.2 QChart类 1.2.3QAbstractSeries类 1.2.4 QAbstractAxis类 1.2.5 QLegendMarker 二、与图表交互 1. 动态绘制数据 2. 深入数据 3. 缩放和滚动 4. 鼠标悬停 三、主题 …

Harbor安装、HTTPS配置、修改端口后不可访问?

Harbor安装、HTTPS配置、修改端口后不可访问&#xff1f; 大家好&#xff0c;我是秋意零。今天分享Harbor相关内容&#xff0c;安装部分可完全参考官方文档&#xff0c;写的也比较详细。 安装Harbor 官方文档&#xff1a;https://goharbor.io/docs/2.12.0/install-config/ …

MTK 展锐 高通 sensorhub架构

一、MTK平台 MTK框架可以分为两部分&#xff0c;AP和SCP。 AP是主芯片&#xff0c;SCP是协处理器&#xff0c;他们一起工作来处理sensor数据。 SCP 是用来处理sensor和audio相关功能和其他客制化需求的一个协处理理器&#xff0c;MTK SCP选择freeRTOS作为操作系统&#xff0c…

JDK的版本演化,JDK要收费吗?

Java版本演化历史 Java的版本历史可以追溯到1995年&#xff0c;以下是Java语言自诞生以来的主要版本及其关键特性&#xff1a; 一、早期版本 Java 1.0&#xff08;1996年1月发布&#xff09; 引入了Java虚拟机&#xff08;JVM&#xff09;和Java应用编程接口&#xff08;API&…