当前位置: 首页 > news >正文

Kafka之消费者客户端

1、历史上的二个版本

与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本:

  • 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端
  • 新消费者客户端(New Consumer):从Kafka 0.9.0版本之后基于Java语言开发的版本,又称为Java消费者客户端

2、必要的参数配置

  • bootstrap.servers

    用来指定连接Kafka集群所需的broker地址清单,形式为:host1:port1,host2:port2,…,多个broker之间以“,”隔开。

    不用将所有broker列出来,消费者可以根据一个broker查询到其他broker。

    建议至少配置2个或2个以上的broker,防止只有一个broker的话,宕机的时候就无法连接到Kafka集群了。

  • group.id

    消费者隶属消费组的名称。

  • key.deserializer 和 value.deserializer

    与生产者客户端 KafkaProducer中的key.serializer和value.serializer参数对应。

    用来将字节数组中的key和value反序列化还原为原来的对象格式。

3、订阅主题与分区

一个消费者可以订阅一个或多个主题。

Kafka消费者客户端提供了三种订阅方式:集合订阅subscribe(Collection)、正则表达式订阅subscribe(Pattern)、指定分区订阅assign(Collection)。

这三种订阅方式分别代表了三种不同的订阅状态,依次为AUTO_TOPICS、 AUTO_PATTERN、USER_ASSIGNED。如果没有订阅,订阅状态为NONE。

其中的集合订阅subscribe(Collection)和正则表达式订阅subscribe(Pattern)这两种订阅方式有消费者自动再均衡的功能,可以根据分区分配策略自动的为消费者分配对应的分区。而指定分区订阅assign(Collection)方式则不具备消费者自动再均衡的功能。

综上所述梳理了一张关于订阅方式、订阅状态和再均衡功能的关系表:
在这里插入图片描述

4、消费消息

消息消费一般有两种方式:

  • 推模式:服务器主动将消息推送给消费者。
  • 拉模式:消费者主动向服务器发起请求来来取信息。

Kafka采用的消息消费模式是拉模式。

在拉取消息的时候有一个超时时间参数(timeout),如果消费者的缓存区中无可用数据(即没有要消费消息),我们可以通过这个timeout参数来设置等待的时长。如果timeout=0,则不管有无数据立刻返回结果。

5、位移提交

在Kafka的分区当中,每一个消息都有一个唯一的标识offset,我们可以用它来表示消息在分区中的位置。

对于消费者而言,也有一个offset的概念,我们可以用它来表示消费到分区中某消息的位置。

对于offset这个单词,我们既可以翻译为偏移量,也可以翻译为位移,并没有什么严格的区分。但是为了更好的区分不同的使用场景,我们可以将用来表示消息在分区中位置的offset称为偏移量。对于用来表示消费者消费到的消息所处位置的offset称为位移,更明确的话称为“消费位移”

通过下图希望能够帮助大家更清晰的理解:偏移量、消费位移、位移提交。
在这里插入图片描述
通过上图我们可以了解到如下信息:

  1. 正在消费的消息下标为3。
  2. 所以对于分区来说,它的偏移量为3;对于消费者来说,它的消费位移也为3。
  3. 对于分区来说,下标4则作为下一个消息要写入的位置。
  4. 对于消费者来说,将要提交的消费位移(即位移提交)是下标4。

Kafka默认情况下,消费位移的提交方式为自动提交,提交间隔时间默认为5秒。

根据位移提交的具体情况,可能会出现重复消费和消息丢失的现象。我们通过下面一个例子更详细介绍下重复消费和消息丢失是如何出现的。让我们先来看一张图:
在这里插入图片描述
根据上图,我们假设本次拉取的消息为x+2 ~ x+7,x+2为上一次的提交的消费位移,x+8为下一次要提交的消费位移,目前正在处理x+5。

  • 消息丢失

    假设我们在处理x+5之前(即在处理x+0或x+1或x+2…)就提交了本次的消费位移(即x+8),当到处理x+5的时候出现了异常,恢复后,就要从x+8开始拉取了,此时x+5、x+6、x+7实际上并没有被消费,这样便发生了消息丢失的现象。(在消费消息出现异常之前就执行了位移提交)。

  • 重复消费

    假设我们在处理x+5的时候出现了异常,此时还没有提交本次的消费位移(即x+8),恢复后,就还需要从x+2开始拉取消息,这样x+2 ~ x+4就又得再消费一次,这种现象就是重新消费。(在消费消息出现异常之前没有执行位移提交)。

通过以上的描述我们还可以发现:拉取线程和消息处理线程完全是两个独立的线程。

6、指定位移消息

首先提出一个问题:当消费者遇到无法获取所记录的消费位移的时候该怎么办?

为了要解决这个问题,消费者客户端提供了auto.offset.reset参数,用来在遇到这种情况的时候告诉消费者客户端从哪里开始拉取消息消费,该参数的值有几种选择:

  • latest:默认值,意为从分区末尾开始消费消息(即分区中下一条消息要写入的位置)。
  • earliest:意为消费者会从起始处也就是0开始消费。
  • none:直接抛出NoOffsetForPartitionException异常。

7、再均衡

所谓再均衡就是将一个分区的所属权从一个消费者转移到另外一个消费者。

再均衡的过程中,消费组内的消费者无法读取消息。

再均衡后,可能会出现重复消费的情况。因为再均衡的时候,消费者会丢掉当前的状态。如果在上一个消费者(即具有分区所属权的消费者)正在消费消息(已消费了一部分消息了)还没有来得及提交消费位移的时候就发生了再均衡,那么新的消费者(分区所属权转移后的消费者)会重新拉取曾经消费过的消息再消费一遍。

8、消费者拦截器

我们可以通过消费者拦截器在poll返回消息之前消费位移提交之后进行一些特定的处理。

9、多线程实现

为了提高整体的消费能力,我们对消费者客户端采取多线程来实现。

有三种多线程的实现方式:

  1. 线程封闭,即为每一个线程实现一个KafkaConsumer对象,如下图: 在这里插入图片描述
  2. 多个消费线程同时消费一个分区,通过assign()、seek()等方法实现,打破了原有的消费线程的个数不能超过分区个数的限制。但是这种实现方式会使位移提交和顺序控制变得非常负责,实际场景中很少会用到。
  3. 将处理消息的逻辑改为多线程实现,也就是在一个KafkaConsumer对象中有多个处理消息的handler线程,如下图: 在这里插入图片描述
    在这种实现方式中,为了能够正确的完成位移提交,引入了一个共享变量offsets来参与提交,如下图:
    在这里插入图片描述
    基于这种实现方式提供以下两种实现方案:
    • 通过消费者拉取一个批次的消息,然后再将这些消息交给多线程去处理。
    • 基于滑动窗口来实现,将拉取的消息以批次为单位暂存起来,多个消费线程拉取暂存的消息消费,如下图: 在这里插入图片描述
      窗口滑动过程描述:上一次滑动窗口的范围是2 ~ 5,startOffset为2,当2中的消息都被消费完成后,提交2中的消费位移,窗口向前滑动一格,范围变为3 ~ 6,startOffset变为3。

上一篇:Kafka之消费组与消费者

相关文章:

Kafka之消费者客户端

1、历史上的二个版本 与生产者客户端一样,在Kafka的发展过程当中,消费者客户端主要有两个大的版本: 旧消费者客户端(Old Consumer):基于Scala语言开发的版本,又称为Scala消费者客户端。新消费…...

使用Python进行数据分析入门

文章目录 Python环境搭建安装Anaconda验证安装 必备库介绍NumPyPandasMatplotlibSciPy 数据导入与清洗导入数据清洗数据 数据探索与分析描述性统计相关性分析 数据可视化绘制直方图 高级主题机器学习深度学习 总结 随着大数据时代的到来,数据分析变得越来越重要。Py…...

ubuntu20 从源码编译升级到版本5.15.263

author: hjjdebug date: 2024年 10月 25日 星期五 15:38:48 CST description: ubuntu20 从源码编译升级到版本5.15.263 我的内核是 5.15.105, 用apt 下载源码后其版本是5.15.263 为什么要从源码编译内核. 升级内核? 目的: 练练手. 消除内核神秘性. 还可以裁减内核,也是调试内核…...

php 程序开发分层与验证思想

在PHP程序开发中,合理的层级设计可以提高代码的可维护性、可扩展性和可测试性。以下是常见的层级设计模式及建议: 1. 分层架构 通常可以将PHP应用分为以下几层: 表示层(Presentation Layer): 负责与用户交…...

关于InternVL2的单卡、多卡推理

关于InternVL2的单卡、多卡推理 前言单卡推理多卡推理总结前言 本章节将介绍如何使用上一章节微调后的模型进行推理。推理又分为单卡和多卡,这里介绍的两种方式都是Hugging Face的transformers方法进行推理。模型的话可以使用上一章微调的任意一个非lora模型进行测试。 单卡推…...

Go语言设计Web框架

如何设计一个Web框架 项目规划 在开始设计Web框架之前,我们需要对整个项目进行规划。主要包括以下几个方面: 项目结构依赖管理路由设计控制器设计日志和配置管理 项目结构 首先,我们定义项目的目录结构: ├── cmd/ │ └…...

2024年10月28日练习(双指针算法)

一.11. 盛最多水的容器 - 力扣(LeetCode) 1.题目描述: 这个题目代表的意思就是数组上每个对应的值就相当于每条垂直线的高度,就相当于短板效应,两 个高度的线会取最短的长度因为那样水才不会漏。而两条线的数组的下标…...

Objective-C 音频爬虫:实时接收数据的 didReceiveData_ 方法

在互联网技术领域,数据的获取和处理是至关重要的。尤其是对于音频内容的获取,实时性和效率是衡量一个爬虫性能的重要指标。本文将深入探讨在Objective-C中实现音频爬虫时,如何高效地使用didReceiveData:方法来实时接收数据,并通过…...

提升网站流量和自然排名的SEO基本知识与策略分析

内容概要 在当今数字化时代,SEO(搜索引擎优化)成为加强网站可见度和提升流量的重要工具。SEO的基础知识包括理解搜索引擎的工作原理,以及如何通过优化网站内容和结构来提高自然排名。白帽SEO和黑帽SEO代表了两种截然不同的策略&a…...

雷池社区版compose文件配置讲解--fvm

在现代网络安全中,选择合适的 Web 应用防火墙至关重要。雷池(SafeLine)社区版免费切好用。为网站提供全面的保护,帮助网站抵御各种网络攻击。 docker-compose.yml 文件是 Docker Compose 的核心文件,用于定义和管理多…...

基于51单片机的智能断路器proteus仿真

地址: https://pan.baidu.com/s/16lfGgrgVr9V7JehonMNVQA 提取码:1234 仿真图: 芯片/模块的特点: AT89C52/AT89C51简介: AT89C52/AT89C51是一款经典的8位单片机,是意法半导体(STMicroelectro…...

(N-154)基于springboot酒店预订管理系统

开发工具:IDEA 服务器:Tomcat9.0, jdk1.8 项目构建:maven 数据库:mysql5.7 前端技术:AdminLTEBootstrapLayUIHTMLjQuery 服务端技术:springbootmybatis-plusthymeleaf 本项目分前台和后台…...

elasticsearch 8.x 插件安装(三)之拼音插件

elasticsearch 8.x 插件安装(三)之拼音插件 elasticsearch插件安装合集 elasticsearch插件安装(一)之ik分词器安装(含MySQL更新) elasticsearch 8.x插件(二)之同义词安装如何解决…...

快速遍历包含合并单元格的Word表格

Word中的合并表格如下,现在需要根据子类(例如:果汁)查找对应的品类,如果这是Excel表格,那么即使包含合并单元格,也很容易处理,但是使用Word VBA进行查找,就需要一些技巧。…...

手机收银云进销存管理软件,商品档案Excel格式批量导入导出,一键导入Excel的商品档案

如果您有Excel的商品档案,那么就可以批量导入到我们的手机云进销存软件系统里,就不需要人工手工一个个商品的新建商品档案,大大提高工作效率。如果您看下面的步骤不会操作,可以联系我们技术支持,来帮您把商品档案导入。…...

html 中识别\n自动换行

CSS实现&#xff1a;white-space <div style"white-space: pre-wrap;" v-html"str"> </div>white-space: normal|nowrap|pre|pre-line|pre-wrap|initial|inherit;值描述换行符空格和制表符文字换行行尾空格normal默认。空白会被浏览器忽略。合…...

用QWebSocketServer写websocket服务端

1. 引入必要的头文件 #include <QCoreApplication> #include <QWebSocketServer> #include <QWebSocket> #include <QDebug> #include <QObject>QCoreApplication&#xff1a;用于创建控制台应用的事件循环。QWebSocketServer&#xff1a;提供 …...

云原生后端:现代应用架构的核心力量

云原生后端&#xff1a;现代应用架构的核心力量 云原生后端是基于云环境进行设计和开发的一种理念&#xff0c;利用云服务和云原生技术构建的服务端应用。它旨在提供灵活、高效、弹性和可扩展的解决方案&#xff0c;成为推动应用现代化的核心力量。本文将详细探讨云原生后端的…...

arcgis中dem转模型导入3dmax

文末分享素材 效果 1、准备数据 (1)DEM (2)DOM 2、打开arcscene软件 3、加载DEM、DOM数据 4、设置DOM的高度为DEM...

Python自动化测试中的Mock与单元测试实战

在软件开发过程中&#xff0c;自动化测试是确保代码质量和稳定性的关键一环。而Python作为一门灵活且强大的编程语言&#xff0c;提供了丰富的工具和库来支持自动化测试。本文将深入探讨如何结合Mock与单元测试&#xff0c;利用Python进行自动化测试&#xff0c;以提高代码的可…...

物联网海量数据下的时序数据库选型:InfluxDB、TDEngine、MongoDB与HBase对比与建议

随着物联网&#xff08;IoT&#xff09;的普及&#xff0c;各行业纷纷部署大量传感器、设备生成的数据流&#xff0c;面对如此海量的时间序列数据&#xff0c;如何高效存储、查询和分析成为关键。为此&#xff0c;时序数据库&#xff08;Time Series Database, TSDB&#xff09…...

Python中的数据可视化:Matplotlib基础与高级技巧

Python中的数据可视化&#xff1a;Matplotlib基础与高级技巧 数据可视化是数据分析和数据科学中不可或缺的一部分。通过图表&#xff0c;我们可以更直观地观察数据的分布和趋势。Matplotlib作为Python最基础、也是最广泛使用的绘图库之一&#xff0c;不仅支持多种常用图表&…...

数组名和指针数组名深度复习

#define _CRT_SECURE_NO_WARNINGS #include <stdio.h> //sizeof只关注占用内存空间的大小&#xff0c;不在乎内存中存放的是什么 //是操作符 /* int main() { char arr[] { "abcdef" }; //a b c d e f \0 printf("%d\n", sizeof(arr));//…...

Linux 诞生

目录 Linux诞生背景 Linus Torvalds的创举 Linux内核的首次发布 Linux诞生背景 在计算机操作系统的发展史上&#xff0c;Linux是一个重要的里程碑。它的诞生源于对自由、开放和协作精神的追求&#xff0c;以及对Unix操作系统的深入研究和改进。 在1991年之前&#xff0c;Un…...

借助Aspose.Email,管理受密码保护的 PST 文件

在当今的数字环境中&#xff0c;保护您的数据比以往任何时候都更加重要。确保您的电子邮件数据受到密码保护是维护安全性的关键步骤。对于使用 Microsoft Outlook 数据的开发人员来说&#xff0c;管理受密码保护的 PST&#xff08;个人存储表&#xff09;文件可能是一项关键任务…...

MySQL数据库MHA高可用

目录 一、MHA简述 二、MHA 的组成 三、MHA 的特点 四、MHA工作原理 五、MHA部署步骤 六、搭建 MySQL MHA MHA一主两从高可用集群示意图 实验环境 1. Master、Slave1、Slave2 节点上安装 mysql5.7 2. 关闭防火墙 3. 修改 Master、Slave1、Slave2 节点的主机名 4. 修…...

DevEco Studio使用技巧和插件推荐

DevEco Studio是一款强大的集成开发环境&#xff08;IDE&#xff09;&#xff0c;为开发者提供了丰富的功能和插件。以下是一些使用技巧和插件推荐&#xff1a; 使用技巧 设置中文界面&#xff1a; 打开DevEco Studio&#xff0c;选择“Configure”&#xff0c;再点击“Prefer…...

使用Node.js与Express构建RESTful API

&#x1f496; 博客主页&#xff1a;瑕疵的CSDN主页 &#x1f4bb; Gitee主页&#xff1a;瑕疵的gitee主页 &#x1f680; 文章专栏&#xff1a;《热点资讯》 使用Node.js与Express构建RESTful API 1 引言 2 Node.js与Express简介 3 安装Node.js与Express 4 创建Express项目 5…...

从0开始搭建一个生产级SpringBoot2.0.X项目(二)SpringBoot应用连接数据库集成mybatis-plus

前言 最近有个想法想整理一个内容比较完整springboot项目初始化Demo。 连接Oracle数据库集成mybatis-plus&#xff0c;自定义WrapperFactory。配置代码生成器 一、引入jar包 <!--oracle驱动 --><dependency><groupId>org.springframework.boot</groupI…...

Docker部署教程:打造流畅的斗地主网页小游戏

Docker部署教程&#xff1a;打造流畅的斗地主网页小游戏 一、项目介绍项目简介项目预览 二、系统要求环境要求环境检查Docker版本检查检查操作系统版本 三、部署斗地主网页小游戏下载镜像创建容器检查容器状态查看容器日志安全设置 四、访问斗地主网页小游戏五、总结 一、项目介…...

厦门专业做网站的公司/通过百度指数不能判断出

原文&#xff1a;Spring实现AOP的4种方式 Spring AOP 详解 Spring实现AOP的4种方式 先了解AOP的相关术语:1.通知(Advice):通知定义了切面是什么以及何时使用。描述了切面要完成的工作和何时需要执行这个工作。2.连接点(Joinpoint):程序能够应用通知的一个“时机”&#xff0c;这…...

中卫网站制作公司报价/环球网最新消息

这是本人总结的一些认为C比较经典的书籍&#xff0c;希望对大家有用 下面链接好像失效了&#xff0c;新发一个&#xff1a; www.it689.net/Webs/Books/List.aspxThinking in C C编程思想 C入门书籍&#xff0c;不多介绍 http://www.laixp.cn/soft/sort01/sort02/down-12215.h…...

wordpress 上下篇/爱站seo综合查询

1、为什么要定义函数? 定义函数&#xff08;指定它的功能和名字&#xff09;的目的就是为了使用函数。已达到精简代码的目的。 2、怎样定义函数&#xff1f; 类型标识符 函数名&#xff08;参数&#xff09;{ 声明部分&#xff1b; 语句部分 } 3、定义函数时函数后面括号中的变…...

百度可信网站/百度小说搜索风云排行榜

动态查找树主要有&#xff1a;二叉查找树&#xff0c;平衡二叉树&#xff0c;红黑树&#xff0c;B-tree/B-tree/B*-tree。前三个都是典型的二叉树结构&#xff0c;查找的时间复杂度O(log2N)和树的深度相关&#xff0c;随着树的深度降低会提高查找效率。而在现实情况中大部分数据…...

做律师咨询网站/山东疫情最新情况

简介 自.NET 4.5发布已经过了差不多1年了。但是随着最近微软大多数的发布&#xff0c;与.NET开发者交流的问题显示&#xff0c;开发者仅知道一到两个特性&#xff0c;其他的特性仅仅停留在MSDN并以简单的文档形式存在着。 比如说&#xff0c;当你问一个.NET开发者.NET框架内核中…...

wordpress 3.8页面伪静态化 html/就业培训机构有哪些

本篇文章将对一些常见的python面试题目进行整理&#xff0c;并做简要的回答&#xff0c;可供参考&#xff0c;如果你有更好更全面的答案&#xff0c;那么请在评论区评论&#xff0c;一起交流学习^0^ 1. 列举 Python2 和 Python3 的区别? 答&#xff1a; 默认编码不同&#xff…...