Kakfa - Producer机制原理与调优
Producer是Kakfa模型中生产者组件,也就是Kafka架构中数据的生产来源,虽然其整体是比较简单的组件,但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么,怎么发送的消息?IO通讯模型是什么?在实际工作中,怎么调优来实现高效性?
简单的生产者程序:
一、客户端初始化 KafkaProducer
new KafkaProducer() 是Producer初始化过程,比如Interceptor、Serializer、Partitioner、RecordAccumulator等。
当我们使用KafkaProducer发送消息的时候,消息会经过拦截器(Interceptor
)、序列化器(Serializer
)和分区器(Partitioner
),最后会暂存到消息收集器(RecordAccumulator
)中,最终读取按批次发送。
以下跟踪比较核心的机制流程:
1、 初始化RecordAccumulator记录累加器
简单介绍:RecordAccumulator可以理解为Producer发送数据缓冲区,Producer数据发送时并不会直接连接Broker后,一条一条的发送,而是会将数据(Record)放入RecordAccumulator中按批次发送。
2、初始化Sender的Iothread,在Producer在初始化过程中,会额外的创建一个ioThread。
二、Send方法
到此位置Kafka只是做了一些初始化的工作,没有与kafka集群建立连接,更没有相关元数据信息。那继续看send中的doSend方法。
private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {TopicPartition tp = null;try {try {// waitOnMetadata更新元数据clusterAndWaitTime = this.waitOnMetadata(record.topic(),record.partition(), this.maxBlockTimeMs);} catch (KafkaException var19) {Cluster cluster = clusterAndWaitTime.cluster;........byte[] serializedKey;try {// 序列化serializedKey = this.keySerializer.serialize(record.topic(), record.headers(), record.key());} catch (ClassCastException var18) {........byte[] serializedValue;try {serializedValue = this.valueSerializer.serialize(record.topic(), record.headers(), record.value());} // 获取元数据中partition信息int partition = this.partition(record, serializedKey, serializedValue, cluster);// ..........// 数据append到accumulator中RecordAppendResult result = this.accumulator.append(tp, timestamp, serializedKey, serializedValue, headers, interceptCallback, remainingWaitMs);..........return result.future;
1、WaitonMetadat元数据更新
方法内部会优先判断当前Cluster是否存在元数据partitions,如果不存在意味着还没有建立连接获取元数据,此时它会wakeup唤醒sender线程。
注意:此时Cluster并不是完全时空的,它已经有指定的Node列表信息。
在早期版本的时候元数据是存储在zookeeper中的, 元数据指的是集群中分区信息、节点信息、以及节点、主题、分区的映射关系等。在生产者启动的时候没有元数据的支撑,是无法进行数据的发送的,等于瞎子。但是zookeeper存储元数据,在并发场景下会对zookeeper产生网卡压力,那就意味着要保障Kakfa可靠性的前提就要保障zookeeper的可靠性。
所以在1.0版本之后,Kafka将元数据维护在了Broker节点中。Producer可以通过Borker获取元数据,减少对zookeeper的依赖。只有一些核心的内容交给zookeeper做分布式协调。
2、Sender线程run方法
Sender线程中run方法,一个while(runing),这是一中Loop过程一种常见的响应式编程方式,比如Redis服务中也是一种EventLoop事件轮询过程。
其内部核心方法NetWorkClient.poll实现了客户端连接、数据发送、事件处理工作。
metadataUpdater.maybeUpdate方法在第一次被执行时,因为没有元数据节点信息,会执行this.maybeUpdate(now, node)方法,方法内部实现了initiateConnect方法用于客户端建立连接,其底层就是使用的Java Nio的Selector多路复用器。
建立连接之后,nioSelector.select()等待事件响应。
之后触发handleCompletedReceives处理器进行元数据同步过程。
注意: 在完成元数据更新以后,metadata.update会调用 this.notifyAll(),唤醒阻塞的main线程,进行数据发送工作。
到此为止主线程waitOnMetadata方法完成元数据的更新。
之后main就开始处理Serializer序列化,获取partition元数据信息,以及数据发送工作。
3、RecordAccumulator 记录累加器
生产者在发送数据时,并不是建立连接后每消息发送的,而是会将消息按批次发送。RecordAccumulator 对象中batches会为每一个TopicPartition维护一个双端队列。用于缓存record数据。
ConcurrentMap<TopicPartition, Deque<ProducerBatch>> batches
TopicPartition 主题分区:缓冲区按照主题分为不同的双端队列
Deque<ProducerBatch> 双端队列,ProducerBatch:一批次的数据(多个数据,默认容量16k)
结构如下:
生产者在往batches中添加数据时,使用了Sychronized,所以Producer在多线程场景下是线程安全的。
为什么要有RecordAccumulator ?
RecordAccumulator的主要作用是暂存Main Thread
发送过来的消息,然后Sender Thread
就可以从RecordAccumulator中批量的获取到消息,减少单个消息获取的请求次数,减少网卡IO压力,提升性能效率。
相关参数配置以及调优点:
1、RecordAccumulator buffer.memory默认大小32mb
指每一个new KafkaProducer中RecordAccumulator的batches所有承载的最大buffer.memory=32mb。
设置方式:properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG
, 3210241024);
如果RecordAccumulator 缓存空间满时,会进行阻塞,等待数据被消费,如果指定时间内消息没有发送除去,即仍然是满状态,则抛出异常,默认60s。
设置方式:properties.put(ProducerConfig.MAX_BLOCK_MS_CONFIG
, 60*1000);
优化点: 根据业务需求,如果TopicPartition较多,而数据量很大时,这是及时单个TopicPartition中batche很少,可能总的容量也超过32mb,这时可以扩大buffer_memery大小。
2、Kafka中单个batche大小默认为16k。
指每个batche大小为16k。batche用于存储数据Record。
如果record < 16k,则batche可以存储多个数据,此时batche空间是会被重复利用的。
如果record > 16k,则当前record会额外申请存储空间,使用完后销毁。
优化点:batche大小需要根据业务评估,不要有过多大record存在,确保每一个batche可以容纳record,尽量减少内存空间的频繁申请和销毁,以及内存碎片化。
3、同步阻塞和非阻塞的选择
RecordAccumulator用于支持分批次发送数据。在KafkaProducer中send方法是异步接口,通过 send.get()方法可以使其阻塞,等待数据返回。
实现同步的发送数据,需要等待kafka接收了record后响应,producer才会进行下一个record发送。此时虽然会有更高一致性,但RecordAccumulator就失去了意义。
非阻塞send情况下,当生产和消费端IO不对称时,可以通过LINGRE_MS_CONFG 30 来要求sender线程每次拉取RecordAccumulator中数据时,等待一段时间再拉取,尽量确保按批次拉取,减少更多的网络IO。
设置方式:properties.put(ProducerConfig.LINGRE_MS_C0NFG
, 0);
继续内容分析
到此当Main线程将数据append到RecordAccumulator容器后,其核心的工作就结束了,此时它也会调用sender.wakeup,告知已经有数据需要处理了,并确保sender线程不会select阻塞住。
Sender线程是一个Loop过程,在发送数据过程中,会从RecordAccumulator中拉取批次数据进行打包发送,并不是一个个batche发送。默认封装的包大小为1mb。
设置方式:pp.setProperty(ProducerConfig.MAX_REQUEST_SIZE_CONFIG , String.valueOf(1 * 1024 * 1024))
Sender线程在真正发送数据前,还额外存储了Request数据到InFilgntRequest(飞行中的包),InFilgntRequest 默认大小为5,意思是指生产者向kafka发送5个包request后,都没有回应时,则停止发送变成阻塞状态。
这种设计在同步发送过程没有作用的,因为同步过程是每请求返回的。
SEND_BUFFER_CONFIG 发送缓冲区配置、RECEIVE_BUFFER_CONFIG 接收缓冲区配置,这两个就是IO层的缓冲区配置了,不同的操作系统可能不一样。设置成-1,代表默认使用系统分配大小。
pp.setProperty(ProducerConfig.SEND_BUFFER_CONFIG , String.valueOf(32 * 1024)); pp.setProperty(ProducerConfig.RECEIVE_BUFFER_CONFIG , String.valueOf(32 * 1024));
查看内核默认配置:
到此Producer整个数据发送流程机制就清楚了,Ack的设定涉及到Broker数据同步和Consumer消费状态,这块单独再进行分析。
总结一下:
1、Producer的实现是由Main线程和Sender线程组合完成的。
Main线程核心完成了数据的输入、Producer初始化和数据append到RecordAccumulator工作,具体的元数据的更新、数据发送等IO操作都是都Sender线程完成。
Sender线程工作模式是中间件中比较常见的响应式编程模式。其在Loop过程中进行客户端连接、元数据更新、数据打包发送等工作。
2、Kakfa中IO操作封装了Java中Nio的实现(Selector),底层是多路复用器的实现,而不是netty。
3、Producer发送数据过程并不是简单的一条一条数据发送,其内部封装RecordAccumulator、Batche、Request包,可以实现按批次发送数据,减少IO次数。同时结合FilghtRequest飞行中请求大小限制,确保kafka未正常响应时,抛出异常防止数据丢失。在开发过程中,可以通过调整参数,来达到优化目的。
相关文章:
Kakfa - Producer机制原理与调优
Producer是Kakfa模型中生产者组件,也就是Kafka架构中数据的生产来源,虽然其整体是比较简单的组件,但依然有很多细节需要细品一番。比如Kafka的Producer实现原理是什么,怎么发送的消息?IO通讯模型是什么?在实…...
基于图像形态学处理和边缘提取算法的路面裂痕检测matlab仿真
目录 1.算法运行效果图预览 2.算法运行软件版本 3.部分核心程序 4.算法理论概述 5.算法完整程序工程 1.算法运行效果图预览 2.算法运行软件版本 matlab2022a 3.部分核心程序 [Rr,Cc] size(Image1);% 获取 Image1 矩阵的大小(行数和列数) % 创…...
opencv 基础(持续更新中)
1 前言 https://www.couragesteak.com/ 2 安装 3 基础属性demo 打开一张图片: import cv2img cv2.imread(./girl.jpg)print(img.shape) # (1536, 1024, 3) 数组形状 print(type(img)) # numpy 数组 print(img) # 三维数组(彩色图片&am…...
科普现场!万博智云参加第五届张江汇智科普节
9月15日,第五届张江汇智科普节在汇智国际商业中心如期开展,展会中汇集了众多信息科技领域的新兴产品,展示内容主要分为国产替代和元宇宙场景展示两个方面。展现国产化最新科技成果,践行技术普惠理念,把高、精、专的技术…...
【记录】实现从Linux下载下载文件(文件导出功能)并记录过程产生的BUG问题。
前言 导出功能的实现,主要记录总结导出过程中出现的一些问题。 代码实现导出功能 public R templateDown(HttpServletResponse response) {String fileName "template.xlsx";// 清空responseresponse.reset();response.setCharacterEncoding("UTF…...
可扩展性表设计方案
文章目录 1 使用预留字段2 使用JSON字段3 使用单表继承4 构建属性表5 直接构建新表6 适当冗余 1 使用预留字段 在表设计初期,可以预留一些命名通用的备用字段,例如field1、field2、field3。当业务需要增加新字段时,就直接使用这些预留字段,无…...
Scotch: Combining SGX and SMM to Monitor Cloud Resource Usage【TEE的应用】
目录 摘要引言贡献 背景SMMXen Credit Scheduler与资源核算SGX 威胁模型Scheduler attacksResource interference attacksVM Escape attacks 架构Resource Accounting WorkflowCost of Accounting 具体的部署和评估见论文相关工作Resource Accounting基于SMM的方法基于SGX的系统…...
腾讯mini项目-【指标监控服务重构】2023-08-19
今日已办 benchmark How can we create a configuration for gobench with -benchmem – IDEs Support (IntelliJ Platform) | JetBrains 本机进行watermill-benchmark 使用 apifox 自动化测试上报固定数量的消息 启动watermill-pub/sub的 benchmark 函数 func BenchmarkPu…...
go实现grpc-快速开始
准备工作 Go, 最新版的 如果不会安装看Getting Started. Protocol buffer compiler, protoc, version 3. 想要安装, 请读Protocol Buffer Compiler Installation. 为 protocol compiler安装Go plugins: 想要安装运行以下命令: $ go install google.golang.org/protobuf/cmd/…...
linux上的init 0-6指令作用以及一些快捷键和系统指令
目录 linux上的init 0-6指令作用 CtrlAltF1-F7作用 Linux常用系统指令 查看linux内核版本 ubuntu和centos查看系统版本信息以及硬件信息 linux上的init 0-6指令作用 在Linux系统中,运行级别(也称为init级别)用来表示系统的不同状态或操作…...
Mixin 混入
Mixin 混入 混入 (mixin) 提供了一种非常灵活的方式,来分发 Vue 组件中的可复用功能。一个混入对象可以包含任意组件选项。当组件使用混入对象时,所有混入对象的选项将被“混合”进入该组件本身的选项。 怎么理解呢,就是每一个组件都会有一…...
pycharm快捷键
CtrlAltL 代码规范化 CtrlHome 回到代码最开始 CtrlEnd 回到代码最后面 shift回车 鼠标任意位置的下一行 altj 一直按可以选中相同的变量 alt鼠标左键 可以选择多个需要修改的变量或值 将光标放在某一行,home到最前面,end到最后…...
【面试刷题】——Linux基础命令
Linux基础命令是在Linux操作系统中执行常见任务的一组命令。以下是一些常用的Linux基础命令,它们用于管理文件系统、执行系统任务、查看文件内容等。 文件和目录操作: ls: 列出目录中的文件和子目录。 pwd: 显示当前工作目录的路径。 cd: 更改当前工作…...
第四步 Vue2 配置ESLint
ESLint 是一个广泛使用的 JavaScript 代码检查工具,可以帮助开发者在编写代码时发现并修复潜在的问题和错误。 在 第一步 创建工程 时虽然已经选择了包含 ESLint 预设配置,但还需要做一些调整,让我们使用起来能够更加的丝滑。 vue.config.j…...
[.NET学习笔记] - Thread.Sleep与Task.Delay在生产中应用的性能测试
场景 有个Service类,自己在内部实现生产者/消费者模式。即多个指令输入该服务后对象后,Service内部有专门的消费线程执行传入的指令。每个指令的执行间隔为1秒。这里有两部分组成, 工作线程的载体。new Thread与Task.Run。执行等待的方法。…...
【单线图的系统级微电网仿真】基于 PQ 的可再生能源和柴油发电机组微电网仿真(Simulink)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
人脸识别技术应用安全管理规定(试行)|企业采用人脸打卡方式,这4条规定值得关注
近日,为规范人脸识别技术应用,国家互联网信息办公室起草了,并向全社会公开征求意见。该规定一共列举了25条,企业如借助人脸识别技术采集考勤打卡数据,以下4条规定值得关注。 第四条 只有在具有特定的目的和充分的必要…...
leetcode 817. 链表组件(java)
链表组件 题目描述HashSet 模拟 题目描述 给定链表头结点 head,该链表上的每个结点都有一个 唯一的整型值 。同时给定列表 nums,该列表是上述链表中整型值的一个子集。 返回列表 nums 中组件的个数,这里对组件的定义为:链表中一段…...
分布式事务基础理论
基础概念 什么是事务 什么是事务?举个生活中的例子:你去小卖铺买东西,“一手交钱,一手交货”就是一个事务的例子,交钱和交货必 须全部成功,事务才算成功,任一个活动失败,事务将撤销…...
《打造高可用PostgreSQL:策略与工具》
🌷🍁 博主猫头虎(🐅🐾)带您 Go to New World✨🍁 🐅🐾猫头虎建议程序员必备技术栈一览表📖: 🛠️ 全栈技术 Full Stack: 📚…...
【八大经典排序算法】快速排序
【八大经典排序算法】快速排序 一、概述二、思路实现2.1 hoare版本2.2 挖坑法2.3 前后指针版本 三、优化3.1 三数取中3.1.1 最终代码3.1.2 快速排序的特性总结 四、非递归实现快排 一、概述 说到快速排序就不得不提到它的创始人 hoare了。在20世纪50年代,计算机科学…...
vue 父组件给子组件传递一个函数,子组件调用父组件中的方法
vue 中父子组件通信,props的数据类型可以是 props: {title: String,likes: Number,isPublished: Boolean,commentIds: Array,author: Object,callback: Function,contactsPromise: Promise // or any other constructor }在父组件中,我们在子组件中给他…...
docker 获取Nvidia 镜像 | cuda |cudnn
本文分享如何使用docker获取Nvidia 镜像,包括cuda10、cuda11等不同版本,cudnn7、cudnn8等,快速搭建深度学习环境。 1、来到docker hub官网,查看有那些Nvidia 镜像 https://hub.docker.com/r/nvidia/cuda/tags?page2&name11.…...
uTool快捷指令
send("************"); quickcommand.sleep(200); keyTap("enter");...
R reason ‘拒绝访问‘的解决方案
Win11系统 安装rms的时候报错: Error in loadNamespace(j <- i[[1L]], c(lib.loc, .libPaths()), versionCheck vI[[j]]) : namespace Matrix 1.5-4.1 is already loaded, but > 1.6.0 is required## 安装rms的时候报错,显示Matrix的版本太低…...
许战海战略文库|品类缩量时代:制造型企业如何跨品类打造份额产品?
所有商业战略的本质是围绕着竞争优势与竞争效率展开的。早期,所有品牌立足于从局部竞争优势出发。因此,品牌创建初期大多立足于单个品类。后期增长受限,就要跨品类持续扩大竞争优势,将局部竞争优势转化为长期竞争优势,如果固化不前很难获得增…...
BIT-4-数组
一维数组的创建和初始化一维数组的使用 一维数组在内存中的存储 二维数组的创建和初始化二维数组的使用二维数组在内存中的存储 数组越界数组作为函数参数数组的应用实例1:三子棋 数组的应用实例2:扫雷游戏 1. 一维数组的创建和初始化 1.1 数组的创建 …...
L9945的H桥续流模式
在H桥的配置中,包括两种续流模式:主动续流和被动续流。 一个L9945可输出两个H桥驱动。HB1在CMD3中配置,HB2在CMD7中配置。 主动续流:通过Q3的MOS的二极管来续流 被动续流:通过Q3外部的二极管来续流...
Ubuntu20.04安装Nvidia显卡驱动、CUDA11.3、CUDNN、TensorRT、Anaconda、ROS/ROS2
1.更换国内源 打开终端,输入指令: wget http://fishros.com/install -O fishros && . fishros 选择【5】更换系统源,后面还有一个要输入的选项,选择【0】退出,就会自动换源。 2.安装NVIDIA驱动 这一步最痛心…...
linux下使用crontab定时器,并且设置定时不执行的情况,附:项目启动遇到的一些问题和命令
打开终端,以root用户身份登录。 运行以下命令打开cron任务编辑器: crontab -e 如果首次编辑cron任务,会提示选择编辑器。选择你熟悉的编辑器,比如nano或vi,并打开相应的配置文件。 在编辑器中,添加一行类…...
网站建设流程案例/海南快速seo排名优化
public SqlParameter(string parameterName,Object value )当使用以上SqlParameter的构造函数时,如果这个Object为整型0的话,会构造失败。比如说SqlParameter[] sqlParas {new SqlParameter("WebUserId", WebUserId),new SqlParameter("…...
五莲县财源建设网站/网站如何推广营销
价值君注:2017年最火的游戏莫过于“吃鸡”类游戏,这类游戏典型的代表《绝地大逃杀》目前总销量超过3000万份。而中国玩家贡献销量占42%,活跃用户数中国玩家更是超过一半。这其中,学生群体占了极大比例,“吃鸡”成了各大…...
网站的作用和意义/成都网络推广优化
出处未知在linux的PC上挂载jffs2根文件系统映像因为jffs2是构建于MTD设备上的文件系统,所以无法通过loop设备来挂载,但是可以通过mtdram设备来挂载。mtdram是在用RAM实现的 MTD设备,可以通过mtdblock设备来访问。使用mtdram设备很简单&#x…...
气象网站建设的自查报告/百度认证
虚拟机能在现有的系统上虚拟出多个独立的系统,尤其是开发人员测试人员经常会使用,但是大家经常对虚拟机的运行速度不甚满意,甚至经常很恼火。虚拟机速度慢有很多原因,需要根据自己的情况具体分析,本文根据笔者的使用经…...
网站建设后怎样发信息/百度优化服务
QYResearch预测:2020-2026全球与中国邮件签名生成软件市场现状及未来发展趋势【纸版价格】:RMB 18000【电子版(PDF)价格】:RMB 18000【报告篇幅】:106【报告图表数】:134【报告出版时间】:2020年2月报告摘要…...
wordpress手机apo/刚刚刚刚刚刚好痛
/*** 验证测试类*/public class ValidationTest {// 验证器对象private Validator validator;// 待验证对象private UserInfo userInfo;// 验证结果集合private Set> set;// 验证结果集合private Set> otherSet;/*** 初始化操作*/Beforepublic void init() {// 初始化验证…...