Raft 一致性算法
Raft
Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。
一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一:
- follower(跟随者):所有节点都以follower的状态开始。如果没有收到leader的消息则会变成candidate状态。
- candidate(候选人):会向其他节点“拉选票”,如果得到大部分的票则成为leader,这个过程叫做Leader选举(Leader Election)。未当选Leader的节点会将状态转换为follower。
- leader(领导者):所有对系统的修改都会先经过leader。
raft 是 etcd 和consoul的核心算法。
Raft 一致性算法
- Raft通过选出一个leader来简化日志副本的管理,例如,日志项(log entry)只允许从leader流向follower。
- 基于leader的方法,Raft算法可以分解成三个子问题。
- Leader election(领导选举):原来的leader挂掉后,必须选出一个新的leader。
- Log replication(日志复制):leader从客户端接收日志,并复制到整个集群中。
- Safety(安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项。
动画演示
地址:http://thesecretlivesofdata.com/raft/
网络节点必须是奇数个。
动画主要包含三部分内容:
- 第一部分介绍简单版的领导者选举和日志复制的过程
- 第二部分介绍详细版的领导者选举和日志复制过程
- 第三部分介绍如果遇到网络分区(脑裂),raft算法是如何回复网络一致的。
Leader election(领导选举)
- Raft 使用一种心跳机制来触发领导人选举
- 当服务器程序启动时,节点都是 follower(跟随者) 身份
- 如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者
- 要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态,然后它会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。
- 候选人的状态维持直到发生以下任何一个条件发生的时候
- 他自己赢得了这次的选举
- 其他的服务器成为领导者
- 一段时间之后没有任何一个获胜的人
Log replication(日志复制)
- 当选出 leader 后,它会开始接收客户端请求,每个请求会带有一个指令,可以被回放到状态机中
- leader 把指令追加成一个log entry,然后通过AppendEntries RPC并行地发送给其他的server,当该entry被多数server复制后,leader 会把该entry回放到状态机中,然后把结果返回给客户端
- 当 follower 宕机或者运行较慢时,leader 会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry
- raft的log replication要保证如果两个log entry有相同的index和term,那么它们存储相同的指令
- leader在一个特定的term和index下,只会创建一个log entry
代码
package mainimport ("os""strconv""fmt""sync""net/rpc""net/http""math/rand""time""log"
)//对每个节点id和端口的封装类型
type nodeInfo struct {id stringport string
}//声明节点对象类型Raft
type Raft struct {node nodeInfomu sync.Mutex//当前节点编号me intcurrentTerm intvotedFor intstate inttimeout intcurrentLeader int//该节点最后一次处理数据的时间lastMessageTime int64message chan booleclectCh chan boolheartbeat chan bool//子节点给主节点返回心跳信号heartbeatRe chan bool
}//声明leader对象
type Leader struct {//任期Term int//leader 编号LeaderId int
}//设置节点个数
const raftCount = 2var leader = Leader{0, -1}
//存储缓存信息
var bufferMessage = make(map[string]string)
//处理数据库信息
var mysqlMessage = make(map[string]string)
//操作消息数组下标
var messageId = 1
//用nodeTable存储每个节点中的键值对
var nodeTable map[string]stringfunc main() {//终端接收来的是数组if len(os.Args) > 1 {//接收终端输入的信息userId := os.Args[1]//字符串转换整型id, _ := strconv.Atoi(userId)fmt.Println(id)//定义节点id和端口号nodeTable = map[string]string{"1": ":8000","2": ":8001",}//封装nodeInfo对象node := nodeInfo{id: userId, port: nodeTable[userId]}//创建节点对象rf := Make(id)//确保每个新建立的节点都有端口对应//127.0.0.1:8000rf.node = node//注册rpcgo func() {//注册rpc,为了实现远程链接rf.raftRegisterRPC(node.port)}()if userId == "1" {go func() {//回调方法http.HandleFunc("/req", rf.getRequest)fmt.Println("监听8080")if err := http.ListenAndServe(":8080", nil); err != nil {fmt.Println(err)return}}()}}for {;}
}var clientWriter http.ResponseWriterfunc (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {request.ParseForm()if len(request.Form["age"]) > 0 {clientWriter = writerfmt.Println("主节点广播客户端请求age:", request.Form["age"][0])param := Param{Msg: request.Form["age"][0], MsgId: strconv.Itoa(messageId)}messageId++if leader.LeaderId == rf.me {rf.sendMessageToOtherNodes(param)} else {//将消息转发给leaderleaderId := nodeTable[strconv.Itoa(leader.LeaderId)]//连接远程rpc服务rpc, err := rpc.DialHTTP("tcp", "127.0.0.1"+leaderId)if err != nil {log.Fatal("\nrpc转发连接server错误:", leader.LeaderId, err)}var bo = false//首先给leader传递err = rpc.Call("Raft.ForwardingMessage", param, &bo)if err != nil {log.Fatal("\nrpc转发调用server错误:", leader.LeaderId, err)}}}
}func (rf *Raft) sendMessageToOtherNodes(param Param) {bufferMessage[param.MsgId] = param.Msg// 只有领导才能给其它服务器发送消息if rf.currentLeader == rf.me {var success_count = 0fmt.Printf("领导者发送数据中 。。。\n")go func() {rf.broadcast(param, "Raft.LogDataCopy", func(ok bool) {//需要其它服务端回应rf.message <- ok})}()for i := 0; i < raftCount-1; i++ {fmt.Println("等待其它服务端回应")select {case ok := <-rf.message:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()mysqlMessage[param.MsgId] = bufferMessage[param.MsgId]delete(bufferMessage, param.MsgId)if clientWriter != nil {fmt.Fprintf(clientWriter, "OK")}fmt.Printf("\n领导者发送数据结束\n")rf.mu.Unlock()}}}}}
}//注册Raft对象,注册后的目的为确保每个节点(raft) 可以远程接收
func (node *Raft) raftRegisterRPC(port string) {//注册一个服务器rpc.Register(node)//把服务绑定到http协议上rpc.HandleHTTP()err := http.ListenAndServe(port, nil)if err != nil {fmt.Println("注册rpc服务失败", err)}
}//创建节点对象
func Make(me int) *Raft {rf := &Raft{}rf.me = merf.votedFor = -1//0 follower ,1 candidate ,2 leaderrf.state = 0rf.timeout = 0rf.currentLeader = -1rf.setTerm(0)//初始化通道rf.message = make(chan bool)rf.heartbeat = make(chan bool)rf.heartbeatRe = make(chan bool)rf.eclectCh = make(chan bool)//每个节点都有选举权go rf.election()//每个节点都有心跳功能go rf.sendLeaderHeartBeat()return rf
}//选举成功后,应该广播所有的节点,本节点成为了leader
func (rf *Raft) sendLeaderHeartBeat() {for {select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()}}
}func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var success_count = 0go func() {param := Param{Msg: "leader heartbeat",Arg: Leader{rf.currentTerm, rf.me}}rf.broadcast(param, "Raft.Heartbeat", func(ok bool) {rf.heartbeatRe <- ok})}()for i := 0; i < raftCount-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()fmt.Println("接收到了子节点们的返回信息")rf.mu.Unlock()}}}}}
}func randomRange(min, max int64) int64 {//设置随机时间rand.Seed(time.Now().UnixNano())return rand.Int63n(max-min) + min
}//获得当前时间(毫秒)
func milliseconds() int64 {return time.Now().UnixNano() / int64(time.Millisecond)
}func (rf *Raft) election() {var result bool//每隔一段时间发一次心跳for {//延时时间timeout := randomRange(1500, 3000)//设置该节点最有一次处理消息的时间rf.lastMessageTime = milliseconds()select {//间隔时间为1500-3000ms的随机值case <-time.After(time.Duration(timeout) * time.Millisecond):}result = falsefor !result {//选择leaderresult = rf.election_one_round(&leader)}}
}func (rf *Raft) election_one_round(args *Leader) bool {//已经有了leader,并且不是自己,那么returnif args.LeaderId > -1 && args.LeaderId != rf.me {fmt.Printf("%d已是leader,终止%d选举\n", args.LeaderId, rf.me)return true}var timeout int64var vote intvar triggerHeartbeat booltimeout = 2000last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()fmt.Printf("candidate=%d start electing leader\n", rf.me)for {fmt.Printf("candidate=%d send request vote to server\n", rf.me)go func() {rf.broadcast(Param{Msg: "send request vote"}, "Raft.ElectingLeader", func(ok bool) {//无论成功失败都需要发送到通道 避免堵塞rf.eclectCh <- ok})}()vote = 0triggerHeartbeat = falsefor i := 0; i < raftCount-1; i++ {fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.eclectCh:if ok {vote++success = vote >= raftCount/2 || rf.currentLeader > -1if success && !triggerHeartbeat {fmt.Println("okok", args)triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()args.Term = rf.currentTerm + 1args.LeaderId = rf.merf.mu.Unlock()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)rf.heartbeat <- true}}}fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (vote >= raftCount/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(5000) * time.Millisecond):}}}fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
}func (rf *Raft) becomeLeader() {rf.state = 2fmt.Println(rf.me, "成为了leader")rf.currentLeader = rf.me
}//设置发送参数的数据类型
type Param struct {Msg stringMsgId stringArg Leader
}func (rf *Raft) broadcast(msg Param, path string, fun func(ok bool)) {//设置不要自己给自己广播for nodeID, port := range nodeTable {if nodeID == rf.node.id {continue;}//链接远程rpcrp, err := rpc.DialHTTP("tcp", "127.0.0.1"+port)if err != nil {fun(false)continue}var bo = falseerr = rp.Call(path, msg, &bo)if err != nil {fun(false)continue}fun(bo)}}func (rf *Raft) becomeCandidate() {if rf.state == 0 || rf.currentLeader == -1 {//候选人状态rf.state = 1rf.votedFor = rf.merf.setTerm(rf.currentTerm + 1)rf.currentLeader = -1}
}func (rf *Raft) setTerm(term int) {rf.currentTerm = term
}//Rpc处理
func (rf *Raft) ElectingLeader(param Param, a *bool) error {//给leader投票*a = truerf.lastMessageTime = milliseconds()return nil
}func (rf *Raft) Heartbeat(param Param, a *bool) error {fmt.Println("\nrpc:heartbeat:", rf.me, param.Msg)if param.Arg.Term < rf.currentTerm {*a = false} else {leader = param.Argfmt.Printf("%d收到leader%d心跳\n", rf.currentLeader, leader.LeaderId)*a = truerf.mu.Lock()rf.currentLeader = leader.LeaderIdrf.votedFor = leader.LeaderIdrf.state = 0rf.lastMessageTime = milliseconds()fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()}return nil
}//连接到leader节点
func (rf *Raft) ForwardingMessage(param Param, a *bool) error {fmt.Println("\nrpc:forwardingMessage:", rf.me, param.Msg)rf.sendMessageToOtherNodes(param)*a = truerf.lastMessageTime = milliseconds()return nil
}//接收leader传过来的日志
func (r *Raft) LogDataCopy(param Param, a *bool) error {fmt.Println("\nrpc:LogDataCopy:", r.me, param.Msg)bufferMessage[param.MsgId] = param.Msg*a = truereturn nil
}
Reference
老男孩 Go 5期
相关文章:
Raft 一致性算法
Raft Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。 一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一: …...

驱动程序开发:基于EC20 4G模块自动拨号联网的两种方式(GobiNet工具拨号和PPP工具拨号)
目录一、EC20 4G模块简介二、根据移远官方文档修改EC20 4G模组驱动 1、因为EC20 4G模组min-pice接口其实就是usb接口,因此需要修改Linux内核源码drivers/usb/serial/option.c文件,如下图: 2、根据USB协议的要求,需要在drive…...
Web自动化测试——常见问题篇
文章目录一、什么是自动化测试二、为啥进行自动化测试(优点)三、Webdriver 的工作原理四、显示等待和隐式等待的区别五、什么样的项目适合做自动化六、自动化测试的流程七、如何分析生成的自动化测试报告一、什么是自动化测试 所谓的自动化测试就是使用…...

快速实现Modbus TCP转BACnet IP协议的方案
一、需求背景 BACnet是用于智能楼宇自控系统的主流通信协议,可用在暖通空调系统(HVAC,包括暖气、通风、空气调节),也可以用在照明控制、门禁系统、火警侦测系统及其相关的设备。楼宇中的受控设备都通过BACnet协议连接到…...

Unity CircleLayoutGroup 如何实现一个圆形自动布局组件
文章目录简介实现原理Editor 编辑器简介 Unity中提供了三种类型的自动布局组件,分别是Grid Layou Group、Horizontal Layout Group、Vertical Layout Group,本文自定义了一个圆形的自动布局组件Circle Layout Group,如图所示: Ra…...

springcloud+nacos+gateway案例
一、先搭建好springcloudnacos项目地址:https://javazhong.blog.csdn.net/article/details/128899999二、spring cloud gateway简述Spring Cloud Gateway 是Spring Cloud家族中的一款API网关。Gateway 建立在 Spring Webflux上,目标是提供一个简洁、高效的API网关&a…...
实习这么久,你知道Maven是如何从代码仓库中找到需要的依赖吗?
目录 碎碎念 Maven是如何找到代码仓库里需要的依赖的? 如何根据坐标在本地仓库中寻找所需要的依赖? 如何根据坐标在远程仓库中寻找所需要的依赖? Maven 如何使用 HTTP 或 HTTPS 协议从远程仓库中获取依赖项,请详细解释其原理…...

低代码/零代码的快速开发框架
目前国内主流的低代码开发平台有:宜搭、简道云、明道云、云程、氚云、伙伴云、道一云、JEPaaS、华炎魔方、搭搭云、JeecgBoot 、RuoYi等。这些平台各有优劣势,定位也不同,用户可以根据自己需求选择。 一、阿里云宜搭 宜搭是阿里巴巴集团在20…...
C# 中常见的设计模式
设计模式是一套被广泛应用于软件设计的最佳实践,它们可以帮助开发者解决特定的问题,提高代码的可重用性、可读性和可维护性。本文将介绍 C# 中常见的几种设计模式,并提供相应的示例代码。 工厂模式 工厂模式是一种创建型设计模式,…...
promethues/servicemonitor
目录 1.promethues 能保证源源不断地采集/metrics 信息吗?每次都是最新的吗 2.部署servicemonitor 的作用是什么? 3.pod 部署采集数据直接上报promthues ,不通过servicemonitor 可以吗? 4.你说的"此外,如果部署…...

postman使用简介
1、介绍 postman是一款功能强大的网页调试和模拟发送HTTP请求的Chrome插件,支持几乎所有类型的HTTP请求 2、下载及安装 官方文档:https://www.getpostman.com/docs/v6/ chrome插件:chrome浏览器应用商店直接搜索添加即可(需墙&…...
@DS注解在事务中实现数据源的切换@DS在事务中失效【已解决】
在Springboot的application.yml中的配置: spring:datasource:url: jdbc:mysql://localhost:3306/test2?serverTimezoneUTC&useUnicodetrue&characterEncodingutf8driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: rootdynamic:primar…...
Java I/O之文件系统
一、全文概览 在学习文件系统之前,需要了解下Java在I/O上的发展史:在Java7之前,打开和读取文件需要编写特别笨拙的代码,涉及到很多的InputStream、OutputStream等组合起来使用,每次在使用时或许都需要查一下文档才能记…...

Mysql元数据获取方法(information_schema绕过方法)
前提:如果waf或其它过滤了information_schema关键字,那我们该如何获取元数据呢?能够代替information_schema的有:sys.schema_auto_increment_columnssys.schema_table_statistics_with_bufferx$schema_table_statistics_with_buff…...
Eclipse快捷键
* 1.补全代码的声明:alt /* 2.快速修复: ctrl 1 * 3.批量导包:ctrl shift o* 4.使用单行注释:ctrl /* 5.使用多行注释: ctrl shift / * 6.取消多行注释:ctrl shift \* 7.复制指定行的代码:ctrl a…...

java ssm自习室选座预约系统开发springmvc
人工管理显然已无法应对时代的变化,而自习室选座预约系统开发能很好地解决这一问题,既能提高人力物力,又能提高预约选座的知名度,取代人工管理是必然趋势。 本自习室选座预约系统开发以SSM作为框架,JSP技术,…...

分享我从功能测试转型到测试开发的真实故事
由于这段时间我面试了很多家公司,也经历了之前公司的不愉快。所以我想写一篇文章来分享一下自己的面试体会。希望能对我在之后的工作或者面试中有一些帮助,也希望能帮助到正在找工作的你。 找工作 我们总是草率地进入一个自己不了解的公司工作…...

TypeScript快速入门———(二)TypeScript常用类型
文章目录概述1 类型注解2 常用基础类型概述3.原始类型4 数组类型5 类型别名6.函数类型7 对象类型8 接口9 元组10 类型推论11 类型断言12 字面量类型13 枚举14 any 类型15 typeof概述 TypeScript 是 JS 的超集,TS 提供了 JS 的所有功能,并且额外的增加了…...

Mac M1 使用Centos8➕VMware Fusion进行静态网络配置
大部分的流程网络上面都有当我们已经下载好mac m1版的Centos8链接: https://pan.baidu.com/s/1UTl4Lo-_c17s-PDj3dA6kA 提取码: 7xh2 和VMware Fusionhttps://www.vmware.com/cn/products/fusion.html之后就可以进行安装了在导入过后 记得将硬盘和内存都设置好了 记得在关机状态…...

RadGraph: Extracting Clinical Entities and Relations from Radiology Reports代码
文章来源:NeurIPS 文章类别:IE(Information Extraction) RadGraph主要基于dygie,主要文件为inference.py。 inference.py: 1、get_file_list(data_path) def get_file_list(path):file_list [item for item in glob.glob(f&q…...

Spark 之 入门讲解详细版(1)
1、简介 1.1 Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发通用内存并行计算框架。Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处&…...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
MySQL中【正则表达式】用法
MySQL 中正则表达式通过 REGEXP 或 RLIKE 操作符实现(两者等价),用于在 WHERE 子句中进行复杂的字符串模式匹配。以下是核心用法和示例: 一、基础语法 SELECT column_name FROM table_name WHERE column_name REGEXP pattern; …...
【JavaSE】绘图与事件入门学习笔记
-Java绘图坐标体系 坐标体系-介绍 坐标原点位于左上角,以像素为单位。 在Java坐标系中,第一个是x坐标,表示当前位置为水平方向,距离坐标原点x个像素;第二个是y坐标,表示当前位置为垂直方向,距离坐标原点y个像素。 坐标体系-像素 …...
JS手写代码篇----使用Promise封装AJAX请求
15、使用Promise封装AJAX请求 promise就有reject和resolve了,就不必写成功和失败的回调函数了 const BASEURL ./手写ajax/test.jsonfunction promiseAjax() {return new Promise((resolve, reject) > {const xhr new XMLHttpRequest();xhr.open("get&quo…...

VisualXML全新升级 | 新增数据库编辑功能
VisualXML是一个功能强大的网络总线设计工具,专注于简化汽车电子系统中复杂的网络数据设计操作。它支持多种主流总线网络格式的数据编辑(如DBC、LDF、ARXML、HEX等),并能够基于Excel表格的方式生成和转换多种数据库文件。由此&…...
ubuntu22.04 安装docker 和docker-compose
首先你要确保没有docker环境或者使用命令删掉docker sudo apt-get remove docker docker-engine docker.io containerd runc安装docker 更新软件环境 sudo apt update sudo apt upgrade下载docker依赖和GPG 密钥 # 依赖 apt-get install ca-certificates curl gnupg lsb-rel…...
面试高频问题
文章目录 🚀 消息队列核心技术揭秘:从入门到秒杀面试官1️⃣ Kafka为何能"吞云吐雾"?性能背后的秘密1.1 顺序写入与零拷贝:性能的双引擎1.2 分区并行:数据的"八车道高速公路"1.3 页缓存与批量处理…...

客户案例 | 短视频点播企业海外视频加速与成本优化:MediaPackage+Cloudfront 技术重构实践
01技术背景与业务挑战 某短视频点播企业深耕国内用户市场,但其后台应用系统部署于东南亚印尼 IDC 机房。 随着业务规模扩大,传统架构已较难满足当前企业发展的需求,企业面临着三重挑战: ① 业务:国内用户访问海外服…...
如何通过git命令查看项目连接的仓库地址?
要通过 Git 命令查看项目连接的仓库地址,您可以使用以下几种方法: 1. 查看所有远程仓库地址 使用 git remote -v 命令,它会显示项目中配置的所有远程仓库及其对应的 URL: git remote -v输出示例: origin https://…...