etcd选举源码分析和例子
本文主要介绍etcd在分布式多节点服务中如何实现选主。
1、基础知识
在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。
1、version
作用域为key,表示某个key的版本,每个key刚创建的version为1,每次更新key这个值都会自增,表示这个key自创建以来更新的次数。
2、revision
作用域为集群,单调递增,集群内任何key的增删改都会使它自增。可以把它理解为集群的一个逻辑状态标志。记录了每一次集群内的增删改操作。
3、ModRevision
作用域为key,表示某个key修改时的版本,它等于修改这个key时的revision的值。
4、CreateRevision
作用域为key,表示某个key创建时的版本,等于创建这个key时revision的值,再删除之前会保持不变。
现在在etcd里面放一个key:
可以看到,Version是1,CreateRevision与Modrevision和Revision相等,都是7677720。
在修改了key的值以后:
version自增变为2,CreateRevision没有变动。modRevision等于修改时的Revsion。Revision已经跑到前面去了,因为此时还有其他的程序在修改etcd里面的key。
2、选举
先初始化一个session和选举:
electionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)
在启动了三台服务后,在etcd里面找到以下三个key:
现在的leader是第一台:
去看下compaign的源码:
func (e *Election) Campaign(ctx context.Context, val string) error {s := e.sessionclient := e.session.Client()//用leaseID与前面的key前缀拼成keyk := fmt.Sprintf("%s%x", e.keyPrefix, s.Lease())//判断当前key的createRevision是否是0,也就是否创建txn := client.Txn(ctx).If(v3.Compare(v3.CreateRevision(k), "=", 0))txn = txn.Then(v3.OpPut(k, val, v3.WithLease(s.Lease())))txn = txn.Else(v3.OpGet(k))resp, err := txn.Commit()if err != nil {return err}//获取key的Revisione.leaderKey, e.leaderRev, e.leaderSession = k, resp.Header.Revision, sif !resp.Succeeded {kv := resp.Responses[0].GetResponseRange().Kvs[0]e.leaderRev = kv.CreateRevisionif string(kv.Value) != val {if err = e.Proclaim(ctx, val); err != nil {e.Resign(ctx)return err}}}_, err = waitDeletes(ctx, client, e.keyPrefix, e.leaderRev-1)if err != nil {// clean up in case of context cancelselect {case <-ctx.Done():e.Resign(client.Ctx())default:e.leaderSession = nil}return err}e.hdr = resp.Headerreturn nil
}
用当前key和leaseid在etcd中创建一个key,并获取到key此时的Revision。
// waitDeletes efficiently waits until all keys matching the prefix and no greater
// than the create revision.
func waitDeletes(ctx context.Context, client *v3.Client, pfx string, maxCreateRev int64) (*pb.ResponseHeader, error) {getOpts := append(v3.WithLastCreate(), v3.WithMaxCreateRev(maxCreateRev))for {resp, err := client.Get(ctx, pfx, getOpts...)if err != nil {return nil, err}if len(resp.Kvs) == 0 {return resp.Header, nil}lastKey := string(resp.Kvs[0].Key)if err = waitDelete(ctx, client, lastKey, resp.Header.Revision); err != nil {return nil, err}}
}
func waitDelete(ctx context.Context, client *v3.Client, key string, rev int64) error {cctx, cancel := context.WithCancel(ctx)defer cancel()var wr v3.WatchResponsewch := client.Watch(cctx, key, v3.WithRev(rev))for wr = range wch {for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {return nil}}}if err := wr.Err(); err != nil {return err}if err := ctx.Err(); err != nil {return err}return fmt.Errorf("lost watcher waiting for delete")
}
这里get的option,找到/my-ection前缀下最新创建的key,并且createRev的值小于等于当前key创建的createRev-1.
// WithLastCreate gets the key with the latest creation revision in the request range.
func WithLastCreate() []OpOption { return withTop(SortByCreateRevision, SortDescend) }
// WithMaxCreateRev filters out keys for Get with creation revisions greater than the given revision.
func WithMaxCreateRev(rev int64) OpOption { return func(op *Op) { op.maxCreateRev = rev } }
此时/my-election,在这个revision之前没有任何key,所以node2启动直接竞选到了主。
紧接着node3启动,在这个get,它能获取到node2创建的key,所以node3去watchnode2的的删除事件。
同理,node4启动以后,在这里的get,它获取的是node3的key,它去wachnode3的删除事件。
当node2释放后,node3获取到node2的删除事件变成主。node3释放以后,node4变成watch到node3
的删除事件变成主。
3、观察者
在上面的例子中,如果node4挂了以后,node2能继续变回为leader吗?
在引入observe以后,可以做到。
找到/my-election这个前缀下最开始创建的key,也就是node2。
然后把这个key放入返回ch中。
func (e *Election) observe(ctx context.Context, ch chan<- v3.GetResponse) {client := e.session.Client()defer close(ch)for {resp, err := client.Get(ctx, e.keyPrefix, v3.WithFirstCreate()...)if err != nil {return}var kv *mvccpb.KeyValuevar hdr *pb.ResponseHeaderif len(resp.Kvs) == 0 {cctx, cancel := context.WithCancel(ctx)// wait for first key put on prefixopts := []v3.OpOption{v3.WithRev(resp.Header.Revision), v3.WithPrefix()}wch := client.Watch(cctx, e.keyPrefix, opts...)for kv == nil {wr, ok := <-wchif !ok || wr.Err() != nil {cancel()return}// only accept puts; a delete will make observe() spinfor _, ev := range wr.Events {if ev.Type == mvccpb.PUT {hdr, kv = &wr.Header, ev.Kv// may have multiple revs; hdr.rev = the last rev// set to kv's rev in case batch has multiple Putshdr.Revision = kv.ModRevisionbreak}}}cancel()} else {hdr, kv = resp.Header, resp.Kvs[0]}select {case ch <- v3.GetResponse{Header: hdr, Kvs: []*mvccpb.KeyValue{kv}}:case <-ctx.Done():return}cctx, cancel := context.WithCancel(ctx)wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))keyDeleted := falsefor !keyDeleted {wr, ok := <-wchif !ok {cancel()return}for _, ev := range wr.Events {if ev.Type == mvccpb.DELETE {keyDeleted = truebreak}resp.Header = &wr.Headerresp.Kvs = []*mvccpb.KeyValue{ev.Kv}select {case ch <- *resp:case <-cctx.Done():cancel()return}}}cancel()}
}
wch := client.Watch(cctx, string(kv.Key), v3.WithRev(hdr.Revision+1))
接下里watch/my-election下第一个创建的key,在当前revision以后的delete事件。如果有delete事件则把这个消息通知出去。也就是发现了当前主的节点发生了delete事件,主发生了变化。
同理,当node2 down掉,它的key被删除,node3变成了主,observe此时watch的就是node3的key,因为此时node3创建的key是/my-election下的第一个key。
完整代码:
package mainimport ("context""fmt""log""os""time"clientv3 "go.etcd.io/etcd/client/v3""go.etcd.io/etcd/client/v3/concurrency"
)const (FOLLOWER = "follower"LEADER = "leader"
)var state string = FOLLOWER
var preState string = FOLLOWERfunc compaign(election *concurrency.Election, val string) {// Campaign for leadershipfmt.Println("start compaign!")if err := election.Campaign(context.Background(), val); err != nil {log.Fatal(err)}fmt.Println("Became leader!")preState = statestate = LEADER// Hold leadership until a key pressfmt.Println("Press Enter to release leadership...")fmt.Scanln()// Resign leadershipif err := election.Resign(context.Background()); err != nil {log.Fatal(err)}//preState = state//state = FOLLOWERfmt.Println("Released leadership.")
}func observe(election *concurrency.Election, val string) {ch := election.Observe(context.Background())for {select {case rsp, ok := <-ch:if !ok {fmt.Println("now I am follower")//election.Campaign(context.Background(), args[1])//重新开始观察go observe(election, val)return} else {fmt.Printf("leader now is:%s\n", string(rsp.Kvs[0].Value))if string(rsp.Kvs[0].Value) == val {fmt.Println("still be leader")preState = statestate = LEADER} else {fmt.Println("now become follower")preState = statestate = FOLLOWERif preState == LEADER {go compaign(election, val)}}}}}
}func main() {// Connect to etcdargs := os.Argsclient, err := clientv3.New(clientv3.Config{Endpoints: []string{"ipdizhi"}, // Replace with your etcd endpointsDialTimeout: 5 * time.Second,Username: "user",Password: "password",})if err != nil {log.Fatal(err)}defer client.Close()// Key for leader electionelectionKey := "/my-election"// Create an election sessionsession, err := concurrency.NewSession(client, concurrency.WithTTL(10))if err != nil {log.Fatal(err)}defer session.Close()election := concurrency.NewElection(session, electionKey)go compaign(election, args[1])go observe(election, args[1])for {}
}
相关文章:

etcd选举源码分析和例子
本文主要介绍etcd在分布式多节点服务中如何实现选主。 1、基础知识 在开始之前,先介绍etcd中 Version, Revision, ModRevision, CreateRevision 几个基本概念。 1、version 作用域为key,表示某个key的版本,每个key刚创建的version为1&#…...

Android 网络配置
ip tables 和 ip route 是两个不同的工具,它们在不同的阶段执行不同的功能。ip route 是用来管理和控制路由表的,它决定了数据包应该从哪个网卡或网关发送出去。ip tables 是用来配置、管理和控制网络数据包的过滤、转发和转换的,它根据用户定…...

【网络通信 -- WebRTC】Open WebRTC Toolkit 环境搭建指南
【网络通信 -- WebRTC】Open WebRTC Toolkit -- OWT-Server 编译安装指南 【1】OWT Server 与 Web Demo 视频会议环境搭建 【1.1】编译 OWT Server 安装依赖 ./scripts/installDepsUnattended.sh编译 scripts/build.js -t all --check 注意若不支持硬件加速则采用如下命令 s…...

文件上传漏洞(CVE-2022-30887)
简介 多语言药房管理系统(MPMS)是用PHP和MySQL开发的,该软件的主要目的是在药房和客户之间提供一套接口,客户是该软件的主要用户。该软件有助于为药房业务创建一个综合数据库,并根据到期、产品等各种参数提供各种报告…...

LeetCode-77-组合
一:题目描述: 给定两个整数 n 和 k,返回范围 [1, n] 中所有可能的 k 个数的组合。 你可以按 任何顺序 返回答案。 二:示例与提示 示例 1: 输入:n 4, k 2 输出: [[2,4],[3,4],[2,3],[1,2],[1,3],[1,4…...
Oracle中instr,rtrim,XMLPARSE,XMLAGG,GETCLOBVAL函数的使用
1:INSTR()函数 INSTR 是一个字符串函数,用于查找子字符串在源字符串中的位置。 它的语法如下: INSTR(source_string, search_string)source_string 是源字符串,即要在其中进行搜索的字符串。search_string 是要查找的子字符串。…...

java接入apiv3微信小程序支付(以java的eladmin框架为例)
一、需要准备的资料 1.小程序AppID 如:wx2e56f5****** 2.商户号 如:1641****** 3.商户API私钥路径:什么是商户API证书?如何获取商户API证书? 获取文件如下图: 如: 本地路径:E:\Env\e…...
第19节-PhotoShop基础课程-历史记录画笔工具
文章目录 前言1.历史记录画笔工具1.从当前状态创建文档2.创建新快照 2.历史记录艺术画笔工具 前言 任何记录都会被记录下来,并且可以拍快照,从历史中恢复,特别适合艺术创作的孩子 1.历史记录画笔工具 不只是画笔,所有操作记录都…...

MongoDB常用的比较符号和一些功能符号
比较符号 results collection.find({age: {$gt: 20}})功能符号 results collection.find({name: {$regex: ^M.*}})...

网络安全(黑客)技术自学
前言 一、什么是网络安全 网络安全可以基于攻击和防御视角来分类,我们经常听到的 “红队”、“渗透测试” 等就是研究攻击技术,而“蓝队”、“安全运营”、“安全运维”则研究防御技术。 无论网络、Web、移动、桌面、云等哪个领域,都有攻与防…...
C++ 引用
C 引用 引用变量是一个别名,也就是说,它是某个已存在变量的另一个名字。一旦把引用初始化为某个变量,就可以使用该引用名称或变量名称来指向变量。 C 引用 vs 指针 引用很容易与指针混淆,它们之间有三个主要的不同:…...

9.1.tensorRT高级(4)封装系列-自动驾驶案例项目self-driving-道路分割分析
目录 前言1. 道路分割总结 前言 杜老师推出的 tensorRT从零起步高性能部署 课程,之前有看过一遍,但是没有做笔记,很多东西也忘了。这次重新撸一遍,顺便记记笔记。 本次课程学习 tensorRT 高级-自动驾驶案例项目self-driving-道路分…...

稳定的 Glance 来了,安卓小部件有救了!
稳定的 Glance 来了,安卓小部件有救了! 稳定版本的 Glance 终于发布了,来一起看看吧,看看这一路的旅程,看看好用么,再看看如何使用! 前世今生 故事发生在两年的一天吧,其实夸张了…...

用友U8与MES系统API接口对接案例分析
企业数字化转型:轻易云数据集成平台助力 U8 ERPMES 系统集成 为什么选择数字化转型? 领导层对企业资源规划(ERP)的深刻理解促使了数字化转型的启动。采用精确的“N5”滚动计划,为供应商提供充分的预期信息,…...

web UI自动化介绍
文章目录 一、web UI自动化介绍1.1 执行UI自动化测试前提1.2 Selenium介绍以及知识点梳理 二、Selenium 学习2.1 基础2.1.1 环境安装与基础使用2.1.2 web浏览器控制2.1.3 常见控件的八大定位方式2.1.3.1 八大定位方式介绍2.1.3.2 NAME、ID定位2.1.3.3 css_selector定位2.1.3.4 …...

小米13Pro/13Ultra刷面具ROOT后激活LSPosed框架微X模块详细教程
喜欢买小米手机,很多是因为小米手机的开放,支持root权限,而ROOT对普通用户来说更多的是刷入DIY模块功能,今天ROM乐园小编就教大家如何使用面具ROOT,实现大家日常情况下非常依赖的微X模块功能,体验微X模块的…...
文盘Rust -- 给程序加个日志 | 京东云技术团队
日志是应用程序的重要组成部分。无论是服务端程序还是客户端程序都需要日志做为错误输出或者业务记录。在这篇文章中,我们结合log4rs聊聊rust 程序中如何使用日志。 log4rs类似java生态中的log4j,使用方式也很相似 log4rs中的基本概念 log4rs 的功能组件也由 appe…...

C语言深入理解指针(非常详细)(五)
目录 回调函数qsort使用举例qsort函数的模拟实现sizeof和strlen的对比sizeofstrlensizeof和strlen的对比一道关于sizeof的题 回调函数 回调函数就是一个通过函数指针调用的函数 如果你把函数的指针(地址)作为参数传递给另一个函数,当这个指…...

[docker]笔记-portainer的安装
1、portainer是一款可视化的容器管理软件,利用portainer可以轻松方便的管理和创建容器。portainer本身是一个容器,完全免费并且具有汉化版。本文介绍portainer的安装和使用。 2、安装好容器并配置好容器环境,可参照https://blog.csdn.net/bl…...

详解TCP/IP的三次握手和四次挥手
文章目录 前言一、TCP/IP协议的三次握手1.1 三次握手流程 二、TCP/IP的四次挥手2.1 四次挥手流程 三、主要字段3.1、标志位(Flags)3.2、序号(sequence number)3.3、确认号(acknowledgement number) 四、状态…...

页面渲染流程与性能优化
页面渲染流程与性能优化详解(完整版) 一、现代浏览器渲染流程(详细说明) 1. 构建DOM树 浏览器接收到HTML文档后,会逐步解析并构建DOM(Document Object Model)树。具体过程如下: (…...

(转)什么是DockerCompose?它有什么作用?
一、什么是DockerCompose? DockerCompose可以基于Compose文件帮我们快速的部署分布式应用,而无需手动一个个创建和运行容器。 Compose文件是一个文本文件,通过指令定义集群中的每个容器如何运行。 DockerCompose就是把DockerFile转换成指令去运行。 …...

【JVM面试篇】高频八股汇总——类加载和类加载器
目录 1. 讲一下类加载过程? 2. Java创建对象的过程? 3. 对象的生命周期? 4. 类加载器有哪些? 5. 双亲委派模型的作用(好处)? 6. 讲一下类的加载和双亲委派原则? 7. 双亲委派模…...

【C++进阶篇】智能指针
C内存管理终极指南:智能指针从入门到源码剖析 一. 智能指针1.1 auto_ptr1.2 unique_ptr1.3 shared_ptr1.4 make_shared 二. 原理三. shared_ptr循环引用问题三. 线程安全问题四. 内存泄漏4.1 什么是内存泄漏4.2 危害4.3 避免内存泄漏 五. 最后 一. 智能指针 智能指…...
快刀集(1): 一刀斩断视频片头广告
一刀流:用一个简单脚本,秒杀视频片头广告,还你清爽观影体验。 1. 引子 作为一个爱生活、爱学习、爱收藏高清资源的老码农,平时写代码之余看看电影、补补片,是再正常不过的事。 电影嘛,要沉浸,…...

逻辑回归暴力训练预测金融欺诈
简述 「使用逻辑回归暴力预测金融欺诈,并不断增加特征维度持续测试」的做法,体现了一种逐步建模与迭代验证的实验思路,在金融欺诈检测中非常有价值,本文作为一篇回顾性记录了早年间公司给某行做反欺诈预测用到的技术和思路。百度…...
怎么让Comfyui导出的图像不包含工作流信息,
为了数据安全,让Comfyui导出的图像不包含工作流信息,导出的图像就不会拖到comfyui中加载出来工作流。 ComfyUI的目录下node.py 直接移除 pnginfo(推荐) 在 save_images 方法中,删除或注释掉所有与 metadata …...
0x-3-Oracle 23 ai-sqlcl 25.1 集成安装-配置和优化
是不是受够了安装了oracle database之后sqlplus的简陋,无法删除无法上下翻页的苦恼。 可以安装readline和rlwrap插件的话,配置.bahs_profile后也能解决上下翻页这些,但是很多生产环境无法安装rpm包。 oracle提供了sqlcl免费许可,…...

Axure 下拉框联动
实现选省、选完省之后选对应省份下的市区...

Kubernetes 节点自动伸缩(Cluster Autoscaler)原理与实践
在 Kubernetes 集群中,如何在保障应用高可用的同时有效地管理资源,一直是运维人员和开发者关注的重点。随着微服务架构的普及,集群内各个服务的负载波动日趋明显,传统的手动扩缩容方式已无法满足实时性和弹性需求。 Cluster Auto…...