当前位置: 首页 > news >正文

当参数调优无法解决kafka消息积压时可以这么做

今天的议题是:如何快速处理kafka的消息积压

通常的做法有以下几种:

  1. 增加消费者数
  2. 增加 topic 的分区数,从而进一步增加消费者数
  3. 调整消费者参数,如max.poll.records
  4. 增加硬件资源

常规手段不是本文的讨论重点或者当上面的手段已经使用过依然存在很严重的消息积压时该怎么办?本文给出一种增加消费者消费速率的方案。我们知道消息积压往往是因为生产速率远大于消费速率,本文的重点就是通过提高消费速率来解决消息积压。

经验判断,消费速率低下的主要原因往往都是数据处理时间长,业务逻辑复杂最终导致一次 poll 的时间被无限拉长,如果可以通过增加数据处理的线程数来降低一次 poll 的时间那么问题就解决了。但是需要注意一下几点:

  1. 业务逻辑对乱序数据不敏感,因为并行一定会导致乱序问题
  2. kafka 的消费者是线程不安全的
  3. 如何提交 offset

基于上述几点,思路就是消费者 poll 下来一批数据,交给多个线程去并行处理,消费者等待所有线程执行完后提交。为了减少线程的创建与销毁则维护一个线程池。代码如下:

第一步:创建一个MultipleConsumer类用于封装消费者和线程池

public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}
}

理论上相较于传统的消费速率可以提升 threadNum 倍。

第二步:因为需要并行处理一批 poll 数据,因此需要对数据进行切分,切分逻辑如下

private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}

这里采用轮训的方式且切分的个数与 threadNum 一致,尽可能保证每个线程处理的数据数量相差不大

第三步:定义一个静态内部类用来处理数据,并处理同步逻辑(因为需要等待所有线程执行完再提交 offset)

private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}

使用 CountDownLatch 实现线程同步逻辑,假设每条数据的业务处理时间为 1 s

第四步:消费者 poll 逻辑

public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}

完整代码如下:

import org.apache.kafka.clients.consumer.*;import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.*;
import java.util.concurrent.*;/*** @author wjun* @date 2023/3/1 14:50* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumer {private final KafkaConsumer<String, String> consumer;private final int threadNum;private final ExecutorService threadPool;private boolean isRunning = true;public MultipleConsumer(Properties properties, List<String> topics, int threadNum) {// 实例化消费者consumer = new KafkaConsumer<>(properties);// 订阅主题consumer.subscribe(topics);this.threadNum = threadNum;this.threadPool = Executors.newFixedThreadPool(threadNum);}public void start() {while (isRunning) {ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(5));if (!consumerRecords.isEmpty()) {// 分割任务Map<Integer, List<ConsumerRecord<String, String>>> splitTask = splitTask(consumerRecords);CountDownLatch countDownLatch = new CountDownLatch(threadNum);// 提交任务for (int i = 0; i < threadNum; i++) {threadPool.submit(new InnerProcess(splitTask.get(i), countDownLatch));}// 等待任务执行结束try {countDownLatch.await();} catch (InterruptedException e) {throw new RuntimeException(e);}// 提交偏移量consumer.commitAsync((map, e) -> {if (e != null) {System.out.println("提交偏移量失败");}});}}}private Map<Integer, List<ConsumerRecord<String, String>>> splitTask(ConsumerRecords<String, String> consumerRecords) {HashMap<Integer, List<ConsumerRecord<String, String>>> tasks = new HashMap<>();for (int i = 0; i < threadNum; i++) {tasks.put(i, new ArrayList<>());}int recordIndex = 0;for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {tasks.get(recordIndex % threadNum).add(consumerRecord);recordIndex++;}return tasks;}public void stop() {isRunning = false;threadPool.shutdown();}private static class InnerProcess implements Runnable {private final List<ConsumerRecord<String, String>> records;private final CountDownLatch countDownLatch;public InnerProcess(List<ConsumerRecord<String, String>> records, CountDownLatch countDownLatch) {this.records = records;this.countDownLatch = countDownLatch;}@Overridepublic void run() {try {// 处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("topic: " + record.topic() + ", partition: " + record.partition() + ", offset: " + record.offset() + ", key: " + record.key() + ", value: " + record.value());TimeUnit.SECONDS.sleep(1);}} catch (InterruptedException e) {e.printStackTrace();} finally {countDownLatch.countDown();}}}
}

测试一下:

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;import java.util.ArrayList;
import java.util.List;
import java.util.Properties;/*** @author wjun* @date 2023/3/1 16:03* @email wjunjobs@outlook.com* @describe*/
public class MultipleConsumerTest {private static final Properties properties = new Properties();private static final List<String> topics = new ArrayList<>();public static void before() {properties.put("bootstrap.servers", "localhost:9092");properties.put("group.id", "test");properties.put("enable.auto.commit", "false");properties.put("auto.commit.interval.ms", "1000");properties.put("session.timeout.ms", "30000");properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");topics.add("multiple_demo");}public static void main(String[] args) {new MultipleConsumer(properties, topics, 5).start();}
}

20 条数据的处理事件只需要 4s(threadNume = 5,即缩短 5 倍)

image-20230301172739782

但是此方法的缺点:

  1. 只适用于业务逻辑复杂导致的处理时间长的场景
  2. 对数据乱序不敏感的业务场景

相关文章:

当参数调优无法解决kafka消息积压时可以这么做

今天的议题是&#xff1a;如何快速处理kafka的消息积压 通常的做法有以下几种&#xff1a; 增加消费者数增加 topic 的分区数&#xff0c;从而进一步增加消费者数调整消费者参数&#xff0c;如max.poll.records增加硬件资源 常规手段不是本文的讨论重点或者当上面的手段已经使…...

Java线程池源码分析

Java 线程池的使用&#xff0c;是面试必问的。下面我们来从使用到源码整理一下。 1、构造线程池 通过Executors来构造线程池 1、构造一个固定线程数目的线程池&#xff0c;配置的corePoolSize与maximumPoolSize大小相同&#xff0c; 同时使用了一个无界LinkedBlockingQueue存…...

手撕八大排序(下)

目录 交换排序 冒泡排序&#xff1a; 快速排序 Hoare法 挖坑法 前后指针法【了解即可】 优化 再次优化&#xff08;插入排序&#xff09; 迭代法 其他排序 归并排序 计数排序 排序总结 结束了上半章四个较为简单的排序&#xff0c;接下来的难度将会大幅度上升&…...

SAP 详细解析SCC4

事务代码&#xff1a;SCC4&#xff0c;选择一个客户端&#xff0c;点击进入&#xff0c;如图&#xff1a; 一、客户端角色 客户控制&#xff1a;客户的角色&#xff08;生产性&#xff0c;测试&#xff0c;...&#xff09; 此属性表示 R/3 系统中的客户端角色。其中可能包括…...

java异常分类和finally代码块中return语句的影响

首先看一下java中异常相关类的继承关系&#xff1a; 引用 1、分类 异常可以分为受查异常和非受查异常&#xff0c;Error和RuntimeException及其所有的子类都是非受查异常&#xff0c;其他的是受查异常。 两者的区别主要在&#xff1a; 受检的异常是由编译器&#xff08;编译…...

【链表OJ题(二)】链表的中间节点

​ ​&#x1f4dd;个人主页&#xff1a;Sherry的成长之路 &#x1f3e0;学习社区&#xff1a;Sherry的成长之路&#xff08;个人社区&#xff09; &#x1f4d6;专栏链接&#xff1a;数据结构 &#x1f3af;长路漫漫浩浩&#xff0c;万事皆有期待 文章目录链表OJ题(二)1. 链表…...

【强烈建议收藏:MySQL面试必问系列之并发事务锁专题】

一.知识回顾 上节课我们一起学习了MySQL面试必问系列之事务&#xff0c;没有学习的同学可以看一下上一篇文章&#xff0c;肯定对你会有帮助&#xff0c;学习过的同学肯定知道&#xff0c;上节课我们留了一个小尾巴&#xff0c;这个小尾巴是什么呢&#xff1f;就是没有详细展开…...

Linux下使用Makefile实现条件编译

在Linux系统下Makefile和C/C语言都有提供条件选择编译的语法&#xff0c;就是在编译源码的时候&#xff0c;可以选择性地编译指定的代码。这种条件选择编译的使用场合有好多&#xff0c;例如我们开发一个兼容标准版本与定制版本兼容的项目&#xff0c;那么&#xff0c;一些与需…...

java 应用cpu飙升(超过100%)故障排查

前言害。。。昨天刚写完一份关于jvm问题排查相关的博客&#xff0c;今天线上项目就遇到了一个突发问题。现象是用户反映系统非常卡&#xff0c;无法操作。然后登录服务器查看发现cpu 一直100%以上。具体排查步骤&#xff1a;1&#xff0c;首先top命令查看服务器cpu等情况&#…...

光学设计软件Ansys的Lumerical 2023版本下载与安装使用

文章目录前言一、许可管理工具安装二、许可管理器配置三、Lumerical安装四、工具使用配置总结前言 Lumerical是一款功能强大的软件&#xff0c;用于设计和分析从组件到系统阶段的光子学和电磁学。这个版本的Lumerical改进了电子和光子学设计工具&#xff0c;用于复杂光子学&am…...

Java 异常

文章目录1. 异常概述2. JVM 的默认处理方案3. 异常处理之 try...catch4. Throwable 的成员方法5. 编译异常和运行异常的区别6. 异常处理之 throws7. 自定义异常8. throws 和 throw 的区别1. 异常概述 异常就是程序出现了不正常的情况。 ① Error&#xff1a;严重问题&#xff…...

JavaSE学习笔记day17

零、 复习昨日 File: 通过路径代表一个文件或目录 方法: 创建型,查找类,判断类,其他 IO 输入& 输出字节&字符 try-catch代码 一、作业 给定路径删除该文件夹 public static void main(String[] args) {deleteDir(new File("E:\\A"));}// 删除文件夹public s…...

【项目】Vue3+TS 动态路由 面包屑 查询重置 列表

&#x1f4ad;&#x1f4ad; ✨&#xff1a;【项目】Vue3TS 动态路由 面包屑 查询重置 列表   &#x1f49f;&#xff1a;东非不开森的主页   &#x1f49c;: 热烈的不是青春&#xff0c;而是我们&#x1f49c;&#x1f49c;   &#x1f338;: 如有错误或不足之处&#xff0…...

前脚背完这些接口自动化测试面试题,后脚就进了字节测试岗

1、请结合你熟悉的项目&#xff0c;介绍一下你是怎么做测试的&#xff1f; -首先要自己熟悉项目&#xff0c;熟悉项目的需求、项目组织架构、项目研发接口等 -功能 接口 自动化 性能 是怎么处理的&#xff1f; -第一步&#xff1a; 进行需求分析&#xff0c;需求评审&#…...

termux 安装centos

相关链接 centos官网rootfs制作其他人提供的安装脚本centos镜像列表其他人提供的安装脚本的说明 如果想使用老版本的centos7跟着上面链接5走就行 如果想用新系统比如centos9 stream&#xff0c;就跟我来 Q:为什么要装新系统? A:旧系统太多软件已过时&#xff0c;升级费时费…...

从菜鸟程序员到高级架构师,竟然是因为这个字final

final实现原理 简介 final关键字&#xff0c;实际的含义就一句话&#xff0c;不可改变。什么是不可改变&#xff1f;就是初始化完成之后就不能再做任何的修改&#xff0c;修饰成员变量的时候&#xff0c;成员变量变成一个常数&#xff1b;修饰方法的时候&#xff0c;方法不允…...

【vulhub漏洞复现】CVE-2018-2894 Weblogic任意文件上传漏洞

一、漏洞详情影响版本weblogic 10.3.6.0、weblogic 12.1.3.0、weblogic 12.2.1.2、weblogic 12.2.1.3WebLogic是美国Oracle公司出品的一个application server&#xff0c;确切的说是一个基于JAVAEE架构的中间件&#xff0c;WebLogic是用于开发、集成、部署和管理大型分布式Web应…...

函数栈帧详解

写在前面 这个模块临近C语言的边界&#xff0c;学起来需要一定的时间&#xff0c;不过当我们知道这些知识后&#xff0c;在C语言函数这块我们看到的不仅仅是表象了&#xff0c;可以真正了解函数是怎么调用的。不过我的能力有限&#xff0c;下面的的知识若是不当&#xff0c;还…...

Spring 事务(编程式事务、声明式事务@Transactional、事务隔离级别、事务传播机制)

文章目录1. 事务的定义2. Spring 中事务的实现2.1 MySQL 中使用事务2.2 Spring 中编程式事务的实现2.3 Spring 中声明式事务2.3.1 声明式事务的实现 Transactional2.3.2 Transactional 作用域2.3.3Transactional 参数设置2.3.4 Transactional 异常情况2.3.5 Transactional 工作…...

车载技术——Window Display之surface的绘制过程与原理

一、Surface 概述 OpenGL ES/Skia定义了一组绘制接口的规范&#xff0c;为什么能够跨平台&#xff1f; 本质上需要与对应平台上的本地窗口建立连接。也就是说OpenGL ES负责输入了绘制的命令&#xff0c;但是需要一个 “画布” 来承载输出结果&#xff0c;最终展示到屏幕。这个…...

2023年全国最新工会考试精选真题及答案10

百分百题库提供工会考试试题、工会考试预测题、工会考试真题、工会证考试题库等&#xff0c;提供在线做题刷题&#xff0c;在线模拟考试&#xff0c;助你考试轻松过关。 51.&#xff08;&#xff09;是企业工会的权力机关&#xff0c;每年召开一至两次会议。 A.会员大会 B.会…...

pytorch-复现经典深度学习模型-LeNet5

Neural Networks 使用torch.nn包来构建神经网络。nn包依赖autograd包来定义模型并求导。 一个nn.Module包含各个层和一个forward(input)方法&#xff0c;该方法返回output。 一个简单的前馈神经网络&#xff0c;它接受一个输入&#xff0c;然后一层接着一层地传递&#xff0c;…...

【C++】类和对象(上)

文章目录对象的介绍类的介绍类的两种定义方式类的访问限定符及封装访问限定符封装类的作用域类的实例化类的对象模型对象的介绍 C语言是面向过程的&#xff0c;关注的是过程&#xff0c;分析出求解问题的步骤&#xff0c;通过函数调用逐步解决问题&#xff1b;   C是基于面向…...

工作中责任链模式用法及其使用场景?

前言 笔者是金融保险行业&#xff0c;有这么一种场景&#xff0c;业务员录完单后提交核保&#xff0c;这时候系统会对保单数据进行校验&#xff0c;如不允许手续费超限校验&#xff0c;客户真实性校验、费率限额校验等等&#xff0c;当校验一多时&#xff0c;维护起来特别麻烦…...

三八女神节有哪些数码好物?2023年三八女神节数码好物清单

2023年的三八女神节就快到了&#xff0c;大家还在烦恼&#xff0c;不知道有哪些数码好物&#xff1f;在此&#xff0c;我来给大家分享几款三八女神节实用性强的数码好物&#xff0c;一起来看看吧。 一、蓝牙耳机&#xff1a;南卡小音舱 参考价&#xff1a;239 推荐理由&…...

FairGuard-Windows加固工具版本更新日志

FairGuard-Windows加固工具1.2.2版本更新日志&#xff1a; ■ 增加Unity Resources资源加密的支持; ■ 增加单独Assetbundle资源加密&#xff0c;并同时支持压缩包和文件夹作为输入的方式; ■ 增加对游戏原文件夹加固的支持; Windows加固方案介绍 FairGuard专为游戏量身定…...

基于RT-Thread完整版搭建的极简Bootloader

项目背景Agile Upgrade: 用于快速构建 bootloader 的中间件。example 文件夹提供 PC 上的示例特性适配 RT-Thread 官方固件打包工具 (图形化工具及命令行工具)使用纯 C 开发&#xff0c;不涉及任何硬件接口&#xff0c;可在任何形式的硬件上直接使用加密、压缩支持如下&#xf…...

3.flinkDateStreamAPI介绍env与source

执行环境 Flink可以在不同的环境上下文中运行.可以本地集成开发环境中运行也可以提交到远程集群环境运行. 不同的运行环境对应的flink的运行过程不同,需要首先获取flink的运行环境,才能将具体的job调度到不同的TaskManager 在flink中可以通过StreamExecutionEnvironment类获取…...

$ 2 :数据类型

1.数据类型 1.1基本类型 a、整型int b、浮点型float c、字符型char 1.2构造类型 a、数组[ ] b、结构体struct 1.3指针类型 * 1.4空类型(void) 2.关键字 autoconstdoublefloatintshortstructunsignedbreakcontinueelseforlongsignedswitchvoidcasedefaultenumgotoregistersiz…...

类和对象 - 上

本文已收录至《C语言》专栏&#xff01; 作者&#xff1a;ARMCSKGT 目录 前言 正文 面向过程与面向对象 面向过程的解决方法 面向对象的解决方法 面向对象的优势 类的引入 早期C类的实现 class定义类 class定义规则 类成员的两种定义方式 类的访问限定符及封装 访…...

去哪个网站找题目给孩子做/软件培训机构排行榜

声明&#xff1a;本文是根据英文教程 A Neural Network in 11 lines of Python&#xff08;用 11 行 Python 代码实现的神经网络&#xff09;学习总结而来&#xff0c;关于更详细的神经网络的介绍可以参考我的另一篇博客&#xff1a;从感知机到人工神经网络。 如果你读懂了下面…...

懂的建设网站/企业网站排名优化方案

实验报告课程Java语言程序设计实验名称第八章 Swing图形用户界面程序设计实验任务(三)第页专业班级学号__ __ 姓名实验日期&#xff1a;2010 年11 月9 日报告退发(订正、重做)一、实验目的?布局管理器的使用二、实验环境1、微型计算机一台2、DOS或WINDOWS操作系统&#xff0c;…...

h5响应式网站建设/怎么做线上销售

http://www.111cn.net/sys/linux/58445.htmiptables是linux系统的防火墙功能&#xff0c;今天一聚教程小编就给各位整理一下删除iptables指定某条规则的例子&#xff0c;希望此例子对你会带来帮助。系统:centos 6x1.查看要删除的那条规则命令iptables -nvL –line-number-L 查看…...

淘宝做网站/网络游戏推广平台

为了让美化上传文件框&#xff0c;设置了cursor:pointer;,然而不起作用&#xff0c;设置font-size:0&#xff0c;这样就可以了。转载于:https://www.cnblogs.com/mmykdbc/p/10531976.html...

番禺厂家关键词优化/seo外推软件

应用程序在运行过程中&#xff0c;会有大量需要处理的异常。在页面解析的一个工程中&#xff0c;会存在多个service类同时出现页面解析异常和解析结果入库异常&#xff0c;而这就表示在程序中需要一个机制&#xff0c;去统一处理这些异常&#xff0c;提供统一的异常处理。因为我…...

免费开商城网站/许昌网络推广外包

1.dockerToolBox下载 下载路径1:http://mirrors.aliyun.com/docker-toolbox/windows/docker-toolbox/下载路径2: https://docs.docker.com/toolbox/toolbox_install_windows/1.Docker溯源Docker的前身是名为dotCloud的小公司&#xff0c;主要提供的是基于 PaaS&#xff08;Plat…...