Kafka生产问题总结及性能优化实践
1、消息丢失情况
消息发送端:
(1)acks=0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高,对数据丢失不敏感的情况可以用这种。
(2)acks=1: 至少要等待leader已经成功将数据写入本地log,但是不需要等待所有follower是否成功写入。就可以继续发送下一条消息。这种情况下,如果follower没有成功备份数据,而此时leader又挂掉,则消息会丢失。
(3)acks=-1或all: 这意味着leader需要等待所有备份(min.insync.replicas配置的备份个数)都成功写入日志,这种策略会保证只要有一个备份存活就不会丢失数据。这是最强的数据保证。一般除非是金融级别,或跟钱打交道的场景才会使用这种配置。当然如果min.insync.replicas配置的是1则也可能丢消息,跟acks=1情况类似。
消息消费端:
如果消费这边配置的是自动提交,万一消费到数据还没处理完,就自动提交offset了,但是此时你consumer直接宕机了,未处理完的数据丢失了,下次也消费不到了。
2、消息重复消费
消息发送端:
发送消息如果配置了重试机制,比如网络抖动时间过长导致发送端发送超时,实际broker可能已经接收到消息,但发送方会重新发送消息。
消息消费端:
如果消费这边配置的是自动提交,刚拉取了一批数据处理了一部分,但还没来得及提交,服务挂了,下次重启又会拉取相同的一批数据重复处理。
一般消费端都是要做消费幂等处理的。
3、消息乱序
如果发送端配置了重试机制,kafka不会等之前那条消息完全发送成功才去发送下一条消息,这样可能会出现,发送了1,2,3条消息,第一条超时了,后面两条发送成功,再重试发送第1条消息,这时消息在broker端的顺序就是2,3,1了。所以,是否一定要配置重试要根据业务情况而定。也可以用同步发送的模式去发消息,当然acks不能设置为0,这样也能保证消息从发送端到消费端全链路有序。
kafka保证全链路消息顺序消费,需要从发送端开始,将所有有序消息发送到同一个分区,然后用一个消费者去消费,但是这种性能比较低,可以在消费者端接收到消息后将需要保证顺序消费的几条消费发到内存队列(可以搞多个),一个内存队列开启一个线程顺序处理消息。
4、消息积压
1)线上有时因为发送方发送消息速度过快,或者消费方处理消息过慢,可能会导致broker积压大量未消费消息。此种情况如果积压了上百万未消费消息需要紧急处理,可以修改消费端程序,让其将收到的消息快速转发到其他topic(可以设置很多分区),然后再启动多个消费者同时消费新主题的不同分区。
2)由于消息数据格式变动或消费者程序有bug,导致消费者一直消费不成功,也可能导致broker积压大量未消费消息。
此种情况可以将这些消费不成功的消息转发到其它队列里去(类似死信队列),后面再慢慢分析死信队列里的消息处理问题。
5、延时队列
延时队列存储的对象是延时消息。所谓的“延时消息”是指消息被发送以后,并不想让消费者立刻获取,而是等待特定的时间后,消费者才能获取这个消息进行消费,延时队列的使用场景有很多, 比如 :
1)在订单系统中, 一个用户下单之后通常有 30 分钟的时间进行支付,如果 30 分钟之内没有支付成功,那么这个订单将进行异常处理,这时就可以使用延时队列来处理这些订单了。
2)订单完成1小时后通知用户进行评价。
实现思路:发送延时消息时先把消息按照不同的延迟时间段发送到指定的队列中(topic_1s,topic_5s,topic_10s,…topic_2h,这个一般不能支持任意时间段的延时),然后通过定时器进行轮训消费这些topic,查看消息是否到期,如果到期就把这个消息发送到具体业务处理的topic中,队列中消息越靠前的到期时间越早,具体来说就是定时器在一次消费过程中,对消息的发送时间做判断,看下是否延迟到对应时间了,如果到了就转发,如果还没到这一次定时任务就可以提前结束了。
6、消息回溯
如果某段时间对已消费消息计算的结果觉得有问题,可能是由于程序bug导致的计算错误,当程序bug修复后,这时可能需要对之前已消费的消息重新消费,可以指定从多久之前的消息回溯消费,这种可以用consumer的offsetsForTimes、seek等方法指定从某个offset偏移的消息开始消费,参见上节课的内容。
7、分区数越多吞吐量越高吗
可以用kafka压测工具自己测试分区数不同,各种情况下的吞吐量
# 往test里发送一百万消息,每条设置1KB
# throughput 用来进行限流控制,当设定的值小于 0 时不限流,当设定的值大于 0 时,当发送的吞吐量大于该值时就会被阻塞一段时间
bin/kafka‐producer‐perf‐test.sh ‐‐topic test ‐‐num‐records 1000000 ‐‐record‐size 1024 ‐‐throughput ‐1
‐‐producer‐props bootstrap.servers=192.168.65.60:9092 acks=1
网络上很多资料都说分区数越多吞吐量越高 , 但从压测结果来看,分区数到达某个值吞吐量反而开始下降,实际上很多事情都会有一个临界值,当超过这个临界值之后,很多原本符合既定逻辑的走向又会变得不同。一般情况分区数跟集群机器数量相当就差不多了。
当然吞吐量的数值和走势还会和磁盘、文件系统、 I/O调度策略等因素相关。
注意:如果分区数设置过大,比如设置10000,可能会设置不成功,后台会报错"java.io.IOException : Too many open files"。
异常中最关键的信息是“ Too many open flies”,这是一种常见的 Linux 系统错误,通常意味着文件描述符不足,它一般发生在创建线程、创建 Socket、打开文件这些场景下 。 在 Linux系统的默认设置下,这个文件描述符的个数不是很多 ,通过 ulimit -n 命令可以查看:一般默认是1024,可以将该值增大,比如:ulimit -n 65535
8、消息传递保障
at most once(消费者最多收到一次消息,0-1次):acks = 0 可以实现。
at least once(消费者至少收到一次消息,1-多次):ack = all 可以实现。
exactly once(消费者刚好收到一次消息):at least once 加上消费者幂等性可以实现,还可以用kafka生产者的幂等性来实
现。
kafka生产者的幂等性::因为发送端重试导致的消息重复发送问题,kafka的幂等性可以保证重复发送的消息只接收一次,只需在生产者加上参数 props.put(“enable.idempotence”, true) 即可,默认是false不开启。
具体实现原理是,kafka每次发送消息会生成PID和Sequence Number,并将这两个属性一起发送给broker,broker会将PID和
Sequence Number跟消息绑定一起存起来,下次如果生产者重发相同消息,broker会检查PID和Sequence Number,如果相同不会再接收。
PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。生产者如果重启则会生成新的PID。
Sequence Number:对于每个 PID,该 Producer 发送到每个 Partition 的数据都有对应的序列号,这些序列号是从0开始单调递增的。
9、kafka的事务
Kafka的事务不同于Rocketmq,Rocketmq是保障本地事务(比如数据库)与mq消息发送的事务一致性,Kafka的事务主要是保障一次发送多条消息的事务一致性(要么同时成功要么同时失败),一般在kafka的流式计算场景用得多一点,比如,kafka需要对一个topic里的消息做不同的流式计算处理,处理完分别发到不同的topic里,这些topic分别被不同的下游系统消费(比如hbase,redis,es等),这种我们肯定希望系统发送到多个topic的数据保持事务一致性。Kafka要实现类似Rocketmq的分布式事务需要额外开发功能。
kafka的事务处理可以参考官方文档:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "my‐transactional‐id");
Producer<String, String> producer = new KafkaProducer<>(props, new StringSerializer(), new StringSerializer());
//初始化事务
producer.initTransactions();try {
//开启事务
producer.beginTransaction();
for (int i = 0; i < 100; i++){
//发到不同的主题的不同分区
producer.send(new ProducerRecord<>("hdfs‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("es‐topic", Integer.toString(i), Integer.toString(i)));
producer.send(new ProducerRecord<>("redis‐topic", Integer.toString(i), Integer.toString(i)));
}
//提交事务
producer.commitTransaction();
} catch (ProducerFencedException | OutOfOrderSequenceException | AuthorizationException e) {
// We can't recover from these exceptions, so our only option is to close the producer and exit.
producer.close();
} catch (KafkaException e) {
// For all other exceptions, just abort the transaction and try again.
//回滚事务
producer.abortTransaction();
}
producer.close();
10、kafka高性能的原因
磁盘顺序读写:kafka消息不能修改以及不会从文件中间删除保证了磁盘顺序读,kafka的消息写入文件都是追加在文件末尾,
不会写入文件中的某个位置(随机写)保证了磁盘顺序写。
数据传输的零拷贝。
读写数据的批量batch处理以及压缩传输。
相关文章:
Kafka生产问题总结及性能优化实践
1、消息丢失情况 消息发送端: (1)acks0: 表示producer不需要等待任何broker确认收到消息的回复,就可以继续发送下一条消息。性能最高,但是最容易丢消息。大数据统计报表场景,对性能要求很高&am…...
[MySQL]数据库原理2,Server,DataBase,Connection,latin1、UTF-8,gb2312,Encoding,Default Collation——喵喵期末不挂科
希望你开心,希望你健康,希望你幸福,希望你点赞! 最后的最后,关注喵,关注喵,关注喵,佬佬会看到更多有趣的博客哦!!! 喵喵喵,你对我真的…...
【算法集训】基础数据结构:十、矩阵
矩阵其实就是二维数组,这些题目在9日集训中已经做过,这里做的方法大致相同。 第一题 1351. 统计有序矩阵中的负数 int countNegatives(int** grid, int gridSize, int* gridColSize) {int r gridSize;int c gridColSize[0];int ret 0;for(int i 0;…...
python排序算法 直接插入排序法和折半插入排序法
最近需要使用到一些排序算法,今天主要使针对直接插入排序和折半插入排序进行讲解。 首先是直接插入排序,其排序过程主要是,针对A[a1,a2,a3,a4,a5....an],从排序的序列头部起始位置开始,将其也就是a1视为只有一个元素的…...
【flutter对抗】blutter使用+ACTF习题
最新的能很好反编译flutter程序的项目 1、安装 git clone https://github.com/worawit/blutter --depth1 然后我直接将对应的两个压缩包下载下来(通过浏览器手动下载) 不再通过python的代码来下载,之前一直卡在这个地方。 如果读者可以正…...
OpenHarmony 如何去除系统锁屏应用
前言 OpenHarmony源码版本:4.0release / 3.2 release 开发板:DAYU / rk3568 一、3.2版本去除锁屏应用 在源码根目录下:productdefine/common/inherit/rich.json 中删除screenlock_mgr组件的编译配置,在rich.json文件中搜索th…...
Python - 搭建 Flask 服务实现图像、视频修复需求
目录 一.引言 二.服务构建 1.主函数 upload_gif 2.文件接收 3.专属目录 4.图像修复 5.gif2mp4 6.mp42gif 7.图像返回 三.服务测试 1.服务启动 2.服务调用 四.总结 一.引言 前面我们介绍了如何使用 Real-ESRGAN 进行图像增强并在原始格式 jpeg、jpg、mp4 的基础上…...
C#基础——构造函数、析构函数
C#基础——构造函数、析构函数 1、构造函数 构造函数是一种特殊的方法,用于在创建类的实例时进行初始化操作。构造函数与类同名,并且没有返回类型。 构造函数在对象创建时自动调用,可以用来设置对象的初始状态、分配内存、初始化字段等操作…...
jmeter 如何循环使用接口返回的多值?
有同学在用jmeter做接口测试的时候,经常会遇到这样一种情况: 就是一个接口请求返回了多个值,然后下一个接口想循环使用前一个接口的返回值。 这种要怎么做呢? 有一定基础的人,可能第一反应就是先提取前一个接口返回…...
VLAN 详解一(VLAN 基本原理及 VLAN 划分原则)
VLAN 详解一(VLAN 基本原理及 VLAN 划分原则) 在早期的交换网络中,网络中只有 PC、终端和交换机,当某台主机发送一个广播帧或未知单播帧时,该数据帧会被泛洪,甚至传递到整个广播域。而广播域越大ÿ…...
Android - 分区存储 MediaStore、SAF
官方页面 参考文章 一、概念 分区存储(Scoped Storage)的推出是针对 APP 访问外部存储的行为(乱建乱获取文件和文件夹)进行规范和限制,以减少混乱使得用户能更好的控制自己的文件。 公有目录被分为两大类:…...
Shiro框架权限控制
首先去通过配置类的用户认证,在用户认证完成后,进行用户授权,用户通过授权之后再跳转其他的界面时,会进行一个验证,当前账号是否有权限。 前端权限控制显示的原理 在前端中,通常使用用户的角色或权限信息来…...
centOS7 安装tailscale并启用子网路由
1、在centOS7上安装Tailscale客户端 #安装命令所在官网位置:https://tailscale.com/download/linux #具体命令为: curl -fsSL https://tailscale.com/install.sh | sh #命令执行后如下图所示2、设置允许IP转发和IP伪装。 安装后,您可以启动…...
spring 项目中如何处理跨越cors问题
1.使用 CrossOrigin 注解 作用于controller 方法上 示例如下 RestController RequestMapping("/account") public class AccountController {CrossOriginGetMapping("/{id}")public Account retrieve(PathVariable Long id) {// ...}DeleteMapping(&quo…...
importlib --- import 的实现
3.1 新版功能. 源代码 Lib/importlib/__init__.py 概述 importlib 包具有三重目标。 一是在 Python 源代码中提供 import 语句的实现(并且因此而扩展 __import__() 函数)。 这提供了一个可移植到任何 Python 解释器的 import 实现。 与使用 Python 以…...
【PyTorch】现代卷积神经网络
文章目录 1. 理论介绍1.1. 深度卷积神经网络(AlexNet)1.1.1. 概述1.1.2. 模型设计 1.2. 使用块的网络(VGG)1.3. 网络中的网络(NiN)1.4. 含并行连结的网络(GoogLeNet)1.5. 批量规范化…...
用python编写九九乘法表
1 问题 我们在学习一门语言的过程中,都会练习到编写九九乘法表这个代码,下面介绍如何编写九九乘法表的流程。 2 方法 (1)打开pycharm集成开发环境,创建一个python文件,并编写第一行代码,主要构建…...
Google Gemini 模型本地可视化
Google近期发布了Gemini模型,而且开放了Gemini Pro API,Gemini Pro 可免费使用! Gemini Pro支持全球180个国家的38种语言,目前接受文本、图片作为输入并生成文本作为输出。 Gemini Pro的表现超越了其他同类模型,当前版…...
数据修复:.BlackBit勒索病毒来袭,安全应对方法解析
导言: 黑色数字罪犯的新玩具——.BlackBit勒索病毒,近来成为网络安全领域的头号威胁。这种恶意软件以其高度隐秘性和毁灭性而引起广泛关注。下面是关于.BlackBit勒索病毒的详细介绍,如不幸感染这个勒索病毒,您可添加我们的技术服…...
拓扑排序实现循环依赖判断 | 京东云技术团队
本文记录如何通过拓扑排序,实现循环依赖判断 前言 一般提到循环依赖,首先想到的就是Spring框架提供的Bean的循环依赖检测,相关文档可参考: https://blog.csdn.net/cristianoxm/article/details/113246104 本文方案脱离Spring Be…...
Java的NIO工作机制
文章目录 1. 问题引入2. NIO的工作方式3. Buffer的工作方式4. NIO数据访问方式 1. 问题引入 在网络通信中,当连接已经建立成功,服务端和客户端都会拥有一个Socket实例,每个Socket实例都有一个InputStream和OutputStream,并通过这…...
一个简单的光线追踪渲染器
前言 本文参照自raytracing in one weekend教程,地址为:https://raytracing.github.io/books/RayTracingInOneWeekend.html 什么是光线追踪? 光线追踪模拟现实中的成像原理,通过模拟一条条直线在场景内反射折射,最终…...
C++学习笔记(十二)------is_a关系(继承关系)
你好,这里是争做图书馆扫地僧的小白。 个人主页:争做图书馆扫地僧的小白_-CSDN博客 目标:希望通过学习技术,期待着改变世界。 提示:以下是本篇文章正文内容,下面案例可供参考 文章目录 前言 一、继承关系…...
DC电源模块的设计与制造技术创新
BOSHIDA DC电源模块的设计与制造技术创新 DC电源模块的设计与制造技术创新主要涉及以下几个方面: 1. 高效率设计:传统的DC电源模块存在能量转换损耗较大的问题,技术创新可通过采用高效率的电路拓扑结构、使用高性能的功率开关器件和优化控制…...
Sketch for Mac:实现你的创意绘图梦想的矢量绘图软件
随着数字时代的到来,矢量绘图软件成为了广告设计、插画创作和UI设计等领域中必不可少的工具。在众多矢量绘图软件中,Sketch for Mac(矢量绘图软件)以其强大的功能和简洁的界面脱颖而出,成为了众多设计师的首选。 Sket…...
ReactNative0.73发布,架构升级与更好的调试体验
这次更新包含了多种提升开发体验的改进,包括: 更流畅的调试体验: 通过 Hermes 引擎调试支持、控制台日志历史记录和实验性调试器,让调试过程更加高效顺畅。稳定的符号链接支持: 简化您的开发工作流程,轻松将文件或目录链接到其他…...
SVN忽略文件的两种方式
当使用版本管理工具时,提交到代码库的文档我们不希望存在把一些临时文件也推送到仓库中,这样就需要用到忽略文件。SVN的忽略相比于GIT稍显麻烦,GIT只需要在.gitignore添加忽略规则即可。而SVN有两种忽略方式,一个是全局设置&#…...
手写VUE后台管理系统10 - 封装Axios实现异常统一处理
目录 前后端交互约定安装创建Axios实例拦截器封装请求方法业务异常处理 axios 是一个易用、简洁且高效的http库 axios 中文文档:http://www.axios-js.com/zh-cn/docs/ 前后端交互约定 在本项目中,前后端交互统一使用 application/json;charsetUTF-8 的请…...
JavaScript装饰者模式
JavaScript装饰者模式 1 什么是装饰者模式2 模拟装饰者模式3 JavaScript的装饰者4 装饰函数5 AOP装饰函数6 示例:数据统计上报 1 什么是装饰者模式 在程序开发中,许多时候都我们并不希望某个类天生就非常庞大,一次性包含许多职责。那么我们就…...
C++学习笔记01
01.C概述(了解) c语言在c语言的基础上添加了面向对象编程和泛型编程的支持。 02.第一个程序helloworld(掌握) #define _CRT_SECURE_NO_WARNINGS #include<iostream> using namespace std;//标准命名空间int main() {//co…...
wordpress 安装插件 ftp/网购平台推广方案
前提条件 如果没有安装office的话,需要安装引擎 安装了office就不用安装引擎 连接数据库 Dim plMydb As Microsoft.Office.Interop.Access.Dao.DatabaseplMydb DAODBEngine_definst.OpenDatabase(ppDataBase) 备份数据库 AccessDB.CompactDatabase(iMDBFile, iTMPF…...
网站选项卡如何做自适应/杭州seo顾问
https://access.redhat.com/documentation/en-us/reference_architectures/current/ 搜索oracle即可。...
tomcat做公司网站/快速排名教程
本文实例讲述了jsp实现用于自动生成表单标签html代码的自定义表单标签。分享给大家供大家参考。具体如下:这个是自己写的一个简单的jsp表单标签,用于自动生成checkbox,select,radio等标签,传入菜单集合生成html代码,自动选中指定值…...
建设电子商务网站需要什么设备/seo的工作内容主要包括
2017年5月12日 09:57:48 星期五 最近接触了几天的composer, 不吹不黑, 简单说下用法吧 官方说要先用PHP命令行下载installer, 其实作用就是检测当前的PHP环境是否支持, 再一个就是自动下载composer.phar包 其实可以直接下载composer.phar放到某个地方 怎么跟你的PHP项目结合呢 …...
直播视频素材/seo关键词排名技术
磁盘管理:RAIDRedundent Aarry of Inexpensive DisksRedundent Aarry of Indepedent Disks目的:高性能(读、写)、可靠(冗余)级别:Level,用于描述磁盘不同组合逻辑Raid0: 条带Raid1: 镜像Raid10就是将Raid1和Raid0按某种方式连接起…...
企业网站模板文件管理/小程序怎么引流推广
发现把含main()函数放到gdu_xmp下就编译正常,放到工程根目录下编译报错,好像找不到含main()的.c文件...