当前位置: 首页 > news >正文

黄骅做网站|黄骅网站|黄骅百度优化|黄骅百度推广|黄骅微信|黄骅/短链接在线生成器

黄骅做网站|黄骅网站|黄骅百度优化|黄骅百度推广|黄骅微信|黄骅,短链接在线生成器,我县政府网站建设发展状况,经典语录网站做合格党员目录 1、消费者、消费组 2、心跳机制 3、消费者常见参数配置 4、订阅 5、反序列化 基本概念 自定义反序列化器 6、位移提交 6.1、自动提交 6.2、手动提交 同步提交 异步提交 7、再均衡 7.1、定义与基本概念 7.2、缺陷 7.3、如何避免再均衡 7.4、如何进行组内分…

目录

1、消费者、消费组

2、心跳机制

3、消费者常见参数配置

4、订阅

5、反序列化

基本概念

自定义反序列化器

6、位移提交

6.1、自动提交

6.2、手动提交

同步提交

异步提交

7、再均衡

7.1、定义与基本概念

7.2、缺陷

7.3、如何避免再均衡

7.4、如何进行组内分区分配

7.5、谁来执行再均衡和消费组管理

8、消费者拦截器

作用

自定义消费者拦截器


1、消费者、消费组

  • 消费者从订阅的主题消费消息,消费消息的偏移量保存在kafka中的__consumer_offsets的主题中。
  • 多个消费同一个主题的消费者,可以通过group.id配置,加入到同一个消费组中。消费组均衡地给消费者分配分区,每个分区只由消费组中的一个消费者消费,防止重复消费。
  • 同一个消费组里:一个分区只会对应一个消费者,但一个消费者可以消费多个分区。
  • group_id一半设置为应用或者业务的逻辑名称。

2、心跳机制

消费者4宕机,重新分配分区3的消费者
分区3所在broker宕机,重选分区3的leader分区

  • 消费者宕机,退出消费组,触发再平衡,重新给消费组中的消费者分配分区;
  • broker宕机,分区3重选leader副本,出发再平衡,重新分配分区3消息。

        心跳机制,就是consumer和broker之间的健康检查。consumer和broker之间保持长连接,通过心跳机制检测对方是否健康。心跳检测相关参数如下所示:

        在broker端,可配置sessionTimeoutMs参数,如果consumer心跳超期,broker会把消费者从消费组中移除,并触发再平衡,重新分配分区;

        在consumer端,可配置sessionTimeoutMs和rebalanceTimeoutMs参数,如果broker心跳超期,consumer则会告知broker主动退出消费组,并触发再平衡。

3、消费者常见参数配置

4、订阅

主题、分区(leader和follower分区)、消费者、消费组、订阅。

  • 主题:topic,用于分类管理消息的逻辑单元,可以用于区分业务类型;
  • 分区:partition,同一个topic的消息,会被分散到多个分区中,不同分区通常在不同broker上,方便水平扩展。分区可分为leader分区和follower分区,leader分区用于与生产者/消费者通信,follower分区用于备份leader分区的数据;
  • 消费者:与分区长连接,用于消费分区中的消息;
  • 消费组:消费组中可能会有多个消费者,保证一个消费组获取到特定主题的全部消息。消费组可以保证一个主题的分区只会被消费组中的一个消费者消费;
  • 订阅:消费者订阅主题,并将消费者加入到消费组中,采用pull模式,从broker分区中读取消息。kafka的消费者只有pull模式,该模式下消费者可以自主控制消费消息的速率。

5、反序列化

基本概念

  • 在Kafka中保存的数据都是字节数组。
  • 消息者接收消息后,需要将消息反序列化为指定的数据格式进行处理。
  • 消费者通过key.deserializer和value.deserializer指定key和value的序列化器。
  • Kafka使用org.apache.kafka.common.serialization.Deserializer<T>接口定义序列化器。
  • Kafka已实现的序列化器有:ByteArrayDeserializer、ByteBufferDeserializer、BytesDeserializer、DoubleDeserializer、FloatDeserializer、IntegerDeserializer、StringDeserializer、LongDeserializer、ShortDeserializer。

自定义反序列化器

        实现org.apache.kafka.common.serialization.Deserializer<T>接口,并实现其中的deserializer方法。

public class UserDeserializer implements Deserializer<User> {@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {}@Overridepublic User deserialize(String topic, byte[] data) {ByteBuffer allocate = ByteBuffer.allocate(data.length);allocate.put(data);allocate.flip();int userId = allocate.getInt();int length = allocate.getInt();System.out.println(length);String username = new String(data, 8, length);return new User(userId, username);}@Overridepublic void close() {}
}

6、位移提交

  • 位移 = kafka分区消息的偏移量。
  • kafka中有一个主题,专门用于保存消费者的偏移量。
  • 消费者与分区一一对应,消费者在消费分区消息时,需要向kafka提交自己的位移(偏移量)信息,kafka只记录该消费者在对应分区的偏移量信息。
  • 消费者向kafka提交偏移量的过程,叫做位移提交。
  • 位移提交,分为自动提交和手动提交;也分为同步提交和异步提交。

6.1、自动提交

  • 开启⾃动提交: enable.auto.commit=true,kafka默认为自动提交。
  • 配置⾃动提交间隔:Consumer端: auto.commit.interval.ms ,默认 5s。
        自动提交模式下,Kafka会保证在开始调⽤ poll ⽅法时,提交上次 poll 返回的所有消息,因此⾃动提交不会出现消息丢失,但会重复消费,比如:
  1. Consumer 5s 提交一次offset
  2. 假设提交 offset 后的 3s 发⽣了 Rebalance
  3. Rebalance 之后的所有 Consumer 从上⼀次提交的 offset 处继续消费
  4. 因此 Rebalance 发⽣前 3s 的消息会被重复消费

6.2、手动提交

同步提交

while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));process(records); // 处理消息try {consumer.commitSync();} catch (CommitFailedException e) {handle(e); // 处理提交失败异常}
}
  • 使⽤ KafkaConsumer#commitSync(),会提交 KafkaConsumer#poll() 返回的最新 offset
  • ⼿动同步提交可以控制offset提交的时机和频率
  • 调⽤ commitSync 时,Consumer 处于阻塞状态,直到 Broker 返回结果
  • 会影响 TPS
  • 如果提交间隔过长,consumer重启后,会有更多的消息被重复消费。

异步提交

while (true) {ConsumerRecords<String, String> records = consumer.poll(3_000);process(records); // 处理消息consumer.commitAsync((offsets, exception) -> {if (exception != null) {handle(exception);}});
}
  • 使⽤ KafkaConsumer#commitAsync():会提交 KafkaConsumer#poll() 返回的最新 offset
  • commitAsync出现问题不会⾃动重试,可通过异步提交与同步提交相结合的方式解决。

7、再均衡

7.1、定义与基本概念

        也叫做重平衡,主要是为了让消费组下的消费者来重新分配主题下的每一个分区。再均衡的触发条件有如下三个:

  1. 消费组内成员变更(增加和减少消费者),⽐如消费者宕机退出消费组,或者新增一个消费者。
  2. 主题的分区数发⽣变更,kafka⽬前只⽀持增加分区,当增加的时候就会触发再均衡。
  3. 订阅的主题发⽣变化,比如消费者组使⽤正则表达式订阅主题,⽽恰好⼜新建了对应的主题,就会触发再均衡。

7.2、缺陷

再均衡过程中,消费者无法从kafka消费消息。如果kafka节点过多,再均衡过程会及其耗时(数分钟甚至小时),过程中kafka基本处于不可用状态。

7.3、如何避免再均衡

完全避免,那不可能,因为你无法保证消费者不会故障。但是我们可以通过避免增加分区、增加订阅的主题、增加消费者这几种情况,减少再均衡的触发。
但有时候,kafka会错误地认为一个正常的消费者已经挂掉,从而触发再均衡。我们要做的,就是避免这种情况。
消费者和kafka之间通过心跳机制来做健康检查。当消费者宕机、网络阻塞或是消费者因负载过重没来得及发送心跳时,kafka都会认为消费者挂掉了。所以,设置合理的健康检查参数可以有效减少再均衡的发生。比较重要的参数如下:
  1. session.timout.ms:控制⼼跳超时时间,推荐设置为6s
  2. heartbeat.interval.ms:控制⼼跳发送频率,频率越高越不容易误判,但也会消耗更多资源,推荐设置为2s
  3. max.poll.interval.ms:控制poll的间隔,消费者poll数据后,需要⼀些处理,再进⾏拉取。如果两次拉取时间间隔超过这个参数设置的值,那么消费者就会被踢出消费者组。推荐为消费者处理消息最长耗时 + 1分钟。

7.4、如何进行组内分区分配

有三种分配策略:RangeAssignor和RoundRobinAssignor以及StickyAssignor。

7.5、谁来执行再均衡和消费组管理

        kafka里有一个角色,叫做Group Coordinator,用于执行消费组的管理。
        Group Coordinator——每个消费组分配一个消费组协调器⽤于组管理和位移管理。当消费组的第一个消费者启动的时候,它会去和Kafka Broker确定谁是它们组的组协调器。之后该消费组内所有消费者和该组协调器协调通信。

8、消费者拦截器

作用

  1. 消费者在拉取了分区消息后,会先通过反序列化对key和value进行处理;
  2. 然后可通过设置消费者拦截器对消息进行处理,允许更改消费者接收到的消息,或者做一些监控、日志处理
  3. 应用程序处理消费者拉取的分区消息;

自定义消费者拦截器

        ConsumerInterceptor方法抛出的异常会被捕获、记录,但是不会向下传播。如果用户配置了错误的key或value类型参数,消费者不会抛出异常,而仅仅是记录下来。

        自定义消费者拦截器需要实现org.apache.kafka.clients.consumer.ConsumerInterceptor<K, V> 接口,并实现其中的configure()、onConsume()、onCommit()、close()方法,其中:

  • onConsume():该方法在poll方法返回之前调用,调用结束后poll方法就返回消息了。可通过该方法修改消费者消息,返回新的消息。
  • onCommit():当消费者提交偏移量时,调用该方法。
  • close():用于关闭该拦截器用到的资源,如打开的文件、连接的数据库等。
  • configure():用于获取消费者的参数配置。
public class MyInterceptor implements ConsumerInterceptor<String, String> {@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {// poll方法返回结果之前最后要调用的方法System.out.println("MyInterceptor -- 开始");// 消息不做处理,直接返回return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {// 消费者提交偏移量的时候,经过该方法System.out.println("MyInterceptor -- 结束");}@Overridepublic void close() {// 用于关闭该拦截器用到的资源,如打开的文件,连接的数据库等}@Overridepublic void configure(Map<String, ?> configs) {// 用于获取消费者的设置参数configs.forEach((k, v) -> {System.out.println(k + "\t" + v);});}
}

以上内容为个人学习理解,如有问题,欢迎在评论区指出。

部分内容截取自网络,如有侵权,联系作者删除。

相关文章:

kafka学习-消费者

目录 1、消费者、消费组 2、心跳机制 3、消费者常见参数配置 4、订阅 5、反序列化 基本概念 自定义反序列化器 6、位移提交 6.1、自动提交 6.2、手动提交 同步提交 异步提交 7、再均衡 7.1、定义与基本概念 7.2、缺陷 7.3、如何避免再均衡 7.4、如何进行组内分…...

Alibaba(商品详情)API接口

为了进行电商平台 的API开发&#xff0c;首先我们需要做下面几件事情。 1&#xff09;开发者注册一个账号 2&#xff09;然后为每个alibaba应用注册一个应用程序键&#xff08;App Key) 。 3&#xff09;下载alibaba API的SDK并掌握基本的API基础知识和调用 4&#xff09;利…...

OLED透明屏触控:引领未来科技革命的创新力量

OLED透明屏触控技术作为一项颠覆性的创新&#xff0c;正在引领新一轮科技革命。它将OLED显示技术与触摸技术相结合&#xff0c;实现了透明度和触控功能的完美融合。 在这篇文章中&#xff0c;尼伽将通过引用最新的市场数据、报告和行业动态&#xff0c;详细介绍OLED透明屏触控…...

Ubuntu下QT操作Mysql数据库

本篇总结一下一下Ubuntu下QT操作Mysql数据库。 目录 1. 启动Mysql数据库服务器 2.查看QT支持的数据库驱动 3.连接数据库 4. 增加表和记录 5. 删除记录 6. 修改记录 7. 查询记录 8.完整代码和运行效果 常见错误总结&#xff1a; (1) 数据库服务没启动报错信息 (2) 有…...

sqli --【1--10】

Less-1&#xff08;联合查询&#xff09; 1.查看是否有回显 2.查看是否有报错 3.使用联合查询&#xff08;字符注入&#xff09; 3.1判断其列数 3.2 判断显示位置 3.3敏感信息查询 Less-2&#xff08;联合查询&#xff09; 1.查看是否有回显 2.查看是否有报错 3.使用…...

《自然语言处理(NLP)的最新进展:Transformers与GPT-4的浅析》

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…...

Wireshark 用命令行分析数据包

1&#xff0c;那些情况需要使用命令行 Wireshark一次性提供了太多的信息。使用命令行工具可以限制打印出的信息&#xff0c;最后只显示相关数据&#xff0c;比如用单独一行来显示IP地址。命令行工具适用于过滤数据包捕获文件&#xff0c;并提供结果给另一个支持UNIX管道的工具…...

LVS DR模式负载均衡群集部署

目录 1 LVS-DR 模式的特点 1.1 数据包流向分析 1.2 DR 模式的特点 2 DR模式 LVS负载均衡群集部署 2.1 配置负载调度器 2.1.1 配置虚拟 IP 地址 2.1.2 调整 proc 响应参数 2.1.3 配置负载分配策略 2.2 部署共享存储 2.3 配置节点服务器 2.3.1 配置虚拟 IP 地址 2.3.2…...

探讨前后端分离开发的优势、实践以及如何实现更好的用户体验?

随着互联网技术的迅猛发展&#xff0c;前后端分离开发已经成为现代软件开发的一种重要趋势。这种开发模式将前端和后端的开发工作分开&#xff0c;通过清晰的接口协议进行通信&#xff0c;旨在优化开发流程、提升团队协作效率&#xff0c;并最终改善用户体验。本文将深入探讨前…...

微博一面:JVM预热,你的方案是啥?

说在前面 在40岁老架构师 尼恩的读者社区(50)中&#xff0c;最近有小伙伴拿到了一线互联网企业如微博、阿里、汽车之家、极兔、有赞、希音、百度、网易、滴滴的面试资格&#xff0c;遇到一几个很重要的面试题&#xff1a; JVM预热&#xff0c;你的方案是啥&#xff1f;Springb…...

open与fopen的区别

1. 来源 从来源的角度看&#xff0c;两者能很好的区分开&#xff0c;这也是两者最显而易见的区别&#xff1a; open是UNIX系统调用函数&#xff08;包括LINUX等&#xff09;&#xff0c;返回的是文件描述符&#xff08;File Descriptor&#xff09;&#xff0c;它是文件在文件…...

Unity记录一些glsl和hlsl的着色器Shader逆向代码

以下内容一般基于 GLSL 300 之后 以下某些代码行&#xff0c;是“伪代码“&#xff0c;绝大部分是renderDoc 逆向产生标准代码 本人OpenlGL零基础&#xff0c;也不打算重头学 目录 Clip&#xff08;&#xff09; 剔除函数 discard; FS最终颜色输出 out 和最终颜色相加方程…...

基于Sentinel的微服务保护

前言 Sentinel是Alibaba开源的一款微服务流控组件&#xff0c;用于解决分布式应用场景下服务的稳定性问题。Sentinel具有丰富的应用场景&#xff0c;它基于流量提供一系列的服务保护措施&#xff0c;例如多线程秒杀情况下的系统承载&#xff0c;并发访问下的流量控制&#xff…...

Collectors类作用:

一、Collectors类&#xff1a; 1.1、Collectors介绍 Collectors类&#xff0c;是JDK1.8开始提供的一个的工具类&#xff0c;它专门用于对Stream操作流中的元素各种处理操作&#xff0c;Collectors类中提供了一些常用的方法&#xff0c;例如&#xff1a;toList()、toSet()、to…...

LASSO回归

LASSO回归 LASSO(Least Absolute Shrinkage and Selection Operator&#xff0c;最小绝对值收敛和选择算子算法)是一种回归分析技术&#xff0c;用于变量选择和正则化。它由Robert Tibshirani于1996年提出&#xff0c;作为传统最小二乘回归方法的替代品。 损失函数 1.线性回…...

机器学习中的 K-均值聚类算法及其优缺点。

K-均值聚类算法是一种常见的无监督学习算法&#xff0c;它可以将数据集分成 K 个簇&#xff0c;每个簇内部的数据点尽可能相似&#xff0c;而不同簇之间的数据点应尽可能不同。下面详细讲解 K-均值聚类算法的优缺点&#xff1a; 优点&#xff1a; 简单易用&#xff1a;K-均值…...

云计算与虚拟化

一、概念 什么是云计算&#xff1f; 云计算&#xff08;cloud computing&#xff09;是分布式计算的一种&#xff0c;指的是通过网络“云”将巨大的数据计算处理程序分解成无数个小程序&#xff0c;然后&#xff0c;通过多部服务器组成的系统进行处理和分析这些小程序得到结果…...

Linux常见进程类别

目录 常见进程类别 守护进程&精灵进程 任务管理 进程组 作业 作业 | 进程组 会话 w命令 守护进程 守护进程的创建 setsid()函数 daemon()函数 模拟实现daemon函数 前台进程 | 后台进程 僵尸进程 | 孤儿进程 僵尸进程的一些细节 守护进程 | 后台进程 守护…...

智能小车之蓝牙控制并测速小车、wife控制小车、4g控制小车、语音控制小车

目录 1. 蓝牙控制小车 2. 蓝牙控制并测速小车 3. wifi控制测速小车 4. 4g控制小车 5. 语音控制小车 1. 蓝牙控制小车 使用蓝牙模块&#xff0c;串口透传蓝牙模块&#xff0c;又叫做蓝牙串口模块 串口透传技术&#xff1a; 透传即透明传送&#xff0c;是指在数据的传输过…...

指针进阶(一)

指针进阶 1. 字符指针面试题 2. 指针数组3. 数组指针3.1 数组指针的定义3.2 &数组名VS数组名 3.3 数组指针的使用4. 数组传参和指针传参4.1 一维数组传参4.2 二维数组传参4.3 一级指针传参4.4 二级指针传参 前言 指针的主题&#xff0c;我们在初级阶段的《指针》章节已经接…...

c# sql 判断表中是否包含指定字段

你可以使用以下方法来判断一个 SQL 数据库中的表是否包含指定的字段。 首先&#xff0c;你需要连接到数据库&#xff0c;然后执行一条 SQL 查询语句来检查表结构。你可以使用 SELECT 语句和 INFORMATION_SCHEMA.COLUMNS 系统视图来获取表中的所有列信息。 下面是一个示例代码…...

08-JVM垃圾收集器详解

上一篇&#xff1a;07-垃圾收集算法详解 如果说收集算法是内存回收的方法论&#xff0c;那么垃圾收集器就是内存回收的具体实现。 虽然我们对各个收集器进行比较&#xff0c;但并非为了挑选出一个最好的收集器。因为直到现在为止还没有最好的垃圾收集器出现&#xff0c;更加没…...

sql_mode详解

文章目录 一、sql_mode作用二、查询sql_mode三、mysql8默认的mode配置&#xff08;6个默认配置&#xff09;四、常见mode详细解释mysql8默认配置了的mode&#xff08;6个&#xff09;需要自己配置的mode&#xff08;4个&#xff09; 五、设置sql_mode&#xff08;一旦设置了&am…...

Vue3的新特性总结

一、Vue3 里 script 的三种写法 首先&#xff0c;Vue3 新增了一个叫做组合式 api 的东西&#xff0c;英文名叫 Composition API。因此 Vue3 的 script 现在支持三种写法。 1、最基本的 Vue2 写法 <template><div>{{ count }}</div><button click"…...

【Node】Mac多版本Node切换

1、查看当前电脑是否安装node node -v或者查看当前电脑通过brew安装的node路径 ls /usr/local/Cellar/node*2、查看可安装的node brew search node3、安装其他版本node 下载需要安装的node版本 brew install node144、brew切换node版本 假设之前的版本是18&#xff0c;需…...

Apache POI

POI介绍 Apache POI是用Java编写的免费开源的跨平台的Java API&#xff0c; Apache POI提供API给Java程序对Microsoft Office格式档案读和写的功能&#xff0c; 其中使用最多的就是使用POI操作Excel文件。 maven坐标&#xff1a; <dependency><groupId>org.apa…...

个人能做股票期权吗?个人期权交易开户条件新规

个人投资者是可以交易股票期权的&#xff0c;不过期权交易通常需要投资者具备一定的投资经验和风险承受能力&#xff0c;因为期权交易涉及较高的风险和复杂性&#xff0c;下文为大家介绍个人能做股票期权吗&#xff1f;个人期权交易开户条件新规的内容。本文来自&#xff1a;期…...

Java面试整理(一)

开篇 面试,应该都是打工人需要面对的事情。我记得自己以前开始准备Java工程师面试时,都会去看那个《面试宝典》,当时这个“宝典”真的很经典,现在应该还是不少朋友会看这个。我自己经历过了找工作的面试,和企业招聘工作。所以我自己更加想从这两个不同的角度去和大家交流这…...

国家信息中心举办“数字政府建设暨数字安全技术”研讨会:海云安提出数字政府软件供应链安全解决方案

近日&#xff0c;由国家信息中心主办&#xff0c;复旦大学研究院承办的“数字政府建设暨数字安全技术研讨会”在义乌顺利召开。国家信息中心信息与网络安全部副主任禄凯&#xff0c;复旦大学党委常委、宣传部部长陈玉刚&#xff0c;义乌市委常委、常务副市长喻新贵为会议致辞。…...

uniapp 处理 分页请求

我的需求是手机上一个动态滚动列表&#xff0c;下拉到底部时&#xff0c;触发分页数据请求 uniapp上处理分页解决方案 主要看你是如何写出滚动条的。我想到的目前有三种 &#xff08;1&#xff09;页面滚动&#xff1a;直接使用onReachBottom方法&#xff0c;可以监听到达底部…...