当前位置: 首页 > news >正文

Kafka收发消息核心参数详解

文章目录

  • 1、从基础的客户端说起
    • 1.1、消息发送者主流程
    • 1.2、消息消费者主流程
  • 2、从客户端属性来梳理客户端工作机制
    • 2.1、消费者分组消费机制

1、从基础的客户端说起

Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可:

  <dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.13</artifactId><version>3.4.0</version></dependency>

1.1、消息发送者主流程

​ 然后可以使用Kafka提供的Producer类,快速发送消息。

public class MyProducer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) throws ExecutionException, InterruptedException {//PART1:设置发送者相关属性Properties props = new Properties();// 此处配置的是kafka的端口props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);// 配置key的序列化类props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");// 配置value的序列化类props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");Producer<String,String> producer = new KafkaProducer<>(props);CountDownLatch latch = new CountDownLatch(5);for(int i = 0; i < 5; i++) {//Part2:构建消息ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, Integer.toString(i), "MyProducer" + i);//Part3:发送消息//单向发送:不关心服务端的应答。producer.send(record);System.out.println("message "+i+" sended");//同步发送:获取服务端应答消息前,会阻塞当前线程。RecordMetadata recordMetadata = producer.send(record).get();String topic = recordMetadata.topic();int partition = recordMetadata.partition();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+"; partition:"+partition+ ";offset:"+offset);//异步发送:消息发送后不阻塞,服务端有应答后会触发回调函数producer.send(record, new Callback() {@Overridepublic void onCompletion(RecordMetadata recordMetadata, Exception e) {if(null != e){System.out.println("消息发送失败,"+e.getMessage());e.printStackTrace();}else{String topic = recordMetadata.topic();long offset = recordMetadata.offset();String message = recordMetadata.toString();System.out.println("message:["+ message+"] sended with topic:"+topic+";offset:"+offset);}latch.countDown();}});}//消息处理完才停止发送者。latch.await();producer.close();}
}

​ 整体来说,构建Producer分为三个步骤:

  • 设置Producer核心属性 :Producer可选的属性都可以由ProducerConfig类管理。比如ProducerConfig.BOOTSTRAP_SERVERS_CONFIG属性,显然就是指发送者要将消息发到哪个Kafka集群上。这是每个Producer必选的属性。在ProducerConfig中,对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。
  • 构建消息:Kafka的消息是一个Key-Value结构的消息。其中,key和value都可以是任意对象类型。其中,key主要是用来进行Partition分区的,业务上更关心的是value。
  • 使用Producer发送消息:通常用到的就是单向发送、同步发送和异步发送者三种发送方式。

1.2、消息消费者主流程

​ 接下来可以使用Kafka提供的Consumer类,快速消费消息。

public class MyConsumer {private static final String BOOTSTRAP_SERVERS = "worker1:9092,worker2:9092,worker3:9092";private static final String TOPIC = "disTopic";public static void main(String[] args) {//PART1:设置发送者相关属性Properties props = new Properties();//kafka地址props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);//每个消费者要指定一个groupprops.put(ConsumerConfig.GROUP_ID_CONFIG, "test");//key序列化类props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");//value序列化类props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");Consumer<String, String> consumer = new KafkaConsumer<>(props);consumer.subscribe(Arrays.asList(TOPIC));while (true) {//PART2:拉取消息// 100毫秒超时时间ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));//PART3:处理消息for (ConsumerRecord<String, String> record : records) {System.out.println("offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());}//提交offset,消息就不会重复推送。consumer.commitSync(); //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
//            consumer.commitAsync(); //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。}}
}

​ 整体来说,Consumer同样是分为三个步骤:

  • 设置Consumer核心属性 :可选的属性都可以由ConsumerConfig类管理。在这个类中,同样对于大部分比较重要的属性,都配置了对应的DOC属性进行描述。同样BOOTSTRAP_SERVERS_CONFIG是必须设置的属性。
  • 拉取消息:Kafka采用Consumer主动拉取消息的Pull模式。consumer主动从Broker上拉取一批感兴趣的消息。
  • 处理消息,提交位点:消费者将消息拉取完成后,就可以交由业务自行处理对应的这一批消息了。只是消费者需要向Broker提交偏移量offset。如果不提交Offset,Broker会认为消费者端消息处理失败了,还会重复进行推送。
    ​ Kafka的客户端基本就是固定的按照这三个大的步骤运行。在具体使用过程中,最大的变数基本上就是给生产者和消费者的设定合适的属性。这些属性极大的影响了客户端程序的执行方式。

2、从客户端属性来梳理客户端工作机制

​ 渔与鱼:Kafka的客户端API的重要目的就是想要简化客户端的使用方式,所以对于API的使用,尽量熟练就可以了。对于其他重要的属性,都可以通过源码中的描述去学习,并且可以设计一些场景去进行验证。其重点,是要逐步在脑海之中建立一个Message在Kafka集群中进行流转的基础模型。

​ 其实Kafka的设计精髓,是在网络不稳定,服务也随时会崩溃的这些作死的复杂场景下,如何保证消息的高并发、高吞吐,那才是Kafka最为精妙的地方。但是要理解那些复杂的问题,都是需要建立在这个基础模型基础上的。

2.1、消费者分组消费机制

​ 这是我们在使用kafka时,最为重要的一个机制,因此最先进行梳理。

​ 在Consumer中,都需要指定一个GROUP_ID_CONFIG属性,这表示当前Consumer所属的消费者组。他的描述是这样的:

  public static final String GROUP_ID_CONFIG = "group.id";public static final String GROUP_ID_DOC = "A unique string that identifies the consumer group this consumer belongs to. This property is required if the consumer uses either the group management functionality by using <code>subscribe(topic)</code> or the Kafka-based offset management strategy.";

既然这里提到了kafka-based offset management strategy,那是不是也有非Kafka管理Offset的策略呢?

另外,还有一个相关的参数GROUP_INSTANCE_ID_CONFIG,可以给组成员设置一个固定的instanceId,这个参数通常可以用来减少Kafka不必要的rebalance。

​ 从这段描述中看到,对于Consumer,如果需要在subcribe时使用组管理功能以及Kafka提供的offset管理策略,那就必须要配置GROUP_ID_CONFIG属性。这个分组消费机制简单描述就是这样的:

image.png

​ 生产者往Topic下发消息时,会尽量均匀的将消息发送到Topic下的各个Partition当中。而这个消息,会向所有订阅了该Topic的消费者推送。推送时,每个ConsumerGroup中只会推送一份。也就是同一个消费者组中的多个消费者实例,只会共同消费一个消息副本。而不同消费者组之间,会重复消费消息副本。这就是消费者组的作用。

​ 与之相关的还有Offset偏移量。这个偏移量表示每个消费者组在每个Partiton中已经消费处理的进度。在Kafka中,可以看到消费者组的Offset记录情况。

[oper@worker1 bin]$ ./kafka-consumer-groups.sh --bootstrap-server worker1:9092 --describe --group test

相关文章:

Kafka收发消息核心参数详解

文章目录 1、从基础的客户端说起1.1、消息发送者主流程1.2、消息消费者主流程 2、从客户端属性来梳理客户端工作机制2.1、消费者分组消费机制 1、从基础的客户端说起 Kafka提供了非常简单的客户端API。只需要引入一个Maven依赖即可&#xff1a; <dependency><groupId…...

Springboot中Aop的使用

Springboot中使用拦截器、过滤器、监听器-CSDN博客 相比较于拦截器&#xff0c;Spring 的aop则功能更强大&#xff0c;封装的更细致&#xff0c;需要单独引用 jar包。 <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-b…...

创建vue3项目、链式调用、setup函数、ref函数、reactive函数、计算和监听属性、vue3的生命周期、torefs的使用、vue3的setup写法

1 创建vue3项目 # 两种方式- vue-cli&#xff1a;vue脚手架---》创建vue项目---》构建vue项目--》工具链跟之前一样- vite &#xff1a;https://cn.vitejs.dev/-npm create vuelatest // 或者-npm create vitelatest一路选择即可# 运行vue3项目-vue-cli跟之前一样-vite 创建的…...

搭建好自己的PyPi服务器后怎么使用

当您成功搭建好自己的 PyPI 服务器后&#xff0c;您可以使用以下步骤来发布和使用您的包&#xff1a; 打包您的代码&#xff1a; 首先&#xff0c;将您的 Python 项目打包成一个发布包。确保您已经在项目根目录下创建了 setup.py 文件&#xff0c;并按照正确的格式填写了项目信…...

Vue3 中使用provide和reject

1、provide 和reject 可以实现一条事件线上的 父传子&#xff0c;父传孙等&#xff1b;相比较 props emits 仅限与父子传参更方便&#xff0c;相较于pinia书写更简单&#xff0c;但是需要注意使用响应式&#xff0c;如果是非响应式的会导致页面更新不及时 父组件 <templat…...

大数据flink篇之一-基础知识

一、起源 2010至2014年间&#xff0c;由柏林工业大学、柏林洪堡大学和哈索普拉特纳研究所联合发起名Stratosphere的研究项目。2014年4月&#xff0c;项目贡献给Apache基金会&#xff0c;成为孵化项目。更名为Flink2014年12月&#xff0c;成为基金会顶级项目2015年9月&#xff…...

No140.精选前端面试题,享受每天的挑战和学习

🤍 前端开发工程师(主业)、技术博主(副业)、已过CET6 🍨 阿珊和她的猫_CSDN个人主页 🕠 牛客高级专题作者、在牛客打造高质量专栏《前端面试必备》 🍚 蓝桥云课签约作者、已在蓝桥云课上架的前后端实战课程《Vue.js 和 Egg.js 开发企业级健康管理项目》、《带你从入…...

Oracle 11g_FusionOS_安装文档

同事让安装数据库&#xff0c;查询服务器信息发现操作系统是超聚变根据华为openEuler操作系统更改的自研操作系统&#xff0c;安装过程中踩坑不少&#xff0c;最后在超聚变厂商的技术支持下安装成功&#xff0c;步骤可参数该文。 一、 安装环境准备 1.1 软件下载 下载地址:…...

Linux驱动实现IO模型

在Linux系统分为内核态和用户态&#xff0c;CPU会在这两个状态之间进行切换。当进行IO操作时&#xff0c;应用程序会使用系统调用进入内核态&#xff0c;内核操作系统会准备好数据&#xff0c;把IO设备的数据加载到内核缓冲区。 然后内核操作系统会把内核缓冲区的数据从内核空…...

wsl2 更新报错问题解决记录

1、问题 win10 中安装的 wsl2&#xff0c;启动 docker desktop 时提示 wsl2 有问题&#xff1a; 于是点击推荐的地址连接到微软&#xff0c;下载 wsl2 的更新文件。之后运行&#xff0c;又报错&#xff1a; 更新被卡住。 2、解决方法 WinR 输入 cmd 打开命令行窗口&#x…...

突破算法迷宫:精选50道-算法刷题指南

前言 在计算机科学和编程领域&#xff0c;算法和数据结构是基础的概念&#xff0c;无论你是一名初学者还是有经验的开发者&#xff0c;都需要掌握它们。本文将带你深入了解一系列常见的算法和数据结构问题&#xff0c;包括二分查找、滑动窗口、链表、二叉树、TopK、设计题、动…...

玩转Mysql系列 - 第26篇:聊聊mysql如何实现分布式锁?

这是Mysql系列第26篇。 本篇我们使用mysql实现一个分布式锁。 分布式锁的功能 分布式锁使用者位于不同的机器中&#xff0c;锁获取成功之后&#xff0c;才可以对共享资源进行操作 锁具有重入的功能&#xff1a;即一个使用者可以多次获取某个锁 获取锁有超时的功能&#xff…...

linux 解压缩命令tar

Tar tar 命令的选项有很多(用 man tar 可以查看到)&#xff0c;但常用的就那么几个选项&#xff0c;下面来举例说明一下&#xff1a; tar -cf all.tar *.jpg 这条命令是将所有.jpg 的文件打成一个名为 all.tar 的包。-c 是表示产生新的包&#xff0c;-f 指 定包的文件名。 …...

OpenAI ChatGPT API 文档之 Embedding

译者注&#xff1a; Embedding 直接翻译为嵌入似乎不太恰当&#xff0c;于是问了一下 ChatGPT&#xff0c;它的回复如下&#xff1a; 在自然语言处理和机器学习领域&#xff0c;"embeddings" 是指将单词、短语或文本转换成连续向量空间的过程。这个向量空间通常被称…...

Java常用类(二)

好久不见&#xff0c;因工作原因&#xff0c;好久没发文了&#xff0c;OldWang 回来了&#xff0c;持续更新Java内容&#xff01;⭐ 不可变和可变字符序列使用陷阱⭐ 时间处理相关类⭐ Date 时间类(java.util.Date)⭐ DateFormat 类和 SimpleDateFormat 类⭐ Calendar 日历类 ⭐…...

Java获取给定月份的前N个月份和前N个季度

描述&#xff1a; 在项目开发过程中&#xff0c;遇到这样一个需求&#xff0c;即&#xff1a;给定某一月份&#xff0c;得到该月份前面的几个月份以及前面的几个季度。例如&#xff1a;给定2023-09&#xff0c;获取该月份前面的前3个月&#xff0c;即2023-08、2023-07、2023-0…...

网页资源加载过程

网页资源加载是指在浏览器中访问一个网页时&#xff0c;浏览器如何获取和显示网页内容的过程。这个过程通常分为以下几个步骤&#xff1a; DNS 解析&#xff1a; 当用户在浏览器中输入一个网址&#xff08;例如&#xff0c;https://www.example.com&#xff09;&#xff0c;浏览…...

使用git config --global设置用户名和邮件,以及git config的全局和局部配置

文章目录 1. 文章引言2. 全局配置2.1 命令方式2.2 配置文件方式 3. 局部配置3.1 命令方式3.2 配置文件方式 4. 总结 1. 文章引言 我们为什么要设置设置用户名和邮件&#xff1f; 我们在注册github&#xff0c;gitlab等时&#xff0c;一般使用用户名或邮箱&#xff1a; 这个用户…...

【C语言】21-指针-3

目录 1. 指针数组1.1 什么是指针数组1.2 如何定义指针数组1.3 如何使用指针数组2. 多重指针2.1 二重指针的定义2.2 二重指针的初始化与赋值2.3 二重指针的使用3. 指针常量、常量指针、指向常量的常指针3.1 概念3.2 const pointer3.3 pointer to a constant3.3.1 (pointer to a …...

解决craco启动react项目卡死在Starting the development server的问题

现象&#xff1a; 原因&#xff1a;craco.config.ts配置文件有问题 经过排查发现Dev开发模式下不能有splitChunk的配置&#xff0c; 解决办法&#xff1a; 加一个生产模式的判断&#xff0c;开发模式不加载splitChunk的配置&#xff0c;仅在生产模式才加载 判断条件代码&#…...

常见的密码学算法都有哪些?

密码学算法是用于保护信息安全的数学方法和技术。它们可以分为多个类别&#xff0c;包括对称加密、非对称加密、哈希函数和数字签名等。以下是一些常见的密码学算法&#xff1a; 1、对称加密算法&#xff1a; AES&#xff08;高级加密标准&#xff09;&#xff1a;一种广泛使…...

云安全【阿里云ECS攻防】

关于VPC的概念还请看&#xff1a;记录一下弹性计算云服务的一些词汇概念 - 火线 Zone-安全攻防社区 一、初始化访问 1、元数据 1.1、SSRF导致读取元数据 如果管理员给ECS配置了RAM角色&#xff0c;那么就可以获得临时凭证 如果配置RAM角色 在获取ram临时凭证的时候&#xff…...

TBSS数据分析

tbss分析基本流程&#xff1a; 步骤一&#xff0c;指标解算&#xff1a;求解出FA&#xff0c;MD&#xff0c;AD&#xff0c;RD指标 #!/bin/bash #基于体素的形态学分析VBA path/media/kui/Passport5T/DATA_help/TBSS/row_data mkdir ${path}/Results_DTI_tbss mkdir ${path}/R…...

【单调队列】 239. 滑动窗口最大值

239. 滑动窗口最大值 解题思路 计算每一个滑动窗口的最大值 关键在于借助单调队列实现窗口对于单调队列 尾部添加元素 头部删除元素添加元素操作&#xff1a;从尾部开始循环对比 删除比当前元素小的元素获取最大值元素 直接获取头部元素删除元素操作 直接删除头部元素 class…...

Spring实例化源码解析之ComponentScanAnnotationParser(四)

上一章我们分析了ConfigurationClassParser&#xff0c;配置类的解析源码分析。在ComponentScans和ComponentScan注解修饰的候选配置类的解析过程中&#xff0c;我们需要深入的了解一下ComponentScanAnnotationParser的parse执行流程&#xff0c;SpringBoot启动类为什么这么写&…...

MySQL - 外键(foreign key)约束的作用和使用

什么是外键约束&#xff1f; 外键&#xff1a;用来让两张表的数据之间建立连接&#xff0c;从而保证数据的一致性和完整性。 外键约束是用于建立两个表之间关系的一种约束&#xff0c;它定义了一个表中的列与另一个表中的列之间的关系。外键约束可以保证数据的完整性和一致性…...

前端开发之服务器的基本概念与初识Ajax

1&#xff0c;服务器的基本概念与初识Ajax 1.1 URL地址的组成部分 1.2 客户端与服务器的通信过程 1.3 网页中如何请求数据 1.4 $.get()函数 1.4.1 $.get()函数的语法 // jQuery 中 $.get() 函数的功能单一&#xff0c;专门用来发起 get 请求&#xff0c;从而将服务器上的资源…...

数据结构排序算法---八大排序复杂度及代码实现

文章目录 一、冒泡排序代码实现 二、直接插入排序代码实现 三、希尔排序代码实现 四、选择排序代码实现 五、堆排序代码实现 六、快速排序代码实现 七、归并排序代码实现 八、计数排序代码实现 稳定性&#xff1a;相同的数据排序后&#xff0c;相对位置是否发生改变 一、冒泡排…...

GMS之Launcher中去除默认Search或替换为Chrome Search

将Launcher中搜索框去除 将FeatureFlags.java文件中的QSB_ON_FIRST_SCREEN变量修改为false \system\vendor\mediatek\proprietary\packages\apps\Launcher3\src\com\android\launcher3\config\FeatureFlags.java/*** Defines a set of flags used to control various launche…...

@DateTimeFormat 和 @JsonFormat 的详细研究

关于这两个时间转化注解&#xff0c;先说结论 一、介绍 1、DateTimeFormat DateTimeFormat 并不会根据得到其属性 pattern 把前端传入的数据转换成自己想要的格式&#xff0c;而是将前端的String类型数据封装到Date类型&#xff1b;其次它的 pattern 属性是用来规范前端传入…...

政府网站建设管理经验汇报材料/成都网络营销推广

dagger2 和 RxJava butterknife 以及 Retrofit使用起来非常酸爽 代码非常干净清晰 动手尝试 配置编译 DaggerAppComponent的时候 出现问题 配置dagger2 在 Application中能够顺利编译,但是添加完 butterknife之后 Application中的 DaggerAppComponent编译失败。原来是dagger2和…...

wordpress 忘记用户名密码/在线排名优化

使用 Entity Framework 連結 MS SQL 資料庫時&#xff0c;基本上是無難度的事情。但 Oracle 又該如何做呢?安裝 Oracle.ManagedDataAccess.EntityFramework透過 NuGet 搜尋 Oracle&#xff0c;即可找到要安裝的目標「Oracle.ManagedDataAccess.EntityFramework」&#xff0c;N…...

网站广告推送怎么做/官方正版清理优化工具

一、八大排序 排序也称排序算法(Sort Algorithm)&#xff0c;排序是将一组数据&#xff0c;依指定的顺序进行排列的过程。 排序分类 总体对比 1 冒泡排序 基本思想 冒泡排序&#xff08;Bubble Sorting&#xff09;的基本思想是&#xff1a;通过对待排序序列从前向后&…...

做盗文网站/搜外网友情链接

在长久的“裸奔”之后&#xff0c;电动平衡车即将穿上“衣服”。 早先&#xff0c;北京、上海等地已经陆续出台了相关政策&#xff0c;全面禁止电动平衡车、电动滑板等违规交通工具上路。而在近日&#xff0c;首个电动平衡车标准也新鲜出炉了。 昨天&#xff0c;在国家质检总局…...

做优惠卷网站/冯耀宗seo教程

npm 初始化by Zell Liew由Zell Liew 初始化npm的最佳时间 (The best time to npm init) When should you npm init?您应该何时启动npm init &#xff1f; Most developers run npm init right after creating and navigating into a new project.大多数开发人员在创建并导航…...

c2c平台的产品类型/seo短视频网页入口引流

2019独角兽企业重金招聘Python工程师标准>>> import java.io.UnsupportedEncodingException; /** * 转换字符串的编码 */public class ChangeCharset { /** 7位ASCII字符&#xff0c;也叫作ISO646-US、Unicode字符集的基本拉丁块 */ public static final String US_…...