当前位置: 首页 > news >正文

Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)

目录

    • 一、Range分区分配策略原理
      • 1.1、Range分区分配策略原理的示例一
      • 1.2、Range分区分配策略原理的示例二
      • 1.3、Range分区分配策略原理的示例注意事项
    • 二、Range 分区分配策略代码案例
      • 2.1、创建带有4个分区的fiveTopic主题
      • 2.2、创建三个消费者 组成 消费者组
      • 2.3、创建生产者
      • 2.4、测试
      • 2.5、Range 分区分配策略代码案例说明
    • 三、Range 分区分配再平衡案例
      • 3.1、停止某一个消费者后,(45s 以内)重新发送消息示例
      • 3.2、停止某一个消费者后,(45s 以后)重新发送消息示例
      • 3.3、Range 分区分配再平衡案例说明

一、Range分区分配策略原理

  • Range 是对每个 topic 而言的。首先对同一个 topic 里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

1.1、Range分区分配策略原理的示例一

假如现在有 4 个分区,3 个消费者,排序后的分区将会是0,1,2,3;消费者排序完之后将会是C0,C1,C2。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:4/3 = 1 余 1 ,除不尽,那么消费者C0便会多消费1个分区。
    在这里插入图片描述

1.2、Range分区分配策略原理的示例二

假如现在有 5 个分区,3 个消费者,排序后的分区将会是0,1,2,3,4;消费者排序完之后将会是C0,C1,C2。

  • 通过 partitions数/consumer数 来决定每个消费者应该消费几个分区。 如果除不尽,那么前面几个消费者将会多消费 1 个分区。
  • 例如:5/3 = 1 余 2 ,除不尽,那么消费者么C0和C1分别多消费一个分区。
    在这里插入图片描述

1.3、Range分区分配策略原理的示例注意事项

  • 如果只是针对 1 个 topic 而言,C0消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个 topic,消费者 C0都将多消费 1 个分区,topic越多,C0消费的分区会比其他消费者明显多消费 N 个分区。 容易产生数据倾斜!

二、Range 分区分配策略代码案例

2.1、创建带有4个分区的fiveTopic主题

  • 在 Kafka 集群控制台,创建带有4个分区的fiveTopic主题

    bin/kafka-topics.sh --bootstrap-server 192.168.136.27:9092 --create --partitions 4 --replication-factor 1 --topic fiveTopic
    

    在这里插入图片描述

2.2、创建三个消费者 组成 消费者组

  • 复制 CustomConsumer1类,创建 CustomConsumer2和CustomConsumer3。这样可以由三个消费者组成消费者组,组名都为“test”。

    package com.xz.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
    import java.util.ArrayList;
    import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {// 0 配置Properties properties = new Properties();// 连接 bootstrap.serversproperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");// 反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());// 配置消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");// 1 创建一个消费者  "", "hello"KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);// 2 订阅主题 firstArrayList<String> topics = new ArrayList<>();topics.add("fiveTopic");kafkaConsumer.subscribe(topics);// 3 消费数据while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
    }
    

2.3、创建生产者

  • 创建CustomProducer生产者。

    package com.xz.kafka.producer;import org.apache.kafka.clients.producer.*;
    import org.apache.kafka.common.serialization.StringSerializer;
    import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) throws InterruptedException {//1、创建 kafka 生产者的配置对象Properties properties = new Properties();//2、给 kafka 配置对象添加配置信息:bootstrap.serversproperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.27:9092,192.168.136.28:9092,192.168.136.29:9092");//3、指定对应的key和value的序列化类型 key.serializer value.serializerproperties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());//4、创建 kafka 生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);//5、调用 send 方法,发送消息for (int i = 0; i < 200; i++) {kafkaProducer.send(new ProducerRecord<>("fiveTopic", "hello kafka" + i), new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception == null){System.out.println("主题: "+metadata.topic() + " 分区: "+ metadata.partition());}}});Thread.sleep(2);}// 3 关闭资源kafkaProducer.close();}
    }
    

2.4、测试

  • 首先,在 IDEA中分别启动消费者1、消费者2和消费者3代码
    在这里插入图片描述

  • 然后,在 IDEA中分别启动生产者代码
    在这里插入图片描述

  • 在 IDEA 控制台观察消费者1、消费者2和消费者3控制台接收到的数据,如下图所示:

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

2.5、Range 分区分配策略代码案例说明

  • 由上述测试输出结果截图可知: 消费者1消费2分区的数据;消费者2消费0和3分区的数据;消费者3消费2分区的数据。
  • 说明:Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

三、Range 分区分配再平衡案例

3.1、停止某一个消费者后,(45s 以内)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1号分区数据。
    在这里插入图片描述

3.2、停止某一个消费者后,(45s 以后)重新发送消息示例

  • 由下图控制台输出可知:2号消费者 消费到 0、3号分区数据。
    在这里插入图片描述
  • 由下图控制台输出可知:3号消费者 消费到 1、2号分区数据。
    在这里插入图片描述

3.3、Range 分区分配再平衡案例说明

  • 1号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。
  • 消费者1 已经被踢出消费者组,所以重新按照 range 方式分配。

相关文章:

Kafka3.0.0版本——消费者(Range分区分配策略以及再平衡)

目录 一、Range分区分配策略原理1.1、Range分区分配策略原理的示例一1.2、Range分区分配策略原理的示例二1.3、Range分区分配策略原理的示例注意事项 二、Range 分区分配策略代码案例2.1、创建带有4个分区的fiveTopic主题2.2、创建三个消费者 组成 消费者组2.3、创建生产者2.4、…...

WeiTools

目录 1.1 WeiTools 1.2 getTime 1.3 getImageView 1.4 StringEncode 1.4.1 // TODO Auto-generated catch block WeiTools package com.shrimp.xiaoweirobot.tools;...

目标检测数据集:医学图像检测数据集(自己标注)

1.专栏介绍 ✨✨✨✨✨✨目标检测数据集✨✨✨✨✨✨ 本专栏提供各种场景的数据集,主要聚焦:工业缺陷检测数据集、小目标数据集、遥感数据集、红外小目标数据集,该专栏的数据集会在多个专栏进行验证,在多个数据集进行验证mAP涨点明显,尤其是小目标、遮挡物精度提升明显的…...

【系统设计系列】数据库

系统设计系列初衷 System Design Primer&#xff1a; 英文文档 GitHub - donnemartin/system-design-primer: Learn how to design large-scale systems. Prep for the system design interview. Includes Anki flashcards. 中文版&#xff1a; https://github.com/donnemarti…...

mp4压缩视频不改变画质?跟我这样压缩视频大小

在当今数字化时代&#xff0c;视频文件变得越来越普遍&#xff0c;然而&#xff0c;这些文件通常都很大&#xff0c;给存储和传输带来了困难&#xff0c;为了解决这个问题&#xff0c;许多人都希望将视频压缩得更小&#xff0c;而又不牺牲画质&#xff0c;下面就来看看具体应该…...

AQS同步队列和等待队列的同步机制

理解AQS必须要理解同步队列和等待队列之间的同步机制&#xff0c;简单来说流程是&#xff1a; 获取锁失败的线程进入同步队列&#xff0c;成功的占用锁&#xff0c;占锁线程调用await方法进入条件等待队列&#xff0c;其他占锁线程调用signal方法&#xff0c;条件等待队列线程进…...

vue3实现无限循环滚动的方法;el-table内容无限循环滚动的实现

需求&#xff1a;vue3实现一个div内的内容无限循环滚动 方法一&#xff1a; <template><div idcontainer><div class"item" v-foritem in 5>测试内容{{{ item }}</div></div> </template><script setup> //封装一个方法…...

Windows 安装 MariaDB 数据库

之前一直使用 MySQL&#xff0c;使用 MySQL8.0 时候&#xff0c;占用内存比较大&#xff0c;储存空间好像也稍微有点大&#xff0c;看到 MariaDB 是用来代替 MySQL 的方案&#xff0c;之前用着也挺得劲&#xff0c;MySQL8.0 以上好像不能去导入低版本的 sql&#xff0c;或者需要…...

RK3568-mpp(Media Process Platform)媒体处理软件平台

第一章 MPP 介绍 1.1 概述 瑞芯微提供的媒体处理软件平台(Media Process Platform,简称 MPP)是适用于瑞芯微芯片系列的通用媒体处理软件平台。 该平台对应用软件屏蔽了芯片相关的复杂底层处理,其目的是为了屏蔽不同芯片的差异,为使用者提供统一的视频媒体处理接口(Medi…...

【ModelSim】使用终端命令行来编译、运行Verilog程序,创建脚本教程

▚ 01 ModelSim命令解说 &#x1f4e2; 这些命令是 ModelSim 中常用的命令&#xff0c;用于创建库、编译源代码和启动仿真。 &#x1f514; 在使用这些命令之前&#xff0c;你需要在 ModelSim 的命令行界面或脚本中执行 vlib 命令来创建一个库&#xff0c;然后使用 vlog 命令…...

腾讯云网站备案详细流程_审核时间说明

腾讯云网站备案流程先填写基础信息、主体信息和网站信息&#xff0c;然后提交备案后等待腾讯云初审&#xff0c;初审通过后进行短信核验&#xff0c;最后等待各省管局审核&#xff0c;前面腾讯云初审时间1到2天左右&#xff0c;最长时间是等待管局审核时间&#xff0c;网站备案…...

HTTP介绍:一文了解什么是HTTP

前言&#xff1a; 在当今数字时代&#xff0c;互联网已经成为人们生活中不可或缺的一部分。无论是浏览网页、发送电子邮件还是在线购物&#xff0c;我们都离不开超文本传输协议&#xff08;HTTP&#xff09;。HTTP作为一种通信协议&#xff0c;扮演着连接客户端和服务器的重要角…...

动态规划之子数组系列

子数组系列 1. 环形⼦数组的最⼤和2. 乘积最大子数组3. 等差数列划分4. 最长湍流子数组5. 单词拆分6. 环绕字符串中唯⼀的子字符串 1. 环形⼦数组的最⼤和 1.题目链接&#xff1a;环形⼦数组的最⼤和 2.题目描述&#xff1a;给定一个长度为 n 的环形整数数组 nums &#xff0c…...

LeetCode(力扣)332.重新安排行程Python

LeetCode332.重新安排行程 题目链接代码 题目链接 https://leetcode.cn/problems/reconstruct-itinerary/ 代码 class Solution:def backtracking(self, tickets, used, cur, result, path):if len(path) len(tickets) 1:result.append(path[:])return Truefor i, ticket…...

Pytho 从列表中创建字典 (dict.fromkeys()的问题)

问题起因&#xff1a;想在代码中通过已有的列表创建一个字典&#xff0c;但是又不想写循环&#xff0c;更不想手动填&#xff0c;所以用到了字典对象的fromkeys()方法 。 先以一个简单的例子介绍一下该方法&#xff1a; a ["A", "B", "C", &qu…...

第14节-PhotoShop基础课程-图框工具

文章目录 前言1.矩形画框2.椭圆画框 前言 图框 上面两张图&#xff0c;生成下面一幅图&#xff0c;这个就是图框工具的作用 图框工具ICON 1.矩形画框 2.椭圆画框...

使用 Nacos 在 Spring Boot 项目中实现服务注册与配置管理

&#x1f337;&#x1f341; 博主猫头虎&#xff08;&#x1f405;&#x1f43e;&#xff09;带您 Go to New World✨&#x1f341; &#x1f984; 博客首页——&#x1f405;&#x1f43e;猫头虎的博客&#x1f390; &#x1f433; 《面试题大全专栏》 &#x1f995; 文章图文…...

package.json中workspaces详解与monorepo

参考package.json配置详解&#xff0c;让你一看就会&#xff08;下&#xff09; - 掘金...

Spring Boot + Vue的网上商城之商品信息展示

Spring Boot Vue的网上商城之商品信息展示 当实现一个Spring Boot Vue的网上商城的商品信息展示时&#xff0c;可以按照以下步骤进行&#xff1a; 后端实现&#xff1a; 创建一个Spring Boot项目&#xff0c;并添加所需的依赖&#xff0c;包括Spring Web和Spring Data JPA。…...

深度优先搜索遍历与广度优先搜索遍历

目录 一.深度优先搜索遍历 1.深度优先遍历的方法 2.采用邻接矩阵表示图的深度优先搜索遍历 3.非连通图的遍历 二.广度优先搜索遍历 1.广度优先搜索遍历的方法 2.非连通图的广度遍历 3.广度优先搜索遍历的实现 4.按广度优先非递归遍历连通图 一.深度优先搜索遍历 1.深…...

java 中 返回一个空Map

原文链接&#xff1a;Map用法总结 Constructs an empty HashMap with the default initial capacity (16) &#xff08;mutable&#xff09; // Constructs an empty HashMap with the default initial capacity (16) and the default load fact // Since:1.2 Map<String, …...

sql 执行插入多条语句中 n个insert 与 一个insert+多个values 性能上有和区别 -- chatGPT

在 SQL 中&#xff0c;你可以使用多种方式来插入多条记录。其中两种常见的方式是&#xff1a; 1. **多个 INSERT 语句**&#xff1a;每个 INSERT 语句都插入一行记录。 sql INSERT INTO table_name (column1, column2, ...) VALUES (value1_1, value1_2, ...); INSERT INTO …...

数学建模国赛C蔬菜类商品的自动定价与补货决策C

数学建模国赛C蔬菜类商品的自动定价与补货决策C 完整思路和代码请私信~~~ 1.拟解决问题 这是一个关于生鲜商超蔬菜商品管理的复杂问题&#xff0c;需要综合考虑销售、补货、定价等多个方面。以下是对这些问题的总结&#xff1a; 问题 1: 蔬菜销售分析 需要分析蔬菜各品类和…...

在程序开发中,接口(interface)的重要性

开了很多人写的程序&#xff0c;都适用了接口&#xff0c;也适用了注入&#xff0c;也没有感到什么不妥。如果只是为了注入而写接口&#xff0c;其实我感觉大可不必&#xff0c;特别是把接口和实体写在一个项目项目中的。 我不知道其他人怎么看接口这一层&#xff0c;接口最大的…...

MyBatis关联关系映射详解

前言 在使用MyBatis进行数据库操作时&#xff0c;关联关系映射是一个非常重要的概念。它允许我们在数据库表之间建立关联&#xff0c;并通过对象之间的关系来进行数据查询和操作。本文将详细介绍MyBatis中的关联关系映射&#xff0c;包括一对一、一对多和多对多关系的处理方法…...

常用电子元器件基础知识

目录 一、电阻 二、电容 三、电感 四、保险丝 五、二极管 一、电阻 概念&#xff1a;顾名思义&#xff0c;就是增加电流通过的阻力的。 就像是在水渠中放入东西&#xff0c;能阻止水的顺利通过也是一个道理。 基于电阻的电气特性&#xff0c;电阻在电路中主要有以下四个…...

git撤销还未push的的提交

怎样撤销掉上图中的提交呢 使用以下代码即可提交 git reset --soft HEAD^...

MySQL--数据库基础

数据库分类 数据库大体可以分为 关系型数据库 和 非关系型数据库 常用数据类型 数值类型&#xff1a; 分为整型和浮点型&#xff1a; 字符串类型 日期类型...

Excel相关笔记

1、找出B列中A列没有的数据并放在C列 公式&#xff1a;IF(ISNA(VLOOKUP(B1,$A 1 : 1: 1:A$4,1,FALSE)),B1,“”)...

RouterOS-配置PPPoEv4v6 Server

1 接口 ether3 出接口 ether4 内网接口 2 出接口 出接口采用PPPoE拨号SLAAC获取前缀&#xff0c;手动配置后缀 2.1 选择出接口interface&#xff0c;配置PPPoE client模式 2.2 配置PPPoE client用户名和密码 2.3 从PPPoE client获取前缀地址池 2.4 给出接口选择前缀并配置…...

网站首页布局的设计/百度搜索推广操作简要流程

来自网络 傅立叶变换、拉普拉斯变换、Z变换的联系?他们的本质和区别是什么?为什么要进行这些变换。研究的都是什么?从几方面讨论下。 本文引用地址&#xff1a;http://www.eepw.com.cn/article/277444.htm 这三种变换都非常重要!任何理工学科都不可避免需要这些变换。 傅立叶…...

自学网站建设基本流程/廊坊seo推广公司

Vlookup函数&#xff0c;可以算是一个数据专员必须要会使用的基本函数了&#xff0c;确实很好用。但是你可能会注意到&#xff0c;Excel一旦数据量过大&#xff0c;打开都费劲了&#xff0c;何况打开后&#xff0c;你还要输入公式计算&#xff0c;就更费劲了&#xff0c;此时你…...

广州网站建设电话大全/百度模拟点击

题目&#xff1a;计算阶乘n!n*(n-1)*(n-2)*…3*2*1用递归函数来表示为&#xff1a;def f(x):if x1:return 1return x*f(x-1)代码截图运行结果计算5的阶乘5&#xff01;&#xff0c;运行正确。接着计算大一点的数1000&#xff01;&#xff1a;代码截图运行结果运行结果可以看到运…...

大连市网站制作电话/seo推广软件代理

有时候很实用在一些场合&#xff0c;留住备用吧复制代码 代码如下:function is_mobile_request(){$_SERVER["ALL_HTTP"] isset($_SERVER["ALL_HTTP"]) ? $_SERVER["ALL_HTTP"] : "";$mobile_browser "0";if(preg_match(&…...

乐山北京网站建设/seo手机端优化

移动、联通、电信版P6均可成功的EMUI3.0开发版/稳定版 ROOT... - P6/P6s 花粉俱乐部 http://cn.club.vmall.com/forum.php?modviewthread&tid1710032&extrapage%3D4%26filter%3Dauthor%26orderby%3Ddateline%26orderby%3Ddateline 华为 Ascend P6 联通版首个真正意义的…...

wordpress清理/黄山网站建设

http://www.cnblogs.com/leesf456/p/6063694.html zk的应用&#xff1a; https://baijiahao.baidu.com/s?id1576906164723309054&wfrspider&forpc...