Go协程池gopool源码解析

1、gopool简介

Repository:https://github.com/bytedance/gopkg/tree/develop/util/gopool

gopool is a high-performance goroutine pool which aims to reuse goroutines and limit the number of goroutines.

It is an alternative to the go keyword.

gopool的用法非常简单,将曾经我们经常使用的go func(){...}替换为gopool.Go(func(){...})即可

此时gopool将会使用默认的配置来管理你启动的协程,也可以选择针对业务场景配置池子大小以及扩容上限

old:

go func() {// do your job
}()

new:

gopool.Go(func(){// do your job
})

2、核心数据结构

1)、Pool

Pool是一个定义了协程池的接口,代码如下:

// util/gopool/pool.go
type Pool interface {// Name returns the corresponding pool name.// 协程池的名称Name() string// SetCap sets the goroutine capacity of the pool.// 设置协程池内goroutine的容量SetCap(cap int32)// Go executes f.// 执行f函数Go(f func())// CtxGo executes f and accepts the context.// 带ctx,执行f函数CtxGo(ctx context.Context, f func())// SetPanicHandler sets the panic handler.// 设置发生panic时调用的函数SetPanicHandler(f func(context.Context, interface{}))
}

gopool提供了Pool这个接口的默认实现pool,代码如下:

// util/gopool/pool.go
type pool struct {// The name of the pool// 协程池的名字name string// capacity of the pool, the maximum number of goroutines that are actually working// 协程池实际工作的goroutine的最大数量cap int32// Configuration information// 配置信息config *Config// linked list of tasks// task队列的元信息,每一个task代表一个待执行的函数taskHead  *tasktaskTail  *tasktaskLock  sync.MutextaskCount int32// Record the number of running workers// 当前有多少个worker在运行中,每个worker代表一个goroutineworkerCount int32// This method will be called when the worker panic// 协程池中的协程引发的panic会由该函数处理panicHandler func(context.Context, interface{})
}

pool数据结构如下图:

2)、task
// util/gopool/pool.go
type task struct {// 当前task的ctxctx context.Context// 当前task需要执行的函数ff func()// 指向下一个task的指针next *task
}

task是一个链表结构,可以把它理解为一个待执行的任务,包含了当前task需要执行的函数f func()以及指向下一个task的指针

一个协程池pool对应了一组task,pool维护了指向链表的头尾的两个指针:taskHead和taskTail以及链表的长度taskCount和对应的锁taskLock

3)、worker
// util/gopool/worker.go
type worker struct {pool *pool
}

一个worker就是逻辑上的一个执行器,它对应到一个协程池pool

当一个worker被唤起,将会开启一个goroutine,不断从pool中的task链表获取任务并执行,代码如下:

// util/gopool/worker.go
func (w *worker) run() {go func() {for {var t *task// 操作pool中的task链表前,加锁保证并发安全w.pool.taskLock.Lock()if w.pool.taskHead != nil {// 拿到taskHead准备执行t = w.pool.taskHead// 更新链表的head以及数量w.pool.taskHead = w.pool.taskHead.nextatomic.AddInt32(&w.pool.taskCount, -1)}if t == nil {// if there's no task to do, exit// 如果前一步拿到的taskHead为空,说明无任务需要执行,清理后返回(关闭goroutine)w.close()w.pool.taskLock.Unlock()w.Recycle()return}w.pool.taskLock.Unlock()// 执行任务,针对panic会recover,并调用配置的handlerfunc() {defer func() {if r := recover(); r != nil {msg := fmt.Sprintf("GOPOOL: panic in pool: %s: %v: %s", w.pool.name, r, debug.Stack())logger.CtxErrorf(t.ctx, msg)if w.pool.panicHandler != nil {w.pool.panicHandler(t.ctx, r)}}}()t.f()}()t.Recycle()}}()
}

3、核心API

来看下使用gopool的核心APIGo(f func()),实现如下:

// util/gopool/gopool.go
func Go(f func()) {CtxGo(context.Background(), f)
}func CtxGo(ctx context.Context, f func()) {defaultPool.CtxGo(ctx, f)
}
// util/gopool/pool.go
func (p *pool) CtxGo(ctx context.Context, f func()) {// 创建一个task对象,将ctx和待执行的函数赋值t := taskPool.Get().(*task)t.ctx = ctxt.f = f// 将task插入pool的链表的尾部,更新链表数量p.taskLock.Lock()if p.taskHead == nil {p.taskHead = tp.taskTail = t} else {p.taskTail.next = tp.taskTail = t}p.taskLock.Unlock()atomic.AddInt32(&p.taskCount, 1)// The following two conditions are met:// 1. the number of tasks is greater than the threshold.// 2. The current number of workers is less than the upper limit p.cap.// or there are currently no workers.// 以下任意条件满足时,创建新的worker并唤起执行// 1.待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)// 2.无worker运行if (atomic.LoadInt32(&p.taskCount) >= p.config.ScaleThreshold && p.WorkerCount() < atomic.LoadInt32(&p.cap)) || p.WorkerCount() == 0 {// worker数量+1p.incWorkerCount()// 创建一个新的worker,并把当前pool赋值w := workerPool.Get().(*worker)w.pool = p// 唤起worker执行w.run()}
}

以下任意条件满足时,会扩容worker:

  1. 待执行的task超过了扩容阈值(默认值为1)且当前运行的worker数量小于上限(默认值为10000)
  2. 无worker运行

gopool自行维护一个defaultPool,这是一个默认的pool结构体,在引入包的时候就进行初始化。当我们直接调用gopool.Go()时,本质上是调用了defaultPool的同名方法

// util/gopool/gopool.go
var defaultPool Poolfunc init() {defaultPool = NewPool("gopool.DefaultPool", 10000, NewConfig())
}
// util/gopool/config.go
const (defaultScalaThreshold = 1
)type Config struct {// threshold for scale.// new goroutine is created if len(task chan) > ScaleThreshold.// defaults to defaultScalaThreshold.// 控制扩容的阈值,一旦待执行的task超过此值,且worker数量未达到上限,就开始启动新的workerScaleThreshold int32
}func NewConfig() *Config {c := &Config{ScaleThreshold: defaultScalaThreshold,}return c
}

defaultPool的名称为gopool.DefaultPool,池子容量一万,扩容阈值为1

当调用gopool.Go()时,gopool就会更新维护的任务链表,并且判断是否需要扩容worker:

  • 若此时已经有很多worker启动(底层一个worker对应一个goroutine),不需要扩容,就直接返回
  • 若判断需要扩容,就创建一个新的worker,并调用worker.run()方法启动,各个worker会异步地检查pool里面的任务链表是否还有待执行的任务,如果有就执行

gopool中三个角色的定位:

  • task是一个待执行的任务节点,同时还包含了指向下一个任务的指针,链表结构
  • worker是一个实际执行任务的执行器,它会异步启动一个goroutine执行协程池里面未执行的task
  • pool是一个逻辑上的协程池,对应了一个task链表,同时负责维护task状态的更新,以及在需要的时候创建新的worker

gopool核心实现原理如下图:

4、使用sync.Pool进行性能优化

gopool中多次使用了sync.Pool来池化对象的创建,复用woker和task对象

task池化:

// util/gopool/pool.go
var taskPool sync.Poolfunc init() {taskPool.New = newTask
}func newTask() interface{} {return &task{}
}func (t *task) Recycle() {t.zero()taskPool.Put(t)
}

worker池化:

// util/gopool/worker.go
var workerPool sync.Poolfunc init() {workerPool.New = newWorker
}func newWorker() interface{} {return &worker{}
}func (w *worker) Recycle() {w.zero()workerPool.Put(w)
}

参考:

解析 Golang 协程池 gopool 设计与实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/301610.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

基于Difussion图像、视频生成综述

2024年大年初七&#xff08;02.16&#xff09;OpenAI 发布视频生成模型 Sora 在各大平台转疯了&#xff0c;和2022年发布ChatGPT3.5时一样的疯狂。在开工第一天&#xff0c;我就去官网上看了 Sora 的技术报告&#xff0c;遗憾的是&#xff0c;在这份技术报告中只披露了一些模型…

memcached集群

一、介绍 memcache本身没有像redis所具备的数据持久化功能&#xff0c;但是可以通过做集群同步的方式&#xff0c;让各memcache服务器的数据进行同步&#xff0c;从而实现数据的一致性&#xff0c;即保证各memcache的数据是一样的&#xff0c;即使有任何一台memcache发生故障&…

2024-04-07 作业

作业要求&#xff1a; 1> 思维导图 2> 自由发挥应用场景实现一个登录窗口界面。 【可以是QQ登录界面、也可以是自己发挥的登录界面】 要求&#xff1a;尽量每行代码都有注释 作业1&#xff1a; 作业2&#xff1a; 运行代码&#xff1a; #include "myqwidget.h&quo…

提高网站安全性,漏洞扫描能带来什么帮助

随着互联网的蓬勃发展&#xff0c;网站已经成为人们获取信息、交流思想、开展业务的重要平台。然而&#xff0c;与之伴随的是日益严重的网络安全问题&#xff0c;包括恶意攻击、数据泄露、隐私侵犯等。 为了保障网站的安全性&#xff0c;提前做好网站的安全检测非常有必要&…

Spring学习笔记:IOC控制反转、AOP面向切面

挺快的&#xff0c;框架这一部分 文章目录 一、Spring概述入门案例导入依赖包在src下写配置文件创建普通类和测试类 二、IOC&#xff08;控制反转&#xff09;2.1 IOC bean 的XML操作&#xff08;创建对象&#xff0c;注入属性2.2 IOC bean 的 注解 操作 三、AOP&#xff08;面…

「每日跟读」英语常用句型公式 第6篇

「每日跟读」英语常用句型公式 第6篇 1. As ___ as possible 越 ___ 越好 As soon as possible (ASAP)(越快越好) As happy as possible (越快乐越好) As prepared as possible (越有准备越好) As much/many as possible (越多越好 *不可数/可数) As early as possible …

STM32一个地址未对齐引起的 HardFault 异常

1. 概述 客户在使用 STM32G070 的时候&#xff0c;KEIL MDK 为编译工具&#xff0c;当编译优化选项设置为Level0 的时候&#xff0c;程序会出现 Hard Fault 异常&#xff0c;而当编译优化选项设置为 Level1 的时候&#xff0c;则程序运行正常。表面上看&#xff0c;这似乎是 K…

云计算(五)—— OpenStack基础环境配置与API使用

OpenStack基础环境配置与API使用 项目实训一 【实训题目】 使用cURL命令获取实例列表 【实训目的】 理解OpenStack的身份认证和API请求流程。 【实训准备】 &#xff08;1&#xff09;复习OpenStack的认证与API请求流程的相关内容。 &#xff08;2&#xff09;熟悉cURL…

mysql慢sql排查与分析

当MySQL遇到慢查询&#xff08;慢SQL&#xff09;时&#xff0c;我们可以通过以下步骤进行排查和优化&#xff1a; 标题开启慢查询日志&#xff1a; 确保MySQL的慢查询日志已经开启。通过查看slow_query_log和slow_query_log_file变量来确认。 如果没有开启&#xff0c;可以…

2.SpringBoot利用Thymeleaf实现页面的展示

什么是Thymeleaf&#xff1f; Thymeleaf是一个现代服务器端Java模板引擎&#xff0c;适用于Web和独立环境&#xff0c;能够处理HTML&#xff0c;XML&#xff0c;JavaScript&#xff0c;CSS甚至纯文本。 Thymeleaf的主要目标是提供一种优雅且高度可维护的模板创建方式。为实现这…

css实现更改checkbox的样式;更改checkbox选中后的背景色;更改checkbox选中后的icon

<input class"check-input" type"checkbox"> .check-input {width: 16px;height: 16px;} /* 设置默认的checkbox样式 */input.check-input[type"checkbox"] {-webkit-appearance: none; /* 移除默认样式 */border: 1px solid #999;outl…

Windows系统下安装java开发环境所需的JDK开发工具包

目录 一、JDK开发工具包下载二、安装三、环境变量配置3.1 添加安装包路径3.2 添加lib路径3.3 添加bin目录 四、检查是否安装成功五、总结 一、JDK开发工具包下载 官网地址&#xff1a;JDK下载 打开网址后有多个版本的JDK&#xff0c;学者根据自己电脑需求选择对应版本下载。如…

6.10物联网RK3399项目开发实录-驱动开发之SPI接口的使用(wulianjishu666)

嵌入式实战开发例程&#xff0c;珍贵资料&#xff0c;开发必备&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1149x7q_Yg6Zb3HN6gBBAVA?pwdhs8b SPI 使用 SPI 简介 SPI 是一种高速的&#xff0c;全双工&#xff0c;同步串行通信接口&#xff0c;用于连接微控制器、…

ics-05-攻防世界

题目 点了半天只有设备维护中心能进去 御剑扫一下 找到一个css 没什么用 再点击云平台设备维护中心url发生了变化 设备维护中心http://61.147.171.105:65103/index.php?pageindex试一下php伪协议 php://filter/readconvert.base64-encode/resourceindex.php base64解一下…

设计模式总结-建造者模式

建造者模式 模式动机模式定义模式结构模式分析建造者模式实例与解析实例&#xff1a;KFC套餐 模式动机 无论是在现实世界中还是在软件系统中&#xff0c;都存在一些复杂的对象&#xff0c;它们拥有多个组成部分&#xff0c;如汽车&#xff0c;它包括车轮、方向盘、发送机等各种…

LC低通滤波

LC滤波器&#xff0c;是指将电感L与电容器 C进行组合设计构成的滤波电路&#xff0c;可去除或通过特定频率的无源器件。电容器具有隔直流通交流&#xff0c;且交流频率越高越容易通过的特性。而电感则具有隔交流通直流&#xff0c;且交流频率越高越不易通过的特性。因此&#x…

C#仿OutLook的特色窗体设计

目录 1. 资源图片准备 2. 设计流程&#xff1a; &#xff08;1&#xff09;用MenuStrip控件设计菜单栏 &#xff08;2&#xff09;用ToolStrip控件设计工具栏 &#xff08;3&#xff09;用StatusStrip控件设计状态栏 &#xff08;4&#xff09;ImageList组件装载树节点图…

考虑预同步的虚拟同步机T型三电平逆变器并离网MATLAB仿真模型

微❤关注“电气仔推送”获得资料&#xff08;专享优惠&#xff09; 模型简介 三相 T 型三电平逆变器电路如图所示&#xff0c;逆变器主回路由三个单相 T 型逆变器组成。 直流侧输入电压为 UPV&#xff0c;直流侧中点电位 O 设为零电位&#xff0c;交流侧输出侧是三相三线制连…

RFID涉密载体柜 RFID智能文件柜系统

涉密载体管控RFID智能柜&#xff08;载体柜DW-G101R&#xff09;通过对涉密物资、设备进行RFID唯一标识并放置于RFID设备涉密物资柜柜体&#xff0c;通过定位每台设备每件涉密物资的位置&#xff0c;实现涉密物资审批、自助借还、防盗等出入库全流程自动化管理。主要管理对象移…

每天五分钟掌握深度学习框架pytorch:本专栏说明

专栏大纲 专栏计划更新章节在100章左右&#xff0c;之后还会不断更新&#xff0c;都会配备代码实现。以下是专栏大纲 部分代码实现 代码获取 为了方便用户浏览代码&#xff0c;本专栏将代码同步更新到github中&#xff0c;所有用户可以读完专栏内容和代码解析之后&#xff0c…