Kafka消费者 TCP管理
Kafka消费者 TCP管理
- 创建 TCP
- FindCoordinator
- 连接协调者
- 消费数据
- TCP 连接数
- 关闭 TCP 连接
消费者的程序入口类是 KafkaConsumer
- 构建 KafkaConsumer 时 ,不会创建任何 TCP 连接
- TCP 连接是用
KafkaConsumer.poll创建
创建 TCP
poll 创建 TCP 的地方 :
- 发起 FindCoordinator 请求时
- 连接协调者时
- 消费数据时
FindCoordinator
协调者 (Coordinator) : 驻留在 Broker 的内存中
- 负责消费组的组成员管理和各个消费者的位移提交管理
- 当消费者首次用 poll 时,发送
FindCoordinator请求到任意个Broker (负载最小) 发送请求,并告知 Broker 的协调者
负载评估 : 消费者连接所有 Broker 中,待发送请求最少
连接协调者
Broker 处理完 FindCoordinator 请求后,会返回 Broker 的协调者
- 消费者知道协调者后,就对该 Broker 进行 Socket 连接
- 成功连接协调者后,就能组协调操作,如 : 加入组、等待组分配方案、心跳请求处理、位移获取、位移提交
消费数据
消费者给每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP
- 例子 : 消费者要消费 5 个分区的数据,这 5 个分区的领导者副本分布在 4 台 Broker 上,那消费者在消费时 ,会与这 4 台 Broker 的创建 Socket 连接
TCP 连接数
消费者创建 3 类 TCP 连接:
- 确定协调者和获取集群元数据
- 连接协调者,令其执行组成员管理操作
- 执行实际的消息获取
Kafka 日志:
# 消费者程序创建的第一个 TCP 连接,用于发送 FindCoordinator 请求
# 消费者创建第一个连接,它连接的 Broker 节点的 ID 是 -1 :
# 消费者不知道 Kafka Broker 的任何信息
[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者复用上次创建 Socket 连接
# 向 Kafka 发送元数据请求,获取整个集群的信息
[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=‘t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)# 消费者开始发送 FindCoordinator 请求里的 Broker
# 即 localhost:9092,nodeId = -1
[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)# 消费者成功协调者的 Broker 信息(node_id = 2) 后,
# 消费者就知道协调者 Broker 的连接信息
[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094} (org.apache.kafka.clients.NetworkClient:837)# 发第二个 Socket 连接,TCP连接 localhost:9094
# 只有连接协调者后,消费者才能开启消费组的各种功能
[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者要创建新 TCP 连接,用于实际的消息获取
# 消费分区的领导者副本在哪台 Broker,消费者连接那个 Broker
# 消费者创建 3 个 TCP 连接:
# localhost:9092,localhost:9093 和 localhost:9094
[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
ID = -1 原因 :
- 消费者程序(其实也不光是消费者,生产者也是这样的机制)首次启动时,对 Kafka 集群一无所知,因此用 -1 来表示尚未获取到 Broker 数据
ID = 2147483645 原因 :
Integer.MAX_VALUE- 协调者的 Broker ID- 协调者 ID 是 2,Socket 连接节点 ID =
Integer.MAX_VALUE - 2= 2147483645 - 这种节点 ID 目的 : 让组协调请求和真正的数据获取请求使用不同的 Socket 连接
关闭 TCP 连接
消费者关闭 Socket :
- 主动关闭 : 调用
KafkaConsumer.close(),或执行kill - Kafka 自动关闭 : 由
connection.max.idle.ms控制 (默认值: 9 分钟),当某个 Socket 连续 9 分钟都没有任何请求,消费者杀掉该 Socket 连接
相关文章:
Kafka消费者 TCP管理
Kafka消费者 TCP管理创建 TCPFindCoordinator连接协调者消费数据TCP 连接数关闭 TCP 连接消费者的程序入口类是 KafkaConsumer 构建 KafkaConsumer 时 ,不会创建任何 TCP 连接TCP 连接是用 KafkaConsumer.poll 创建 创建 TCP poll 创建 TCP 的地方 : 发起 FindC…...
软考高级备考哪一个类型好些?
软考高级是比中级和初级难,科目就要考三科,选择题基础知识简答题案例分析写作论文 软考高级科目有:信息系统项目管理师、系统分析师、系统架构设计师、网络规划师、系统规划与管理师。如下: 软考高级中高项信息系统项目管理师师比…...
2023 HBU 天梯赛第一次测试 题目集
目录 1 建校日期 2 发射小球 3 背上书包去旅行 4 吉利的数字 5 向前走 6 热水器 7 走方格 8 朋友圈 9 交保护费 10 走方格 11 和与积 12 缩短字符串 13 买木棒 1 建校日期 在2022 ICPC沈阳站上,东北大学命题组给参赛的选手们出了一道签到题࿰…...
华为OD机试题,用 Java 解【子序列长度】问题
华为Od必看系列 华为OD机试 全流程解析+经验分享,题型分享,防作弊指南)华为od机试,独家整理 已参加机试人员的实战技巧华为od 2023 | 什么是华为od,od 薪资待遇,od机试题清单华为OD机试真题大全,用 Python 解华为机试题 | 机试宝典使用说明 参加华为od机试,一定要注意不…...
内网环境解决SSL证书问题
本来这个没什么好写的,但是坑实在有点多,不得不写个文章记录下来。 创建证书看这里!!! 很多知识点要结合这个页面内容来看。 创建证书已经看过相关文章,然后用unity跑的时候发现连不上,完全没…...
数据分析方法01对比分析法
对比分析法 1、概念 基于相同的数据标准下,把两个及以上相互联系的指标数据进行比较,准确量化的分析他们的差异,说明研究对象在规模大小,水平高低,速度快慢等的不同表现,目的是为了找到差异的原因&#x…...
基于SMOKE多模式排放清单处理技术及EDGAR/MEIC清单制作与VOCs排放量核算
查看原文>>>基于SMOKE多模式排放清单处理技术及EDGAR/MEIC清单制作与VOCs排放量核算 (qq.com)随着我国经济快速发展,我国面临着日益严重的大气污染问题。近年来,严重的大气污染问题已经明显影响国计民生,引起政府、学界和人们越来越…...
CSS流动布局-页面自适应
项目中经常会碰到页面自适应的问题,例如:商城的列表展示、分类列表展示等页面,如下: 该页面会随着页面的放大缩小而随之发生变化,这种自适应的页面布局在大屏幕、小屏幕、不同的浏览器设备上都应该呈现出与设计匹配的…...
3.Elasticsearch初步进阶
3.Elasticsearch初步进阶[toc]1.文档批量操作批量获取文档数据批量获取文档数据是通过_mget的API来实现的在URL中不指定index和type请求方式:GET请求地址:_mget功能说明:可以通过ID批量获取不同index和type的数据请求参数docs:文档数组参数_index:指定index_type:指定type_id:指…...
优思学院|六西格玛管理的核心理念是什么?
六西格玛管理是一种基于数据分析的质量管理方法,旨在通过降低过程的变异性来达到质量稳定和优化的目的。该方法以希腊字母“σ”为名,代表标准差,是衡量过程变异性的重要指标。 六西格玛管理的核心理念是“以客户为中心、以数据为基础、追求…...
第十七节 多态
多态 什么是多态? ●同类型的对象,执行同一个行为,会表现出不同的行为特征。 多态的常见形式 父类类型 对象名称new子类构造器; 接口 对象名称new 实现类构造器; 多态中成员访问特点 ●方法调用:编译看左边,运行看右边。 ●变量调用:编译看…...
[vue]提供一种网站底部备案号样式代码
演示 vue组件型(可直接用) 组件代码:copyright-icp.vue <template><div class"icp">{{© ${year} ${author} }}<a href"http://beian.miit.gov.cn/" target"_blank">{{ record }}</a…...
python第四天作业~函数练习
目录 作业4、判断以下哪些不能作为标识符 A、a B、¥a C、_12 D、$a12 E、false F、False 作业5: 输入数,判断这个数是否是质数(要求使用函数 for循环) 作业6:求50~150之间的质数是…...
linux安装influxdb-rpmyum方式
一、influxdb的安装InfluxDB简介时序数据库InfluxDB版是一款专门处理高写入和查询负载的时序数据库,用于存储大规模的时序数据并进行实时分析,包括来自DevOps监控、应用指标和IoT传感器上的数据主要特点:专为时间序列数据量身订造高性能数据存…...
死锁
1.死锁的定义 多线程以及多进程改善了系统资源的利用率并提高了系统 的处理能力。然而,并发执行也带来了新的问题——死锁。所谓死锁是指多个线程因竞争资源而造成的一种僵局(互相等待),若无外力作用,这些进程都将无法…...
C++基础了解-05-C++常量
C常量 一、C常量 常量是固定值,在程序执行期间不会改变。这些固定的值,又叫做字面量。 常量可以是任何的基本数据类型,可分为整型数字、浮点数字、字符、字符串和布尔值。 常量就像是常规的变量,只不过常量的值在定义后不能进…...
深度学习笔记-2.自动梯度问题
通过反向传播进行自动求梯度1-requires_grad问题2-梯度3- detach() 和 with torch.no_grad()4- Tensor.data.requires_gradPyTorch提供的autograd包能够根据输入和前向传播过程自动构建计算图,并执行反向传播. 1-requires_grad问题 requires_gradTrue …...
一文读懂倒排序索引涉及的核心概念
基础概念相信对于第一次接触Elasticsearch的同学来说,最难理解的概念就是倒排序索引(也叫反向索引),因为这个概念跟我们之前在传统关系型数据库中的索引概念是完全不同的!在这里我就重点给大家介绍一下倒排序索引&…...
Java基础算法题
以创作之名致敬节日 胜固欣然,败亦可喜。 --苏轼 目录 练习1 : 优化代码 扩展 : CRTL Alt M 自动抽取方法 练习2: 方法一: 方法二: 方法三: Math : 顾名思义,Math类就是用来进行数学计算的,它提供了大量的静态方法来便于我们实…...
「SAP ABAP」你真的了解OPEN SQL的DML语句吗 (附超详细案例讲解)
💂作者简介: THUNDER王,一名热爱财税和SAP ABAP编程以及热爱分享的博主。目前于江西师范大学本科在读,同时任汉硕云(广东)科技有限公司ABAP开发顾问。在学习工作中,我通常使用偏后端的开发语言A…...
深度学习在微纳光子学中的应用
深度学习在微纳光子学中的主要应用方向 深度学习与微纳光子学的结合主要集中在以下几个方向: 逆向设计 通过神经网络快速预测微纳结构的光学响应,替代传统耗时的数值模拟方法。例如设计超表面、光子晶体等结构。 特征提取与优化 从复杂的光学数据中自…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
Python如何给视频添加音频和字幕
在Python中,给视频添加音频和字幕可以使用电影文件处理库MoviePy和字幕处理库Subtitles。下面将详细介绍如何使用这些库来实现视频的音频和字幕添加,包括必要的代码示例和详细解释。 环境准备 在开始之前,需要安装以下Python库:…...
华为云Flexus+DeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建
华为云FlexusDeepSeek征文|DeepSeek-V3/R1 商用服务开通全流程与本地部署搭建 前言 如今大模型其性能出色,华为云 ModelArts Studio_MaaS大模型即服务平台华为云内置了大模型,能助力我们轻松驾驭 DeepSeek-V3/R1,本文中将分享如何…...
在WSL2的Ubuntu镜像中安装Docker
Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包: for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...
tree 树组件大数据卡顿问题优化
问题背景 项目中有用到树组件用来做文件目录,但是由于这个树组件的节点越来越多,导致页面在滚动这个树组件的时候浏览器就很容易卡死。这种问题基本上都是因为dom节点太多,导致的浏览器卡顿,这里很明显就需要用到虚拟列表的技术&…...
Angular微前端架构:Module Federation + ngx-build-plus (Webpack)
以下是一个完整的 Angular 微前端示例,其中使用的是 Module Federation 和 npx-build-plus 实现了主应用(Shell)与子应用(Remote)的集成。 🛠️ 项目结构 angular-mf/ ├── shell-app/ # 主应用&…...
九天毕昇深度学习平台 | 如何安装库?
pip install 库名 -i https://pypi.tuna.tsinghua.edu.cn/simple --user 举个例子: 报错 ModuleNotFoundError: No module named torch 那么我需要安装 torch pip install torch -i https://pypi.tuna.tsinghua.edu.cn/simple --user pip install 库名&#x…...
HashMap中的put方法执行流程(流程图)
1 put操作整体流程 HashMap 的 put 操作是其最核心的功能之一。在 JDK 1.8 及以后版本中,其主要逻辑封装在 putVal 这个内部方法中。整个过程大致如下: 初始判断与哈希计算: 首先,putVal 方法会检查当前的 table(也就…...
C++.OpenGL (14/64)多光源(Multiple Lights)
多光源(Multiple Lights) 多光源渲染技术概览 #mermaid-svg-3L5e5gGn76TNh7Lq {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mermaid-svg-3L5e5gGn76TNh7Lq .error-icon{fill:#552222;}#mermaid-svg-3L5e5gGn76TNh7Lq .erro…...
