etcd选举源码分析和例子

本文主要介绍etcd在分布式多节点服务中如何实现选主。

1、基础知识

在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。
1、version
作用域为key,表示某个key的版本,每个key刚创建的version为1,每次更新key这个值都会自增,表示这个key自创建以来更新的次数。
2、revision
作用域为集群,单调递增,集群内任何key的增删改都会使它自增。可以把它理解为集群的一个逻辑状态标志。记录了每一次集群内的增删改操作。
3、ModRevision
作用域为key,表示某个key修改时的版本,它等于修改这个key时的revision的值。
4、CreateRevision
作用域为key,表示某个key创建时的版本,等于创建这个key时revision的值,再删除之前会保持不变。
现在在etcd里面放一个key:
!
可以看到,Version是1,CreateRevision与Modrevision和Revision相等,都是7677720。
在修改了key的值以后:
在这里插入图片描述
version自增变为2,CreateRevision没有变动。modRevision等于修改时的Revsion。Revision已经跑到前面去了,因为此时还有其他的程序在修改etcd里面的key。

2、选举

先初始化一个session和选举:
	electionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)
在启动了三台服务后,在etcd里面找到以下三个key:

在这里插入图片描述
现在的leader是第一台:
在这里插入图片描述
去看下compaign的源码:

func (e *Election) Campaign(ctx context.Context, val string) error {s := e.sessionclient := e.session.Client()//用leaseID与前面的key前缀拼成keyk := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())//判断当前key的createRevision是否是0,也就是否创建txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))txn = txn.Else(v3.OpGet(k))resp, err := txn.Commit()if err != nil {return err}//获取key的Revisione.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, sif !resp.Succeeded {kv := resp.Responses[0].GetResponseRange().Kvs[0]e.leaderRev = kv.CreateRevisionif string(kv.Value) != val {if err = e.Proclaim(ctx, val); err != nil {e.Resign(ctx)return err}}}_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)if err != nil {// clean up in case of context cancelselect {case <-ctx.Done():e.Resign(client.Ctx())default:e.leaderSession = nil}return err}e.hdr = resp.Headerreturn nil
}

用当前key和leaseid在etcd中创建一个key,并获取到key此时的Revision。

// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))for {resp, err := client.Get(ctx, pfx, getOpts...)if err != nil {return nil, err}if len(resp.Kvs) == 0 {return resp.Header, nil}lastKey := string(resp.Kvs[0].Key)if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {return nil, err}}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {cctx, cancel := context.WithCancel(ctx)defer cancel()var wr v3.WatchResponsewch := client.Watch(cctx, key, v3.WithRev(rev))for wr = range wch {for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {return nil}}}if err := wr.Err(); err != nil {return err}if err := ctx.Err(); err != nil {return err}return fmt.Errorf("lost watcher waiting for delete")
}

这里get的option,找到/my-ection前缀下最新创建的key,并且createRev的值小于等于当前key创建的createRev-1.

// WithLastCreate gets the key with the latest creation revision in the request range.
func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }

此时/my-election,在这个revision之前没有任何key,所以node2启动直接竞选到了主。
紧接着node3启动,在这个get,它能获取到node2创建的key,所以node3去watchnode2的的删除事件。
同理,node4启动以后,在这里的get,它获取的是node3的key,它去wachnode3的删除事件。
当node2释放后,node3获取到node2的删除事件变成主。node3释放以后,node4变成watch到node3
的删除事件变成主。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

3、观察者

在上面的例子中,如果node4挂了以后,node2能继续变回为leader吗?
在引入observe以后,可以做到。
在这里插入图片描述
找到/my-election这个前缀下最开始创建的key,也就是node2。
然后把这个key放入返回ch中。

func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {client := e.session.Client()defer close(ch)for {resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)if err != nil {return}var kv *mvccpb.KeyValuevar hdr *pb.ResponseHeaderif len(resp.Kvs) == 0 {cctx, cancel := context.WithCancel(ctx)// wait for first key put on prefixopts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}wch := client.Watch(cctx, e.keyPrefix, opts...)for kv == nil {wr, ok := <-wchif !ok || wr.Err() != nil {cancel()return}// only accept puts; a delete will make observe() spinfor _, ev := range wr.Events {if ev.Type == mvccpb.PUT {hdr, kv = &wr.Header, ev.Kv// may have multiple revs; hdr.rev = the last rev// set to kv's rev in case batch has multiple Putshdr.Revision = kv.ModRevisionbreak}}}cancel()} else {hdr, kv = resp.Header, resp.Kvs[0]}select {case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:case <-ctx.Done():return}cctx, cancel := context.WithCancel(ctx)wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))keyDeleted := falsefor !keyDeleted {wr, ok := <-wchif !ok {cancel()return}for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {keyDeleted = truebreak}resp.Header = &wr.Headerresp.Kvs = []*mvccpb.KeyValue{ev.Kv}select {case ch <- *resp:case <-cctx.Done():cancel()return}}}cancel()}
}

wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
接下里watch/my-election下第一个创建的key,在当前revision以后的delete事件。如果有delete事件则把这个消息通知出去。也就是发现了当前主的节点发生了delete事件,主发生了变化。
在这里插入图片描述
同理,当node2 down掉,它的key被删除,node3变成了主,observe此时watch的就是node3的key,因为此时node3创建的key是/my-election下的第一个key。
完整代码:

package mainimport ("context""fmt""log""os""time"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/concurrency"
)const (FOLLOWER = "follower"LEADER   = "leader"
)var state string = FOLLOWER
var preState string = FOLLOWERfunc compaign(election *concurrency.Election, val string) {// Campaign for leadershipfmt.Println("start compaign!")if err := election.Campaign(context.Background(), val); err != nil {log.Fatal(err)}fmt.Println("Became leader!")preState = statestate = LEADER// Hold leadership until a key pressfmt.Println("Press Enter to release leadership...")fmt.Scanln()// Resign leadershipif err := election.Resign(context.Background()); err != nil {log.Fatal(err)}//preState = state//state = FOLLOWERfmt.Println("Released leadership.")
}func observe(election *concurrency.Election, val string) {ch := election.Observe(context.Background())for {select {case rsp, ok := <-ch:if !ok {fmt.Println("now I am follower")//election.Campaign(context.Background(), args[1])//重新开始观察go observe(election, val)return} else {fmt.Printf("leader now is:%s\n", string(rsp.Kvs[0].Value))if string(rsp.Kvs[0].Value) == val {fmt.Println("still be leader")preState = statestate = LEADER} else {fmt.Println("now become follower")preState = statestate = FOLLOWERif preState == LEADER {go compaign(election, val)}}}}}
}func main() {// Connect to etcdargs := os.Argsclient, err := clientv3.New(clientv3.Config{Endpoints:   []string{"ipdizhi"}, // Replace with your etcd endpointsDialTimeout: 5 * time.Second,Username:    "user",Password:    "password",})if err != nil {log.Fatal(err)}defer client.Close()// Key for leader electionelectionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)go compaign(election, args[1])go observe(election, args[1])for {}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/125590.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【猿灰灰赠书活动 - 05期】- 【速学Linux:系统应用从入门到精通】

&#x1f468;‍&#x1f4bb;本文专栏&#xff1a;赠书活动专栏&#xff08;为大家争取的福利&#xff0c;免费送书&#xff09; &#x1f468;‍&#x1f4bb;本文简述&#xff1a;博文为大家争取福利&#xff0c;与机械工业出版社合作进行送书活动 &#x1f468;‍&#x1f…

【虚拟机】

虚拟机 简介VMware Workstation简介下载安装许可证密钥 CentOS简介下载 创建新的虚拟机 简介 虚拟机&#xff08;Virtual Machine&#xff0c;简称VM&#xff09;是一种软件模拟的计算机&#xff0c;它在一台物理计算机上创建了一个独立的虚拟计算环境。这个虚拟环境可以运行操…

在线实时监测离子风机的功能

离子风机是一种能够通过释放大量负离子来净化空气并提供清新环境的设备。要实现联网实时在线监测离子风机&#xff0c;可以考虑以下几个步骤&#xff1a; 1. 设备接入互联网&#xff1a;离子风机需要具备网络连接功能&#xff0c;可以通过无线网络或者以太网接入路由器&#x…

[uni-app] 海报图片分享方案 -canvas绘制

文章目录 canvas使用记录先看下实际效果图绘制流程及思路1. 绘制头像, 通过drawImage来绘制2.绘制文字部分 具体代码 分享海报图片的方式,以前再RN端采用的是截图方案, 我记得组件好像是 react-native-view-shot 现在要处理uni-app的海报图片分享, 一般也有 html2canvas的相关插…

索尼 toio™应用创意开发征文|联盟国战

✨ 能用众力&#xff0c;则无敌于天下矣&#xff1b;能用众智&#xff0c;则无畏于圣人矣。 —— 孙权 前言&#xff1a; 从火爆全网的ChatGPT&#xff0c;到人人都是开发者。AI无疑贯彻了整个2023年的主题&#xff0c;从刚上幼儿园的小朋友到耄耋之年的老顽童&#xff0c;都对…

0基础学习VR全景平台篇 第96篇:VR电子楼书

大家好&#xff0c;欢迎观看蛙色VR官方系列课程——VR电子楼书&#xff01; 作为2021年底全新上线的行业解决方案&#xff0c;是专门针对地产、园区数字化营销的一站式VR解决方案&#xff0c;为行业潜在客户提供优质的7x24小时线上看房体验。 本期教程将通过功能介绍后台操作&…

SpringBoot项目集成Druid

文章目录 一、前言二、Druid相关内容1、Druid简介1.1数据库连接池 2、项目集成Druid2.1、环境准备2.2、依赖准备2.3、编写配置文件2.4、测试访问 3、功能介绍3.1、查看数据源3.2、SQL监控3.3、URI监控 三、总结提升 一、前言 本文将介绍Druid的相关内容以及项目如何集成Druid&…

C#,《小白学程序》第十二课:日历的编制,时间DateTime的计算方法与代码

1 文本格式 /// <summary> /// 《小白学程序》第十二课&#xff1a;日历的编制&#xff0c;时间DateTime的计算方法与代码 /// 本课学习时间类型的数据 DateTime 的简单方法&#xff0c;并编制一个月的日历。 /// </summary> /// <param name"sender"…

OpenWrt系统开发笔记

openWrt英文官网&#xff1a; https://openwrt.org/ 中文官网&#xff1a; http://www.openwrt.org.cn/ 一、开发环境及编译 在github上有两个源码使用的比较多   一个是lede,地址为&#xff1a;https://github.com/coolsnowwolf/lede   另一个为OpenWrt的官方源码&#…

笔记 | 排序算法实现(Python)

排序算法 一、选择排序二、合并/归并排序三、快速排序四、计数排序 排序类型时间复杂度选择排序(Selection Sort) O ( n 2 ) O(n^{2} ) O(n2)合并/归并排序&#xff08;Merge Sort&#xff09; O ( n log ⁡ n ) O(n\log n ) O(nlogn)快速排序(Quick Sort)平均情况 O ( n log ⁡…

STM32F4X RTC

STM32F4X RTC 什么是RTCSTM32F4X RTCSTM32F4X RTC框图STM32F4X RTC计数频率STM32F4X RTC日历STM32F4X RTC闹钟 STM32F4X RTC例程 什么是RTC RTC全程叫Real-Time Clock实时时钟&#xff0c;是MCU中一个用来计时的模块。RTC的一个主要作用是用来显示实时时间&#xff0c;就像日常…

利用less实现多主题切换(配合天气现象)

1. 先看效果&#xff1a; 2. 话不多说直接撸吧&#xff1a; 原理&#xff1a;先给body元素添加style&#xff0c;再根据天气现象动态更改style 开撸&#xff1a; 创建src/assets/style/variables.less 使用 XXX:var(–XXX,‘style’) 声明系列变量&#xff0c;之后添加其他变…

单臂路由实验:通过Trunk和子接口实现VLAN互通

文章目录 一、实验背景与目的二、实验拓扑三、实验需求四、实验解法1. PC 配置 IP 地址2. PC3 属于 Vlan10&#xff0c;PC4 属于 Vlan20&#xff0c;配置单臂路由实现 Vlan10 和 Vlan20 三层互通3. 测试在 PC3 上 Ping PC4 &#xff0c;可以 Ping 通 PC4 摘要&#xff1a; 本文…

任意文件读取及漏洞复现

文章目录 渗透测试漏洞原理任意文件读取1. 任意文件读取概述1.1 漏洞成因1.2 漏洞危害1.3 漏洞分类1.4 任意文件读取1.4.1 文件读取1.4.2 任意文件读取1.4.3 权限问题 1.5 任意文件下载1.5.1 一般情况1.5.2 PHP实现1.5.3 任意文件下载 2. 任意文件读取攻防2.1 路径过滤2.1.1 过…

引爆用户参与:消息重弹,让您的推送不再被忽略

在当前各大APP拉新促活成本居高不下的大背景下&#xff0c;如何稳定存量用户、提升用户粘性就显得尤为关键。从促销活动到个性化推荐&#xff0c;从互动通知到功能提醒&#xff0c;消息推送已成为各大APP连接存量用户和目标市场之间的桥梁&#xff0c;通过点击推送&#xff0c;…

springboot初试elasticsearch

引入依赖 elasticsearch的依赖版本与你elasticsearch要一致 <dependency><groupId>org.elasticsearch.client</groupId><artifactId>elasticsearch-rest-high-level-client</artifactId> </dependency> 索引库的操作 创建索引库 impo…

Visual Studio 2019下使用C++与Python进行混合编程——环境配置与C++调用Python API接口

前言 在vs2019下使用C与Python进行混合编程,在根源上讲&#xff0c;Python 本身就是一个C库&#xff0c;那么这里使用其中最简单的一种方法是把Python的C API来嵌入C项目中&#xff0c;来实现混合编程。当前的环境是&#xff0c;win10,IDE是vs2019,python版本是3.9&#xff0c…

LeetCode 刷题记录——从零开始记录自己一些不会的

1. 最多可以摧毁的敌人城堡数目 题意 思路 两层循环&#xff0c;太low了 用一个变量记录前一个位置 代码 class Solution { public:int captureForts(vector<int>& forts) {int ans 0, pre -1;for (int i 0; i < forts.size(); i) {if (forts[i] 1 || forts…

04、javascript 修改对象中原有的属性值、修改对象中原有属性的名字(两种方式)、添加对象中新属性等的操作

1、修改对象中原有的属性值 其一、代码为&#xff1a; // 想将 obj 中的 flag 值&#xff0c;根据不同的值来变化(即&#xff1a;修改对象中原有的属性值)&#xff1b; let obj {"port": "port_0","desc": "desc_0","flag&quo…

IP应用场景查询API:深入了解网络用户行为的利器

前言 随着数字时代的不断发展&#xff0c;互联网已经成为人们生活的重要组成部分。而随着越来越多的业务和社交活动迁移到在线平台上&#xff0c;了解和理解网络用户行为变得至关重要。为了满足这个需求&#xff0c;IP 应用场景查询 API 崭露头角&#xff0c;成为深入了解网络…