Raft 一致性算法
Raft
Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。
一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一:
- follower(跟随者):所有节点都以follower的状态开始。如果没有收到leader的消息则会变成candidate状态。
- candidate(候选人):会向其他节点“拉选票”,如果得到大部分的票则成为leader,这个过程叫做Leader选举(Leader Election)。未当选Leader的节点会将状态转换为follower。
- leader(领导者):所有对系统的修改都会先经过leader。
raft 是 etcd 和consoul的核心算法。
Raft 一致性算法
- Raft通过选出一个leader来简化日志副本的管理,例如,日志项(log entry)只允许从leader流向follower。
- 基于leader的方法,Raft算法可以分解成三个子问题。
- Leader election(领导选举):原来的leader挂掉后,必须选出一个新的leader。
- Log replication(日志复制):leader从客户端接收日志,并复制到整个集群中。
- Safety(安全性):如果有任意的server将日志项回放到状态机中了,那么其他的server只会回放相同的日志项。
动画演示
地址:http://thesecretlivesofdata.com/raft/
网络节点必须是奇数个。
动画主要包含三部分内容:
- 第一部分介绍简单版的领导者选举和日志复制的过程
- 第二部分介绍详细版的领导者选举和日志复制过程
- 第三部分介绍如果遇到网络分区(脑裂),raft算法是如何回复网络一致的。
Leader election(领导选举)
- Raft 使用一种心跳机制来触发领导人选举
- 当服务器程序启动时,节点都是 follower(跟随者) 身份
- 如果一个跟随者在一段时间里没有接收到任何消息,也就是选举超时,然后他就会认为系统中没有可用的领导者然后开始进行选举以选出新的领导者
- 要开始一次选举过程,follower 会给当前term加1并且转换成candidate状态,然后它会并行的向集群中的其他服务器节点发送请求投票的 RPCs 来给自己投票。
- 候选人的状态维持直到发生以下任何一个条件发生的时候
- 他自己赢得了这次的选举
- 其他的服务器成为领导者
- 一段时间之后没有任何一个获胜的人
Log replication(日志复制)
- 当选出 leader 后,它会开始接收客户端请求,每个请求会带有一个指令,可以被回放到状态机中
- leader 把指令追加成一个log entry,然后通过AppendEntries RPC并行地发送给其他的server,当该entry被多数server复制后,leader 会把该entry回放到状态机中,然后把结果返回给客户端
- 当 follower 宕机或者运行较慢时,leader 会无限地重发AppendEntries给这些follower,直到所有的follower都复制了该log entry
- raft的log replication要保证如果两个log entry有相同的index和term,那么它们存储相同的指令
- leader在一个特定的term和index下,只会创建一个log entry
代码
package mainimport ("os""strconv""fmt""sync""net/rpc""net/http""math/rand""time""log"
)//对每个节点id和端口的封装类型
type nodeInfo struct {id stringport string
}//声明节点对象类型Raft
type Raft struct {node nodeInfomu sync.Mutex//当前节点编号me intcurrentTerm intvotedFor intstate inttimeout intcurrentLeader int//该节点最后一次处理数据的时间lastMessageTime int64message chan booleclectCh chan boolheartbeat chan bool//子节点给主节点返回心跳信号heartbeatRe chan bool
}//声明leader对象
type Leader struct {//任期Term int//leader 编号LeaderId int
}//设置节点个数
const raftCount = 2var leader = Leader{0, -1}
//存储缓存信息
var bufferMessage = make(map[string]string)
//处理数据库信息
var mysqlMessage = make(map[string]string)
//操作消息数组下标
var messageId = 1
//用nodeTable存储每个节点中的键值对
var nodeTable map[string]stringfunc main() {//终端接收来的是数组if len(os.Args) > 1 {//接收终端输入的信息userId := os.Args[1]//字符串转换整型id, _ := strconv.Atoi(userId)fmt.Println(id)//定义节点id和端口号nodeTable = map[string]string{"1": ":8000","2": ":8001",}//封装nodeInfo对象node := nodeInfo{id: userId, port: nodeTable[userId]}//创建节点对象rf := Make(id)//确保每个新建立的节点都有端口对应//127.0.0.1:8000rf.node = node//注册rpcgo func() {//注册rpc,为了实现远程链接rf.raftRegisterRPC(node.port)}()if userId == "1" {go func() {//回调方法http.HandleFunc("/req", rf.getRequest)fmt.Println("监听8080")if err := http.ListenAndServe(":8080", nil); err != nil {fmt.Println(err)return}}()}}for {;}
}var clientWriter http.ResponseWriterfunc (rf *Raft) getRequest(writer http.ResponseWriter, request *http.Request) {request.ParseForm()if len(request.Form["age"]) > 0 {clientWriter = writerfmt.Println("主节点广播客户端请求age:", request.Form["age"][0])param := Param{Msg: request.Form["age"][0], MsgId: strconv.Itoa(messageId)}messageId++if leader.LeaderId == rf.me {rf.sendMessageToOtherNodes(param)} else {//将消息转发给leaderleaderId := nodeTable[strconv.Itoa(leader.LeaderId)]//连接远程rpc服务rpc, err := rpc.DialHTTP("tcp", "127.0.0.1"+leaderId)if err != nil {log.Fatal("\nrpc转发连接server错误:", leader.LeaderId, err)}var bo = false//首先给leader传递err = rpc.Call("Raft.ForwardingMessage", param, &bo)if err != nil {log.Fatal("\nrpc转发调用server错误:", leader.LeaderId, err)}}}
}func (rf *Raft) sendMessageToOtherNodes(param Param) {bufferMessage[param.MsgId] = param.Msg// 只有领导才能给其它服务器发送消息if rf.currentLeader == rf.me {var success_count = 0fmt.Printf("领导者发送数据中 。。。\n")go func() {rf.broadcast(param, "Raft.LogDataCopy", func(ok bool) {//需要其它服务端回应rf.message <- ok})}()for i := 0; i < raftCount-1; i++ {fmt.Println("等待其它服务端回应")select {case ok := <-rf.message:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()mysqlMessage[param.MsgId] = bufferMessage[param.MsgId]delete(bufferMessage, param.MsgId)if clientWriter != nil {fmt.Fprintf(clientWriter, "OK")}fmt.Printf("\n领导者发送数据结束\n")rf.mu.Unlock()}}}}}
}//注册Raft对象,注册后的目的为确保每个节点(raft) 可以远程接收
func (node *Raft) raftRegisterRPC(port string) {//注册一个服务器rpc.Register(node)//把服务绑定到http协议上rpc.HandleHTTP()err := http.ListenAndServe(port, nil)if err != nil {fmt.Println("注册rpc服务失败", err)}
}//创建节点对象
func Make(me int) *Raft {rf := &Raft{}rf.me = merf.votedFor = -1//0 follower ,1 candidate ,2 leaderrf.state = 0rf.timeout = 0rf.currentLeader = -1rf.setTerm(0)//初始化通道rf.message = make(chan bool)rf.heartbeat = make(chan bool)rf.heartbeatRe = make(chan bool)rf.eclectCh = make(chan bool)//每个节点都有选举权go rf.election()//每个节点都有心跳功能go rf.sendLeaderHeartBeat()return rf
}//选举成功后,应该广播所有的节点,本节点成为了leader
func (rf *Raft) sendLeaderHeartBeat() {for {select {case <-rf.heartbeat:rf.sendAppendEntriesImpl()}}
}func (rf *Raft) sendAppendEntriesImpl() {if rf.currentLeader == rf.me {var success_count = 0go func() {param := Param{Msg: "leader heartbeat",Arg: Leader{rf.currentTerm, rf.me}}rf.broadcast(param, "Raft.Heartbeat", func(ok bool) {rf.heartbeatRe <- ok})}()for i := 0; i < raftCount-1; i++ {select {case ok := <-rf.heartbeatRe:if ok {success_count++if success_count >= raftCount/2 {rf.mu.Lock()rf.lastMessageTime = milliseconds()fmt.Println("接收到了子节点们的返回信息")rf.mu.Unlock()}}}}}
}func randomRange(min, max int64) int64 {//设置随机时间rand.Seed(time.Now().UnixNano())return rand.Int63n(max-min) + min
}//获得当前时间(毫秒)
func milliseconds() int64 {return time.Now().UnixNano() / int64(time.Millisecond)
}func (rf *Raft) election() {var result bool//每隔一段时间发一次心跳for {//延时时间timeout := randomRange(1500, 3000)//设置该节点最有一次处理消息的时间rf.lastMessageTime = milliseconds()select {//间隔时间为1500-3000ms的随机值case <-time.After(time.Duration(timeout) * time.Millisecond):}result = falsefor !result {//选择leaderresult = rf.election_one_round(&leader)}}
}func (rf *Raft) election_one_round(args *Leader) bool {//已经有了leader,并且不是自己,那么returnif args.LeaderId > -1 && args.LeaderId != rf.me {fmt.Printf("%d已是leader,终止%d选举\n", args.LeaderId, rf.me)return true}var timeout int64var vote intvar triggerHeartbeat booltimeout = 2000last := milliseconds()success := falserf.mu.Lock()rf.becomeCandidate()rf.mu.Unlock()fmt.Printf("candidate=%d start electing leader\n", rf.me)for {fmt.Printf("candidate=%d send request vote to server\n", rf.me)go func() {rf.broadcast(Param{Msg: "send request vote"}, "Raft.ElectingLeader", func(ok bool) {//无论成功失败都需要发送到通道 避免堵塞rf.eclectCh <- ok})}()vote = 0triggerHeartbeat = falsefor i := 0; i < raftCount-1; i++ {fmt.Printf("candidate=%d waiting for select for i=%d\n", rf.me, i)select {case ok := <-rf.eclectCh:if ok {vote++success = vote >= raftCount/2 || rf.currentLeader > -1if success && !triggerHeartbeat {fmt.Println("okok", args)triggerHeartbeat = truerf.mu.Lock()rf.becomeLeader()args.Term = rf.currentTerm + 1args.LeaderId = rf.merf.mu.Unlock()fmt.Printf("candidate=%d becomes leader\n", rf.currentLeader)rf.heartbeat <- true}}}fmt.Printf("candidate=%d complete for select for i=%d\n", rf.me, i)}if (timeout+last < milliseconds()) || (vote >= raftCount/2 || rf.currentLeader > -1) {break} else {select {case <-time.After(time.Duration(5000) * time.Millisecond):}}}fmt.Printf("candidate=%d receive votes status=%t\n", rf.me, success)return success
}func (rf *Raft) becomeLeader() {rf.state = 2fmt.Println(rf.me, "成为了leader")rf.currentLeader = rf.me
}//设置发送参数的数据类型
type Param struct {Msg stringMsgId stringArg Leader
}func (rf *Raft) broadcast(msg Param, path string, fun func(ok bool)) {//设置不要自己给自己广播for nodeID, port := range nodeTable {if nodeID == rf.node.id {continue;}//链接远程rpcrp, err := rpc.DialHTTP("tcp", "127.0.0.1"+port)if err != nil {fun(false)continue}var bo = falseerr = rp.Call(path, msg, &bo)if err != nil {fun(false)continue}fun(bo)}}func (rf *Raft) becomeCandidate() {if rf.state == 0 || rf.currentLeader == -1 {//候选人状态rf.state = 1rf.votedFor = rf.merf.setTerm(rf.currentTerm + 1)rf.currentLeader = -1}
}func (rf *Raft) setTerm(term int) {rf.currentTerm = term
}//Rpc处理
func (rf *Raft) ElectingLeader(param Param, a *bool) error {//给leader投票*a = truerf.lastMessageTime = milliseconds()return nil
}func (rf *Raft) Heartbeat(param Param, a *bool) error {fmt.Println("\nrpc:heartbeat:", rf.me, param.Msg)if param.Arg.Term < rf.currentTerm {*a = false} else {leader = param.Argfmt.Printf("%d收到leader%d心跳\n", rf.currentLeader, leader.LeaderId)*a = truerf.mu.Lock()rf.currentLeader = leader.LeaderIdrf.votedFor = leader.LeaderIdrf.state = 0rf.lastMessageTime = milliseconds()fmt.Printf("server = %d learned that leader = %d\n", rf.me, rf.currentLeader)rf.mu.Unlock()}return nil
}//连接到leader节点
func (rf *Raft) ForwardingMessage(param Param, a *bool) error {fmt.Println("\nrpc:forwardingMessage:", rf.me, param.Msg)rf.sendMessageToOtherNodes(param)*a = truerf.lastMessageTime = milliseconds()return nil
}//接收leader传过来的日志
func (r *Raft) LogDataCopy(param Param, a *bool) error {fmt.Println("\nrpc:LogDataCopy:", r.me, param.Msg)bufferMessage[param.MsgId] = param.Msg*a = truereturn nil
}
Reference
老男孩 Go 5期
相关文章:
Raft 一致性算法
Raft Raft提供了一种在计算系统集群中分布状态机的通用方法,确保集群中的每个节点都同意一系列相同的状态转换。 一个Raft集群包含若干个服务器节点,通常为5个,这允许整个系统容忍2个节点的失效。每个节点处于以下三种状态之一: …...

驱动程序开发:基于EC20 4G模块自动拨号联网的两种方式(GobiNet工具拨号和PPP工具拨号)
目录一、EC20 4G模块简介二、根据移远官方文档修改EC20 4G模组驱动 1、因为EC20 4G模组min-pice接口其实就是usb接口,因此需要修改Linux内核源码drivers/usb/serial/option.c文件,如下图: 2、根据USB协议的要求,需要在drive…...
Web自动化测试——常见问题篇
文章目录一、什么是自动化测试二、为啥进行自动化测试(优点)三、Webdriver 的工作原理四、显示等待和隐式等待的区别五、什么样的项目适合做自动化六、自动化测试的流程七、如何分析生成的自动化测试报告一、什么是自动化测试 所谓的自动化测试就是使用…...

快速实现Modbus TCP转BACnet IP协议的方案
一、需求背景 BACnet是用于智能楼宇自控系统的主流通信协议,可用在暖通空调系统(HVAC,包括暖气、通风、空气调节),也可以用在照明控制、门禁系统、火警侦测系统及其相关的设备。楼宇中的受控设备都通过BACnet协议连接到…...

Unity CircleLayoutGroup 如何实现一个圆形自动布局组件
文章目录简介实现原理Editor 编辑器简介 Unity中提供了三种类型的自动布局组件,分别是Grid Layou Group、Horizontal Layout Group、Vertical Layout Group,本文自定义了一个圆形的自动布局组件Circle Layout Group,如图所示: Ra…...

springcloud+nacos+gateway案例
一、先搭建好springcloudnacos项目地址:https://javazhong.blog.csdn.net/article/details/128899999二、spring cloud gateway简述Spring Cloud Gateway 是Spring Cloud家族中的一款API网关。Gateway 建立在 Spring Webflux上,目标是提供一个简洁、高效的API网关&a…...
实习这么久,你知道Maven是如何从代码仓库中找到需要的依赖吗?
目录 碎碎念 Maven是如何找到代码仓库里需要的依赖的? 如何根据坐标在本地仓库中寻找所需要的依赖? 如何根据坐标在远程仓库中寻找所需要的依赖? Maven 如何使用 HTTP 或 HTTPS 协议从远程仓库中获取依赖项,请详细解释其原理…...

低代码/零代码的快速开发框架
目前国内主流的低代码开发平台有:宜搭、简道云、明道云、云程、氚云、伙伴云、道一云、JEPaaS、华炎魔方、搭搭云、JeecgBoot 、RuoYi等。这些平台各有优劣势,定位也不同,用户可以根据自己需求选择。 一、阿里云宜搭 宜搭是阿里巴巴集团在20…...
C# 中常见的设计模式
设计模式是一套被广泛应用于软件设计的最佳实践,它们可以帮助开发者解决特定的问题,提高代码的可重用性、可读性和可维护性。本文将介绍 C# 中常见的几种设计模式,并提供相应的示例代码。 工厂模式 工厂模式是一种创建型设计模式,…...
promethues/servicemonitor
目录 1.promethues 能保证源源不断地采集/metrics 信息吗?每次都是最新的吗 2.部署servicemonitor 的作用是什么? 3.pod 部署采集数据直接上报promthues ,不通过servicemonitor 可以吗? 4.你说的"此外,如果部署…...

postman使用简介
1、介绍 postman是一款功能强大的网页调试和模拟发送HTTP请求的Chrome插件,支持几乎所有类型的HTTP请求 2、下载及安装 官方文档:https://www.getpostman.com/docs/v6/ chrome插件:chrome浏览器应用商店直接搜索添加即可(需墙&…...
@DS注解在事务中实现数据源的切换@DS在事务中失效【已解决】
在Springboot的application.yml中的配置: spring:datasource:url: jdbc:mysql://localhost:3306/test2?serverTimezoneUTC&useUnicodetrue&characterEncodingutf8driver-class-name: com.mysql.cj.jdbc.Driverusername: rootpassword: rootdynamic:primar…...
Java I/O之文件系统
一、全文概览 在学习文件系统之前,需要了解下Java在I/O上的发展史:在Java7之前,打开和读取文件需要编写特别笨拙的代码,涉及到很多的InputStream、OutputStream等组合起来使用,每次在使用时或许都需要查一下文档才能记…...

Mysql元数据获取方法(information_schema绕过方法)
前提:如果waf或其它过滤了information_schema关键字,那我们该如何获取元数据呢?能够代替information_schema的有:sys.schema_auto_increment_columnssys.schema_table_statistics_with_bufferx$schema_table_statistics_with_buff…...
Eclipse快捷键
* 1.补全代码的声明:alt /* 2.快速修复: ctrl 1 * 3.批量导包:ctrl shift o* 4.使用单行注释:ctrl /* 5.使用多行注释: ctrl shift / * 6.取消多行注释:ctrl shift \* 7.复制指定行的代码:ctrl a…...

java ssm自习室选座预约系统开发springmvc
人工管理显然已无法应对时代的变化,而自习室选座预约系统开发能很好地解决这一问题,既能提高人力物力,又能提高预约选座的知名度,取代人工管理是必然趋势。 本自习室选座预约系统开发以SSM作为框架,JSP技术,…...

分享我从功能测试转型到测试开发的真实故事
由于这段时间我面试了很多家公司,也经历了之前公司的不愉快。所以我想写一篇文章来分享一下自己的面试体会。希望能对我在之后的工作或者面试中有一些帮助,也希望能帮助到正在找工作的你。 找工作 我们总是草率地进入一个自己不了解的公司工作…...

TypeScript快速入门———(二)TypeScript常用类型
文章目录概述1 类型注解2 常用基础类型概述3.原始类型4 数组类型5 类型别名6.函数类型7 对象类型8 接口9 元组10 类型推论11 类型断言12 字面量类型13 枚举14 any 类型15 typeof概述 TypeScript 是 JS 的超集,TS 提供了 JS 的所有功能,并且额外的增加了…...

Mac M1 使用Centos8➕VMware Fusion进行静态网络配置
大部分的流程网络上面都有当我们已经下载好mac m1版的Centos8链接: https://pan.baidu.com/s/1UTl4Lo-_c17s-PDj3dA6kA 提取码: 7xh2 和VMware Fusionhttps://www.vmware.com/cn/products/fusion.html之后就可以进行安装了在导入过后 记得将硬盘和内存都设置好了 记得在关机状态…...

RadGraph: Extracting Clinical Entities and Relations from Radiology Reports代码
文章来源:NeurIPS 文章类别:IE(Information Extraction) RadGraph主要基于dygie,主要文件为inference.py。 inference.py: 1、get_file_list(data_path) def get_file_list(path):file_list [item for item in glob.glob(f&q…...
Python爬虫实战:研究MechanicalSoup库相关技术
一、MechanicalSoup 库概述 1.1 库简介 MechanicalSoup 是一个 Python 库,专为自动化交互网站而设计。它结合了 requests 的 HTTP 请求能力和 BeautifulSoup 的 HTML 解析能力,提供了直观的 API,让我们可以像人类用户一样浏览网页、填写表单和提交请求。 1.2 主要功能特点…...
mongodb源码分析session执行handleRequest命令find过程
mongo/transport/service_state_machine.cpp已经分析startSession创建ASIOSession过程,并且验证connection是否超过限制ASIOSession和connection是循环接受客户端命令,把数据流转换成Message,状态转变流程是:State::Created 》 St…...
【磁盘】每天掌握一个Linux命令 - iostat
目录 【磁盘】每天掌握一个Linux命令 - iostat工具概述安装方式核心功能基础用法进阶操作实战案例面试题场景生产场景 注意事项 【磁盘】每天掌握一个Linux命令 - iostat 工具概述 iostat(I/O Statistics)是Linux系统下用于监视系统输入输出设备和CPU使…...
在 Nginx Stream 层“改写”MQTT ngx_stream_mqtt_filter_module
1、为什么要修改 CONNECT 报文? 多租户隔离:自动为接入设备追加租户前缀,后端按 ClientID 拆分队列。零代码鉴权:将入站用户名替换为 OAuth Access-Token,后端 Broker 统一校验。灰度发布:根据 IP/地理位写…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...

SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...

推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材)
推荐 github 项目:GeminiImageApp(图片生成方向,可以做一定的素材) 这个项目能干嘛? 使用 gemini 2.0 的 api 和 google 其他的 api 来做衍生处理 简化和优化了文生图和图生图的行为(我的最主要) 并且有一些目标检测和切割(我用不到) 视频和 imagefx 因为没 a…...
A2A JS SDK 完整教程:快速入门指南
目录 什么是 A2A JS SDK?A2A JS 安装与设置A2A JS 核心概念创建你的第一个 A2A JS 代理A2A JS 服务端开发A2A JS 客户端使用A2A JS 高级特性A2A JS 最佳实践A2A JS 故障排除 什么是 A2A JS SDK? A2A JS SDK 是一个专为 JavaScript/TypeScript 开发者设计的强大库ÿ…...

七、数据库的完整性
七、数据库的完整性 主要内容 7.1 数据库的完整性概述 7.2 实体完整性 7.3 参照完整性 7.4 用户定义的完整性 7.5 触发器 7.6 SQL Server中数据库完整性的实现 7.7 小结 7.1 数据库的完整性概述 数据库完整性的含义 正确性 指数据的合法性 有效性 指数据是否属于所定…...

认识CMake并使用CMake构建自己的第一个项目
1.CMake的作用和优势 跨平台支持:CMake支持多种操作系统和编译器,使用同一份构建配置可以在不同的环境中使用 简化配置:通过CMakeLists.txt文件,用户可以定义项目结构、依赖项、编译选项等,无需手动编写复杂的构建脚本…...