kubernetes中的ParallelizeUntil()框架源码解读与使用

概述

摘要:本文从源码层面解读了kubernetes源码中常用的workqueue.ParallelizeParallelizeUntil()框架的源码实现,并且本文也将举例说明了workqueue.ParallelizeUntil()方法的典型使用场景。

正文

说明:基于 kubernetes v1.18.0 源码分析

在Kubernetes源码中, 我们经常会读到workqueue.ParallelizeUntil()函数,它的作用是在并发多个worker来处理任务,直到接收到context发出的停止信号或任务完成。workqueue.ParallelizeUntil源码位于k8s.io/client-go/util/workqueue/parallelizer.go,该workqueue是client-go中的一个工作队列,队列包括三种:FIFO、延迟队列和限速队列。关于workqueue我已经在之前的文章中有详细的介绍,如需了解请阅览informer中的WorkQueue机制的实现分析与源码解读(1)

ParallelizeUntil()的源码解读

ParallelizeUntil()方法作用是并发多个worker来处理任务,直到接收到context发出的停止信号或任务完成

workers 表示启动多少个worker并发处理任务

pieces 表示要处理任务对应的index的数量

DoWorkPieceFunc 表示用于处理任务的工作函数

ctx 使用context控制并发任务的停止

// 定义worker函数
type DoWorkPieceFunc func(piece int)// ParallelizeUntil is a framework that allows for parallelizing N 
// independent pieces of work until done or the context is canceled.
// parallelelizeuntil是一个框架,它允许并行处理任务,直到完成或上下文被取消。
func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {// 定义一个stop信号,当手动ctx.Done()信号后,就让整个 ParallelizeUntil 任务停止var stop <-chan struct{}if ctx != nil {stop = ctx.Done()}// 将把需要多的工作对应的索引放到chan中,放入完成后就关闭chan,这里借助chan,是因为chan是线程安全的。toProcess := make(chan int, pieces)for i := 0; i < pieces; i++ {toProcess <- i}close(toProcess)// 对于要处理的对象数量少于worker数量时if pieces < workers {workers = pieces}wg := sync.WaitGroup{}wg.Add(workers)// 启动多个worker,同时从chan中取对象进行处理,知道收到停止信号或完成任务for i := 0; i < workers; i++ {go func(x int) {defer utilruntime.HandleCrash()defer wg.Done()for piece := range toProcess {fmt.Printf("work %d  ",x)select {case <-stop:returndefault:doWorkPiece(piece)}}}(i)}wg.Wait()
}

从源码分析知道,ParallelizeUntil()的逻辑比较简单,先将要处理的任务,放到chan中,放入完成后就关闭chan,这里借助chan,是因为chan是线程安全的。之后再启动多个worker协程,这些协程不断的从chan去job进行处理。逻辑示意图如下。

在这里插入图片描述

如果在多想象一下,ParallelizeUntil()的逻辑,是不是与工厂中多个机器人不断的从传输带上去东西来进行加工的场景啊。

在这里插入图片描述

接下来我们再看下,worker协程中捕获panic的HandleCrash的源码。源码逻辑比较简单,就是worker协程中如果出现panic将被recover捕获,捕获之后如果定义了额外的handler函数,会遍历执行。

var PanicHandlers = []func(interface{}){logPanic}// PanicHandlers is a list of functions which will be invoked when a panic happens.
var PanicHandlers = []func(interface{}){logPanic}// HandleCrash simply catches a crash and logs an error. Meant to be called via
// defer.  Additional context-specific handlers can be provided, and will be
// called in case of panic.  HandleCrash actually crashes, after calling the
// handlers and logging the panic message.
//
// E.g., you can provide one or more additional handlers for something like shutting down go routines gracefully.
func HandleCrash(additionalHandlers ...func(interface{})) {// 捕获panicif r := recover(); r != nil {for _, fn := range PanicHandlers {fn(r)}for _, fn := range additionalHandlers {fn(r)}if ReallyCrash {// Actually proceed to panic.panic(r)}}
}

代码测试

编写一个简单程序,测试workqueue.ParallelizeUntil()方法的使用.

需求: 通过workqueue.ParallelizeUntil(),并发的找出100以内的素数

import ("context""fmt""k8s.io/client-go/util/workqueue""testing""time"
)// 判断一个数是否是素数
func isPrieme(num int) bool {for i := 2; i < num; i++ {if num%i == 0 {return false}}return true
}// 定义用于保存结果的chan
var ResultChan = make(chan int, 100)func WorkFunc(num int) {fmt.Println(" check num: ", num)if isPrieme(num) {ResultChan <- num}
}// go test -mod=vendor -run="^TestParallelizeUntil" -v
func TestParallelizeUntil(t *testing.T) {// 定义超时信号对应的ctxctx, _ := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))// 判断一个数是否是素数myFunc := WorkFunc// 执行查找workqueue.ParallelizeUntil(ctx, 3, 100, myFunc)// 将结果放入结果chanfmt.Println("result:")close(ResultChan)for i := range ResultChan {fmt.Printf("%d ", i)}fmt.Println()}---------------执行与输出如下-----------------
(base) @PM80280051 ➜ utils git:(dev-xw)go test  -run="^TestParallelizeUntil" -v
=== RUN   TestParallelizeUntilcheck num:  0check num:  3check num:  4check num:  5check num:  6check num:  7check num:  8check num:  9check num:  10check num:  11check num:  12check num:  13check num:  14check num:  15check num:  16check num:  17check num:  1check num:  19check num:  20check num:  21check num:  22check num:  23check num:  24check num:  25check num:  26check num:  27check num:  28check num:  29check num:  30check num:  31check num:  32check num:  2check num:  34check num:  33check num:  36check num:  37check num:  38check num:  39check num:  40check num:  41check num:  42check num:  43check num:  44check num:  45check num:  46check num:  47check num:  48check num:  49check num:  35check num:  51check num:  52check num:  53check num:  54check num:  55check num:  56check num:  57check num:  58check num:  59check num:  60check num:  61check num:  62check num:  63check num:  64check num:  65check num:  66check num:  67check num:  68check num:  69check num:  70check num:  18check num:  72check num:  71check num:  74check num:  75check num:  76check num:  77check num:  78check num:  79check num:  80check num:  81check num:  82check num:  50check num:  84check num:  85check num:  86check num:  87check num:  88check num:  89check num:  90check num:  91check num:  92check num:  93check num:  94check num:  95check num:  96check num:  97check num:  83check num:  99check num:  98check num:  73result: 0 3 5 7 11 13 17 1 19 23 29 31 2 37 41 43 47 53 59 61 67 71 79 89 97 83 73 
--- PASS: TestParallelizeUntil (0.00s)
PASS
ok      kubecmdb/utils  0.552s

在这个例子中,ParallelizeUntil 函数,启动了3个worker,并发的处理10个任务,直到任务处理完,或者接收到ctx定义的超时信号。 注意,ParallelizeUntil 函数不会返回任何值,如果需要记录worker协程中的结果,可以自己定义。`

使用场景

在kubernetes源码中,Kube-scheduler在进行预选算法时,使用了workqueue.ParallelizeUntil(),并发16个worker同时执行预选算法。在集群规模较大时,并发处理能提升kube-scheduler调度任务的整体效能。

预选算法执行预选的源码findNodesThatPassFilters

// k8s.io/kubernetes/pkg/scheduler/core/generic_scheduler.go// findNodesThatPassFilters finds the nodes that fit the filter plugins.
// findNodesThatPassFilters使用过滤插件来查找适合的节点。
func (g *genericScheduler) findNodesThatPassFilters(ctx context.Context, prof *profile.Profile, state *framework.CycleState, pod *v1.Pod, statuses framework.NodeToStatusMap) ([]*v1.Node, error) {allNodes, err := g.nodeInfoSnapshot.NodeInfos().List()if err != nil {return nil, err}numNodesToFind := g.numFeasibleNodesToFind(int32(len(allNodes)))// Create filtered list with enough space to avoid growing it// and allow assigning.filtered := make([]*v1.Node, numNodesToFind)// 代码略// Stops searching for more nodes once the configured number of feasible nodes// are found.// !!!重点,启动了16个work,并发的执行 checkNode 函数,对nodes节点进行预选操作workqueue.ParallelizeUntil(ctx, 16, len(allNodes), checkNode)processedNodes := int(filteredLen) + len(statuses)g.nextStartNodeIndex = (g.nextStartNodeIndex + processedNodes) % len(allNodes)// filter 列表预选算法,过滤出满足预选算法的node节点filtered = filtered[:filteredLen]if err := errCh.ReceiveError(); err != nil {statusCode = framework.Errorreturn nil, err}return filtered, nil
}

结论

workqueue.ParallelizeUntil框架广泛的适用于kubernetes源码,通过对其源码的解读,我们了解到了其如何实现与使用场景。我们可以在平时日常开发中,也可以多多尝试使用这个成熟的并发任务框架。

我们可以通过阅读kubernetes源码,学习Kubernetes内部机制,同时Kubernets项目中有一些好的成熟框架,我们可以学以致用,多在用在日常开发中才能真正掌握。

参考资料

Kubernete-v1.18源码

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/416607.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【Qt 即时通讯系统】信息消息核心类的编写

文章目录 1. 获得唯一的 messageId2. 转成格式化时间3. 把QByteArray数据转成QIcon 1. 获得唯一的 messageId &#x1f427;通过createUuid()可以获得全球唯一的身份标识&#xff0c;Qt中对UUID是有封装的&#xff0c;获取的结果其实是一串十六进制数。 2. 转成格式化时间 …

数字化转型的内容框架解析,附华为数字化转型内容框架及方法论

数字化转型的内容框架是一个系统性、多维度的体系&#xff0c;旨在通过数字技术的融入和应用&#xff0c;对传统业务、流程和模式进行重构、升级&#xff0c;以提升效率、创造更多价值。以下是对数字化转型内容框架的详细阐述&#xff1a; 一、总体要求 数字化转型的总体要求…

Open3D 体素随机下采样

目录 一、概述 1.1原理 1.2实现步骤 1.3应用场景 二、代码实现 三、实现效果 3.1原始点云 3.2体素下采样后点云 Open3D点云算法汇总及实战案例汇总的目录地址&#xff1a; Open3D点云算法与点云深度学习案例汇总&#xff08;长期更新&#xff09;-CSDN博客 一、概述 体…

AD19基础应用技巧:捕捉对象功能的讲解鼠标”绿色十字”大光标、小光标切换

AD PCB 中心点捕捉功能&#xff1a; 线段、圆、边框中心点捕捉。 有时候不想要鼠标自动捕捉中心点怎么办&#xff1f; 关于Altium Designer 20 的捕抓功能的讲解&#xff08;https://blog.csdn.net/weixin_44599693/article/details/126177841&#xff09; ——- AD PCB画板…

详解si5338 si53xx 设计使用及STM32 iic驱动设计

背景 在实际项目中经常使用si5338 si53xx&#xff0c;进行多路时钟的倍频以生成想要的时钟信号&#xff0c;但是针对si5338 si53xx设计使用缺少相关的资料&#xff0c;本文详解si5338 si53xx 设计使用及STM32 iic驱动设计&#xff0c;本文使用工程在项目中得到测试&#xff0c…

虚拟系统VS

定义 虚拟系统VS&#xff08;Virtual System&#xff09;是指将一台物理设备PS&#xff08;Physical System&#xff09;虚拟成多个相互隔离的逻辑系统。每个VS独立工作&#xff0c;在业务功能上等同于一台独立的传统物理设备&#xff0c;如图2-1所示。 目的 随着网络规模的不…

省钱的开源项目「GitHub 热点速览」

本期&#xff0c;我从上周的热门开源项目中挑选了 5 个既省钱又省事&#xff0c;还好玩的开源项目。 首先&#xff0c;推荐的是省钱的电动汽车智能充电管理平台 evcc&#xff0c;它可以根据分时电价智能安排电动车充电时间&#xff0c;从而降低电费&#xff0c;如果你家还有太阳…

第五篇——数学边界:从毕达哥拉斯定理到费马大定理

目录 一、背景介绍二、思路&方案三、过程1.思维导图2.文章中经典的句子理解3.学习之后对于投资市场的理解4.通过这篇文章结合我知道的东西我能想到什么&#xff1f; 四、总结五、升华 一、背景介绍 数学世界让我敬畏的心&#xff0c;再一次蠢蠢欲动&#xff0c;那些伟大的…

VXLAN 为何采用UDP

VXLAN 简介 VXLAN是一种网络虚拟化技术&#xff0c;它通过在UDP数据包中封装MAC地址和IP信息&#xff0c;使得二层网络&#xff08;如以太网&#xff09;能够跨越三层网络&#xff08;如IP网络&#xff09;进行扩展。这种封装方式不仅支持TCP流量的传输&#xff0c;还能有效处…

Open3D 点云导向滤波

目录 一、概述 1.1原理 1.2实现步骤 1.3应用场景 二、代码实现 三、实现效果 3.1原始点云 3.2滤波后点云 Open3D点云算法汇总及实战案例汇总的目录地址&#xff1a; Open3D点云算法与点云深度学习案例汇总&#xff08;长期更新&#xff09;-CSDN博客 一、概述 点云导向…

香橙派开启vnc

1连接香橙派 2. 更新系统 在SSH会话中&#xff0c;首先更新系统软件包列表并升级现有软件包&#xff1a; sudo apt update sudo apt upgrade3. 安装VNC服务器 安装VNC服务器软件&#xff0c;这里以x11vnc为例&#xff1a; sudo apt install x11vnc 出现如图输入如下代码即可…

AI模拟面试记录

一&#xff1a;Java模拟面试&#xff08;9.4&#xff09; 1.哪些问题是HTTPS无法解决的&#xff1f; 1.网络延迟&#xff1a;HTTPS是基于TCP协议的&#xff0c;因此会受到TCP握手和TLS挥手带来的时间延迟&#xff0c;会导致加载的页面时间变长。 2.数据包大小&#xff1a;由…

二进制方式安装K8S

⼀、安装说明 本⽂章将演示Rocky 8 ⼆进制⽅式安装⾼可⽤k8s 1.28.0版本。 ⽣产环境中&#xff0c;建议使⽤⼩版本⼤于5的Kubernetes版本&#xff0c;⽐如1.19.5 以后的才可⽤于⽣产环境。 ⼆、集群安装 2.1 基本环境配置 请统⼀替换这些⽹段&#xff0c;Pod⽹段和service和…

Golang | Leetcode Golang题解之第393题UTF-8编码验证

题目&#xff1a; 题解&#xff1a; const mask1, mask2 1 << 7, 1<<7 | 1<<6func getBytes(num int) int {if num&mask1 0 {return 1}n : 0for mask : mask1; num&mask ! 0; mask >> 1 {nif n > 4 {return -1}}if n > 2 {return n}r…

Hadoop是什么?

Hadoop 是什么 1&#xff09;Hadoop 是一个由 Apache 开发的分布式系统基础架构&#xff1b; 2&#xff09;主要解决&#xff1a;海量数据的存储和海量数据的分析计算问题&#xff1b; 3&#xff09;广义上来说&#xff0c;HADOOP 通常是指——HADOOP 生态圈&#xff1b; H…

v-bind后面不加属性和v-bind的动态属性

v-bind 平常常见的用法我们应该都知道&#xff0c;说一说 v-bind 的不常用的方式&#xff0c;第一个就是 v-bind 后面直接不添加任何属性&#xff0c;此时会将一个对象的所有 property 都作为 prop 传入 先看官网的说法案例&#xff1a; 父组件&#xff1a; <templat…

828华为云征文|华为云Flexus X实例docker部署srs6并调优,协议使用webrtc与rtmp

828华为云征文&#xff5c;华为云Flexus X实例docker部署srs6并调优&#xff0c;协议使用webrtc与rtmp 华为云最近正在举办828 B2B企业节&#xff0c;Flexus X实例的促销力度非常大&#xff0c;特别适合那些对算力性能有高要求的小伙伴。如果你有自建MySQL、Redis、Nginx等服务…

异地多活架构计算设计

随着互联网的飞速发展,企业对业务连续性和高可用性的需求日益增加。异地多活架构作为一种高可靠性的系统设计方案,通过在地理上分散的多个数据中心部署应用和数据,有效降低了单一故障点对整个系统的影响,确保业务在灾难发生时能够持续运行。 架构设计策略 业务解耦:将系…

git如何设置嵌套仓库(设置子树或子模块),并解决直接将一个仓库拖拽到另一个仓库中导致的问题

git 将一个仓库拷贝到另一个仓库的文件夹下。默认git并不会处理&#xff0c;上传上去之后&#xff0c;只会创建一个文件夹&#xff0c;但是这个文件夹点不开。 在 git add . 的时候&#xff0c;会报出警告&#xff1a; 警告&#xff1a;正在添加嵌入式 git 仓库&#xff1a;cl…

什么是云计算?

1.云计算的概念&#xff1f; 现阶段广为人们所接受的是美国国家标准与技术研究院&#xff08;National Institute of Standards and Technology&#xff0c;NIST&#xff09;给出的定义&#xff1a;“云计算”是一种按使用量付费的模式&#xff0c;这种模式提供可用的、便捷的、…