当参数调优无法解决kafka消息积压时可以这么做
今天的议题是:如何快速处理kafka的消息积压
通常的做法有以下几种:
- 增加消费者数
- 增加 topic 的分区数,从而进一步增加消费者数
- 调整消费者参数,如
max.poll.records - 增加硬件资源
常规手段不是本文的讨论重点或者当上面的手段已经使用过依然存在很严重的消息积压时该怎么办?本文给出一种增加消费者消费速率的方案。我们知道消息积压往往是因为生产速率远大于消费速率,本文的重点就是通过提高消费速率来解决消息积压。
经验判断,消费速率低下的主要原因往往都是数据处理时间长,业务逻辑复杂最终导致一次 poll 的时间被无限拉长,如果可以通过增加数据处理的线程数来降低一次 poll 的时间那么问题就解决了。但是需要注意一下几点:
- 业务逻辑对乱序数据不敏感,因为并行一定会导致乱序问题
- kafka 的消费者是线程不安全的
- 如何提交 offset
基于上述几点,思路就是消费者 poll 下来一批数据,交给多个线程去并行处理,消费者等待所有线程执行完后提交。为了减少线程的创建与销毁则维护一个线程池。代码如下:
第一步:创建一个MultipleConsumer类用于封装消费者和线程池
public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}
}
理论上相较于传统的消费速率可以提升 threadNum 倍。
第二步:因为需要并行处理一批 poll 数据,因此需要对数据进行切分,切分逻辑如下
private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}
这里采用轮训的方式且切分的个数与 threadNum 一致,尽可能保证每个线程处理的数据数量相差不大
第三步:定义一个静态内部类用来处理数据,并处理同步逻辑(因为需要等待所有线程执行完再提交 offset)
private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}
使用 CountDownLatch 实现线程同步逻辑,假设每条数据的业务处理时间为 1 s
第四步:消费者 poll 逻辑
public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}
完整代码如下:
import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;/*** @author wjun* @date 2023/3/1 14:50* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}public void stop() {isRunning = false;threadPool.shutdown();}private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}
}
测试一下:
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/*** @author wjun* @date 2023/3/1 16:03* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumerTest {private static final Properties properties = new Properties();private static final List<String> topics = new ArrayList<>();public static void before() {properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");topics.add("multiple_demo");}public static void main(String[] args) {new MultipleConsumer(properties, topics, 5).start();}
}
20 条数据的处理事件只需要 4s(threadNume = 5,即缩短 5 倍)
但是此方法的缺点:
- 只适用于业务逻辑复杂导致的处理时间长的场景
- 对数据乱序不敏感的业务场景
相关文章:
当参数调优无法解决kafka消息积压时可以这么做
今天的议题是:如何快速处理kafka的消息积压 通常的做法有以下几种: 增加消费者数增加 topic 的分区数,从而进一步增加消费者数调整消费者参数,如max.poll.records增加硬件资源 常规手段不是本文的讨论重点或者当上面的手段已经使…...
Java线程池源码分析
Java 线程池的使用,是面试必问的。下面我们来从使用到源码整理一下。 1、构造线程池 通过Executors来构造线程池 1、构造一个固定线程数目的线程池,配置的corePoolSize与maximumPoolSize大小相同, 同时使用了一个无界LinkedBlockingQueue存…...
手撕八大排序(下)
目录 交换排序 冒泡排序: 快速排序 Hoare法 挖坑法 前后指针法【了解即可】 优化 再次优化(插入排序) 迭代法 其他排序 归并排序 计数排序 排序总结 结束了上半章四个较为简单的排序,接下来的难度将会大幅度上升&…...
SAP 详细解析SCC4
事务代码:SCC4,选择一个客户端,点击进入,如图: 一、客户端角色 客户控制:客户的角色(生产性,测试,...) 此属性表示 R/3 系统中的客户端角色。其中可能包括…...
java异常分类和finally代码块中return语句的影响
首先看一下java中异常相关类的继承关系: 引用 1、分类 异常可以分为受查异常和非受查异常,Error和RuntimeException及其所有的子类都是非受查异常,其他的是受查异常。 两者的区别主要在: 受检的异常是由编译器(编译…...
【链表OJ题(二)】链表的中间节点
📝个人主页:Sherry的成长之路 🏠学习社区:Sherry的成长之路(个人社区) 📖专栏链接:数据结构 🎯长路漫漫浩浩,万事皆有期待 文章目录链表OJ题(二)1. 链表…...
【强烈建议收藏:MySQL面试必问系列之并发事务锁专题】
一.知识回顾 上节课我们一起学习了MySQL面试必问系列之事务,没有学习的同学可以看一下上一篇文章,肯定对你会有帮助,学习过的同学肯定知道,上节课我们留了一个小尾巴,这个小尾巴是什么呢?就是没有详细展开…...
Linux下使用Makefile实现条件编译
在Linux系统下Makefile和C/C语言都有提供条件选择编译的语法,就是在编译源码的时候,可以选择性地编译指定的代码。这种条件选择编译的使用场合有好多,例如我们开发一个兼容标准版本与定制版本兼容的项目,那么,一些与需…...
java 应用cpu飙升(超过100%)故障排查
前言害。。。昨天刚写完一份关于jvm问题排查相关的博客,今天线上项目就遇到了一个突发问题。现象是用户反映系统非常卡,无法操作。然后登录服务器查看发现cpu 一直100%以上。具体排查步骤:1,首先top命令查看服务器cpu等情况&#…...
光学设计软件Ansys的Lumerical 2023版本下载与安装使用
文章目录前言一、许可管理工具安装二、许可管理器配置三、Lumerical安装四、工具使用配置总结前言 Lumerical是一款功能强大的软件,用于设计和分析从组件到系统阶段的光子学和电磁学。这个版本的Lumerical改进了电子和光子学设计工具,用于复杂光子学&am…...
Java 异常
文章目录1. 异常概述2. JVM 的默认处理方案3. 异常处理之 try...catch4. Throwable 的成员方法5. 编译异常和运行异常的区别6. 异常处理之 throws7. 自定义异常8. throws 和 throw 的区别1. 异常概述 异常就是程序出现了不正常的情况。 ① Error:严重问题ÿ…...
JavaSE学习笔记day17
零、 复习昨日 File: 通过路径代表一个文件或目录 方法: 创建型,查找类,判断类,其他 IO 输入& 输出字节&字符 try-catch代码 一、作业 给定路径删除该文件夹 public static void main(String[] args) {deleteDir(new File("E:\\A"));}// 删除文件夹public s…...
【项目】Vue3+TS 动态路由 面包屑 查询重置 列表
💭💭 ✨:【项目】Vue3TS 动态路由 面包屑 查询重置 列表 💟:东非不开森的主页 💜: 热烈的不是青春,而是我们💜💜 🌸: 如有错误或不足之处࿰…...
前脚背完这些接口自动化测试面试题,后脚就进了字节测试岗
1、请结合你熟悉的项目,介绍一下你是怎么做测试的? -首先要自己熟悉项目,熟悉项目的需求、项目组织架构、项目研发接口等 -功能 接口 自动化 性能 是怎么处理的? -第一步: 进行需求分析,需求评审&#…...
termux 安装centos
相关链接 centos官网rootfs制作其他人提供的安装脚本centos镜像列表其他人提供的安装脚本的说明 如果想使用老版本的centos7跟着上面链接5走就行 如果想用新系统比如centos9 stream,就跟我来 Q:为什么要装新系统? A:旧系统太多软件已过时,升级费时费…...
从菜鸟程序员到高级架构师,竟然是因为这个字final
final实现原理 简介 final关键字,实际的含义就一句话,不可改变。什么是不可改变?就是初始化完成之后就不能再做任何的修改,修饰成员变量的时候,成员变量变成一个常数;修饰方法的时候,方法不允…...
【vulhub漏洞复现】CVE-2018-2894 Weblogic任意文件上传漏洞
一、漏洞详情影响版本weblogic 10.3.6.0、weblogic 12.1.3.0、weblogic 12.2.1.2、weblogic 12.2.1.3WebLogic是美国Oracle公司出品的一个application server,确切的说是一个基于JAVAEE架构的中间件,WebLogic是用于开发、集成、部署和管理大型分布式Web应…...
函数栈帧详解
写在前面 这个模块临近C语言的边界,学起来需要一定的时间,不过当我们知道这些知识后,在C语言函数这块我们看到的不仅仅是表象了,可以真正了解函数是怎么调用的。不过我的能力有限,下面的的知识若是不当,还…...
Spring 事务(编程式事务、声明式事务@Transactional、事务隔离级别、事务传播机制)
文章目录1. 事务的定义2. Spring 中事务的实现2.1 MySQL 中使用事务2.2 Spring 中编程式事务的实现2.3 Spring 中声明式事务2.3.1 声明式事务的实现 Transactional2.3.2 Transactional 作用域2.3.3Transactional 参数设置2.3.4 Transactional 异常情况2.3.5 Transactional 工作…...
车载技术——Window Display之surface的绘制过程与原理
一、Surface 概述 OpenGL ES/Skia定义了一组绘制接口的规范,为什么能够跨平台? 本质上需要与对应平台上的本地窗口建立连接。也就是说OpenGL ES负责输入了绘制的命令,但是需要一个 “画布” 来承载输出结果,最终展示到屏幕。这个…...
C++_核心编程_多态案例二-制作饮品
#include <iostream> #include <string> using namespace std;/*制作饮品的大致流程为:煮水 - 冲泡 - 倒入杯中 - 加入辅料 利用多态技术实现本案例,提供抽象制作饮品基类,提供子类制作咖啡和茶叶*//*基类*/ class AbstractDr…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
Caliper 配置文件解析:config.yaml
Caliper 是一个区块链性能基准测试工具,用于评估不同区块链平台的性能。下面我将详细解释你提供的 fisco-bcos.json 文件结构,并说明它与 config.yaml 文件的关系。 fisco-bcos.json 文件解析 这个文件是针对 FISCO-BCOS 区块链网络的 Caliper 配置文件,主要包含以下几个部…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
ArcGIS Pro制作水平横向图例+多级标注
今天介绍下载ArcGIS Pro中如何设置水平横向图例。 之前我们介绍了ArcGIS的横向图例制作:ArcGIS横向、多列图例、顺序重排、符号居中、批量更改图例符号等等(ArcGIS出图图例8大技巧),那这次我们看看ArcGIS Pro如何更加快捷的操作。…...
LangChain知识库管理后端接口:数据库操作详解—— 构建本地知识库系统的基础《二》
这段 Python 代码是一个完整的 知识库数据库操作模块,用于对本地知识库系统中的知识库进行增删改查(CRUD)操作。它基于 SQLAlchemy ORM 框架 和一个自定义的装饰器 with_session 实现数据库会话管理。 📘 一、整体功能概述 该模块…...
pgsql:还原数据库后出现重复序列导致“more than one owned sequence found“报错问题的解决
问题: pgsql数据库通过备份数据库文件进行还原时,如果表中有自增序列,还原后可能会出现重复的序列,此时若向表中插入新行时会出现“more than one owned sequence found”的报错提示。 点击菜单“其它”-》“序列”,…...
goreplay
1.github地址 https://github.com/buger/goreplay 2.简单介绍 GoReplay 是一个开源的网络监控工具,可以记录用户的实时流量并将其用于镜像、负载测试、监控和详细分析。 3.出现背景 随着应用程序的增长,测试它所需的工作量也会呈指数级增长。GoRepl…...
C#最佳实践:为何优先使用as或is而非强制转换
C#最佳实践:为何优先使用as或is而非强制转换 在 C# 的编程世界里,类型转换是我们经常会遇到的操作。就像在现实生活中,我们可能需要把不同形状的物品重新整理归类一样,在代码里,我们也常常需要将一个数据类型转换为另…...
