当前位置: 首页 > news >正文

@KafkaListener指定kafka集群

基于@KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群,这对于需要访问多套kafka集群的程序来说,是有效的解决方案。这里需要注意的是,此时的消费者配置信息需使用原生kafka的配置信息格式(如:ConsumerConfig.MAX_POLL_RECORDS_CONFIG = “max.poll.records”),与自动装载KafkaConsumer时的配置信息格式不同。详情如下:

依赖项(其实spring-kafka包含了kafka-clients)

<!-- spring-kafka --> 
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.6.0</version>
</dependency>
<!-- kafka-clients --> 
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.6.0</version>
</dependency>

配置文件
配置参数的格式和含义,参见《spring-kafka的配置使用》

生产代码

@Component
@Slf4j
public class KafKaProducer {@Autowiredprivate KafkaTemplate kafkaTemplate;public void sendMessage(String topic, Object object) {/** 这里的 ListenableFuture 类是 spring 对 java 原生 Future 的扩展增强,是一个泛型接口,用于监听异步方法的回调 而对于* kafka send 方法返回值而言,这里的泛型所代表的实际类型就是 SendResult<K, V>,而这里 K,V 的泛型实际上 被用于* ProducerRecord<K, V> producerRecord,即生产者发送消息的 key,value 类型*/ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, object);future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {@Overridepublic void onFailure(Throwable throwable) {log.error("发送消息失败:" + throwable.getMessage());}@Overridepublic void onSuccess(SendResult<String, Object> sendResult){// log.info("发送消息成功:" + sendResult.toString());}});}
}

消费者配置类,其中可配置多个kafka集群,每个kafka集群生成一个KafkaListenerContainerFactory实例

@Data
@Slf4j
@Configuration
public class KafkaConfig {@ResourceEnvironment environment;@Beanpublic KafkaListenerContainerFactory<?> containerFactory() {Integer concurrency = environment.getProperty("kafka.concurrency", Integer.class, 1);Integer pollTimeout = environment.getProperty("kafka.poll.timeout", Integer.class, 3000);ConcurrentKafkaListenerContainerFactory<String, String> containerFactory = new ConcurrentKafkaListenerContainerFactory<>();containerFactory.setConsumerFactory(new DefaultKafkaConsumerFactory<>(this.consumerConfigs()));containerFactory.setConcurrency(concurrency); // 消费并发数量containerFactory.setBatchListener(true);      // 批量监听消息containerFactory.getContainerProperties().setAckMode(ContainerProperties.AckMode.BATCH); // 批量提交偏移containerFactory.getContainerProperties().setPollTimeout(pollTimeout); // 消息拉取时限return containerFactory;}@Beanpublic Map<String, Object> consumerConfigs() {String servers          = environment.getProperty("kafka.servers", "127.0.0.1:9092");String groupId          = environment.getProperty("kafka.groupId", "consumer-group");String sessionTimeout   = environment.getProperty("kafka.session.timeout.ms", "60000");String maxPollRecords   = environment.getProperty("kafka.max.poll.records", "100");String maxPollInterval  = environment.getProperty("kafka.max.poll.interval", "600000");String jaasConfig       = environment.getProperty("kafka.sasl.jaas.config");Map<String, Object> props = new HashMap<>();props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords);props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, maxPollInterval);props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);/// props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, sessionTimeout);props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);props.put("security.protocol", "SASL_PLAINTEXT");props.put("sasl.mechanism", "SCRAM-SHA-256");props.put("sasl.jaas.config", jaasConfig);return props;}
}

消费代码 @KafkaListener注解的containerFactory参数指定了KafkaListenerContainerFactory实例,也就指定了kafka集群

@Slf4j
@Component
public class KafkaConsumerListen implements BatchMessageListener<String, String> {@Autowiredprivate Environment environment;@Autowiredprivate KafkaMsgHandleService msgHandleService;@Autowiredprivate ThreadPoolTaskExecutor taskExecutor;/*************************      接收消息************************/@Override@KafkaListener( containerFactory = "containerFactory", groupId = "${kafka.groupId}", topics = "#{'${kafka.topics}'.split(',')}", concurrency = "${kafka.concurrency}")public void onMessage(List<ConsumerRecord<String, String>> records) {try {final List<String> msgs = records.stream().map(ConsumerRecord::value).collect(Collectors.toList());log.info("收到消息体:size={} content:{}", msgs.size(), JSON.toJSONString(msgs));/// 处理消息msgs.forEach(this::processRecord);} catch (Exception e) {log.error("KafkaListener_kafka_consume_error.", e);}}/*************************      处理消息************************/private void processRecord(String msg) {taskExecutor.submit(() -> {if (!environment.getProperty("kafka1.switch", Boolean.class,true)) {log.warn("KafkaListener_turn_off_drop_message.");return;}msgHandleService.handle(msg);});}
}

相关文章:

@KafkaListener指定kafka集群

基于KafkaListener注解的kafka监听代码可以手动指定要消费的kafka集群&#xff0c;这对于需要访问多套kafka集群的程序来说&#xff0c;是有效的解决方案。这里需要注意的是&#xff0c;此时的消费者配置信息需使用原生kafka的配置信息格式&#xff08;如&#xff1a;ConsumerC…...

什么是算法的空间复杂度?

一、问题 常常⽤算法的空间复杂度来评价算法的性能&#xff0c;那么什么是算法的空间复杂度呢&#xff1f; 二、解答 算法的空间复杂度是指在算法的执⾏过程中&#xff0c;需要的辅助空间数量。 辅助空间数量指的不是程序指令、常数、指针等所需要的存储空间&#xff0c;也不是…...

WebDav协议相关软件@简单配置局域网内的http和WebDav服务器和传输系统

文章目录 相关软件windows自带第三方软件 chfs(CuteHttpFileServer)下载软件GUI方案 补充命令行方案命令行程序定位简单创建服务站点使用配置文件配置细节 使用软连接或符号链接等手段将向共享站点的根目录添加文件开机自启服务包装nssm包装使用powershell包装 服务启动chfs服务…...

自定义数据实现SA3D

SA3D&#xff1a;Segment Anything in 3D with NeRFs 实现了3D目标分割 原理是利用SAM(segment anything) 模型和Nerf分割渲染3D目标&#xff0c; SAM只能分块&#xff0c;是没有语义标签的&#xff0c;如何做到语义连续&#xff1f; SA3D中用了self-prompt, 根据前一帧的mask…...

设计模式基础概念:探索设计模式的魅力

设计模式是软件开发中的一种指导性概念&#xff0c;它提供了一套被广泛接受的解决方案&#xff0c;用于常见的设计问题。设计模式有助于提高软件的可重用性、可扩展性和可维护性&#xff0c;并促进团队之间的沟通。 以下是一些常见的设计模式&#xff1a; 创建型模式&#xff1…...

【Leetcode】2182. 构造限制重复的字符串

文章目录 题目思路代码 题目 2182. 构造限制重复的字符串 问题&#xff1a;给你一个字符串 s 和一个整数 repeatLimit &#xff0c;用 s 中的字符构造一个新字符串 repeatLimitedString &#xff0c;使任何字母 连续 出现的次数都不超过 repeatLimit 次。你不必使用 s 中的全…...

Kubernetes(K8S)云服务器实操TKE

一、 Kubernetes(K8S)简介 Kubernetes源于希腊语,意为舵手,因为首尾字母中间正好有8个字母,简称为K8S。Kubernetes是当今最流行的开源容器管理平台,是 Google 发起并维护的基于 Docker 的开源容器集群管理系统。它是大名鼎鼎的Google Borg的开源版本。 K8s构建在 Docker …...

设置弹窗随鼠标位置移动

1.这是要移动的弹窗&#xff0c;隐藏显示逻辑、样式、展示内容自己写&#xff0c;主要就是动态设置弹窗的style&#xff0c;floatLeft和floatTop都是Vue中的data双向绑定数据&#xff1b; <div id"box" v-show"hasMove" :style"{ left: floatLeft…...

Spring Boot实现数据加密脱敏:注解 + 反射 + AOP

文章目录 1. 引言2. 数据加密和脱敏的需求3. Spring Boot项目初始化4. 敏感数据加密注解设计5. 实现加密和脱敏的工具类6. 实体类和加密脱敏注解的使用7. 利用AOP实现加密和脱敏8. 完善AOP切面9. 测试10. 拓展功能与未来展望10.1 加密算法的选择10.2 动态注解配置 11. 总结 &am…...

jmeter和meterSphere如何使用第三方jar包

工具引用jar包语言都是beanshell 问题起因&#xff1a;metersphere 接口自动化实现过程中&#xff0c;如何实现字符串加密且加密方法依赖第三方库&#xff1b; 使用语言&#xff1a;beanshell脚本语言&#xff0c;java语言 使用工具&#xff1a;idea jmeter metersphere 1.首…...

API对象上千个,有啥关联性,kubectl-tree一键搞定

关注【云原生百宝箱】公众号&#xff0c;获取更多云原生消息 "kubectl-tree 是一款强大的 kubectl 插件&#xff0c;通过 ownerReferences 实现 Kubernetes 对象之间的所有权关系探索。相较于 kubectl lineage&#xff0c;它不仅更全面理解 API 对象的逻辑关系&#xff0c…...

java自定义工具类在List快速查找相同字段值对象

根据对象某一字段名&#xff0c;获取字段值&#xff0c;将List转换为Map中包含list&#xff0c;Key为字段值&#xff0c;Value为相同字段值的对象list&#xff0c;快速定位具有相同字段值的对象&#xff0c;转换之后便于在Map中根据字段值快速查找相同字段值的对象 //List转Map…...

codeforces Hello 2024 - C - Grouping Increases --- 题解

目录 Grouping Increases 题目描述&#xff1a; 思路解析&#xff1a; 代码实现&#xff1a; Grouping Increases 题目描述&#xff1a; 给你一个大小为n的数组a&#xff0c;你可以把数组a划分为两个子序列s和t&#xff0c;a中元素&#xff0c;要么在子序列s中&#xff0c;…...

STM32H5培训(一)总览

文章目录 1. 前言2. STM32H5系列MCU的特点和新功能包括性能提升、新外设和安全功能等3. STM32H5系列型号之间的区别和关键资源对比4. 性能和功能亮点6. 开发生态参考&#xff1a; 1. 前言 本篇主要介绍STM32H5系列MCU的特点和新功能&#xff0c;包括全新的M33内核、250M主频处…...

亚马逊云科技 WAF 部署小指南(五):在客户端集成 Amazon WAF SDK 抵御 DDoS 攻击...

方案介绍 在 WAF 部署小指南&#xff08;一&#xff09;中&#xff0c;我们了解了 Amazon WAF 的原理&#xff0c;并通过创建 WEB ACL 和托管规则防护常见的攻击。也了解了通过创建自定义规则在 HTTP 请求到达应用之前判断是阻断还是允许该请求。在 Amazon WAF 自定义规则中&am…...

高光谱分类论文解读分享之基于多模态融合Transformer的遥感图像分类方法

IEEE TGRS 2023&#xff1a;基于多模态融合Transformer的遥感图像分类方法 题目 Multimodal Fusion Transformer for Remote Sensing Image Classification 作者 Swalpa Kumar Roy , Student Member, IEEE, Ankur Deria , Danfeng Hong , Senior Member, IEEE, Behnood Ras…...

Trans论文复现:基于数据驱动的新能源充电站两阶段规划方法程序代码!

适用平台&#xff1a;MatlabYalmipCplex/Gurobi&#xff1b; 文章提出了一种电动汽车充电站的两阶段规划方法&#xff0c;第一阶段通过蒙特卡洛法模拟充电车辆需求和电池充放电数据来确定充电站位置&#xff1b;第二阶段通过数据驱动的分布鲁棒优化方法优化充电站的新能源和电池…...

将抖音视频转成MP3并下载

这篇是在上一篇的基础上写的&#xff0c;这篇负责抖音作者详情页的视频转声音提取&#xff0c;这篇需要用到后端。 本地启动后端后&#xff0c;在控制台输入对应代码&#xff0c;即可实现hover在封面上&#xff0c;按d一键下载音频 控制台代码 // 获取作者的视频列表var liEle…...

C程序训练:与输入有关的错误

在录入程序时有时稍不注意就可能录入错误的字符导致程序运行结果出现错误&#xff0c;下面举例说明。 下面程序的运行结果是错的&#xff0c;但程序又没有错&#xff0c;到底问题出现在哪呢&#xff1f; #include <stdio.h> int main() {FILE *fp;int i, k, n;fpfopen(…...

制作 CentOS Stream9 的U盘系统启动盘

一、简述 注:请勿用于商用&#xff0c;如有版权纠纷&#xff0c;于博主无任何关系。&#xff08;仅用于学习研究使用&#xff09; 由于CentOs Linux 7和CentOs Stream8终止日期是2024年&#xff0c;需要将系统升级到最新版本的CentOs Stream9&#xff0c;下面是刻录系统盘的操…...

Vulnhub靶机:driftingblues 1

一、介绍 运行环境&#xff1a;Virtualbox 攻击机&#xff1a;kali&#xff08;10.0.2.15&#xff09; 靶机&#xff1a;driftingblues1&#xff08;10.0.2.17&#xff09; 目标&#xff1a;获取靶机root权限和flag 靶机下载地址&#xff1a;https://www.vulnhub.com/entr…...

CloudCompare——点云空间圆拟合

目录 1.概述2.软件实现3.完整操作4.算法源码5.相关代码 本文由CSDN点云侠原创&#xff0c;CloudCompare——点云空间圆拟合&#xff0c;爬虫自重。如果你不是在点云侠的博客中看到该文章&#xff0c;那么此处便是不要脸的爬虫与GPT生成的文章。 1.概述 CloudCompare软件中的To…...

解决POI报错POIXMLTypeLoader不存在的问题

问题&#xff1a; springframework.web.util.NestedServletException: Handler dispatch failed; nested exception is java.lang.NoClassDefFoundError: org/apache/poi/POIXMLTypeLoaderat org.springframework.web.servlet.DispatcherServlet.doDispatch(DispatcherServlet…...

关于rewriteBatchedStatements的源码分析

在之前的优惠券兑换码需求中&#xff0c;涉及批量写入问题&#xff0c;其中有一个关键的连接配置参数非常重要——rewriteBatchedStatements&#xff0c;当该值配置为true时&#xff0c;Statement将可能对批量插入sql进行重写。 何谓重写&#xff1f;原来提交的批量执行语句&a…...

自动化神器 Playwright 的 Web 自动化测试解决方案

1. 主流框架的认识 总结&#xff1a; 由于Selenium在3.x和4.x两个版本的迭代中并没有发生多大的变化&#xff0c;因此Selenium一统天下的地位可能因新框架的出现而变得不那么稳固。后续的Cypress、TestCafe、Puppeteer被誉为后Selenium时代Web UI自动化的三驾马车。但是由于这三…...

docker filebeat 将日志多级目录和多维json数据日志同步到es

注 使用的时候先调试调试配置,调试成功在尝试写入es,如果es写入失败就是es账户.密码/白名单.和index未创建的问题,细节可以留言 setup.template.priority 模板优先级 调整这个可以配置一台机器多个filebeat 容器启动 多级目录日志和多维josn日志结构 filebeat.inputs:- typ…...

【机器学习】模型参数优化工具:Optuna使用分步指南(附XGB/LGBM调优代码)

常用的调参方式和工具包 常用的调参方式包括网格搜索(Grid Search)、**随机搜索(Random Search)和贝叶斯优化(Bayesian Optimization)**等。 工具包方面&#xff0c;Scikit-learn提供了GridSearchCV和RandomizedSearchCV等用于网格搜索和随机搜索的工具。另外&#xff0c;有一…...

webview全屏处理,即插即用

去年双十一有个直播的需求&#xff0c;听起来很简单&#xff0c;技术也都很成熟&#xff0c;但是真的开始实现后&#xff0c;还是有不少坑的&#xff0c;首先第一个uc内核不支持webRTC协议&#xff0c;需要重新开发chrome内核的webview&#xff0c;其次webview全屏处理、悬浮窗…...

实录分享 | 央企大数据平台架构发展趋势与应用场景的介绍

分享嘉宾&#xff1a; 孟子涵-中国华能集团信息中心平台架构师 2021年华能就与Alluxio建立了合作&#xff0c;共同写了整个华能统一纳管的架构方案。这个方案我认为是现在我们在央企里边比较核心的一套体系&#xff0c;能让全集团所有我们认为重要的数字化资源实现真正的统一集…...

UE5 将类修改目录

有个需求&#xff0c;需要修改ue里面类的位置&#xff0c;默认在Public类下面&#xff0c;我想创建一个二级目录&#xff0c;将所有的类分好位置&#xff0c;方便查看。 上图为创建一个类所在的默认位置。 接下来&#xff0c;将其移动到一个新的目录中。 首先在资源管理器中找…...

苏州网站推广如何/东莞做网络推广的公司

文章目录一、数据结构部分1、线性表1.1 数组1.2 链表2、栈和队列3、树1&#xff09;二叉树2&#xff09;二叉搜索树3&#xff09;平衡二叉树&#xff08;AVL&#xff09;4&#xff09;红黑树4、图5、并查集二、常见算法1、回溯2、递归3、动态规划4、滑动窗口5、分治法6、贪心算…...

网上查公司怎么查/安卓手机性能优化软件

引言PHP的数组&#xff0c;说白了就是一个映射的键值对集合。那么如何从数组元素中删除元素呢&#xff1f;你肯定首先想到遍历数组&#xff0c;然后找到目标项&#xff0c;然后删除。我看到有同学将要删除的元素 null&#xff0c;这并不起作用&#xff0c;因为 null 也是作为值…...

wordpress政府网站/网店怎么推广和宣传

上一篇博文主要通过两个例子让测试新手了解一下测试思想&#xff0c;和在做测试之前应该了解人几点&#xff0c;那么我们在如何完成一次完整的性能测试呢&#xff1f; 测试报告是一次完整性能测试的体现&#xff0c;所以&#xff0c;这里我给出一个完整的性能测试报告&#xff…...

java用ssm做电商网站/公司怎么做网络营销

四个解决方案: 1、IFeatureCursor 游标查询后&#xff0c;遍历删除 2、更新游标删除IFeatureCursor.DeleteFeature() 3、ITable.DeleteSearchedRows删除 4、 IDataset.Workspace.ExecuteSQL 一、 几种删除方法的代码1. 查询结果中删除 private void Delete1(IFeatureClass P…...

邢台网站关键词优化/成都多享网站建设公司

摘要 温度作为环境监控中具有重要意义的参数之一&#xff0c;其直接影响植物生长、土壤变化&#xff0c;也密切关联着高质量农产品的栽培&#xff0c;在工业生产等诸多领域均起着至关重要的作用。温度控制系统的实现是一个非常关键的课题&#xff0c;但是目前先进的温度测量技…...

织梦网站首页空白/百度竞价品牌广告

摘要&#xff1a;隙值小和根据特点条件同装配模的结构、因特用户间的大的不冲裁&#xff0c;隙常模间凹凸采用方法控制&#xff0c;.光垫片法B。每都有独隙的模间几种有哪冷冲调整常用方法。小时内滴在半剂量一次完&#xff0c;台正2小量在次剂&#xff0c;、缓脑水%甘该A治疗肿…...