大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
点一下关注吧!!!非常感谢!!持续更新!!!
目前已经更新到了:
- Hadoop(已更完)
- HDFS(已更完)
- MapReduce(已更完)
- Hive(已更完)
- Flume(已更完)
- Sqoop(已更完)
- Zookeeper(已更完)
- HBase(已更完)
- Redis (已更完)
- Kafka(正在更新…)
章节内容
上节我们完成了如下内容:
- 消费组测试,消费者变动对消费的影响
- 消费者的心跳机制
- 消费者的相关配置参数
主题和分区
- Topic:Kafka用于分类管理消息的逻辑单元,类似于MySQL的数据库
- Partition:是Kafka下数据存储的基本单元,这个是物理上的概念,同一个Topic的数据,会被分散的存储到多个Partition中,这些Partition可以在同一台机器上,也可以在多台机器上。优势在于可以进行水平扩展,通常Partition的数量是BrokerServer数量的整数倍
- ConsumerGroup,同样是逻辑上的概念,是Kafka实现单播和广播两种消息模型的手段。保证一个消费组获取到特定主题的全部消息。在消息组内部,若干个消费者消费主题分区的消息,消费组可以保证一个主题的每个分区只被消费组中的一个消费者消费。
- Consumer 采用 PULL 模式从 Broker 中读取数据,采用PULL模式 Consumer可以自行控制消费的速度。
反序列化
- Kafka的Broker中所有的消息都是字节数组,消费者获取到消息之后,需要先对消息进行反序列化处理,然后才能交由给用户程序消费。
- 消费者的反序列化器包括Key和Value。
自定义反序列化
如果要实现自定义的反序列化器,需要实现 Deserializer 接口:
public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {Deserializer.super.configure(configs, isKey);}@Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer buffer = ByteBuffer.allocate(data.length);buffer.put(data);buffer.flip();int userId = buffer.getInt();int usernameLen = buffer.getInt();String username = new String(data, 8, usernameLen);int passwordLen = buffer.getInt();String password = new String(data, 8 + usernameLen, passwordLen);int age = buffer.getInt();User user = new User();user.setUserId(userId);user.setUsername(username);user.setPassword(password);user.setAge(age);return user;}@Overridepublic User deserialize(String topic, Headers headers, byte[] data) {return Deserializer.super.deserialize(topic, headers, data);}@Overridepublic void close() {Deserializer.super.close();}
}
消费者拦截器
消费者在拉取了分区消息之后,要首先经过反序列化器对Key和Value进行反序列化操作。
消费端定义消息拦截器,要实现 ConsumerInterceptor接口:
- 一个可插拔的接口,允许拦截、更改消费者接收到的消息,首要的用例在于将第三方组件引入消费者应用程序,用于定制监控、日志处理等
- 该接口的实现类通过configure方法获取消费者配置的属性,如果消费者配置中没有指定ClientID,还可以获取KafkaConsumer生成的ClientID,获取这个配置跟其他拦截器是共享的,需要保证不会在各个拦截器之间产生冲突。
- ConsumerInterceptor方法抛出异常会被捕获,但不会向下传播,如果配置了错误的参数类型,消费者不会抛出异常而是记录下来。
- ConsumerInterceptor回调发生在KafkaConsumer.poll()方法的同一个线程
public class ConsumerInterceptor01 implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {System.out.println("=== 消费者拦截器 01 onConsume ===");return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {System.out.println("=== 消费者拦截器 01 onCommit ===");}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {System.out.println("消费者设置的参数");configs.forEach((k, v) -> {System.out.println(k + ", " + v);});}
}
位移提交
相关概念
- Consumer 需要向Kafka记录自己的位移数据,这个汇报过程称为:提交位移(Committing Offsets)
- Consumer 需要为分配给它的每个分区提交各自的位移数据
- 位移提交的由Consumer端负责的,Kafka只负责保管,存到 __consumer_offsets 中
- 位移提交:自动提交和手动提交
- 位移提交:同步提交和异步提交
自动提交
Kafka Consumer后台提交
- 开启自动提交 enable.auto.commit=true
- 配置启动提交间隔:auto.commit.interval.ms,默认是5秒
位移顺序
自动提交位移的顺序:
- 配置 enable.auto.commit=true
- Kafka会保证在开始调用poll方法时,提交上次poll返回的所有消息的
- 因此自动提交不会出现消息丢失,但是会重复消费
重复消费
重复消费的场景:
- Consumer设置5秒提交offset
- 假设提交offset后3秒发生了Rebalance
- Rebalance之后所有的Consumer从上一次提交的Offset的地方继续消费
- 因为Rebalance发生前3秒的内的提交就丢失了
异步提交
- 使用 KafkaConsumer#commitSync,会提交所有poll返回的最新Offset
- 该方法为同步操作 等待直到 offset 被成功提交才返回
- 手动同步提交可以控制offset提交的时机和频率
位移管理
Kafka中,消费者根据消息的位移顺序消费消息,消费者的位移由消费者者管理,Kafka提供了消费者的API,让消费者自行管理位移。
重平衡
重平衡可以说是Kafka中诟病最厉害的一部分。
重平衡是一个协议,它规定了如何让消费者组下的所有消费者来分配Topic中每一个分区。
比如一个Topic中有100个分区,一个消费组内有20个消费者,在协调者的控制下可以让每一个消费者能分配到5个分区,这个分配过程就是重平衡。
重平衡的出发条件主要有三个:
- 消费者组内成员发生变更,这个变更包括了增加和减少消费者,比如消费者宕机退出消费组。
- 主题的分区数发生变化,Kafka目前只能增加分区数,当增加的时候就会触发重平衡
- 订阅的主题发生变化,当消费组使用正则表达式订阅主题,而恰好又新建了对应的主题,就会重平衡
为什么说重平衡让人诟病呢?因为重平衡过程中,消费者无法从Kafka消费消息,对Kafka的TPS影响极大,而如果Kafka集群内节点较多,比如数百个,重平衡耗时会很久。
避免重平衡
要完全避免重平衡做不到,但是要尽量避免重平衡。
在分布式系统中,由于网络问题没有接收到心跳,此时不确认是挂了还是负载没过来还是网络阻塞。
- session.timeout.ms 规定超时时间是多久
- heartbeat.interval.ms 规定心跳的频率 越高越不容易误判 但是会消耗更多资源
- max.poll.interval.ms 消费者poll数据后,需要处理在进行拉取,如果两次拉取时间超过间隔就会被剔除,默认是5分钟。
这里给出一些推荐参数的配置:
- session.timeout.ms 设置为6秒
- heaertbeat.interval.ms 设置2秒
- max.poll.interval.ms 推荐消费者处理消息最长耗时再加1分钟
相关文章:
大数据-61 Kafka 高级特性 消息消费02-主题与分区 自定义反序列化 拦截器 位移提交 位移管理 重平衡
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...
Google Gemma2 2B:语言模型的“小时代”到来?
北京时间8月1日凌晨(当地时间7月31日下午),Google发布了其Gemma系列开源语言模型的更新,在AI领域引发了巨大的震动。Google Developer的官方博客宣布,与6月发布的27B和9B参数版本相比,新的2B参数模型在保持…...
三线程顺序打印1-100
三线程顺序打印1-100 题目 三个线程顺序打印1-100; 解题 基本思路 首先需要创建三个线程, 确定使用线程1打印 num % 3 1 的数, 线程2打印 num % 3 2 的数, 线程3打印 num % 3 0 的数;使用 synchronized 同步锁让每次只有一个线程进行打印, 每个线程打印前先判断当前数是…...
中央处理器CPU
中央处理器CPU cpu的组成(从功能方面来看)cpu的执行过程★.取指令阶段★.解码阶段★.执行阶段 重点: 1.cpu的组成 2.cpu怎么执行程序(命令) cpu的组成(从功能方面来看) 寄存器:用来临…...
用Python实现AI人脸识别
实现AI人脸识别通常涉及到使用深度学习库,如TensorFlow或PyTorch,配合预训练的人脸识别模型。以下是一个使用Python和TensorFlow框架中的tensorflow_hub模块来加载和使用一个预训练的人脸识别模型的简单示例。 步骤 1: 安装必要的库 首先,你…...
MSPM0G3507_2024电赛自动行驶小车(H题)_问题与感悟
这次电赛题目选的简单了,还规定不能使用到摄像头,这让我之前学习的Opencv 4与树莓派无用武之地了,但我当时对于三子棋题目饶有兴趣,但架不住队友想稳奖,只能选择这个H题了...... 之后我还想抽空将这个E题三子棋题目做…...
C语言:指针(2)
一.数组名 在了解数组名前我们先看一段代码 int arr[10] {1,2,3,4,5,6,7,8,9,10}; int *p &arr[0]; 根据我们上一篇学习的知识,我们知道&arr[0]是数组第一个元素的地址,这时我们再看另一段代码的运行结果。 #include <stdio.h> int ma…...
数组——二维数组
数组(中) 二维数组 定义 二维数组本质上是一个行列式的组合,也就是说二维数组是有行和列两部分构成。二维数组数据是通过行列进行解读。 二维数组可被视为一个特殊的一维数组,相当于二维数组又是一个一维数组,只不过它的元素是一维数组。 …...
深入 Vue 组件与状态管理的教程
目录 深入 Vue 组件与状态管理的教程第一部分:深入组件1. 理解插槽(Slots)的使用1.1 基础插槽示例1.2 具名插槽1.3 作用域插槽 第二部分:Vue Router1. 学习 Vue Router 的基本配置1.1 基本路由配置1.2 嵌套路由1.3 路由参数 2. 导…...
Spring Boot 实现异步处理多个并行任务
在现代Web应用开发中,异步处理和多任务并行处理对于提高系统的响应性和吞吐量至关重要。Spring Boot 提供了多种机制来实现异步任务处理,本文将介绍如何利用这些机制来优化您的应用程序性能。 1. 引言 在高负载情况下,如果所有的请求都采用…...
TiDB系列之:使用Flink TiDB CDC Connector采集数据
TiDB系列之:使用Flink TiDB CDC Connector采集数据 一、依赖项二、Maven依赖三、SQL Client JAR四、如何创建 TiDB CDC 表五、连接器选项六、可用元数据七、特征一次性处理启动阅读位置多线程读取DataStream Source 八、数据类型映射 TiDB CDC 连接器允许从 TiDB 数…...
每日一道算法题 最接近的三数之和
题目 16. 最接近的三数之和 - 力扣(LeetCode) Python class Solution:def threeSumClosest(self, nums: List[int], target: int) -> int:nums.sort()nlen(nums)ans0min_diffinf # infinite 无穷for i in range(n-2):tmpnums[i]li1rn-1while l<…...
搭建自己的金融数据源和量化分析平台(六):下载并存储沪深两市上市公司财报
基于不依赖wind、某花顺等第三方平台数据的考虑,尝试直接从财报中解析三大报表进而计算ROE等财务指标,因此需要下载沪深两市的上市公司财报数据,便于后续从pdf中解析三大报表。 深市爬虫好做,先放深市爬虫: 根据时间段…...
C语言-常见关键字详解
一、const 关键字const用于声明常量,赋值后,其值不能再被修改。 示例: const int MAX_COUNT 100; 二、static static关键字在不同情境下有不同作用: 1.函数中的静态变量:保留变量状态,仅初始化一次&a…...
异步编程之std::future(一): 使用
目录 1.概述 2.std::future的基本用法 3.使用 std::shared_future 4.std::future的使用场景 5.总结 1.概述 在编程实践中,我们常常需要使用异步调用。通过异步调用,我们可以将一些耗时、阻塞的任务交给其他线程来执行,从而保证当前线程的…...
Vue3 + JS项目配置ESLint Pretter
前言 如果在开发大型项目 同时为多人协作开发 那么 ESLint 在项目中极为重要 在使用 ESLint 的同时 也需要使用 Pretter插件 统一对代码进行格式化 二者相辅相成 缺一不可 1. 安装 VsCode 插件 在 VsCode 插件市场搜索安装 ESLint 和 Pretter 2. 安装依赖 这里直接在 pac…...
JavaScript (十四)——JavaScript typeof和类型转换
目录 JavaScript typeof, null, 和 undefined typeof 操作符 null undefined undefined 和 null 的区别 JavaScript 类型转换 JavaScript 数据类型 JavaScript 类型转换 将数字转换为字符串 将布尔值转换为字符串 将日期转换为字符串 将字符串转换为数字 一元运算符…...
CTF-web 基础
网络协议 OSI七层参考模型:一个标准的参考模型 物理层 网线,网线接口等。 数据链路层 可以处理物理层传入的信息。 网络层 比如IP地址 传输层 控制传输的内容的传输,在传输的过程中将要传输的信息分块传输完成之后再进行合并。 应用…...
CP AUTOSAR标准之ChineseV2XNetwork(AUTOSAR_SWS_ChineseV2XNetwork)(更新中……)
1 简介和功能概述 本文档指定了AUTOSAR基础软件模块中国车辆对接网络(CnV2xNet)的功能、API和配置。 中国车联网网络(CnV2xNet)与中国车联网消息(CnV2xMsg)、中国车联网管理(CnV2xMgt)、中国车联网安全(CnV2xSec)以及AUTOSAR BSW模块以太网接口(EthIf)共同构成了AUTOSAR架构…...
【hloc】 项目流程
hloc 项目流程 1. 数据集准备2. 特征提取3. 匹配特征4. 三维重建5. 定位6. 结果评估7. 示例脚本 这个项目涉及到了视觉定位和三维重建的一系列步骤,从特征提取、匹配、三维重建到定位和结果评估。通过提供的脚本文件,用户可以方便地运行整个流程。 1. 数…...
鸿蒙系统开发【应用接续】基本功能
应用接续 介绍 基于ArkTS扩展的声明式开发范式编程语言编写的一个分布式视频播放器,主要包括一个直播视频播放界面,实现视频播放时可以从一台设备迁移到另一台设备继续运行,来选择更合适的设备继续执行播放功能以及PAD视频播放时协同调用手…...
nextTick方法的作用是什么?什么时候会用到
nextTick 方法在 Vue.js 中扮演着重要的角色,它用于在下次 DOM 更新循环结束之后执行延迟回调。这主要用于确保在 Vue 完成 DOM 更新后执行依赖于 DOM 的操作。 作用 确保 DOM 更新完成:Vue 的 DOM 更新是异步的,当你修改了数据后࿰…...
多 NodeJS 环境管理
前言 对于某个项目依赖特定版本的 NodeJS,或几个项目的 NodeJS 版本冲突时,需要在系统中安装多个版本的 NodeJS,这时可以使用一些工具来进行多个 NodeJS 的管理。 有很多类似的 NodeJS 管理工具,如 nvm, nvs, n 等,接…...
解决网站被植入跳转木马病毒
概述 网站被植入跳转木马病毒是一种常见的安全威胁,它可能导致网站用户被重定向到恶意站点。本文将指导您如何检测、清除这类木马病毒以及采取预防措施。 步骤1:确认感染 首先,需要确认您的网站确实受到了跳转木马的影响。 示例ÿ…...
Node.js(6)——npm软件包管理
npm npm是Node.js标准的软件包管理器。 使用: 初始化清单文件:npm init-y(得到package.json文件,有则略过此命令)下载软件包:npm i 软件包名称使用软件包 示例: 初始状态下npm文件夹下只有server.js,下载软件包前看…...
区块链核心概念与技术架构简介
引言 区块链,一种分布式账本技术,不仅为数字货币提供了基础设施,更在金融、供应链、物联网等多个领域展现出广泛的应用前景。区块链技术被认为是继蒸汽机、电力、互联网之后,下一代颠覆性的核心技术。 如果说蒸汽机释放了人们的…...
≌图概念凸显包含射线V的直线W是比V长的线
黄小宁 x轴中:各非负数点xh≥0都变回自己即都作恒等变换,其余点x-h都变号为xh就使x轴失去负数点而变为射线V{xh≥0}。这x轴变为射线V⊂x轴是不保距变换即不是x轴的刚体运动使x轴不≌V⊂x轴(小学生都知道x轴不≌射线V)。据≌图概念…...
子路由的配置方法?
子路由的配置方法主要涉及到在Vue-router中定义嵌套路由,即一个路由内部包含多个子路由。以下是配置子路由的基本步骤: 1. 定义父路由 首先,在Vue Router中定义父路由。父路由可以像其他普通路由一样定义,但通常会有一个组件与之…...
【大模型从入门到精通2】openAI api的入门介绍2
互动对话界面的搭建 让我们来看看如何建立一个互动对话界面,用户可以在此输入查询,系统实时处理并显示响应。 import panel as pn # 用于构建图形用户界面# 初始化对话历史记录和GUI组件 conversation_history [] input_widget pn.widgets.TextInpu…...
【前端编程小白】的HTML从零入门到实战
之前有高中毕业生读了博客,想让我帮他找一些前端入门的内容,他们报的计算机专业,想利用开学前夕学习一下,我给他推荐了一些菜鸟教程呀什么的。后来想,看来还是很多人需要一些更加入门的可成的,而且很多教程…...
网站建设程序的步骤过程/百度官方客服平台
今天看到《银行主数据项目(MDM)的数据持久层,你选择hibernate还是ibatis(MyBatis)》跑到首页来了, 把我最近使用方式分享一下。Hiberante(Spring JDBC freemarker)两次结合,hibernate对简单的数据操作很方便,可以大量减少SQL语句的维护。对于…...
宁皓 wordpress/seo是什么
不知道大家有没有遇到过这样的问题,有时候电脑网络正常连接,QQ微信都能正常登陆和聊天,就是打不开网页,有些小伙伴就会以为电脑系统问题甚至去重装系统。其实不用那么麻烦,正常联网却上不去网站很可能是DNS出了问题&am…...
简述网站的设计流程/品牌营销策划包括哪些内容
小编说:在新的一年里,又到了总结成绩展望未来的时候。今天我们就一起看看在过去的2021年里,微软工业物联网相关技术都取得了怎样的发展吧~ 在全球各行各业纷纷加速数字化转型以降低运营成本、打造新服务类别并实现可持续性目标的…...
用asp做网站流程/重庆森林台词
今年的面试主要是技术面试 1、项目是怎么做的 常规问题 --略 2、判断项目的可以上线的标准是什么? ①与需求一致,符合客户的要求 ②用例执行完成 ③bug和研发、产品这边确定必须改的改完 ④进行release版本回归无bug 3、给你一个登陆功能࿰…...
温州生活网招聘信息/seo优化排名易下拉软件
用pip 更新 numpy, 提示: Operation notpermitted解决办法: 1. pipinstall --upgrade --ignore-installednumpy2. pipinstall --upgrade --user numpy原因: 是因为os(El Capitan)中加入了新的安全机制SIP(System IntegrityProtection)也可以在重启机器的时候command r 进入恢复…...
青岛做网站公司/bt磁力bt天堂
做毕设需要做目标识别的内容,需要GPU进行训练,参考了很多装Cuda的博客和教程,也大致看了一下官方的安装文档,经过了三个月断断续续的摸索,终于把实验室的电脑和自己的电脑成功装上了cuda8.0版本,并在后续装…...