Kafka系列之如何提高消费者消费速度
前言
在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。
实现方案
为了提高消费者的消费速度,我们可以采取以下措施:
- 将主题的分区数量增大,如 20,通过
concurrency将消费者的消费线程数增大到 10(2个pod),提高消息处理的并发能力。 - 将每次批量拉取消息的数量
max.poll.records增大到 500,提高单次处理消息的数量。 - 将消息切分成批次,将单个批次的数据处理业务逻辑放进线程池中异步进行,提高并发处理消息的速度。
- 将异步线程池的拒绝模式调整为 CallerRunsPolicy,这个配置非常重要。当线程池的任务队列已满且所有线程都在忙碌时,新的任务将由提交任务的线程(即调用者线程)来执行。否则在消息量特别大的情况下,很可能会因为线程池任务队列满了而丢失数据。
- 将异步线程池的队列容量设置为 0,这样意味着所有任务必须立即由线程池中的线程来处理,减少在队列中的等待时间。
- 在数据上报的时候进行幂等性验证,防止重复上报数据。
@Component
public class OrderConsumer {@Resource(name = "execThreadPool")private ThreadPoolTaskExecutor execThreadPool;@KafkaListener(id = "record_consumer",topics = "record",groupId = "g_record_consumer",concurrency = "10",properties = {"max.poll.interval.ms:300000", "max.poll.records:500"})public void consume(ConsumerRecords<String, String> records, Acknowledgment ack) {execThreadPool.submit(()-> {// 业务逻辑});ack.acknowledge();}}
ThreadPoolTaskExecutor 是 Spring 框架提供的一个线程池实现,用于管理和执行多线程任务。它是 TaskExecutor 接口的实现,提供了在 Spring 应用程序中创建和配置线程池的便捷方式。
ThreadPoolTaskExecutor主要特点:
-
线程池配置: ThreadPoolTaskExecutor 允许你配置核心线程数、最大线程数、队列容量等线程池属性。
-
线程创建和销毁: 它会根据任务的需求自动创建和销毁线程,避免不必要的线程创建和销毁开销。
-
线程复用: 线程池中的线程可以被复用,从而减少线程创建的开销。
-
队列管理: 当线程池达到最大线程数时,新任务会被放入队列中等待执行。
-
拒绝策略: 当线程池已满并且队列也已满时,可以配置拒绝策略来处理新任务的方式。
RejectedExecutionHandler 是 Java 线程池的一个重要接口,用于定义当线程池已满并且无法接受新任务时,如何处理被拒绝的任务。当线程池的队列和线程都已满,新任务就会被拒绝执行,这时就会使用 RejectedExecutionHandler 来处理这些被拒绝的任务。
在 Java 中,有几种内置的 RejectedExecutionHandler 实现可供选择,每种实现都有不同的拒绝策略:
AbortPolicy(默认策略): 这是默认的拒绝策略,它会抛出一个 RejectedExecutionException 异常,表示任务被拒绝执行。
CallerRunsPolicy: 当线程池已满时,将任务返回给提交任务的调用者(Caller)。这意味着提交任务的线程会尝试执行被拒绝的任务。
DiscardPolicy: 这个策略会默默地丢弃被拒绝的任务,不会产生任何异常。
DiscardOldestPolicy: 这个策略会丢弃队列中最老的任务,然后尝试将新任务添加到队列中。除了这些内置的策略,你还可以实现自定义的 RejectedExecutionHandler 接口,以定义特定于你应用程序需求的拒绝策略。你可以根据业务需求来决定拒绝策略,比如记录日志、通知管理员、重试等。
@Configuration
public class ThreadPoolConfig {@Beanprivate ThreadPoolTaskExecutor execThreadPool() {ThreadPoolTaskExecutor pool = new ThreadPoolTaskExecutor();pool.setCorePoolSize(50); // 核心线程数pool.setMaxPoolSize(10000); // 最大线程数pool.setQueueCapacity(0); // 等待队列sizepool.setKeepAliveSeconds(60); // 线程最大空闲存活时间pool.setWaitForTasksToCompleteOnShutdown(true);pool.setAwaitTerminationSeconds(60); // 程序shutdown时最多等60秒钟让现存任务结束pool.setRejectedExecutionHandler(new CallerRunsPolicy()); // 拒绝策略return pool;}
}
通过以上方案,我们可以提高消费侧的TPS,同时杜绝重复上报的现象,极大提高数据准确性和用户体验。
相关文章:
Kafka系列之如何提高消费者消费速度
前言 在实际开发过程中,如果使用Kafka处理超大数据量(千万级、亿级)的场景,Kafka消费者的消费速度可能决定系统性能瓶颈。 实现方案 为了提高消费者的消费速度,我们可以采取以下措施: 将主题的分区数量增大,如 20&…...
mac安装Whisper
Whisper 官方git https://github.com/openai/whisper?tabreadme-ov-file 基本上参考官方的安装流程 pip3 install -U openai-whisper pip3 install githttps://github.com/openai/whisper.git pip3 install --upgrade --no-deps --force-reinstall githttps://github.com/…...
Linux:进程概述(什么是进程、进程控制块PCB、并发与并行、进程的状态、进程的相关命令)
进程概述 (1)What(什么是进程) 程序:磁盘上的可执行文件,它占用磁盘、是一个静态概念 进程:程序执行之后的状态,占用CPU和内存,是一个动态概念;每一个进程都有一个对应的进程控制块…...
Unity UGUI 之 坐标转换
本文仅作学习笔记与交流,不作任何商业用途 本文包括但不限于unity官方手册,唐老狮,麦扣教程知识,引用会标记,如有不足还请斧正 本文在发布时间选用unity 2022.3.8稳定版本,请注意分别 前置知识:…...
使用 uPlot 在 Vue 中创建交互式图表
本文由ScriptEcho平台提供技术支持 项目地址:传送门 使用 uPlot 在 Vue 中创建交互式图表 应用场景介绍 uPlot 是一个轻量级、高性能的图表库,适用于创建各种交互式图表。它具有丰富的功能,包括可自定义的轴、网格、刻度和交互性。本篇博…...
SpringBoot 项目配置文件注释乱码的问题解决方案
一、问题描述 在项目的配置文件中,我们写了一些注释,如下所示: 但是再次打开注释会变成乱码,如下所示: 那么如何解决呢? 二、解决方案 1. 点击” File→Setting" 2. 搜索“File Encodings”, 将框…...
TTS如何正确读AI缩写、金额和数字
案例:Tell me whats AI(a i), you need pay $186.30, your card Number is 1 2 3, your work Number is 5 6 7 8...
python基础知识点(蓝桥杯python科目个人复习计划75)
第一题:ip补充 题目描述: 小蓝的ip地址为192.168.*.21,其中*是一个数字,请问这个数字最大可能是多少? import os import sys# 请在此输入您的代码 print("255") 第二题:出现最多的字符 题目描…...
小技巧:如何在已知PDF密码情况下去掉PDF的密码保护
第一步,用Edge打开你的pdf,输入密码进去 第二步,点击打印 第三步,选择导出PDF,选择彩印 第四步,选择导出位置,导出成功后打开发现没有密码限制了!...
Java泛型的介绍和基本使用
什么是泛型 泛型就是将类型参数化,比如定义了一个栈,你必须在定义之前声明这个栈中存放的数据的类型,是int也好是double或者其他的引用数据类型也好,定义好了之后这个栈就无法用来存放其他类型的数据。如果这时候我们想要使用这…...
【C++】动态内存管理与模版
目录 1、关键字new: 1、用法: 2、理解: 3、与malloc的相同与不同: 1、相同: 2、不同: 2、模版初阶: 1、函数模版: 1、概念: 2、关键字:template&…...
MongoDB - 组合聚合阶段:$group、$match、$limit、$sort、$skip、$project、$count
文章目录 1. $group2. $group-> $project2.1 $group2.2 $group-> $project2.3 SpringBoot 整合 MongoDB 3. $match-> $group -> $match3.1 $match3.2 $match-> $group3.3 $match-> $group-> $match3.4 SpringBoot 整合 MongoDB 4. $match-> $group->…...
vue element-ui日期控件传参
前端:Vue element-ui <el-form-item label"过期时间" :rules"[ { required: true, message: 请选择过期时间, trigger: blur }]"><el-date-picker v-model"form.expireTime" type"date" format"yyyy-MM-dd&…...
MacOS安装SDKMan管理Java版本
文章目录 1 简介2 安装与卸载2.1 安装2.2 卸载 3 使用3.1 查看其他工具:支持 Ant, Maven 等3.2 查看Java版本3.3 安装Java,加上相关的版本3.4 设置Java版本(全局)3.5 只在当前窗口生效3.6 卸载1 默认环境无法卸载 4 jdk安装的位置5 与IDEA集成参考 1 简介…...
【网络安全的神秘世界】文件包含漏洞
🌝博客主页:泥菩萨 💖专栏:Linux探索之旅 | 网络安全的神秘世界 | 专接本 | 每天学会一个渗透测试工具 一、概述 文件包含:重复使用的函数写在文件里,需要使用某个函数时直接调用此文件,而无需再…...
并发编程--volatile
1.什么是volatile volatile是 轻 量 级 的 synchronized,它在多 处 理器开 发 中保 证 了共享 变 量的 “ 可 见 性 ” 。可 见 性的意思是当一个 线 程 修改一个共享变 量 时 ,另外一个 线 程能 读 到 这 个修改的 值 。如果 volatile 变 量修 饰 符使用…...
记录unraid docker更新的域名
背景:级联 一、安装内容 unraid更新docker,之前一直失败,修改网络后可以进行安装。 二、查看域名 查看域名,发现是走github的,怪不得有一些docker无法正常更新 三、解决方法 更改代理,这里为unraid的…...
SpringCloud+Vue3多对多,多表联查
♥️作者:小宋1021 🤵♂️个人主页:小宋1021主页 ♥️坚持分析平时学习到的项目以及学习到的软件开发知识,和大家一起努力呀!!! 🎈🎈加油! 加油!…...
麒麟系统信创改造
麒麟系统信创改造 一、查看操作系统架构下载相应的依赖,压缩包1、查看Linux系统架构、CPU(1)uname -m(2)lscpu(3)cat /proc/cpuinfo(4)arch(5)getconf LONG_BIT(6)dmidecode2、根据Linux系统架构、CPU的差异进行下载相关依赖,看第二项二、以下是根据本系统的aarc…...
【Android】ListView和RecyclerView知识总结
文章目录 ListView步骤适配器AdpterArrayAdapterSimpleAdapterBaseAdpter效率问题 RecyclerView具体实现不同布局形式的设置横向滚动瀑布流网格 点击事件 ListView ListView 是 Android 中的一种视图组件,用于显示可滚动的垂直列表。每个列表项都是一个视图对象&…...
【kafka】Golang实现分布式Masscan任务调度系统
要求: 输出两个程序,一个命令行程序(命令行参数用flag)和一个服务端程序。 命令行程序支持通过命令行参数配置下发IP或IP段、端口、扫描带宽,然后将消息推送到kafka里面。 服务端程序: 从kafka消费者接收…...
<6>-MySQL表的增删查改
目录 一,create(创建表) 二,retrieve(查询表) 1,select列 2,where条件 三,update(更新表) 四,delete(删除表…...
从零实现富文本编辑器#5-编辑器选区模型的状态结构表达
先前我们总结了浏览器选区模型的交互策略,并且实现了基本的选区操作,还调研了自绘选区的实现。那么相对的,我们还需要设计编辑器的选区表达,也可以称为模型选区。编辑器中应用变更时的操作范围,就是以模型选区为基准来…...
iPhone密码忘记了办?iPhoneUnlocker,iPhone解锁工具Aiseesoft iPhone Unlocker 高级注册版分享
平时用 iPhone 的时候,难免会碰到解锁的麻烦事。比如密码忘了、人脸识别 / 指纹识别突然不灵,或者买了二手 iPhone 却被原来的 iCloud 账号锁住,这时候就需要靠谱的解锁工具来帮忙了。Aiseesoft iPhone Unlocker 就是专门解决这些问题的软件&…...
STM32标准库-DMA直接存储器存取
文章目录 一、DMA1.1简介1.2存储器映像1.3DMA框图1.4DMA基本结构1.5DMA请求1.6数据宽度与对齐1.7数据转运DMA1.8ADC扫描模式DMA 二、数据转运DMA2.1接线图2.2代码2.3相关API 一、DMA 1.1简介 DMA(Direct Memory Access)直接存储器存取 DMA可以提供外设…...
c++ 面试题(1)-----深度优先搜索(DFS)实现
操作系统:ubuntu22.04 IDE:Visual Studio Code 编程语言:C11 题目描述 地上有一个 m 行 n 列的方格,从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子,但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
《基于Apache Flink的流处理》笔记
思维导图 1-3 章 4-7章 8-11 章 参考资料 源码: https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…...
IoT/HCIP实验-3/LiteOS操作系统内核实验(任务、内存、信号量、CMSIS..)
文章目录 概述HelloWorld 工程C/C配置编译器主配置Makefile脚本烧录器主配置运行结果程序调用栈 任务管理实验实验结果osal 系统适配层osal_task_create 其他实验实验源码内存管理实验互斥锁实验信号量实验 CMISIS接口实验还是得JlINKCMSIS 简介LiteOS->CMSIS任务间消息交互…...
成都鼎讯硬核科技!雷达目标与干扰模拟器,以卓越性能制胜电磁频谱战
在现代战争中,电磁频谱已成为继陆、海、空、天之后的 “第五维战场”,雷达作为电磁频谱领域的关键装备,其干扰与抗干扰能力的较量,直接影响着战争的胜负走向。由成都鼎讯科技匠心打造的雷达目标与干扰模拟器,凭借数字射…...
