Kafka系列之如何提高消费者消费速度
前言
在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。
实现方案
为了提高消费者的消费速度,我们可以采取以下措施:
- 将主题的分区数量增大,如 20,通过
concurrency将消费者的消费线程数增大到 10(2个pod),提高消息处理的并发能力。 - 将每次批量拉取消息的数量
max.poll.records增大到 500,提高单次处理消息的数量。 - 将消息切分成批次,将单个批次的数据处理业务逻辑放进线程池中异步进行,提高并发处理消息的速度。
- 将异步线程池的拒绝模式调整为 CallerRunsPolicy,这个配置非常重要。当线程池的任务队列已满且所有线程都在忙碌时,新的任务将由提交任务的线程(即调用者线程)来执行。否则在消息量特别大的情况下,很可能会因为线程池任务队列满了而丢失数据。
- 将异步线程池的队列容量设置为 0,这样意味着所有任务必须立即由线程池中的线程来处理,减少在队列中的等待时间。
- 在数据上报的时候进行幂等性验证,防止重复上报数据。
@Component
public class OrderConsumer {@Resource(name = "execThreadPool")private ThreadPoolTaskExecutor execThreadPool;@KafkaListener(id = "record_consumer",topics = "record",groupId = "g_record_consumer",concurrency = "10",properties = {"max.poll.interval.ms:300000", "max.poll.records:500"})public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {execThreadPool.submit(()-> {// 业务逻辑});ack.acknowledge();}}
ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,用于管理和执行多线程任务。它是 TaskExecutor 接口的实现,提供了在 Spring 应用程序中创建和配置线程池的便捷方式。
ThreadPoolTaskExecutor主要特点:
-
线程池配置: ThreadPoolTaskExecutor 允许你配置核心线程数、最大线程数、队列容量等线程池属性。
-
线程创建和销毁: 它会根据任务的需求自动创建和销毁线程,避免不必要的线程创建和销毁开销。
-
线程复用: 线程池中的线程可以被复用,从而减少线程创建的开销。
-
队列管理: 当线程池达到最大线程数时,新任务会被放入队列中等待执行。
-
拒绝策略: 当线程池已满并且队列也已满时,可以配置拒绝策略来处理新任务的方式。
RejectedExecutionHandler 是 Java 线程池的一个重要接口,用于定义当线程池已满并且无法接受新任务时,如何处理被拒绝的任务。当线程池的队列和线程都已满,新任务就会被拒绝执行,这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。
在 Java 中,有几种内置的 RejectedExecutionHandler 实现可供选择,每种实现都有不同的拒绝策略:
AbortPolicy(默认策略): 这是默认的拒绝策略,它会抛出一个 RejectedExecutionException 异常,表示任务被拒绝执行。
CallerRunsPolicy: 当线程池已满时,将任务返回给提交任务的调用者(Caller)。这意味着提交任务的线程会尝试执行被拒绝的任务。
DiscardPolicy: 这个策略会默默地丢弃被拒绝的任务,不会产生任何异常。
DiscardOldestPolicy: 这个策略会丢弃队列中最老的任务,然后尝试将新任务添加到队列中。除了这些内置的策略,你还可以实现自定义的 RejectedExecutionHandler 接口,以定义特定于你应用程序需求的拒绝策略。你可以根据业务需求来决定拒绝策略,比如记录日志、通知管理员、重试等。
@Configuration
public class ThreadPoolConfig {@Beanprivate ThreadPoolTaskExecutor execThreadPool() {ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();pool.setCorePoolSize(50); // 核心线程数pool.setMaxPoolSize(10000); // 最大线程数pool.setQueueCapacity(0); // 等待队列sizepool.setKeepAliveSeconds(60); // 线程最大空闲存活时间pool.setWaitForTasksToCompleteOnShutdown(true);pool.setAwaitTerminationSeconds(60); // 程序shutdown时最多等60秒钟让现存任务结束pool.setRejectedExecutionHandler(new CallerRunsPolicy()); // 拒绝策略return pool;}
}
通过以上方案,我们可以提高消费侧的TPS,同时杜绝重复上报的现象,极大提高数据准确性和用户体验。
相关文章:
Kafka系列之如何提高消费者消费速度
前言 在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。 实现方案 为了提高消费者的消费速度,我们可以采取以下措施: 将主题的分区数量增大,如 20&…...
mac安装Whisper
Whisper 官方git https://github.com/openai/whisper?tabreadme-ov-file 基本上参考官方的安装流程 pip3 install -U openai-whisper pip3 install githttps://github.com/openai/whisper.git pip3 install --upgrade --no-deps --force-reinstall githttps://github.com/…...
Linux:进程概述(什么是进程、进程控制块PCB、并发与并行、进程的状态、进程的相关命令)
进程概述 (1)What(什么是进程) 程序:磁盘上的可执行文件,它占用磁盘、是一个静态概念 进程:程序执行之后的状态,占用CPU和内存,是一个动态概念;每一个进程都有一个对应的进程控制块…...
Unity UGUI 之 坐标转换
本文仅作学习笔记与交流,不作任何商业用途 本文包括但不限于unity官方手册,唐老狮,麦扣教程知识,引用会标记,如有不足还请斧正 本文在发布时间选用unity 2022.3.8稳定版本,请注意分别 前置知识:…...
使用 uPlot 在 Vue 中创建交互式图表
本文由ScriptEcho平台提供技术支持 项目地址:传送门 使用 uPlot 在 Vue 中创建交互式图表 应用场景介绍 uPlot 是一个轻量级、高性能的图表库,适用于创建各种交互式图表。它具有丰富的功能,包括可自定义的轴、网格、刻度和交互性。本篇博…...
SpringBoot 项目配置文件注释乱码的问题解决方案
一、问题描述 在项目的配置文件中,我们写了一些注释,如下所示: 但是再次打开注释会变成乱码,如下所示: 那么如何解决呢? 二、解决方案 1. 点击” File→Setting" 2. 搜索“File Encodings”, 将框…...
TTS如何正确读AI缩写、金额和数字
案例:Tell me whats AI(a i), you need pay $186.30, your card Number is 1 2 3, your work Number is 5 6 7 8...
python基础知识点(蓝桥杯python科目个人复习计划75)
第一题:ip补充 题目描述: 小蓝的ip地址为192.168.*.21,其中*是一个数字,请问这个数字最大可能是多少? import os import sys# 请在此输入您的代码 print("255") 第二题:出现最多的字符 题目描…...
小技巧:如何在已知PDF密码情况下去掉PDF的密码保护
第一步,用Edge打开你的pdf,输入密码进去 第二步,点击打印 第三步,选择导出PDF,选择彩印 第四步,选择导出位置,导出成功后打开发现没有密码限制了!...
Java泛型的介绍和基本使用
什么是泛型 泛型就是将类型参数化,比如定义了一个栈,你必须在定义之前声明这个栈中存放的数据的类型,是int也好是double或者其他的引用数据类型也好,定义好了之后这个栈就无法用来存放其他类型的数据。如果这时候我们想要使用这…...
【C++】动态内存管理与模版
目录 1、关键字new: 1、用法: 2、理解: 3、与malloc的相同与不同: 1、相同: 2、不同: 2、模版初阶: 1、函数模版: 1、概念: 2、关键字:template&…...
MongoDB - 组合聚合阶段:$group、$match、$limit、$sort、$skip、$project、$count
文章目录 1. $group2. $group-> $project2.1 $group2.2 $group-> $project2.3 SpringBoot 整合 MongoDB 3. $match-> $group -> $match3.1 $match3.2 $match-> $group3.3 $match-> $group-> $match3.4 SpringBoot 整合 MongoDB 4. $match-> $group->…...
vue element-ui日期控件传参
前端:Vue element-ui <el-form-item label"过期时间" :rules"[ { required: true, message: 请选择过期时间, trigger: blur }]"><el-date-picker v-model"form.expireTime" type"date" format"yyyy-MM-dd&…...
MacOS安装SDKMan管理Java版本
文章目录 1 简介2 安装与卸载2.1 安装2.2 卸载 3 使用3.1 查看其他工具:支持 Ant, Maven 等3.2 查看Java版本3.3 安装Java,加上相关的版本3.4 设置Java版本(全局)3.5 只在当前窗口生效3.6 卸载1 默认环境无法卸载 4 jdk安装的位置5 与IDEA集成参考 1 简介…...
【网络安全的神秘世界】文件包含漏洞
🌝博客主页:泥菩萨 💖专栏:Linux探索之旅 | 网络安全的神秘世界 | 专接本 | 每天学会一个渗透测试工具 一、概述 文件包含:重复使用的函数写在文件里,需要使用某个函数时直接调用此文件,而无需再…...
并发编程--volatile
1.什么是volatile volatile是 轻 量 级 的 synchronized,它在多 处 理器开 发 中保 证 了共享 变 量的 “ 可 见 性 ” 。可 见 性的意思是当一个 线 程 修改一个共享变 量 时 ,另外一个 线 程能 读 到 这 个修改的 值 。如果 volatile 变 量修 饰 符使用…...
记录unraid docker更新的域名
背景:级联 一、安装内容 unraid更新docker,之前一直失败,修改网络后可以进行安装。 二、查看域名 查看域名,发现是走github的,怪不得有一些docker无法正常更新 三、解决方法 更改代理,这里为unraid的…...
SpringCloud+Vue3多对多,多表联查
♥️作者:小宋1021 🤵♂️个人主页:小宋1021主页 ♥️坚持分析平时学习到的项目以及学习到的软件开发知识,和大家一起努力呀!!! 🎈🎈加油! 加油!…...
麒麟系统信创改造
麒麟系统信创改造 一、查看操作系统架构下载相应的依赖,压缩包1、查看Linux系统架构、CPU(1)uname -m(2)lscpu(3)cat /proc/cpuinfo(4)arch(5)getconf LONG_BIT(6)dmidecode2、根据Linux系统架构、CPU的差异进行下载相关依赖,看第二项二、以下是根据本系统的aarc…...
【Android】ListView和RecyclerView知识总结
文章目录 ListView步骤适配器AdpterArrayAdapterSimpleAdapterBaseAdpter效率问题 RecyclerView具体实现不同布局形式的设置横向滚动瀑布流网格 点击事件 ListView ListView 是 Android 中的一种视图组件,用于显示可滚动的垂直列表。每个列表项都是一个视图对象&…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
.Net Framework 4/C# 关键字(非常用,持续更新...)
一、is 关键字 is 关键字用于检查对象是否于给定类型兼容,如果兼容将返回 true,如果不兼容则返回 false,在进行类型转换前,可以先使用 is 关键字判断对象是否与指定类型兼容,如果兼容才进行转换,这样的转换是安全的。 例如有:首先创建一个字符串对象,然后将字符串对象隐…...
Python 包管理器 uv 介绍
Python 包管理器 uv 全面介绍 uv 是由 Astral(热门工具 Ruff 的开发者)推出的下一代高性能 Python 包管理器和构建工具,用 Rust 编写。它旨在解决传统工具(如 pip、virtualenv、pip-tools)的性能瓶颈,同时…...
处理vxe-table 表尾数据是单独一个接口,表格tableData数据更新后,需要点击两下,表尾才是正确的
修改bug思路: 分别把 tabledata 和 表尾相关数据 console.log() 发现 更新数据先后顺序不对 settimeout延迟查询表格接口 ——测试可行 升级↑:async await 等接口返回后再开始下一个接口查询 ________________________________________________________…...
20个超级好用的 CSS 动画库
分享 20 个最佳 CSS 动画库。 它们中的大多数将生成纯 CSS 代码,而不需要任何外部库。 1.Animate.css 一个开箱即用型的跨浏览器动画库,可供你在项目中使用。 2.Magic Animations CSS3 一组简单的动画,可以包含在你的网页或应用项目中。 3.An…...
【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论
路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中(图1): mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...
上位机开发过程中的设计模式体会(1):工厂方法模式、单例模式和生成器模式
简介 在我的 QT/C 开发工作中,合理运用设计模式极大地提高了代码的可维护性和可扩展性。本文将分享我在实际项目中应用的三种创造型模式:工厂方法模式、单例模式和生成器模式。 1. 工厂模式 (Factory Pattern) 应用场景 在我的 QT 项目中曾经有一个需…...
保姆级【快数学会Android端“动画“】+ 实现补间动画和逐帧动画!!!
目录 补间动画 1.创建资源文件夹 2.设置文件夹类型 3.创建.xml文件 4.样式设计 5.动画设置 6.动画的实现 内容拓展 7.在原基础上继续添加.xml文件 8.xml代码编写 (1)rotate_anim (2)scale_anim (3)translate_anim 9.MainActivity.java代码汇总 10.效果展示 逐帧…...
C++中vector类型的介绍和使用
文章目录 一、vector 类型的简介1.1 基本介绍1.2 常见用法示例1.3 常见成员函数简表 二、vector 数据的插入2.1 push_back() —— 在尾部插入一个元素2.2 emplace_back() —— 在尾部“就地”构造对象2.3 insert() —— 在任意位置插入一个或多个元素2.4 emplace() —— 在任意…...
aurora与pcie的数据高速传输
设备:zynq7100; 开发环境:window; vivado版本:2021.1; 引言 之前在前面两章已经介绍了aurora读写DDR,xdma读写ddr实验。这次我们做一个大工程,pc通过pcie传输给fpga,fpga再通过aur…...
