当前位置: 首页 > news >正文

深入Kafka核心设计与实践原理读书笔记第三章消费者

消费者

消费者与消费组

消费者Consumer负责定于kafka中的主题Topic,并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组,当消息发布到主题后,只会被投递给订阅它的消费组的一个消费者。

  • 如果有某个主题有4个分区,P0,P1,P2,P3.有两个消费组A和B订阅了这个主题,A消费组有4个消费者,B消费组有2个消费者,那么A消费组中的4个消费者每一个都只会分配到一个分区,而B消费组中的2个消费者会分配到两个分区。

在这里插入图片描述

  • 如果所有消费者都属于一个消费者,那么所有的消息默认会均匀分配给每一个消费者。
  • 如果所有的消费者都隶属于不同的消费组,那么所有的消息都会被广播给所有的消费者。
    PS:再均衡动作:解释一下名词,指的是当一个主题中有6个分区时,有一个消费组,这个消费组中只有一个消费者,那么主题中的6个分区的消息都会由同一个消费者来消费,当有一个新的消费者加入这个消费组之后,6个主题中会有3个分配个新的消费者,依次类推,这个动作被称为再均衡动作

必要参数说明

kafka消费者客户端有个4个必填参数

  1. bootstrapp.service:该参数的释义和生产者客户端的相同,用来指定链接kafka集群所需要的broker地址清单。
  2. group.id:消费者隶属的消费组名称,默认为""
  3. key.deserializer和value.deserializer与生产者相同。
    其他重要参数
  4. fetch.min.bytes:配置消费者在一次的poll中拉取的最小数据量 默认 1b
  5. fetch.max.bytes:配置消费者在一次的poll中拉取的最大数据量默认50MB.
  6. fetch.max.wait.ms :参数用于指定 Kafka 的等待时间,默认值为 500 )
  7. exclude.internal.topics:Kafka 中有两个内部的主题:一consumer_offsets tr ansaction state o exclude.internal.topics用来指定 Kafka 中的内部主题是否可以向消费者公开,默认值为 true 。如果设置 true ,那么只能使用 subscribe( Collection)的方式而不能使用subscribe(Pattern)的方式来订阅内部主题,设置为false 则没有这个限制。
  8. receive.buffer.bytes:这个参数用来设置 Socket 接收消息缓冲区的大小,默认值为 65536 (B) 如果设置为 -1,则使用操作系统的默认值。
  9. send.buffer.bytes:,这个参数用来设置 Socket 发送消息缓冲区的大小默认值为 13 1072 (B) ’
  10. request.timeout.ms: 这个参数用来配置 Consumer 等待请求响应的最长时间,默认值为 30000 ms )。
  11. metadata.max.age.ms: 这个参数用来配置元数据的过期时间,默认值为 300000 ms ),即5分钟。如果元数据在此参数所限定的时间范围内没有进行更新,则会被强制更新,即使没有任何分区变化或有新的broker 加入。
  12. reconnect.backoff.ms 这个参数用来配置尝试重新连接指定主机之前的等待时间(也称为退避时间〉,避免频繁地连接主机,默认值为 50 ms )。

订阅主题与分区

订阅主题通过subscribe()方法来订阅一个主题,可以是集合订阅多个主题,也可以是正则。

public void subscribe(Collection<String> topics,ConsumerRebalanceListenergy listener);
public void subscribe(Collection<String> topics);
public void subscribe(Pattern pattern ,ConsumerRebalanceListenergy listener);
public void subscribe(Pattern pattern);
  • 如果前后调用两次 subscribe方法 那么以后一次的为准。
    PS:ConsumerRebalanceListenergy listener 是用来设置相应的再均衡监听器

  • 这里还可以通过assign()方法来指定主题中特定的分区来定义。

public void assign(Collection<TopicPartition> partition);
  • 其中 partition是分区的集合。
  • TopicPartition类有两种属性 topic和partition,分别代表分区所属的主题和自己的分区偏移量也就是编号。
  • 通过partitionsFor(String topic)方法可以查询主题有多少个分区

取消订阅

  1. unsubscribe()方法取消订阅主题
  2. subscribe(new ArrayList<>());
  3. assign(new ArrayList<>());
    以上都可

反序列化

对应生产者的序列化器相反,用来把序列化的内容反序列化,至于序列化与反序列化请自行百度,基础概念不与重复。

消息消费

Kafka中消费方式采取的拉去式消费:消息的消费一般分为两种:拉取式和推送式。

  1. kefka中的消息消费是一个不断轮询的过程。需要重复的效用poll方法。
public ConsumerRecords<K,V> poll(final Duration timeout);
  • 其中timeOut 是用来限制poll方法的阻塞时间的
    其中 Duration 也有Long的方法,Long的timeOut是毫秒值,Duration 可以通过ofMillis、ofSeconds、ofMinutes
    、ofHours等方法来指定不同时间类型。
    ConsumerRecords类中还会提供一个方便开发人员用来对消息进行处理的:count()等 如有兴趣自定查看。

位移提交

offset偏移量也叫位移,消费者可以通过offset来指定消费分区中的某个消息所在的位置。

  • 每次调用poll方法返回的是未被消费的消息集,偏移量不仅要保存在内存中也要做持久化保存,否则消费者重启之后就无法知晓之前的消费位移,如果有新的消费者加入,那么必然会有再均衡动作,那么新加入的消费者也无法知晓之前的消费位移
  • 在旧消费者客户端中消费者偏移量存储在zk中,新版本存放在kafka的主题_consumer_offsets中,这个把偏移量存储起来的动作就时提交。

控制或关闭消费

KafkaConsumer提供了对消费速度进行控制的方法。使用pause()方法resume()方法来分别实现暂停某些分区在拉取操作时返回数据给客户端和恢复某些分区想客户端返回数据的操作。

指定位置消费

对应消费位移,主要用在消费者重启之后出发了再均衡动作之后指定偏移量消费分区内消息。

消费者拦截器

对应生产者消费器,主要在消费到消息或提交消费位移的时候进行一些定制化操作。

相关文章:

深入Kafka核心设计与实践原理读书笔记第三章消费者

消费者 消费者与消费组 消费者Consumer负责定于kafka中的主题Topic&#xff0c;并且从订阅的主题上拉取消息。与其他消息中间件不同的在于它有一个消费组。每个消费者对应一个消费组&#xff0c;当消息发布到主题后&#xff0c;只会被投递给订阅它的消费组的一个消费者。 如…...

IDEA 中使用 Git 图文教程详解

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…...

【Linux系统】进程概念

目录 1 冯诺依曼体系结构 2 操作系统(Operator System) 概念 设计OS的目的 定位 总结 系统调用和库函数概念 3 进程 3.1 基本概念 3.2 描述进程-PCB 3.2 组织进程 3.3 查看进程 3.4 通过系统调用获取进程标示符 3.5 进程状态 在了解进程概念前我们还得了解下冯诺…...

上课睡觉(2023寒假每日一题 4)

有 NNN 堆石子&#xff0c;每堆的石子数量分别为 a1,a2,…,aNa_1,a_2,…,a_Na1​,a2​,…,aN​。 你可以对石子堆进行合并操作&#xff0c;将两个相邻的石子堆合并为一个石子堆&#xff0c;例如&#xff0c;如果 a[1,2,3,4,5]a[1,2,3,4,5]a[1,2,3,4,5]&#xff0c;合并第 2,32…...

【Selenium学习】Selenium 中常用的基本方法

1&#xff0e;send_keys 方法模拟键盘键入此方法类似于模拟键盘键入。以在百度首页搜索框输入“Selenium”为例&#xff0c;代码如下&#xff1a;# _*_ coding:utf-8 _*_ """ name:zhangxingzai date:2023/2/13 form:《Selenium 3Python 3自动化测试项目实战》 …...

python练习——简化路径

项目场景&#xff1a; 给你一个字符串 path &#xff0c;表示指向某一文件或目录的 Unix 风格 绝对路径 &#xff08;以 /开头&#xff09;&#xff0c;请你将其转化为更加简洁的规范路径。在 Unix 风格的文件系统中&#xff0c;一个点&#xff08;.&#xff09;表示当前目录本…...

2023新华为OD机试题 - 火星文计算2(JavaScript) | 刷完必过

火星文计算 2 题目 已知火星人使用的运算符号为#;$ 其与地球人的等价公式如下 x#y=4*x+3*y+2 x$y=2*x+y+3 x y是无符号整数 地球人公式按照 c 语言规则进行计算 火星人公式中#符优先级高于$ 相同的运算符按从左到右的顺序运算 输入 火星人字符串表达式结尾不带回车换行 输入…...

前端插件重磅来袭

“你值得拥有”专栏系列上新啦&#xff0c;今日推出“手写前端插件”项目&#xff0c;作为一个前端中高级工程师&#xff0c;手写前端树形菜单插件、弹出层插件、日历插件、分页插件、选项卡插件、进度条插件等是必备的技能&#xff0c;让你的前端技术百尺竿头更进一步&#xf…...

深入工厂|高精密多层板是如何被智造出来的?

或许有很多人从网络上见过各种教程&#xff0c;告诉你单层板是什么&#xff0c;多层板是什么&#xff0c;他们该如何做出来&#xff0c;但是在具体制造时却全凭想象&#xff0c;今天&#xff0c;就让我们来实地看看&#xff0c;精密的多层板是如何被制造出来的&#xff01;今天…...

代理模式动态代理

什么是代理模式&#xff1f; 代理模式是开发中常见的一种设计模式&#xff0c;使用代理模式可以很好的对程序进行横向扩展。代理&#xff0c;顾名思义就是一个真实对象会存在一个代理对象&#xff0c;并且代理对象可以替真实对象完成相应操作&#xff0c;外部通过代理对象来访…...

Mysql之二进制日志

目录 二进制日志 12-37 二进制日志格式 基于行的二进制日志 基于语句的二进制日志 混合格式二进制日志 复制日志 12-42 故障安全 (Crash-Safe) 复制 多源复制 二进制日志 12-37 二进制日志&#xff1a; • 包含数据和模式更改及其时间戳 – 基于语句 或 基于行 的日志…...

kail工具的使用--- cewl

1.介绍 Cewl是一款采用Ruby开发的应用程序&#xff0c;可以给他的爬虫指定URL地址和爬取深度&#xff0c;还可以添加外部链接&#xff0c;接下来Cewl会给你返回一个字典文件&#xff0c;你可以把字典用到类似John the Ripper这样的密码破解工具中。 2.使用 输入以下命令之后…...

【蓝桥杯集训1】前缀和专题(2 / 5)

目录 前缀和模板 &#xff01;3956. 截断数组 - 前缀和枚举 前缀和模板 活动 - AcWing import java.util.*;class Main {static int N100010;static int[] anew int[N],snew int[N];public static void main(String[] args){Scanner scnew Scanner(System.in);int nsc.nex…...

基于模块联邦的微前端实现方案

一、 微前端应用案例概述 当前案例中包含三个微应用&#xff0c;分别为 Marketing、Authentication 和 Dashboard Marketing&#xff1a;营销微应用&#xff0c;包含首页组件和价格组件 Authentication&#xff1a;身份验证微应用&#xff0c;包含登录组件 Dashboard&#x…...

【单目标优化算法】食肉植物优化算法(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…...

ANTLR4入门学习(四)

ANTLR4入门学习&#xff08;四&#xff09;一、设计语法1.语法2.ANTLR核心标记3.常见计算机语言模式4.左右递归5.识别常见的语法结构5.1 匹配标识符5.2 匹配数字5.3 匹配字符串常量5.4 匹配注释和空白字符5.5 基础的语法规则5.6 划定词法分析器和语法分析器的界线一、设计语法 …...

Android okhttp3中发送websocket消息,并通过mockwebserver将一个安卓设备模拟成服务器接发消息

websocket 提供了客户端和服务端的长链接&#xff0c;允许客户端和服务端双向发送消息 okhttp 提供了使用websocket 相关接口议。同时为方便单元测试&#xff0c;又提供了mockwebserver可以把一个安卓客户端作为服务端接受消息。 websocket使用 权限 <uses-permission an…...

MySQL系统变量和自定义变量

1 系统变量1.1 查看系统变量可以使用以下命令查看 MySQL 中所有的全局变量信息。SHOW GLOBAL VARIABLES; MySQL 中的系统变量以两个“”开头。global 仅仅用于标记全局变量&#xff1b;session 仅仅用于标记会话变量&#xff1b;首先标记会话变量&#xff0c;如果会话变量不存在…...

基于Python来爬取某音动态壁纸,桌面更香了!

至于小伙伴们想要这个封图&#xff0c;我也没有。不过继续带来一波靓丽壁纸&#xff0c;而且是动态的&#xff0c;我的桌面壁纸又换了&#xff1a;每天换着花样欣赏一波波动态壁纸桌面立刻拥有了高颜值&#xff0c;简直跟刷美女短视频一样啊。对的&#xff0c;这些动态壁纸就是…...

[数据库]表的约束

●&#x1f9d1;个人主页:你帅你先说. ●&#x1f4c3;欢迎点赞&#x1f44d;关注&#x1f4a1;收藏&#x1f496; ●&#x1f4d6;既选择了远方&#xff0c;便只顾风雨兼程。 ●&#x1f91f;欢迎大家有问题随时私信我&#xff01; ●&#x1f9d0;版权&#xff1a;本文由[你帅…...

VisualGDB 5.6R9 FOR WINDOWS

Go cross-platform with comfort VisualGDB 是 Visual Studio 的一个非常强大的扩展&#xff0c;它允许您调试或调试嵌入式系统。这个程序有一个非常有吸引力的用户界面&#xff0c;它有许多调试或调试代码的功能。VisualGDB 还有一个向导可以帮助您调试程序&#xff0c;为您提…...

Yolov8的多目标跟踪实现

Yolov8_tracking 2023年2月&#xff0c;Yolov5发展到yolov8&#xff0c;这世界变得真快哦。Yolov8由ultralytics公司发布&#xff0c;yolov6-美团&#xff0c;yolov7-Alexey Bochkovskiy和Chien-Yao Wang&#xff0c;其各有高招&#xff0c;对yolov5均有提升。mikel-brostrom在…...

28--Django-后端开发-drf之自定义全局异常、接口文档生成以及三大认证源码分析

一、django请求的整个生命周期 旅程: drf处于的位置:路由匹配成功,进视图类之前 1、包装了新的request 2、处理了编码(urlencoded,formdata,json) 3、三大认证 4、进了视图类(GenericAPIView+ListModelMixin) 进行了过滤和排序去模型中取数据分页序列化返回5、处理了…...

【MyBatis】动态SQL

9、动态SQL Mybatis框架的动态SQL技术是一种根据特定条件动态拼装SQL语句的功能&#xff0c;它存在的意义是为了解决拼接SQL语句字符串时的痛点问题。 9.1、if if标签可通过test属性的表达式进行判断&#xff0c;若表达式的结果为true&#xff0c;则标签中的内容会执行&…...

LeetCode(剑指offer) Day1

1.用两个栈实现一个队列。队列的声明如下&#xff0c;请实现它的两个函数 appendTail 和 deleteHead &#xff0c;分别完成在队列尾部插入整数和在队列头部删除整数的功能。(若队列中没有元素&#xff0c;deleteHead 操作返回 -1 ) 解题过程记录&#xff1a;本题就是用两个栈&…...

1、MyBatis框架——JDBC代码回顾与分析、lombok插件的安装与使用

目录 一、JDBC基本操作步骤 二、JDBC代码 三、lombok插件的安装与使用 1、lombok插件的安装 2、lombok常用注解 Data Getter Setter ToString AllArgsConstructor NoArgsConstructor 3、lombok的使用 四、JDBC代码分析 一、JDBC基本操作步骤 1、导包mysql-connect…...

笔记-GPS设备定位方式

1. 背景 最近接触到的GPS设备有点多&#xff0c;逐渐明白大家定位的机理&#xff0c;也结合网上的文章《GPS、WiFi、基站、AGPS几种定位原理介绍与区别》 来做一个简单的总结。 2. 基于GPS定位 这是最基本的定位能力&#xff0c;它主要就是寻找卫星&#xff0c;利用光传播速度…...

2023秋招携程SRE算法岗面试经验分享

本专栏分享 计算机小伙伴秋招春招找工作的面试经验和面试的详情知识点 专栏首页:秋招算法类面经分享 主要分享计算机算法类在面试互联网公司时候一些真实的经验 面试code学习参考请看:...

4.9 内部类

文章目录1.内部类概述2.特点3.练习 : 内部类入门案例4.成员内部类4.1 练习 : 被private修饰4.2 练习 : 被static修饰5.局部内部类6.匿名内部类1.内部类概述 如果一个类存在的意义就是为指定的另一个类&#xff0c;可以把这个类放入另一个类的内部。 就是把类定义在类的内部的情…...

ncnn模型精度验证

验证ncnn模型的精度 1、进行pth模型的验证 得到ncnn模型的顺序为&#xff1a;.pth–>.onnx–>ncnn .pth的精度验证如下&#xff1a; 如进行的是二分类&#xff1a; model init_model(model, data_cfg, devicedevice, modeeval)###.pth转.onnx模型# #---# input_names …...

wordpress自定义内容管理/天眼查企业查询

Zookeeper的目录整理如下 1. 【分布式】分布式架构 2. 【分布式】一致性协议 3. 【分布式】Chubby与Paxos 4. 【分布式】Zookeeper与Paxos 5. 【分布式】Zookeeper使用--命令行 6. 【分布式】Zookeeper使用--Java API 7. 【分布式】Zookeeper使用--开源客户端 8. 【分布式】Zoo…...

东莞高端做网站公司/爱站seo

直线AB与北方向的夹角&#xff0c;按顺时针方向算 坐标北方向起顺时针与某一方向夹角...

免费网页注册/上海百度推广排名优化

1、down状态&#xff1a;路由器不与其他任何路由器交换任何ospf信息 2、init状态&#xff1a; 接收方路由器已经收到对端路由器的hello包&#xff0c;但是并没有从hello包看到自己的信息&#xff0c;此时通信是单向的 3、two-way状态&#xff1a;从收到的hello包中发现了自己…...

wordpress显示关闭评论框/seo广告平台

参考&#xff1a;http://www.w3school.com.cn/css/css_outline.asp CSS 边框属性 "CSS" 列中的数字指示哪个 CSS 版本定义了该属性。 属性描述CSSoutline在一个声明中设置所有的轮廓属性。2outline-color设置轮廓的颜色。2outline-style设置轮廓的样式。2outline-wid…...

单页网站怎么做/今日热搜第一名

1&#xff09;掌握jQuery常用AJAX-API 2&#xff09;掌握Java调用MySQL / Oracle过程与函数 一&#xff09;jQuery常用AJAX-API 目的&#xff1a;简化客户端与服务端进行局部刷新的异步通讯 &#xff08;1&#xff09;取得服务端当前时间 简单形式&#xff1a;jQuery对象.load…...

wordpress编辑主页/网站申请流程

非常简单的发送邮件实现&#xff0c;网上有很多啦&#xff0c;但还是自己写写记录下吧。 package cn.jmail.test;import java.util.Properties;import javax.mail.*; import javax.mail.internet.*;public class FirstMail {/*** 发送简单邮件方法* param host 发送邮件服务…...