当前位置: 首页 > news >正文

记一次Kafka warning排查过程

1、前因

在配合测试某个需求的时候,正好看到控制台打印了个报错,如下:

2023-03-06 17:05:58,565[325651ms][pool-28-thread-1][org.apache.kafka.common.utils.AppInfoParser][WARN] - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=producer-1at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)at com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)at com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)at org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:64)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:426)at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)at org.springframework.kafka.core.DefaultKafkaProducerFactory.createKafkaProducer(DefaultKafkaProducerFactory.java:406)at org.springframework.kafka.core.DefaultKafkaProducerFactory.createProducer(DefaultKafkaProducerFactory.java:392)at org.springframework.kafka.core.KafkaTemplate.getTheProducer(KafkaTemplate.java:463)at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:401)at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:216)

很明显是Kafka在创建Producer实例的时候重复了,正好趁着有空排查排查,不然谁知道后面会因为这个导致什么问题。

2、BUG定位

根据堆栈信息,找到与Kafka有关的报错代码,进到类 AppInfoParser 的 registerAppInfo方法中,代码如下:

public static synchronized void registerAppInfo(String prefix, String id, Metrics metrics, long nowMs) {try {ObjectName name = new ObjectName(prefix + ":type=app-info,id=" + Sanitizer.jmxSanitize(id));AppInfo mBean = new AppInfo(nowMs);ManagementFactory.getPlatformMBeanServer().registerMBean(mBean, name);registerMetrics(metrics, mBean); // prefix will be added later by JmxReporter} catch (JMException e) {log.warn("Error registering AppInfo mbean", e);}
}

从方法名可以推测,应当是 Kafka 在创建 Producer 实例时,会按 Producer 的 id 构造一个 AppInfo,并注册到一个公共的类似Map的东西中,而我们的代码创建了多个实例,并且 id 重复了,基于这个猜测来看Kafka的配置文件(已脱敏):

<!-- 定义producer1的参数 -->
<bean id="producerProperties1" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean><!-- 定义producer2的参数 -->
<bean id="producerProperties2" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean>

可以看到项目中配置了两个 Kafka 的 Producer,并且都未指定 Producer 的 id,符合我们的猜测,那么我们要怎么修复,如果我们指定了 id,Producer 在多线程的情况下,每个线程的 id 是否又会重复。
基于几个问题,进到类 KafkaProducer 的构造方法中,来看 AppInfoParser.registerAppInfo() 方法调用语句:

AppInfoParser.registerAppInfo(JMX_PREFIX, clientId, metrics, time.milliseconds());

可以看到前面说的 Producer 的 id 实际上是 clientId,往前找到 clientId 的赋值语句:

this.clientId = buildClientId(config.getString(ProducerConfig.CLIENT_ID_CONFIG), transactionalId);

进到 buildClientId 里面:

private static String buildClientId(String configuredClientId, String transactionalId) {if (!configuredClientId.isEmpty())return configuredClientId;if (transactionalId != null)return "producer-" + transactionalId;return "producer-" + PRODUCER_CLIENT_ID_SEQUENCE.getAndIncrement();
}

可知如果 configuredClientId 和 transactionalId 都为空,那么clientId就会自动生成,继续往上追溯,来看 transactionalId 的赋值语句:

String transactionalId = userProvidedConfigs.containsKey(ProducerConfig.TRANSACTIONAL_ID_CONFIG) ?(String) userProvidedConfigs.get(ProducerConfig.TRANSACTIONAL_ID_CONFIG) : null;

其中 ProducerConfig.TRANSACTIONAL_ID_CONFIG 值为 transactional.id,可见 transactionalId 的值取得是用户配置(userProvidedConfigs)中的 transactional.id 的值,而 configuredClientId 值并不是直接获取的用户配置(userProvidedConfigs)的 client.id,而是拿的构造方法中传入的config中的 client.id 对应的值,说明 config 很有可能是在用户配置(userProvidedConfigs)的基础上进行了些许处理。
继续往上追溯,进到 DefaultKafkaProducerFactory.createKafkaProducer 方法中:

protected Producer<K, V> createKafkaProducer() {if (this.clientIdPrefix == null) {return new KafkaProducer<>(this.configs, this.keySerializerSupplier.get(),this.valueSerializerSupplier.get());}else {Map<String, Object> newConfigs = new HashMap<>(this.configs);newConfigs.put(ProducerConfig.CLIENT_ID_CONFIG,this.clientIdPrefix + "-" + this.clientIdCounter.incrementAndGet());return new KafkaProducer<>(newConfigs, this.keySerializerSupplier.get(),this.valueSerializerSupplier.get());}
}

可以看到如果 clientIdPrefix 不为空的情况下,会在 config 中放入 client.id 的键值对,很明显这种情况下不会有我们所说的 clientId 重复的情况发生,因此我们只需要保证 clientIdPrefix 不为空即可。在 DefaultKafkaProducerFactory 构造方法中找到 clientIdPrefix 的赋值语句:

if (this.clientIdPrefix == null && configs.get(ProducerConfig.CLIENT_ID_CONFIG) instanceof String) {this.clientIdPrefix = (String) configs.get(ProducerConfig.CLIENT_ID_CONFIG);
}

其中 ProducerConfig.CLIENT_ID_CONFIG 值为 client.id,所以只需要在用户配置中添加 client.id 的值,那么 KafkaProducer 在创建时,就会在自动生成的 clientId 中添加前缀字符串,从而避免不同的 KafkaProducer 的 id 冲突。

3、BUG修复

将上述Kafka配置文件修改如下:

<!-- 定义producer1的参数 -->
<bean id="producerProperties1" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="client.id" value="a"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean><!-- 定义producer2的参数 -->
<bean id="producerProperties2" class="java.util.HashMap"><constructor-arg><map><entry key="bootstrap.servers" value="localhost:9092"/><entry key="client.id" value="b"/><entry key="retries" value="3"/><entry key="batch.size" value="4096"/><entry key="linger.ms" value="10"/><entry key="buffer.memory" value="40960"/><entry key="acks" value="all"/><entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/><entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer"/></map></constructor-arg>
</bean>

相关文章:

记一次Kafka warning排查过程

1、前因 在配合测试某个需求的时候&#xff0c;正好看到控制台打印了个报错&#xff0c;如下&#xff1a; 2023-03-06 17:05:58,565[325651ms][pool-28-thread-1][org.apache.kafka.common.utils.AppInfoParser][WARN] - Error registering AppInfo mbean javax.management.I…...

MySQL学习笔记(6.视图)

1. 视图作用 (1). 简化业务&#xff0c;将多个复杂条件&#xff0c;改为视图 (2). mysql对用户授权&#xff0c;只能控制表权限&#xff0c;通过视图可以控制用户字段权限。 (3). 可以避免基本表变更&#xff0c;影响业务。只需更改视图即可。 2. 视图&#xff08;创建&…...

java多线程与线程池-01多线程知识复习

多线程知识复习 文章目录 多线程知识复习第1章 多线程基础1.1.2 线程与进程的关系1.2 多线程启动1.2.1 线程标识1.2.2 Thread与Runnable1.2.3 run()与start()1.2.4 Thread源码分析1.3 线程状态1.3.1 NEW状态1.3.2 RUNNABLE状态1.3.3 BLOCKED状态1.3.4 WAITING状态1…...

Typescript - 将命名空间A导入另一个命名空间B作为B的子命名空间,并全局暴露命名空间B

前言 最近相统一管理 ts 中的类型声明&#xff0c;这就需要将各模块下的命名空间整合到全局的命名空间下&#xff0c;牵涉到从别的文件中引入命名空间并作为子命名空间在全局命名空间中统一暴露。 将命名空间A导入另一个命名空间B作为B的子命名空间 文件说明 assets.ts 文件中…...

Windows下实现Linux内核的Python开发(WSL2+Conda+Pycharm)

许多软件可以通过Python交互&#xff0c;但没有开发Windows版本&#xff0c;这个时候装双系统或虚拟机都很不方便&#xff0c;可以采取WSL2CondaPycharm的策略来进行基于Linux内核的Python开发。启动WSL2&#xff0c;安装Linux内核教程&#xff1a;旧版 WSL 的手动安装步骤 | M…...

新闻发布网站分析及适用场景

在当今数字时代&#xff0c;发布新闻的渠道已经不再局限于传统媒体&#xff0c;越来越多的企业、组织和个人开始使用互联网平台发布新闻稿&#xff0c;以提升品牌知名度和影响力。本文将介绍一些可以发布新闻的网站&#xff0c;并分析其特点和适用场景。一、新闻稿发布平台1.新…...

云原生时代顶流消息中间件Apache Pulsar部署实操之Pulsar IO与Pulsar SQL

文章目录Pulsar IO (Connector连接器)基础定义安装Pulsar和内置连接器连接Pulsar到Cassandra安装cassandra集群配置Cassandra接收器创建Cassandra Sink验证Cassandra Sink结果删除Cassandra Sink连接Pulsar到PostgreSQL安装PostgreSQL集群配置JDBC接收器创建JDBC Sink验证JDBC …...

Input子系统(一)启动篇

代码路径 基于AndroidS&#xff08;12.0&#xff09;代码 system/core/libutils/Threads.cppframeworks/base/services- java/com/android/server/SystemServer.java- core- java/com/android/server/input/InputManagerService.java- jni/com_android_server_input_InputMan…...

WuThreat身份安全云-TVD每日漏洞情报-2023-03-08

漏洞名称:Agilebio Lab Collector 远程命令执行 漏洞级别:高危 漏洞编号:CVE-2023-24217,CNNVD-202303-375 相关涉及:Agilebio Lab Collector 4.234 漏洞状态:EXP 参考链接:https://tvd.wuthreat.com/#/listDetail?TVD_IDTVD-2023-05536 漏洞名称:PrestaShop “Xen Forum”模…...

ABP IStringLocalizer部分场景不生效的问题

问题描述&#xff1a; 本地项目依赖注入本地化服务时候生效&#xff0c;第三方项目调用本地接口时候出现本地化失效的问题。 解决方案&#xff1a; 第三方服务封装的 GetHttp 请求的请求头中添加 语言相关信息 request.Headers.Add("accept-language", "zh-C…...

数组(四)-- LC[167] 两数之和-有序数组

1 两数之和 1.1 题目描述 题目链接&#xff1a;https://leetcode.cn/problems/two-sum/description/ 1.2 求解思路 1. 暴力枚举 最容易想到的方法是枚举数组中的每一个数 x&#xff0c;寻找数组中是否存在 target - x 参考代码 class Solution(object):def twoSum(self, n…...

Mac电脑,python+appium+安卓模拟器使用步骤

1、第一步&#xff0c;环境搭建&#xff0c;参考这位博主的文章&#xff0c;很齐全 https://blog.csdn.net/qq_44757414/article/details/128142859 我在最后一步安装appium-doctor的时候&#xff0c;提示权限不足&#xff0c;换成sudo appium-doctor即可 2、第二步&#xff0…...

Linux命令·find进阶

find是我们很常用的一个Linux命令&#xff0c;但是我们一般查找出来的并不仅仅是看看而已&#xff0c;还会有进一步的操作&#xff0c;这个时候exec的作用就显现出来了。 exec解释&#xff1a;-exec 参数后面跟的是command命令&#xff0c;它的终止是以;为结束标志的&#xff0…...

R语言ggplot2 | 用百分比格式表示数值

&#x1f4cb;文章目录Percent() 函数介绍例子1&#xff0c;在向量中格式化百分比&#xff1a;例子2&#xff0c;格式化数据框列中的百分比&#xff1a;例子3&#xff0c;格式化多个数据框列中的百分比&#xff1a;如何使用percent()函数在绘图过程展示通常在绘图时&#xff0c…...

【代码训练营】day53 | 1143.最长公共子序列 1035.不相交的线 53. 最大子序和

所用代码 java 最长公告子序列 LeetCode 1143 题目链接&#xff1a;最长公告子序列 LeetCode 1143 - 中等 思路 这个相等于上一题的不连续状态 dp[i] [j]&#xff1a;以[0, i-1]text1和以[0, j-1]text2 的最长公共子序列的长度为dp[i] [j]递推公式&#xff1a; 相同&#x…...

消息队列理解

为什么使用消息队列 使⽤消息队列主要是为了&#xff1a; 减少响应所需时间和削峰。降低系统耦合性&#xff08;解耦/提升系统可扩展性&#xff09;。 当我们不使⽤消息队列的时候&#xff0c;所有的⽤户的请求会直接落到服务器&#xff0c;然后通过数据库或者 缓存响应。假…...

【Linux内核一】在Linux系统下网口数据收发包的具体流向是什么?

在TCP/IP网络分层模型里&#xff0c;整个协议栈被分成了物理层、链路层、网络层&#xff0c;传输层和应用层。物理层对应的是网卡和网线&#xff0c;应用层对应的是我们常见的Nginx&#xff0c;FTP等等各种应用。Linux实现的是链路层、网络层和传输层这三层。 在Linux内核实现中…...

南京、西安集成电路企业和高校分布一览(附产业链主要厂商及高校名录)

前言 3月2日&#xff0c;国务院副总理刘鹤在北京调研集成电路企业发展&#xff0c;并主持召开座谈会。刘鹤指出&#xff0c;集成电路是现代化产业体系的核心枢纽&#xff0c;关系国家安全和中国式现代化进程。他表示&#xff0c;我国已形成较完整的集成电路产业链&#xff0c;也…...

后端Java随机比大小游戏实战讲解

## - 利用print打印输出提示用户 ## - 利用Scanner函数抓取数据 ## - 利用Math方法实现随机数 #### 1.首先用到的是print函数&#xff0c;对用户进行提醒进一步的操作 通过System.out.print();提示用户进行选择买大买小。 #### 2.然后利用Scanner函数&#xff0c;对用户输出…...

dolphinschedule使用shell任务结束状态研究

背景&#xff1a;配置的dolphin任务&#xff0c;使用的是shell&#xff0c;shell里包含了spark-submit 如下截图。 dolphin shell 介绍完毕&#xff0c;开始说明现象。 有天有人调整了集群的cdp配置&#xff0c;executor-cores max1 我之前这里写的是2&#xff0c;所以spark任…...

如何用postman实现接口自动化测试

postman使用 开发中经常用postman来测试接口&#xff0c;一个简单的注册接口用postman测试&#xff1a; 接口正常工作只是最基本的要求&#xff0c;经常要评估接口性能&#xff0c;进行压力测试。 postman进行简单压力测试 下面是压测数据源&#xff0c;支持json和csv两个格…...

AHRS(航姿参考系统)IMU(惯性测量单元)和INS的分析对比研究-2023-3-8

名称 AHRS俗称航姿参考系统 IMU 惯性测量单元 INS 惯性导航系统 英文 全称 &#xff08;Attitude and Heading Reference System&#xff09; &#xff08;Inertial Measurement Unit&#xff09; Inertial Navigation System&#xff09; 组成 加速度计&#xff0c;磁…...

企业管理经典书籍推荐

几乎每一位成功的商业人士都有着良好的阅读习惯。并且他们阅读涉猎的范围也大多与企业管理和领导力有关。而关于企业管理经典书籍&#xff0c;我推荐你看以下这两本。一本是《经理人参阅&#xff1a;企业管理实务》&#xff0c;另一本是《经理人参阅&#xff1a;领导力提升》。…...

JVM系列——破坏双亲委派模型的场景和应用

上文提到过双亲委派模型并不是强制性的&#xff0c;而是Java设计者推荐的类加载器实现方式。 在Java的世界中大部分的类加载器都遵循这个模型&#xff0c;但也有例外的情况&#xff0c;直到Java 模块化出现为止&#xff0c;双亲委派模型出现过几次&#xff08;3次&#xff1f;&…...

基于智能边缘和云计算的数字经济服务细粒度任务调度机制

数字经济被各国视为推动经济增长的必然选择&#xff0c;为经济高质量发展提供了新机遇、新路径。对于中国市场而言&#xff0c;云计算背后的强大基础是数字经济不可阻挡的发展趋势。在数字经济中&#xff0c;云作为基础设施成为构建数字经济金字塔的基础。为缓解数字经济服务器…...

ccc-pytorch-卷积神经网络实战(6)

文章目录一、CIFAR10 与 lenet5二、CIFAR10 与 ResNet一、CIFAR10 与 lenet5 第一步&#xff1a;准备数据集 lenet5.py import torch from torch.utils.data import DataLoader from torchvision import datasets from torchvision import transformsdef main():batchsz 128C…...

置信椭圆(误差椭圆)详解

文章目录Part.I 预备知识Chap.I 一些概念Chap.II 主成分分析Chap.III Matlab 函数 randnChap.IV Matlab 函数 pcaPart.II 置信椭圆的含义Chap.I 一个 Matlab 实例Sec.I 两个不相关变量的特征Sec.II 两个相关变量的特征Chap.II 变换阵 (解相关矩阵) 的求解ReferencePart.I 预备知…...

FreeSWITCH 智能呼叫流程设计

文章目录1. 智能呼叫流程2. 细节处理1. 呼叫字符串指定拨号计划2. 外呼的拨号计划3. 语音打断的支持1. 智能呼叫流程 用户与机器人对话通常都是以文本的形式进行&#xff0c;但是借助 ASR 和 TTS 技术&#xff0c;以语音电话为载体的智能呼叫系统成为可能。智能呼叫系统涉及到…...

什么是Restful风格

什么是RestFul风格&#xff1f; Restful就是一个资源定位及资源操作的风格。不是标准也不是协议&#xff0c;只是一种风格。基于这个风格设计的软件可以更简洁&#xff0c;更有层次&#xff0c;更易于实现缓存等机制。 REST即Representational State Transfer的缩写&#xff0…...

sumifs的交叉 表的例子

比如这样&#xff0c;那么冰箱绿山店的栏位中&#xff0c;SUMIFS($D$3:$D$10,$B$3:$B$10,$F3,$C$3:$C$10,G$2)就是把求和范围&#xff0c;条件1设置为固定列的复合引用&#xff0c;条件2设置为固定行的复合引用即可。...