Kafka之消费者客户端
1、历史上的二个版本
与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:
- 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端。
- 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端。
2、必要的参数配置
-
bootstrap.servers
用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。
不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。
建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。
-
group.id
消费者隶属消费组的名称。
-
key.deserializer 和 value.deserializer
与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。
用来将字节数组中的key和value反序列化还原为原来的对象格式。
3、订阅主题与分区
一个消费者可以订阅一个或多个主题。
Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。
这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。
其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。
综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
4、消费消息
消息消费一般有两种方式:
- 推模式:服务器主动将消息推送给消费者。
- 拉模式:消费者主动向服务器发起请求来来取信息。
Kafka采用的消息消费模式是拉模式。
在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。
5、位移提交
在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。
对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。
对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”。
通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
通过上图我们可以了解到如下信息:
- 正在消费的消息下标为3。
- 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
- 对于分区来说,下标4则作为下一个消息要写入的位置。
- 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。
Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。
根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。
-
消息丢失
假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。
-
重复消费
假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。
通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。
6、指定位移消息
首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?
为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:
- latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
- earliest:意为消费者会从起始处也就是0开始消费。
- none:直接抛出NoOffsetForPartitionException异常。
7、再均衡
所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。
再均衡的过程中,消费组内的消费者无法读取消息。
再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。
8、消费者拦截器
我们可以通过消费者拦截器在poll返回消息之前和消费位移提交之后进行一些特定的处理。
9、多线程实现
为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。
有三种多线程的实现方式:
- 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图:
- 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
- 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图:
在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
基于这种实现方式提供以下两种实现方案:- 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
- 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图:
窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。
上一篇:Kafka之消费组与消费者
相关文章:

Kafka之消费者客户端
1、历史上的二个版本 与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本: 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端。新消费…...

使用Python进行数据分析入门
文章目录 Python环境搭建安装Anaconda验证安装 必备库介绍NumPyPandasMatplotlibSciPy 数据导入与清洗导入数据清洗数据 数据探索与分析描述性统计相关性分析 数据可视化绘制直方图 高级主题机器学习深度学习 总结 随着大数据时代的到来,数据分析变得越来越重要。Py…...
ubuntu20 从源码编译升级到版本5.15.263
author: hjjdebug date: 2024年 10月 25日 星期五 15:38:48 CST description: ubuntu20 从源码编译升级到版本5.15.263 我的内核是 5.15.105, 用apt 下载源码后其版本是5.15.263 为什么要从源码编译内核. 升级内核? 目的: 练练手. 消除内核神秘性. 还可以裁减内核,也是调试内核…...
php 程序开发分层与验证思想
在PHP程序开发中,合理的层级设计可以提高代码的可维护性、可扩展性和可测试性。以下是常见的层级设计模式及建议: 1. 分层架构 通常可以将PHP应用分为以下几层: 表示层(Presentation Layer): 负责与用户交…...
关于InternVL2的单卡、多卡推理
关于InternVL2的单卡、多卡推理 前言单卡推理多卡推理总结前言 本章节将介绍如何使用上一章节微调后的模型进行推理。推理又分为单卡和多卡,这里介绍的两种方式都是Hugging Face的transformers方法进行推理。模型的话可以使用上一章微调的任意一个非lora模型进行测试。 单卡推…...
Go语言设计Web框架
如何设计一个Web框架 项目规划 在开始设计Web框架之前,我们需要对整个项目进行规划。主要包括以下几个方面: 项目结构依赖管理路由设计控制器设计日志和配置管理 项目结构 首先,我们定义项目的目录结构: ├── cmd/ │ └…...

2024年10月28日练习(双指针算法)
一.11. 盛最多水的容器 - 力扣(LeetCode) 1.题目描述: 这个题目代表的意思就是数组上每个对应的值就相当于每条垂直线的高度,就相当于短板效应,两 个高度的线会取最短的长度因为那样水才不会漏。而两条线的数组的下标…...

Objective-C 音频爬虫:实时接收数据的 didReceiveData_ 方法
在互联网技术领域,数据的获取和处理是至关重要的。尤其是对于音频内容的获取,实时性和效率是衡量一个爬虫性能的重要指标。本文将深入探讨在Objective-C中实现音频爬虫时,如何高效地使用didReceiveData:方法来实时接收数据,并通过…...

提升网站流量和自然排名的SEO基本知识与策略分析
内容概要 在当今数字化时代,SEO(搜索引擎优化)成为加强网站可见度和提升流量的重要工具。SEO的基础知识包括理解搜索引擎的工作原理,以及如何通过优化网站内容和结构来提高自然排名。白帽SEO和黑帽SEO代表了两种截然不同的策略&a…...
雷池社区版compose文件配置讲解--fvm
在现代网络安全中,选择合适的 Web 应用防火墙至关重要。雷池(SafeLine)社区版免费切好用。为网站提供全面的保护,帮助网站抵御各种网络攻击。 docker-compose.yml 文件是 Docker Compose 的核心文件,用于定义和管理多…...

基于51单片机的智能断路器proteus仿真
地址: https://pan.baidu.com/s/16lfGgrgVr9V7JehonMNVQA 提取码:1234 仿真图: 芯片/模块的特点: AT89C52/AT89C51简介: AT89C52/AT89C51是一款经典的8位单片机,是意法半导体(STMicroelectro…...

(N-154)基于springboot酒店预订管理系统
开发工具:IDEA 服务器:Tomcat9.0, jdk1.8 项目构建:maven 数据库:mysql5.7 前端技术:AdminLTEBootstrapLayUIHTMLjQuery 服务端技术:springbootmybatis-plusthymeleaf 本项目分前台和后台…...

elasticsearch 8.x 插件安装(三)之拼音插件
elasticsearch 8.x 插件安装(三)之拼音插件 elasticsearch插件安装合集 elasticsearch插件安装(一)之ik分词器安装(含MySQL更新) elasticsearch 8.x插件(二)之同义词安装如何解决…...

快速遍历包含合并单元格的Word表格
Word中的合并表格如下,现在需要根据子类(例如:果汁)查找对应的品类,如果这是Excel表格,那么即使包含合并单元格,也很容易处理,但是使用Word VBA进行查找,就需要一些技巧。…...
手机收银云进销存管理软件,商品档案Excel格式批量导入导出,一键导入Excel的商品档案
如果您有Excel的商品档案,那么就可以批量导入到我们的手机云进销存软件系统里,就不需要人工手工一个个商品的新建商品档案,大大提高工作效率。如果您看下面的步骤不会操作,可以联系我们技术支持,来帮您把商品档案导入。…...
html 中识别\n自动换行
CSS实现:white-space <div style"white-space: pre-wrap;" v-html"str"> </div>white-space: normal|nowrap|pre|pre-line|pre-wrap|initial|inherit;值描述换行符空格和制表符文字换行行尾空格normal默认。空白会被浏览器忽略。合…...
用QWebSocketServer写websocket服务端
1. 引入必要的头文件 #include <QCoreApplication> #include <QWebSocketServer> #include <QWebSocket> #include <QDebug> #include <QObject>QCoreApplication:用于创建控制台应用的事件循环。QWebSocketServer:提供 …...
云原生后端:现代应用架构的核心力量
云原生后端:现代应用架构的核心力量 云原生后端是基于云环境进行设计和开发的一种理念,利用云服务和云原生技术构建的服务端应用。它旨在提供灵活、高效、弹性和可扩展的解决方案,成为推动应用现代化的核心力量。本文将详细探讨云原生后端的…...

arcgis中dem转模型导入3dmax
文末分享素材 效果 1、准备数据 (1)DEM (2)DOM 2、打开arcscene软件 3、加载DEM、DOM数据 4、设置DOM的高度为DEM...

Python自动化测试中的Mock与单元测试实战
在软件开发过程中,自动化测试是确保代码质量和稳定性的关键一环。而Python作为一门灵活且强大的编程语言,提供了丰富的工具和库来支持自动化测试。本文将深入探讨如何结合Mock与单元测试,利用Python进行自动化测试,以提高代码的可…...

华为OD机试-食堂供餐-二分法
import java.util.Arrays; import java.util.Scanner;public class DemoTest3 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseint a in.nextIn…...

C++使用 new 来创建动态数组
问题: 不能使用变量定义数组大小 原因: 这是因为数组在内存中是连续存储的,编译器需要在编译阶段就确定数组的大小,以便正确地分配内存空间。如果允许使用变量来定义数组的大小,那么编译器就无法在编译时确定数组的大…...

网站指纹识别
网站指纹识别 网站的最基本组成:服务器(操作系统)、中间件(web容器)、脚本语言、数据厍 为什么要了解这些?举个例子:发现了一个文件读取漏洞,我们需要读/etc/passwd,如…...

从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践
作者:吴岐诗,杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言:融合数据湖与数仓的创新之路 在数字金融时代,数据已成为金融机构的核心竞争力。杭银消费金…...

MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
深入理解Optional:处理空指针异常
1. 使用Optional处理可能为空的集合 在Java开发中,集合判空是一个常见但容易出错的场景。传统方式虽然可行,但存在一些潜在问题: // 传统判空方式 if (!CollectionUtils.isEmpty(userInfoList)) {for (UserInfo userInfo : userInfoList) {…...
适应性Java用于现代 API:REST、GraphQL 和事件驱动
在快速发展的软件开发领域,REST、GraphQL 和事件驱动架构等新的 API 标准对于构建可扩展、高效的系统至关重要。Java 在现代 API 方面以其在企业应用中的稳定性而闻名,不断适应这些现代范式的需求。随着不断发展的生态系统,Java 在现代 API 方…...

何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡
何谓AI编程【02】AI编程官网以优雅草星云智控为例建设实践-完善顶部-建立各项子页-调整排版-优雅草卓伊凡 背景 我们以建设星云智控官网来做AI编程实践,很多人以为AI已经强大到不需要程序员了,其实不是,AI更加需要程序员,普通人…...
游戏开发中常见的战斗数值英文缩写对照表
游戏开发中常见的战斗数值英文缩写对照表 基础属性(Basic Attributes) 缩写英文全称中文释义常见使用场景HPHit Points / Health Points生命值角色生存状态MPMana Points / Magic Points魔法值技能释放资源SPStamina Points体力值动作消耗资源APAction…...
CppCon 2015 学习:Reactive Stream Processing in Industrial IoT using DDS and Rx
“Reactive Stream Processing in Industrial IoT using DDS and Rx” 是指在工业物联网(IIoT)场景中,结合 DDS(Data Distribution Service) 和 Rx(Reactive Extensions) 技术,实现 …...