深入Kafka核心设计与实践原理读书笔记第三章消费者
消费者
消费者与消费组
消费者Consumer负责定于kafka中的主题Topic,并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的消费组的一个消费者。
- 如果有某个主题有4个分区,P0,P1,P2,P3.有两个消费组A和B订阅了这个主题,A消费组有4个消费者,B消费组有2个消费者,那么A消费组中的4个消费者每一个都只会分配到一个分区,而B消费组中的2个消费者会分配到两个分区。

- 如果所有消费者都属于一个消费者,那么所有的消息默认会均匀分配给每一个消费者。
- 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者。
PS:再均衡动作:解释一下名词,指的是当一个主题中有6个分区时,有一个消费组,这个消费组中只有一个消费者,那么主题中的6个分区的消息都会由同一个消费者来消费,当有一个新的消费者加入这个消费组之后,6个主题中会有3个分配个新的消费者,依次类推,这个动作被称为再均衡动作
必要参数说明
kafka消费者客户端有个4个必填参数
- bootstrapp.service:该参数的释义和生产者客户端的相同,用来指定链接kafka集群所需要的broker地址清单。
- group.id:消费者隶属的消费组名称,默认为""
- key.deserializer和value.deserializer与生产者相同。
其他重要参数 - fetch.min.bytes:配置消费者在一次的poll中拉取的最小数据量 默认 1b
- fetch.max.bytes:配置消费者在一次的poll中拉取的最大数据量默认50MB.
- fetch.max.wait.ms :参数用于指定 Kafka 的等待时间,默认值为 500 )
- exclude.internal.topics:Kafka 中有两个内部的主题:一consumer_offsets tr ansaction state o exclude.internal.topics用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true 。如果设置 true ,那么只能使用 subscribe( Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false 则没有这个限制。
- receive.buffer.bytes:这个参数用来设置 Socket 接收消息缓冲区的大小,默认值为 65536 (B) 如果设置为 -1,则使用操作系统的默认值。
- send.buffer.bytes:,这个参数用来设置 Socket 发送消息缓冲区的大小默认值为 13 1072 (B) ’
- request.timeout.ms: 这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为 30000 ms )。
- metadata.max.age.ms: 这个参数用来配置元数据的过期时间,默认值为 300000 ms ),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker 加入。
- reconnect.backoff.ms 这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间〉,避免频繁地连接主机,默认值为 50 ms )。
订阅主题与分区
订阅主题通过subscribe()方法来订阅一个主题,可以是集合订阅多个主题,也可以是正则。
public void subscribe(Collection<String> topics,ConsumerRebalanceListenergy listener);
public void subscribe(Collection<String> topics);
public void subscribe(Pattern pattern ,ConsumerRebalanceListenergy listener);
public void subscribe(Pattern pattern);
-
如果前后调用两次 subscribe方法 那么以后一次的为准。
PS:ConsumerRebalanceListenergy listener 是用来设置相应的再均衡监听器 -
这里还可以通过assign()方法来指定主题中特定的分区来定义。
public void assign(Collection<TopicPartition> partition);
- 其中 partition是分区的集合。
- TopicPartition类有两种属性 topic和partition,分别代表分区所属的主题和自己的分区偏移量也就是编号。
- 通过partitionsFor(String topic)方法可以查询主题有多少个分区
取消订阅
- unsubscribe()方法取消订阅主题
- subscribe(new ArrayList<>());
- assign(new ArrayList<>());
以上都可
反序列化
对应生产者的序列化器相反,用来把序列化的内容反序列化,至于序列化与反序列化请自行百度,基础概念不与重复。
消息消费
Kafka中消费方式采取的拉去式消费:消息的消费一般分为两种:拉取式和推送式。
- kefka中的消息消费是一个不断轮询的过程。需要重复的效用poll方法。
public ConsumerRecords<K,V> poll(final Duration timeout);
- 其中timeOut 是用来限制poll方法的阻塞时间的
其中 Duration 也有Long的方法,Long的timeOut是毫秒值,Duration 可以通过ofMillis、ofSeconds、ofMinutes
、ofHours等方法来指定不同时间类型。
ConsumerRecords类中还会提供一个方便开发人员用来对消息进行处理的:count()等 如有兴趣自定查看。
位移提交
offset偏移量也叫位移,消费者可以通过offset来指定消费分区中的某个消息所在的位置。
- 每次调用poll方法返回的是未被消费的消息集,偏移量不仅要保存在内存中也要做持久化保存,否则消费者重启之后就无法知晓之前的消费位移,如果有新的消费者加入,那么必然会有再均衡动作,那么新加入的消费者也无法知晓之前的消费位移
- 在旧消费者客户端中消费者偏移量存储在zk中,新版本存放在kafka的主题_consumer_offsets中,这个把偏移量存储起来的动作就时提交。
控制或关闭消费
KafkaConsumer提供了对消费速度进行控制的方法。使用pause()方法resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区想客户端返回数据的操作。
指定位置消费
对应消费位移,主要用在消费者重启之后出发了再均衡动作之后指定偏移量消费分区内消息。
消费者拦截器
对应生产者消费器,主要在消费到消息或提交消费位移的时候进行一些定制化操作。
相关文章:
深入Kafka核心设计与实践原理读书笔记第三章消费者
消费者 消费者与消费组 消费者Consumer负责定于kafka中的主题Topic,并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的消费组的一个消费者。 如…...
IDEA 中使用 Git 图文教程详解
✅作者简介:2022年博客新星 第八。热爱国学的Java后端开发者,修心和技术同步精进。 🍎个人主页:Java Fans的博客 🍊个人信条:不迁怒,不贰过。小知识,大智慧。 💞当前专栏…...
【Linux系统】进程概念
目录 1 冯诺依曼体系结构 2 操作系统(Operator System) 概念 设计OS的目的 定位 总结 系统调用和库函数概念 3 进程 3.1 基本概念 3.2 描述进程-PCB 3.2 组织进程 3.3 查看进程 3.4 通过系统调用获取进程标示符 3.5 进程状态 在了解进程概念前我们还得了解下冯诺…...
上课睡觉(2023寒假每日一题 4)
有 NNN 堆石子,每堆的石子数量分别为 a1,a2,…,aNa_1,a_2,…,a_Na1,a2,…,aN。 你可以对石子堆进行合并操作,将两个相邻的石子堆合并为一个石子堆,例如,如果 a[1,2,3,4,5]a[1,2,3,4,5]a[1,2,3,4,5],合并第 2,32…...
【Selenium学习】Selenium 中常用的基本方法
1.send_keys 方法模拟键盘键入此方法类似于模拟键盘键入。以在百度首页搜索框输入“Selenium”为例,代码如下:# _*_ coding:utf-8 _*_ """ name:zhangxingzai date:2023/2/13 form:《Selenium 3Python 3自动化测试项目实战》 …...
python练习——简化路径
项目场景: 给你一个字符串 path ,表示指向某一文件或目录的 Unix 风格 绝对路径 (以 /开头),请你将其转化为更加简洁的规范路径。在 Unix 风格的文件系统中,一个点(.)表示当前目录本…...
2023新华为OD机试题 - 火星文计算2(JavaScript) | 刷完必过
火星文计算 2 题目 已知火星人使用的运算符号为#;$ 其与地球人的等价公式如下 x#y=4*x+3*y+2 x$y=2*x+y+3 x y是无符号整数 地球人公式按照 c 语言规则进行计算 火星人公式中#符优先级高于$ 相同的运算符按从左到右的顺序运算 输入 火星人字符串表达式结尾不带回车换行 输入…...
前端插件重磅来袭
“你值得拥有”专栏系列上新啦,今日推出“手写前端插件”项目,作为一个前端中高级工程师,手写前端树形菜单插件、弹出层插件、日历插件、分页插件、选项卡插件、进度条插件等是必备的技能,让你的前端技术百尺竿头更进一步…...
深入工厂|高精密多层板是如何被智造出来的?
或许有很多人从网络上见过各种教程,告诉你单层板是什么,多层板是什么,他们该如何做出来,但是在具体制造时却全凭想象,今天,就让我们来实地看看,精密的多层板是如何被制造出来的!今天…...
代理模式动态代理
什么是代理模式? 代理模式是开发中常见的一种设计模式,使用代理模式可以很好的对程序进行横向扩展。代理,顾名思义就是一个真实对象会存在一个代理对象,并且代理对象可以替真实对象完成相应操作,外部通过代理对象来访…...
Mysql之二进制日志
目录 二进制日志 12-37 二进制日志格式 基于行的二进制日志 基于语句的二进制日志 混合格式二进制日志 复制日志 12-42 故障安全 (Crash-Safe) 复制 多源复制 二进制日志 12-37 二进制日志: • 包含数据和模式更改及其时间戳 – 基于语句 或 基于行 的日志…...
kail工具的使用--- cewl
1.介绍 Cewl是一款采用Ruby开发的应用程序,可以给他的爬虫指定URL地址和爬取深度,还可以添加外部链接,接下来Cewl会给你返回一个字典文件,你可以把字典用到类似John the Ripper这样的密码破解工具中。 2.使用 输入以下命令之后…...
【蓝桥杯集训1】前缀和专题(2 / 5)
目录 前缀和模板 !3956. 截断数组 - 前缀和枚举 前缀和模板 活动 - AcWing import java.util.*;class Main {static int N100010;static int[] anew int[N],snew int[N];public static void main(String[] args){Scanner scnew Scanner(System.in);int nsc.nex…...
基于模块联邦的微前端实现方案
一、 微前端应用案例概述 当前案例中包含三个微应用,分别为 Marketing、Authentication 和 Dashboard Marketing:营销微应用,包含首页组件和价格组件 Authentication:身份验证微应用,包含登录组件 Dashboard&#x…...
【单目标优化算法】食肉植物优化算法(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
ANTLR4入门学习(四)
ANTLR4入门学习(四)一、设计语法1.语法2.ANTLR核心标记3.常见计算机语言模式4.左右递归5.识别常见的语法结构5.1 匹配标识符5.2 匹配数字5.3 匹配字符串常量5.4 匹配注释和空白字符5.5 基础的语法规则5.6 划定词法分析器和语法分析器的界线一、设计语法 …...
Android okhttp3中发送websocket消息,并通过mockwebserver将一个安卓设备模拟成服务器接发消息
websocket 提供了客户端和服务端的长链接,允许客户端和服务端双向发送消息 okhttp 提供了使用websocket 相关接口议。同时为方便单元测试,又提供了mockwebserver可以把一个安卓客户端作为服务端接受消息。 websocket使用 权限 <uses-permission an…...
MySQL系统变量和自定义变量
1 系统变量1.1 查看系统变量可以使用以下命令查看 MySQL 中所有的全局变量信息。SHOW GLOBAL VARIABLES; MySQL 中的系统变量以两个“”开头。global 仅仅用于标记全局变量;session 仅仅用于标记会话变量;首先标记会话变量,如果会话变量不存在…...
基于Python来爬取某音动态壁纸,桌面更香了!
至于小伙伴们想要这个封图,我也没有。不过继续带来一波靓丽壁纸,而且是动态的,我的桌面壁纸又换了:每天换着花样欣赏一波波动态壁纸桌面立刻拥有了高颜值,简直跟刷美女短视频一样啊。对的,这些动态壁纸就是…...
[数据库]表的约束
●🧑个人主页:你帅你先说. ●📃欢迎点赞👍关注💡收藏💖 ●📖既选择了远方,便只顾风雨兼程。 ●🤟欢迎大家有问题随时私信我! ●🧐版权:本文由[你帅…...
MPNet:旋转机械轻量化故障诊断模型详解python代码复现
目录 一、问题背景与挑战 二、MPNet核心架构 2.1 多分支特征融合模块(MBFM) 2.2 残差注意力金字塔模块(RAPM) 2.2.1 空间金字塔注意力(SPA) 2.2.2 金字塔残差块(PRBlock) 2.3 分类器设计 三、关键技术突破 3.1 多尺度特征融合 3.2 轻量化设计策略 3.3 抗噪声…...
TDengine 快速体验(Docker 镜像方式)
简介 TDengine 可以通过安装包、Docker 镜像 及云服务快速体验 TDengine 的功能,本节首先介绍如何通过 Docker 快速体验 TDengine,然后介绍如何在 Docker 环境下体验 TDengine 的写入和查询功能。如果你不熟悉 Docker,请使用 安装包的方式快…...
python打卡day49
知识点回顾: 通道注意力模块复习空间注意力模块CBAM的定义 作业:尝试对今天的模型检查参数数目,并用tensorboard查看训练过程 import torch import torch.nn as nn# 定义通道注意力 class ChannelAttention(nn.Module):def __init__(self,…...
(十)学生端搭建
本次旨在将之前的已完成的部分功能进行拼装到学生端,同时完善学生端的构建。本次工作主要包括: 1.学生端整体界面布局 2.模拟考场与部分个人画像流程的串联 3.整体学生端逻辑 一、学生端 在主界面可以选择自己的用户角色 选择学生则进入学生登录界面…...
golang循环变量捕获问题
在 Go 语言中,当在循环中启动协程(goroutine)时,如果在协程闭包中直接引用循环变量,可能会遇到一个常见的陷阱 - 循环变量捕获问题。让我详细解释一下: 问题背景 看这个代码片段: fo…...
【HTML-16】深入理解HTML中的块元素与行内元素
HTML元素根据其显示特性可以分为两大类:块元素(Block-level Elements)和行内元素(Inline Elements)。理解这两者的区别对于构建良好的网页布局至关重要。本文将全面解析这两种元素的特性、区别以及实际应用场景。 1. 块元素(Block-level Elements) 1.1 基本特性 …...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
C# 求圆面积的程序(Program to find area of a circle)
给定半径r,求圆的面积。圆的面积应精确到小数点后5位。 例子: 输入:r 5 输出:78.53982 解释:由于面积 PI * r * r 3.14159265358979323846 * 5 * 5 78.53982,因为我们只保留小数点后 5 位数字。 输…...
USB Over IP专用硬件的5个特点
USB over IP技术通过将USB协议数据封装在标准TCP/IP网络数据包中,从根本上改变了USB连接。这允许客户端通过局域网或广域网远程访问和控制物理连接到服务器的USB设备(如专用硬件设备),从而消除了直接物理连接的需要。USB over IP的…...
JS设计模式(4):观察者模式
JS设计模式(4):观察者模式 一、引入 在开发中,我们经常会遇到这样的场景:一个对象的状态变化需要自动通知其他对象,比如: 电商平台中,商品库存变化时需要通知所有订阅该商品的用户;新闻网站中࿰…...
