当前位置: 首页 > news >正文

Kafka~消息发送过程与ISR机制了解

消息发送过程

使用Kafka发送消息时,一般有两种方式分别是:

  1. 同步发送
  2. 异步发送

同步发送时,可以在发送消息后,通过get方法等待消息结果,这种情况能够准确的拿到消息最终的发送结果,要么是成功、要么是失败。

而异步发送,是采用了callback的方式进行回调的,可以大大的的提升消息的吞吐量,也可以根据回调来判断消息是否发送成功。

不管是同步发送还是异步发送,最终都需要在Producer端把消息发送到Broker中,那么这个过程大致如下:
在这里插入图片描述
Kafka的Producer在发送消息时通常涉及两个线程,主线程(Main)和发送线程(Sender)和一个消息累加器(RecordAccumulator)

  • Main线程是Producer的入口,负责初始化Producer的配置、创建KafkaProducer实例并执行发送逻辑。它会按照用户定义的发送方式(同步或异步)发送消息,然后等待消息发送完成。一条消息的发送,在调用send方法后,会经过拦截器、序列化器及分区器。拦截器主要用于在消息发送之前和之后对消息进行定制化的处理,如对消息进行修改、记录日志、统计信息等。序列化器负责将消息的键和值对象转换为字节数组,以便在网络上传输。分区器决定了一条消息被发送到哪个Partition中。它根据消息的键(如果有)或者特定的分区策略,选择出一个目标Partition。

  • RecordAccumulator在Kafka Producer中起到了消息积累和批量发送的作用,当Producer发送消息时,不会立即将每条消息发送到Broker,而是将消息添加到RecordAccumulator维护的内部缓冲区中,RecordAccumulator会根据配置的条件(如batch.size、linger.ms)对待发送的消息进行批量处理。当满足指定条件时,RecordAccumulator将缓冲区中的消息组织成一个批次(batch),然后一次性发送给Broker。如果发送失败或发生错RecordAccumulator可以从将消息重新分配到新的批次中进行重试。这样可以确保消息不会丢失,同时提高消息的可靠性。

  • Send线程是负责实际的消息发送和处理的。发送线程会定期从待发送队列中取出消息,并将其发送到对应的Partition的 Leader Broker上。它主要负责网络通信操作,并处理发送请求的结果,包括确认的接收、错误处理等。

  • NetworkClient和Selector是两个重要的组件,分别负责网络通信和1/0多路复用。

发送线程会把消息发送到Kafka集群中对应的Partition的Parrtition Leader,Partition Leader接收到消息后,会对消息进行一系列的处理。它会将消息写入本地的日志文件(Log),存储为segment文件,因为是顺序写,segment文件也是顺序截断,为了保证数据的可靠性和高可用性,Kafka使用了消息复制机制。Leader Broker接收到消息后,会将消息复制到其他副本(Partition Follower)。副本是通过网络复制数据的,它们门会定期从LeaderBroker同步消息。

每一个Partition Follower在写入本地log之后,会向Leader发送一个TACK,但是我们的Producer其实也是需要依赖ACK才能知道消息有没有投递成功的,而这个ACK是何时发送的,Producer又要不要关心呢? 这就涉及到了kafka的ack机制,生产者会根据设置的request.required.acks参数不同,选择等待或或直接发送下一条消息:

  • request.required.acks = 0
    表示Producer不等待来自Leader的ACK确认,直接发送送下一条消息。在这种情况下,如果Leader分片所在服务器发生宕机,那么这些已经发送的数据会丢失。
  • request.required.acks = 1
    表示Producer等待来自Leader的ACK确认,当收到确认人后才发送下一条消息。在这种情况下,消息一定会被写入到 Leader服务器,但并不保证Follow节点已已经同步完成。所以如果在消息已经被写入Leader分片,但是还未同步到Follower节点,此时Leade分片所在服务器宕机了,那么这条消息也就丢失了,无法被消费到。
  • request.required.acks = -1
    Leader会把消息复制到集群中的所有ISR(In-Sync Replicas,同步副本),要等待所有ISR的ACK确认后,再向Producer发送ACK消息,然后Producer再继续发下下一条消息。

ISR机制

Kafka 中的 ISR(In-Sync Replicas)机制是一种用于确保数据可靠性和一致性的重要机制。ISR 是一组副本,它包括分区的领导者(Leader)和追随者(Follower)副本,这些副本与领导者保持数据同步。以下是关于 Kafka 的 ISR 机制的详细介绍:

  • 意义:

ISR 机制动态维护了一个与 Leader 副本保持同步的副本集合,只有在 ISR 集合中的副本才有资格参与 Leader 的选举。通过 ISR 机制,可以确保在 Leader 副本出现故障时,能够快速从 ISR 集合中选举出新的 Leader,从而避免数据丢失和服务中断。

  • 用途:
  1. 保证数据可靠性:ISR 机制通过副本冗余机制,提供了 Kafka 消息的高可靠性。
  2. 实现故障转移:ISR 机制可以做到故障转移,保障服务的可用性。当 Leader 副本出现故障时,Kafka 会从 ISR 集合中选举出新的 Leader,从而保证服务的连续性。
  3. 平衡复制方案:ISR 机制平衡了主从架构下,复制方案的选择(同步/异步/少数服从多数),让使用者根据参数自行选择。
  • 实现方式:
  1. 数据同步:Leader Replica 接收到 Producer 发送的消息后,将其写入本地日志,并通过 Pull 模式等待 Follower Replica 主动拉取。Follower Replica 从 Leader Replica 拉取数据并写入本地日志后,将拉取偏移量(fetch offset)返回给 Leader。
  2. 同步状态监测:Leader Replica 持续监控每个 Follower Replica 的拉取偏移量,将其与自身的最新消息偏移量(log end offset)进行比较。若 Follower Replica 的拉取偏移量与 Leader 相差不超过一定阈值(由 replica.lag.time.max.ms 参数控制),则认为该 Follower 处于同步状态,将其纳入 ISR。
  3. ISR 调整:当 Follower Replica 因网络延迟、Broker 故障等原因导致拉取偏移量落后过多,超出阈值时,Leader Replica 会将其从 ISR 中移除。当 Follower Replica 恢复同步后,再次将其加入 ISR。
  • 详细过程

当消息被写入Kafka的分区时,它首先会被写入Leader,然后LLeader将消息复制给ISR中的所有副本。只有当ISR中的所有副本都成功地接收到并确认了消息后,主副本才会认为消息已成功提交。这种机制确保了数据的可靠性和一致性。

在Kafka中,ISR(In-Sync Replicas)列表的维护是通过副本状态和配置参数来进行的。具体的ISR列表维护机制在不同的Kafka版本中有所变化。

  • 在0.9.x之前的版本,Kafka有一个核心的参数:replica.lag.max.messages,表示如果Follower落后
    Leader的消息数量超过了这个参数值,就认为Follower就会从ISR列表里移除。

但是,基于replica.lag.max.messages 这种实现,在瞬间高并发访问的情况下会有问题:比如Leader瞬间接收到几万条消息,然后所有Follower还没来得及同步过去,此时所有follower都会被踢出ISR列表。

  • Kafka从0.9.x版本开始,引入了 replica.lag.max.ms参数,表示如果果某个Follower的LEO (latest end
    offset)一直落后Leader超过了10秒,那么才会被从ISR列表里移除。

这样的话,即使出现瞬间流量,导致Follower落后很多数据,但是只要在限定的时间内尽快追上来就行了。

总之,通过 ISR 机制,Kafka 可以保证在 Leader 副本出现故障时,能够快速从 ISR 集合中选举出新的 Leader,从而避免数据丢失和服务中断。同时,ISR 机制也可以提高 Kafka 系统的可靠性和可用性。

相关文章:

Kafka~消息发送过程与ISR机制了解

消息发送过程 使用Kafka发送消息时,一般有两种方式分别是: 同步发送异步发送 同步发送时,可以在发送消息后,通过get方法等待消息结果,这种情况能够准确的拿到消息最终的发送结果,要么是成功、要么是失败…...

multiprocessing.Queue 多个进程生产和多个进程消费怎么处理

在这个示例中,我们创建了一个队列 q,并通过 multiprocessing.Manager().Queue() 来确保队列可以在多个进程之间共享。我们定义了 consumer 和 producer 函数,分别用于从队列中获取数据和向队列中放入数据。 在主进程中,我们创建了…...

配置 Python 解释器及虚拟环境

配置 Python 解释器及虚拟环境 配置 Python 解释器: 1. 打开 PyCharm,进入“File”(文件)菜单,选择“Settings”(设置)。 2. 在弹出的设置窗口中,选择“Project: [项目名称]”下的…...

JeecgBoot中如何对敏感信息进行脱敏处理?

数据脱敏即将一些敏感信息通过加密、格式化等方式处理,展示给用户一个新的或是格式化后的信息,避免了敏感信息的暴露。 一、接口脱敏注解 针对接口数据实现脱敏加密,只加密,一般此方案用于数据加密展示。 1.1 注解介绍 注解作用域…...

【Docker】存储数据卷

目录 1、挂载数据卷到容器里 2、查询挂载文件 3、容器与主机之间映射共享卷 4、三个容器之间使用共享卷 5、卷数据的备份与恢复 5.1 备份 5.2 恢复 1、挂载数据卷到容器里 docker run -itd --name test02 -v /data nginx docker exec -it test02 bashls / docker inspe…...

《昇思25天学习打卡营第12天 | 昇思MindSpore基于MindSpore的GPT2文本摘要》

12天 本节学习了基于MindSpore的GPT2文本摘要。 1.数据集加载与处理 1.1.数据集加载 1.2.数据预处理 2.模型构建 2.1构建GPT2ForSummarization模型 2.2动态学习率 3.模型训练 4.模型推理...

深入解析npm unpublish命令:使用场景与实践指南

npm(Node Package Manager)是JavaScript编程语言的包管理器,广泛用于Node.js应用程序。npm unpublish命令允许用户从npm仓库中撤回(unpublish)一个包的特定版本。本文将详细介绍npm unpublish命令的使用场景、操作步骤…...

有趣的仿神经猫html5圈小猫游戏源码

有趣的仿神经猫html5圈小猫游戏源码,点击小圆点,围住小猫游戏。猫已经跑到地图边缘,你输了。内含json数据,部署到服务器方可运行 微信扫码免费获取源码...

Redis 7.x 系列【10】数据类型之有序集合(ZSet)

有道无术,术尚可求,有术无道,止于术。 本系列Redis 版本 7.2.5 源码地址:https://gitee.com/pearl-organization/study-redis-demo 文章目录 1. 概述2. 常用命令2.1 ZADD2.2 ZCARD2.3 ZSCORE2.4 ZRANGE2.5 ZREVRANGE2.6 ZRANK2.7…...

操作系统-文件的物理结构(文件分配方式)

文章目录 总览文件块和磁盘块连续分配顺序访问直接访问(随机访问)为什么连续分配同时支持这两种访问模式? 链接分配隐式链接显示链接小结索引分配链接方案多层索引混合索引小结 总结 总览 文件数据存放在外存中 文件块和磁盘块 文件内通过逻…...

Spring Boot集成jsoup实现html解析

1.什么是jsoup jsoup 是一款 Java 的 HTML 解析器,可直接解析某个 URL 地址、HTML 文本内容。它提供了一套非常省力的 API,可通过 DOM,CSS 以及类似于 jQuery 的操作方法来取出和操作数据,可操作 HTML 元素、属性、文本。 JSo…...

[240629] 阿里云揭秘其数据中心设计和自研网络,用于大语言模型训练 | Jina AI 发布最新的神经网络重排序模型

目录 阿里云揭秘其数据中心设计和自研网络,用于大语言模型训练Jina AI 发布最新的神经网络重排序模型 阿里云揭秘其数据中心设计和自研网络,用于大语言模型训练 阿里云近日公布了其专为大型语言模型 (LLM) 训练流量而设计的基于以太网的网络设计&#x…...

【Docker0】网络更改

目录 1. 停止docker服务 2. 关闭docker默认桥接网络接口 3. 从系统删除docker0接口 4. 创建一个名为bridge0的新接口 5. 添加ip地址和子网掩码 6. 启用bridge0接口 7. (如果没起来就执行该句) 8. 查看ip 1. 停止docker服务 sudo service docker…...

IDEA中导入Maven项目

IDEA中导入Maven项目 方式1:使用Maven面板,快速导入项目 打开IDEA,选择右侧Maven面板,点击 号,选中对应项目的pom.xml文件,双击即可 说明:如果没有Maven面板,选择 View > Appe…...

px、em、rem、rpx 作用和用法详解

px px像素(Pixel)。相对长度单位。像素px是相对于显示器屏幕分辨率而言的。 PX特点 IE无法调整那些使用px作为单位的字体大小; 国外的大部分网站能够调整的原因在于其使用了em或rem作为字体单位; Firefox能够调整px和em&#xff…...

Linux 常用命令 - dd 【复制及转换文件内容】

简介 dd 命令源自于磁盘复制(disk dump)的缩写,是 Linux 和 Unix 系统中用于转换和复制文件的一个强大工具。它可以在复制过程中进行格式转换,支持不同的块大小,能够直接对硬盘设备进行操作,非常适合进行备…...

全网唯一免费无水印AI视频工具!

最近Morph Studio开始免费公测!支持高清画质,可以上传语音,同步口型,最重要的是生成的视频没有水印! Morph Studio国内就可以访问,可以使用国内邮箱注册(我用的163邮箱),…...

kafka(四)消息类型

一、同步消息 1、生产者 同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调…...

Emacs之显示blame插件:blamer、git-messenger(一百四十四)

简介: CSDN博客专家,专注Android/Linux系统,分享多mic语音方案、音视频、编解码等技术,与大家一起成长! 优质专栏:Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏:多媒…...

【10分钟速通webpack,全流程打包,编译,发包,全干货,附代码 】

需求 后端有个nodejs 基础库,用typescript编写,需要发包到代码仓库上,被其它业务引入。这其中就涉及了: 编译, 打包,发包。 工作流速览 前提依赖 webpack主体 npm install --save-dev webpack webpack…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时,你可能需要保留重要的数据,例如通讯录。好在,将通讯录从 iPhone 转移到 Android 手机非常简单,你可以从本文中学习 6 种可靠的方法,确保随时保持连接,不错过任何信息。 第 1…...

linux 下常用变更-8

1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...

浅谈不同二分算法的查找情况

二分算法原理比较简单,但是实际的算法模板却有很多,这一切都源于二分查找问题中的复杂情况和二分算法的边界处理,以下是博主对一些二分算法查找的情况分析。 需要说明的是,以下二分算法都是基于有序序列为升序有序的情况&#xf…...

大学生职业发展与就业创业指导教学评价

这里是引用 作为软工2203/2204班的学生,我们非常感谢您在《大学生职业发展与就业创业指导》课程中的悉心教导。这门课程对我们即将面临实习和就业的工科学生来说至关重要,而您认真负责的教学态度,让课程的每一部分都充满了实用价值。 尤其让我…...

HDFS分布式存储 zookeeper

hadoop介绍 狭义上hadoop是指apache的一款开源软件 用java语言实现开源框架,允许使用简单的变成模型跨计算机对大型集群进行分布式处理(1.海量的数据存储 2.海量数据的计算)Hadoop核心组件 hdfs(分布式文件存储系统)&a…...

论文笔记——相干体技术在裂缝预测中的应用研究

目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...

基于 TAPD 进行项目管理

起因 自己写了个小工具,仓库用的Github。之前在用markdown进行需求管理,现在随着功能的增加,感觉有点难以管理了,所以用TAPD这个工具进行需求、Bug管理。 操作流程 注册 TAPD,需要提供一个企业名新建一个项目&#…...

基于Java+MySQL实现(GUI)客户管理系统

客户资料管理系统的设计与实现 第一章 需求分析 1.1 需求总体介绍 本项目为了方便维护客户信息为了方便维护客户信息,对客户进行统一管理,可以把所有客户信息录入系统,进行维护和统计功能。可通过文件的方式保存相关录入数据,对…...

windows系统MySQL安装文档

概览:本文讨论了MySQL的安装、使用过程中涉及的解压、配置、初始化、注册服务、启动、修改密码、登录、退出以及卸载等相关内容,为学习者提供全面的操作指导。关键要点包括: 解压 :下载完成后解压压缩包,得到MySQL 8.…...