Kafka进阶
Kafka进阶
Kafka事务
kafka的事务机制是指kafka支持跨多个主题和分区的原子性写入,即在一个事务中发送的所有消息要么全部成功,要么全部失败。
kafka的事务机制涉及到以下几个方面:
- 事务生产者(transactional producer):可以在一个事务中发送多个消息到不同的主题和分区,也可以从其他主题消费消息并发送到新的主题(实现流处理)。事务生产者需要指定一个唯一的transactional.id,用于标识不同的事务。
- 事务消费者(transactional consumer):可以消费事务生产者发送的消息,并且只有当事务提交后才能看到这些消息。事务消费者需要设置isolation.level为read_committed,以过滤掉未提交或中止的事务消息。
- 事务协调器(transaction coordinator):是运行在每个kafka broker上的一个模块,负责管理和分配ProducerID,维护每个transactional.id对应的事务状态,以及处理事务的提交或中止。
- 事务日志(transaction log):是kafka的一个内部主题,用于存储每个transactional.id对应的事务元数据,包括ProducerID、epoch、分区列表、状态等。¹²
kafka的事务机制大致流程如下:
- 事务生产者调用initTransactions方法,向集群请求一个ProducerID,并找到对应的事务协调器。
- 事务生产者调用beginTransaction方法,向事务协调器发送开始事务的请求,并递增epoch。
- 事务生产者调用send方法,向目标主题和分区发送消息,并将这些分区注册到事务协调器。
- 事务生产者调用commitTransaction或abortTransaction方法,向事务协调器发送提交或中止事务的请求,并将控制消息写入到已注册的分区中。
- 事务协调器根据控制消息和事务状态,决定是否将该事务标记为已提交或已中止,并更新事务日志。
- 事务消费者根据isolation.level设置,只消费已提交的事务消息,并忽略未提交或已中止的事务消息。
Kafka生产者幂等性
幂等性介绍
Kafka的幂等性是指生产者在发送消息时,可以保证同一个消息不会被重复写入到同一个分区中,即使发生了网络错误或者重试;
幂等性原理
Kafka的幂等性是基于生产者的ID和序号来实现的,每个生产者都有一个唯一的ID和一个递增的序号,当生产者发送消息时,会把这两个信息附加到消息中,当分区收到消息时,会根据这两个信息来判断是否是重复的消息。
Kafka的幂等性只能保证单个分区内的消息不重复,不能保证跨分区或跨主题的消息不重复。如果要实现更强的事务保证,需要使用Kafka的事务机制。
分区机制
分区的文件存储形式
Kafka分区中的文件是按照一定的规则进行存储的,主要有以下几个特点:
- 每个分区对应一个日志文件夹(log file),日志文件中存储的是生产者发送的消息。
- 日志文件又被分成多个段文件(segment file),每个段文件都有固定的大小限制,当达到限制时,就会关闭当前段文件,创建新的段文件。
- 段文件由两部分组成:一个是存储消息内容的“.log”文件,另一个是存储消息位置信息的“.index”文件。
- “.index”文件是稀疏索引文件,它记录了消息的偏移量(offset)和物理位置(position)之间的映射关系,方便消费者快速定位消息。
- 消息在日志文件中是顺序追加的,消息在分区中也是有序的,每个消息都有一个递增的偏移量,偏移量在分区内是唯一的。
- Kafka会定期删除过期的或者超过大小限制的段文件,以回收磁盘空间。删除策略可以根据时间或者大小来配置。
消费者如何消费分区
- 消费者消费数据时,首先需要知道自己要消费的分区和偏移量
- 分区是由消费者组(Consumer Group)内部的分区分配策略(Partition Assignor)来决定的,不同的策略会有不同的分配逻辑
- 偏移量是由消费者自己维护的,每次消费完一批消息后,消费者会把当前的偏移量提交到 Kafka 或者其他存储中,下次消费时会从上次提交的偏移量开始继续消费
- 当消费者知道了要消费的分区和偏移量后,它会向分区的 Leader Broker 发送拉取请求,请求从指定的偏移量开始拉取一批消息。
- Leader Broker 收到请求后,会根据偏移量在“.index”文件中查找对应的物理位置(Position),然后从“.log”文件中读取一批消息返回给消费者。
这样,消费者就可以在多个段文件中找到自己要消费的数据了。
生产者分区写入策略
按key分配策略(默认)
它会根据消息的键(key)来计算一个哈希值,并根据哈希值对分区数取模,得到目标分区的编号。如果消息没有键,或者键为空,它会随机选择一个可用的分区。
轮询策略
轮询的分区写入策略,它会按照分区的顺序依次将消息发送到每个分区上,不考虑消息的键或者值。这种策略可以实现消息的均匀分布。
自定义分区策略
自行实现Partitioner接口,自定义分区策略。
指定分区(与写入策略无关)
手动指定写入哪个分区。
随机策略(较早版本)
随机写入某个分区。
消息乱序问题
- 轮询策略和随机策略,造成kafka中的数据是乱序存储的
- 按 key 分区,一定程度上可以实现数据的有序存储——局部有序,但是又可能会造成数据倾斜
Producer的ACKs参数
producer配置的acks参数了,acks参数表示当生产者生产消息的时候,写入到副本的要求严格程度。它决定了生产者如何在性能和可靠性之间做取舍。
acks有3个值可选 0、1和-1(或者all),默认值为1,值为字符串类型,不是整数类型
-
0:producer发送后即为成功,无需分区partition的leader确认写入成功,性能最高
-
1:producer发送后需要接收到partition的leader发送确认收到的回复,性能中等
-
-1或者all:producer发送后,需要ISR中所有副本都成功写入成功才能收到成功响应,性能最慢
分区的leader与follower机制
AR、ISR、OSR
在实际环境中,leader有可能会出现一些故障,所以Kafka一定会选举出新的leader。在讲解leader选举之前,我们先要明确几个概念。Kafka中,把follower可以按照不同状态分为三类——AR、ISR、OSR
- AR(Assigned Replicas) 分区的所有副本
- ISR(In-Sync Replicas) 所有与leader副本保持一定程度同步的副本(包括 leader 副本)
- OSR(Out-of-Sync Replias) 由于follower副本同步滞后过多的副本(不包括 leader 副本)
AR = ISR + OSR, 正常情况下,所有的follower副本都应该与leader副本保持同步,即AR = ISR,OSR集合为空。
Leader选举
-
kafka启动时,会在所有的broker中选择一个controller,controller的选举由broker竞争决定。controller会负责创建topic、或者添加分区、修改副本数量之类的管理任务,包括leader的选举。controller也是高可用的,一旦某个broker崩溃,其他的broker会重新注册为controller
-
controller读取到当前分区的ISR,只要有一个Replica还幸存,就选择其中一个作为leader否则,则任意选这个一个Replica作为leader
Kafka生产、消费数据工作流程
Kafka数据写入流程
Kafka数据消费流程
消息不丢失机制
broker数据不丢失
生产者通过分区的leader写入数据后,所有在ISR中follower都会从leader中复制数据,这样,可以确保即使leader崩溃了,其他的follower的数据仍然是可用的
生产者数据不丢失
通过ACK机制来确保数据已经成功写入。
消费者数据不丢失
在消费者消费数据的时候,只要每个消费者记录好offset值即可,就能保证数据不丢失。offset值记录在zk中。
相关文章:

Kafka进阶
Kafka进阶 Kafka事务 kafka的事务机制是指kafka支持跨多个主题和分区的原子性写入,即在一个事务中发送的所有消息要么全部成功,要么全部失败。 kafka的事务机制涉及到以下几个方面: 事务生产者(transactional producer&#x…...
大数计算:e^1000/300!
1.问题:大数计算可能超出数据类型范围 当单独计算 ,因为 ,double的最大取值为1.79769e308,所以 肯定超过了double的表示范围。 同样,对于300!也是如此。 那我们应该怎么去计算和存储结果呢?…...

力扣164最大间距
1.前言 因为昨天写了一个基数排序,今天我来写一道用基数排序实现的题解,希望可以帮助你理解基数排序。 这个题本身不难,就是线性时间和线性额外空间(O(n))的算法,有点难实现 基数排序的时间复杂度是O(d*(nradix)),其中…...

聚观早报 | “百度世界2023”即将举办;2024款岚图梦想家上市
【聚观365】10月13日消息 “百度世界2023”即将举办 2024款岚图梦想家上市 腾势D9用户超10万 华为发布新一代GigaGreen Radio OpenAI拟进行重大更新 “百度世界2023”即将举办 “百度世界2023”将于10月17日在北京首钢园举办。届时,百度创始人、董事长兼首席执…...

Windows 应用程序监控重启
执行思路 1.定时关闭可执行程序,2.再通过定时监控启动可执行程序 定时启动关闭程序.bat echo off cd "D:\xxxx\" :: 可执行程序目录 Start "" /b xxxx.exe :: 可执行程序 timeout /T 600 /nobreak >nul :: 600秒 taskkill /IM xxxx.exe /…...

springboot 通过url下载文件并上传到OSS
DEMO流程 传入一个需要下载并上传的url地址下载文件上传文件并返回OSS的url地址 springboot pom文件依赖 <?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w…...

docker创建elasticsearch、elasticsearch-head部署及简单操作
elasticsearch部署 1 拉取elasticsearch镜像 docker pull elasticsearch:7.7.0 2 创建文件映射路径 mkdir /mydata/elasticsearch/data mkdir /mydata/elasticsearch/plugins mkdir /mydata/elasticsearch/config 3 文件夹授权 chmod 777 /mydata/elastic…...

竞赛选题 深度学习+python+opencv实现动物识别 - 图像识别
文章目录 0 前言1 课题背景2 实现效果3 卷积神经网络3.1卷积层3.2 池化层3.3 激活函数:3.4 全连接层3.5 使用tensorflow中keras模块实现卷积神经网络 4 inception_v3网络5 最后 0 前言 🔥 优质竞赛项目系列,今天要分享的是 🚩 *…...

Codeforces Round 903 (Div. 3)ABCDE
Codeforces Round 903 (Div. 3)ABCDE 目录 A. Dont Try to Count题目大意思路核心代码 B. Three Threadlets题目大意思路核心代码 C. Perfect Square题目大意思路核心代码 D. Divide and Equalize题目大意思路核心代码 E. Block Sequence题目大意思路核心代码 A. Don’t Try t…...

C# 与 C/C++ 的交互
什么是平台调用 (P/Invoke) P/Invoke 是可用于从托管代码访问非托管库中的结构、回调和函数的一种技术。 托管代码与非托管的区别 托管代码和非托管代码的主要区别是内存管理方式和对计算机资源的访问方式。托管代码通常运行在托管环境中,如 mono 或 java 虚拟机等…...

新版Android Studio搜索不到Lombok以及无法安装Lombok插件的问题
前言 在最近新版本的Android Studio中,使用插件时,在插件市场无法找到Lombox Plugin,具体表现如下图所示: 1、操作步骤: (1)打开Android Studio->Settings->Plugins,搜索Lom…...

BST二叉搜索树
文章目录 概述实现创建节点查找节点增加节点查找后驱值根据关键词删除找到树中所有小于key的节点的value 概述 二叉搜索树,它具有以下的特性,树节点具有一个key属性,不同节点之间key是不能重复的,对于任意一个节点,它…...

【Leetcode】211. 添加与搜索单词 - 数据结构设计
一、题目 1、题目描述 请你设计一个数据结构,支持 添加新单词 和 查找字符串是否与任何先前添加的字符串匹配 。 实现词典类 WordDictionary : WordDictionary() 初始化词典对象void addWord(word) 将 word 添加到数据结构中,之后可以对它…...

Discuz户外旅游|旅行游记模板/Discuz!旅行社、旅游行业门户网站模板
价值328的discuz户外旅游|旅行游记模板,本模板需要配套【仁天际-PC模板管理】插件使用。 模板说明 1、模板页面宽度1200px,简洁大气,较适合户外旅行、骑行、游记、摩旅、旅游、活动等类型的论坛、频道网站; 2、所优化的页面有&…...

【重拾C语言】十一、外部数据组织——文件
目录 前言 十一、外部数据组织——文件 11.1 重新考虑户籍管理问题——文件 11.2 文件概述 11.2.1 文件分类 11.2.2 文件指针、标记及文件操作 11.3 打开、关闭文件 11.4 I/O操作 11.4.1 字符读写 11.4.2 字符串读写 11.4.3 格式化读写 11.4.4 数据块读写 11.4.5 …...

dpdk/spdk/网络协议栈/存储/网关开发/网络安全/虚拟化/ 0vS/TRex/dpvs技术专家成长体系教程
课程围绕安全,网络,存储,云原生4个维度去讲解核心技术点。 6个专栏组成:dpdk网络专栏、存储技术专栏、安全与网关开发专栏、虚拟化与云原生专栏、测试工具专栏、性能测试专栏 一、dpdk网络 dpdk基础知识 多队列网卡࿰…...

树莓派玩转openwrt软路由:5.OpenWrt防火墙配置及SSH连接
1、SSH配置 打开System -> Administration,打开SSH Access将Interface配置成unspecified。 如果选中其他的接口表示仅在给定接口上侦听,如果未指定,则在所有接口上侦听。在未指定下,所有的接口均可通过SSH访问认证。 2、防火…...

Gin:获取本机IP,获取访问IP
获取本机IP func GetLocalIP() []string {var ipStr []stringnetInterfaces, err : net.Interfaces()if err ! nil {fmt.Println("net.Interfaces error:", err.Error())return ipStr}for i : 0; i < len(netInterfaces); i {if (netInterfaces[i].Flags & ne…...

缓存降级代码结构设计
缓存降级设计思想 接前文缺陷点 本地探针应该增加计数器,多次异常再设置,避免网络波动造成误判。耦合度过高,远端缓存和本地缓存应该平行关系被设计为上下游关系了。公用的远端缓存的操作方法应该私有化,避免集成方代码误操作&…...

一文深入理解高并发服务器性能优化
我们现在已经搞定了 C10K并发连接问题 ,升级一下,如何支持千万级的并发连接?你可能说,这不可能。你说错了,现在的系统可以支持千万级的并发连接,只不过所使用的那些激进的技术,并不为人所熟悉。…...

pytorch中的归一化函数
在 PyTorch 的 nn 模块中,有一些常见的归一化函数,用于在深度学习模型中进行数据的标准化和归一化。以下是一些常见的归一化函数: nn.BatchNorm1d, nn.BatchNorm2d, nn.BatchNorm3d: 这些函数用于批量归一化 (Batch Normalization…...

【管理运筹学】第 10 章 | 排队论(1,排队论的基本概念)
文章目录 引言一、基本概念1.1 排队过程1.2 排队系统的组成和特征1.3 排队模型的分类1.4 系统指标1.5 系统状态 引言 开一点排队论的内容吧,方便做题。 排队论(Queuing Theory)也称随机服务系统理论,是为解决一系列排队问题&…...

【Express】服务端渲染(模板引擎 EJS)
EJS(Embedded JavaScript)是一款流行的模板引擎,可以用于在Express中创建动态的HTML页面。它允许在HTML模板中嵌入JavaScript代码,并且能够生成基于数据的动态内容。 下面是一个详细的讲解和示例,演示如何在Express中…...

Linux CentOS8安装gitlab_ce步骤
1 下载安装包 wget --content-disposition https://packages.gitlab.com/gitlab/gitlab-ce/packages/el/8/gitlab-ce-15.0.2-ce.0.el8.x86_64.rpm/download.rpm2 安装gitlab yum install policycoreutils-python-utilsrpm -Uvh gitlab-ce-15.0.2-ce.0.el8.x86_64.rpm3 更新配…...

RabbitMq启用TLS
Windows环境 查看配置文件的位置 选择使用的节点 查看当前节点配置文件的配置 配置TLS 将证书放到同配置相同目录中 编辑配置文件添加TLS相关配置 [{ssl, [{versions, [tlsv1.2]}]},{rabbit, [{ssl_listeners, [5671]},{ssl_options, [{cacertfile,"C:/Users/17126…...

CakePHP 3.x/4.x反序列化RCE链
最近网上公开了cakephp一些反序列化链的细节,但是没有公开poc,并且网上关于cakephp的反序列化链比较少,于是自己跟一下 ,构造pop链。 CakePHP简介 CakePHP是一个运用了诸如ActiveRecord、Association Data Mapping、Front Contr…...

练习之C++[3]
文章目录 1.模板类2.模板声明3.string类 1.模板类 模板可以具有非类型参数,用于指定大小,可以根据指定的大小创建动态结构所以可用来创建动态增长和减小的数据结构模板运行时不检查数据类型,也不保证类型安全,相当于类型的宏替换…...

[MT8766][Android12] 修改WIFI热点默认名称、密码、IP地址以及默认开启热点
文章目录 开发平台基本信息问题描述解决方法 开发平台基本信息 芯片: MTK8766 版本: Android 12 kernel: msm-4.19 问题描述 最近做了一款没有屏幕显示的智能盒子,要想操控这款设备就只能通过adb投屏,如果默认不允许有线连接,那么要怎么实…...

【嵌入式】堆栈与单片机内存
堆栈 在片内RAM中,常常要指定一个专门的区域来存放某些特别的数据 它遵循顺序存取和后进先出(LIFO/FILO)的原则,这个RAM区叫堆栈。 其实堆栈就是单片机中的一些存储单元,这些存储单元被指定保存一些特殊信息,比如地址࿰…...

十大排序算法Java实现及时间复杂度
文章目录 十大排序算法选择排序冒泡排序插入排序希尔排序快速排序归并排序堆排序计数排序基数排序桶排序时间复杂度 参考资料 十大排序算法 选择排序 原理 从待排序的数据元素中找出最小或最大的一个元素,存放在序列的起始位置, 然后再从剩余的未排序元…...