当前位置: 首页 > news >正文

springboot集成kafka消费数据

springboot集成kafka消费数据

文章目录

  • springboot集成kafka消费数据
  • 1.引入pom依赖
  • 2.添加配置文件
    • 2.1.添加KafkaConsumerConfig.java
    • 2.2.添加KafkaIotCustomProperties.java
    • 2.3.添加application.yml配置
  • 3.消费者代码

1.引入pom依赖

        <dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.8.11</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.1.2</version></dependency>

2.添加配置文件

2.1.添加KafkaConsumerConfig.java

@Configuration
@EnableConfigurationProperties(KafkaIotCustomProperties.class)
@Slf4j
public class KafkaConsumerConfig {@AutowiredKafkaIotCustomProperties kafkaIotCustomProperties;@BeanKafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());// 并发数 多个微服务实例会均分factory.setConcurrency(3);factory.setBatchListener(true);ContainerProperties containerProperties = factory.getContainerProperties();// 是否设置手动提交containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}private ConsumerFactory<String, String> consumerFactory() {Map<String, Object> consumerConfigs = consumerConfigs();log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));return new DefaultKafkaConsumerFactory<>(consumerConfigs);}@Beanpublic Map<String, Object> consumerConfigs() {Map<String, Object> propsMap = new HashMap<>();// 服务器地址propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaIotCustomProperties.getBootstrapServers());// 是否自动提交propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaIotCustomProperties.isEnableAutoCommit());// 自动提交间隔propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getAutoCommitInterval());//会话时间propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaIotCustomProperties.getSessionTimeOut());//key序列化propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getKeyDeserializer());//value序列化propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaIotCustomProperties.getValueDeserializer());// 心跳时间propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getHeartbeatInterval());// 分组idpropsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaIotCustomProperties.getGroupId());//消费策略propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaIotCustomProperties.getAutoOffsetReset());// poll记录数propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaIotCustomProperties.getMaxPollRecords());//poll时间propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaIotCustomProperties.getMaxPollInterval());return propsMap;}}

2.2.添加KafkaIotCustomProperties.java

@Component
@ConfigurationProperties(prefix = "fxyh.realdata.kafka")
@Data
public class KafkaIotCustomProperties {private List<String> topics;private String groupId;private String sessionTimeOut;private String bootstrapServers;private String autoOffsetReset;private boolean enableAutoCommit;private String autoCommitInterval;private String fetchMinSize;private String fetchMaxWait;private String maxPollRecords;private String maxPollInterval;private String heartbeatInterval;private String keyDeserializer;private String valueDeserializer;
}

2.3.添加application.yml配置

fxyh:realdata:kafka:bootstrapServers:  192.168.80.251:9092topics: ["test1","test2"]groupId: shengtingrealdatagroup#后台的心跳线程必须在30秒之内提交心跳,否则会reBalancesessionTimeOut: 30000#      autoOffsetReset: earliest#取消自动提交,即便如此 spring会帮助我们自动提交enableAutoCommit: false#自动提交间隔autoCommitInterval: 1000#拉取的最小字节fetchMinSize: 1#拉去最小字节的最大等待时间fetchMaxWait: 500maxPollRecords: 50#300秒的提交间隔,如果程序大于300秒提交,会报错maxPollInterval: 300000#心跳间隔heartbeatInterval: 10000keyDeserializer: org.apache.kafka.common.serialization.StringDeserializervalueDeserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: latest

3.消费者代码


@Slf4j
@Component
public class DeviceDataConsumer {@Autowiredprivate KafkaIotCustomProperties kafkaIotCustomProperties;@KafkaListener(topics = {"#{@kafkaIotCustomProperties.topics}"}, groupId = "#{@kafkaIotCustomProperties.groupId}", containerFactory = "kafkaListenerContainerFactory",properties = {"#{@kafkaIotCustomProperties.autoOffsetReset}"})public void topicTest(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {for (ConsumerRecord<String, String> record : records) {log.info("topic_test 消费了: Topic:" + record.topic() + ",groupId:" + kafkaIotCustomProperties.getGroupId() + ",Message:" + record.value());//手动提交偏移量ack.acknowledge();}}
}

相关文章:

springboot集成kafka消费数据

springboot集成kafka消费数据 文章目录 springboot集成kafka消费数据1.引入pom依赖2.添加配置文件2.1.添加KafkaConsumerConfig.java2.2.添加KafkaIotCustomProperties.java2.3.添加application.yml配置 3.消费者代码 1.引入pom依赖 <dependency><groupId>org.spri…...

单例模式---JAVA

目录 “饿汉”模式 完整代码 “懒汉”模式 完整代码 单例模式&#xff1a;保证某个类在程序中只存在唯一一份实例, 而不会创建出多个实例。 单例模式可以通过实例创建的时间来分为两种&#xff1a;“饿汉”和“懒汉”模式。 “饿汉”模式 所谓的“饿汉”模式实则就是在类…...

maven管理使用

maven基本使用 一、简介二、配置文件三、项目结构maven基本标签实践(例子) 四、pom插件配置五、热部署六、maven 外部手动加载jar打包方式Maven上传私服或者本地 一、简介 基于Ant 的构建工具,Ant 有的功能Maven 都有,额外添加了其他功能.本地仓库:计算机中一个文件夹,自己定义…...

如何在一个系统中同时访问异构的多种数据库

如何在一个系统中同时访问异构的多种数据库 比如在一个系统中&#xff0c;要同时访问MySQL,H2, MsAccess, Mongodb. 要是使用Hibernate, MyBatis这些ORM&#xff0c;难度简直不敢想像。 要是MySQL还使用了分库分表&#xff0c;那更加不得了&#xff0c;一大堆的组件都要配合着…...

半监督学习 - 半监督聚类(Semi-Supervised Clustering)

什么是机器学习 半监督聚类是一种集成了有标签数据和无标签数据的聚类方法&#xff0c;其目标是在聚类的过程中利用有标签数据的信息来提高聚类性能。在半监督聚类中&#xff0c;一部分数据集有已知的标签&#xff0c;而另一部分没有标签。 以下是半监督聚类的基本思想和一些…...

实现STM32烧写程序-(3) Hex文件结构

简介 要对STM32进行更新动作, 就需要对程序文件进行解析, 大部分编译的生成程序文件是Hex或者Bin, 先来看看Hex的结构吧。 资料 Hex文件 简介 Hex文件格式最早由Intel公司于1973年创建。它最初是为了在Intel 8080微处理器上存储和传输二进制数据而设计的。随后&#xff0c;Hex…...

精品量化公式——“区域突破”,应对当下行情较好的主图看盘策略

不多说&#xff0c;直接上效果如图&#xff1a; ► 日线表现 代码评估 技术指标代码评估&#xff1a; VAR1, VAR2, VAR3&#xff1a;这些变量是通过指数移动平均&#xff08;EMA&#xff09;计算得出的。EMA是一种常用的技术分析工具&#xff0c;用于平滑价格数据并减少市场“…...

自然语言处理5——发掘隐藏规律 - Python中的关联规则挖掘

目录 写在开头1. 了解关联规则挖掘的概念和实际应用1.1 关联规则挖掘在市场分析和购物篮分析中的应用1.2 关联规则的定义和基本原理1.3 应用场景2. 使用Apriori算法和FP-growth算法进行关联规则挖掘2.1 Apriori算法的工作原理和实现步骤2.2 FP-growth算法的优势和使用方法2.3 A…...

【记录】重装系统后的软件安装

考完研重装了系统&#xff0c;安装软件乱七八糟&#xff0c;用到什么装什么。在这里记录一套标准操作&#xff0c;备用。一个个装还是很麻烦&#xff0c;我为什么不直接写个脚本直接下载安装包呢&#xff1f;奥&#xff0c;原来是我太菜了还不会写脚本啊&#xff01;先记着吧&a…...

Android 13 - Media框架(31)- ACodec(七)

之前的章节中我们解了 input buffer 是如何传递给 OMX 的&#xff0c;以及Output buffer 是如何分配并且注册给 OMX 的。这一节我们就来看ACodec是如何处理OMX的Callback的。 1、OMXNodeInstance Callback 这一节我们只大致记录Callback是如何传递给ACodec的。在之前的学习中我…...

快速了解VR全景拍摄技术运用在旅游景区的优势

豆腐脑加了糖、烤红薯加了勺&#xff0c;就连索菲亚大教堂前都有了“人造月亮”&#xff0c;在这个冬季&#xff0c;“尔滨”把各地游客宠上了天。面对更多的游客无法实地游玩&#xff0c;哈尔滨冰雪世界再添新玩法&#xff0c;借助VR全景拍摄技术对冬季经典冰雪体验项目进行全…...

分布形态的度量_峰度系数的探讨

集中趋势和离散程度是数据分布的两个重要特征,但要全面了解数据分布的特点&#xff0c;还应掌握数据分布的形态。 描述数据分布形态的度量有偏度系数和峰度系数, 其中偏度系数描述数据的对称性,峰度系数描述与正态分布的偏离程度。 峰度系数反映分布峰的尖峭程度的重要指标. 当…...

HCIP 重发布

拓扑图&IP划分如下&#xff1a; 第一步&#xff0c;配置接口IP&环回地址 以R1为例&#xff0c;R2~R4同理 interface GigabitEthernet 0/0/0 ip address 12.1.1.1 24 interface GigabitEthernet 0/0/1 ip address 13.1.1.1 24 interface LoopBack 0 ip address 1.1.1.…...

FX图中的节点代表什么操作

在 FX 图中&#xff0c;每个节点代表一个操作。这些操作可以是函数调用、方法调用、模块实例调用&#xff0c;也可以是 torch.nn.Module 实例的调用。每个节点都对应一个调用站点&#xff0c;如运算符、方法和模块。 一.节点操作 下面是一些节点可能代表的操作&#xff1a; 1…...

【Java 设计模式】创建型之单例模式

文章目录 1. 定义2. 应用场景3. 代码实现1&#xff09;懒汉式2&#xff09;饿汉式 4. 应用示例结语 在软件开发中&#xff0c;单例模式是一种常见的设计模式&#xff0c;它确保一个类只有一个实例&#xff0c;并提供一个全局访问点。单例模式在需要控制某些资源&#xff0c;如数…...

FlinkAPI开发之窗口(Window)

案例用到的测试数据请参考文章&#xff1a; Flink自定义Source模拟数据流 原文链接&#xff1a;https://blog.csdn.net/m0_52606060/article/details/135436048 窗口的概念 Flink是一种流式计算引擎&#xff0c;主要是来处理无界数据流的&#xff0c;数据源源不断、无穷无尽。…...

【Unity】Joystick Pack摇杆插件实现锁四向操作

Joystick Pack ​ 简介&#xff1a;一款Unity摇杆插件&#xff0c;非常轻量化 ​ 摇杆移动类型&#xff1a;圆形、横向、竖向 ​ 摇杆类型&#xff1a; Joystick描述Fixed固定位置Floating浮动操纵杆从用户触碰的地方开始&#xff0c;一直固定到触碰被释放。Dynamic动态操纵…...

29 旋转工具箱

效果演示 实现了一个菜单按钮的动画效果&#xff0c;当鼠标悬停在菜单按钮上时&#xff0c;菜单按钮会旋转315度&#xff0c;菜单按钮旋转的同时&#xff0c;菜单按钮旋转的8个小圆圈也会依次旋转360度&#xff0c;并且每个小圆圈的旋转方向和菜单按钮的旋转方向相反&#xff0…...

WeNet2.0:提高端到端ASR的生产力

摘要 最近&#xff0c;我们提供了 WeNet [1]&#xff0c;这是一个面向生产&#xff08;工业生产环境需求&#xff09;的端到端语音识别工具包&#xff0c;在单个模型中&#xff0c;它引入了统一的两次two-pass (U2) 框架和内置运行时&#xff08;built-in runtime&#xff09;…...

第九部分 使用函数 (四)

目录 一、foreach 函数 二、if 函数 三、call 函数 一、foreach 函数 foreach 函数和别的函数非常的不一样。因为这个函数是用来做循环用的&#xff0c;Makefile 中的 foreach 函数几乎是仿照于 Unix 标准 Shell&#xff08;/bin/sh&#xff09;中的 for 语句&#xff0c;或…...

三维GIS开发cesium智慧地铁教程(5)Cesium相机控制

一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点&#xff1a; 路径验证&#xff1a;确保相对路径.…...

无法与IP建立连接,未能下载VSCode服务器

如题&#xff0c;在远程连接服务器的时候突然遇到了这个提示。 查阅了一圈&#xff0c;发现是VSCode版本自动更新惹的祸&#xff01;&#xff01;&#xff01; 在VSCode的帮助->关于这里发现前几天VSCode自动更新了&#xff0c;我的版本号变成了1.100.3 才导致了远程连接出…...

解决Ubuntu22.04 VMware失败的问题 ubuntu入门之二十八

现象1 打开VMware失败 Ubuntu升级之后打开VMware上报需要安装vmmon和vmnet&#xff0c;点击确认后如下提示 最终上报fail 解决方法 内核升级导致&#xff0c;需要在新内核下重新下载编译安装 查看版本 $ vmware -v VMware Workstation 17.5.1 build-23298084$ lsb_release…...

解锁数据库简洁之道:FastAPI与SQLModel实战指南

在构建现代Web应用程序时&#xff0c;与数据库的交互无疑是核心环节。虽然传统的数据库操作方式&#xff08;如直接编写SQL语句与psycopg2交互&#xff09;赋予了我们精细的控制权&#xff0c;但在面对日益复杂的业务逻辑和快速迭代的需求时&#xff0c;这种方式的开发效率和可…...

用docker来安装部署freeswitch记录

今天刚才测试一个callcenter的项目&#xff0c;所以尝试安装freeswitch 1、使用轩辕镜像 - 中国开发者首选的专业 Docker 镜像加速服务平台 编辑下面/etc/docker/daemon.json文件为 {"registry-mirrors": ["https://docker.xuanyuan.me"] }同时可以进入轩…...

优选算法第十二讲:队列 + 宽搜 优先级队列

优选算法第十二讲&#xff1a;队列 宽搜 && 优先级队列 1.N叉树的层序遍历2.二叉树的锯齿型层序遍历3.二叉树最大宽度4.在每个树行中找最大值5.优先级队列 -- 最后一块石头的重量6.数据流中的第K大元素7.前K个高频单词8.数据流的中位数 1.N叉树的层序遍历 2.二叉树的锯…...

AI书签管理工具开发全记录(十九):嵌入资源处理

1.前言 &#x1f4dd; 在上一篇文章中&#xff0c;我们完成了书签的导入导出功能。本篇文章我们研究如何处理嵌入资源&#xff0c;方便后续将资源打包到一个可执行文件中。 2.embed介绍 &#x1f3af; Go 1.16 引入了革命性的 embed 包&#xff0c;彻底改变了静态资源管理的…...

使用 SymPy 进行向量和矩阵的高级操作

在科学计算和工程领域&#xff0c;向量和矩阵操作是解决问题的核心技能之一。Python 的 SymPy 库提供了强大的符号计算功能&#xff0c;能够高效地处理向量和矩阵的各种操作。本文将深入探讨如何使用 SymPy 进行向量和矩阵的创建、合并以及维度拓展等操作&#xff0c;并通过具体…...

Xen Server服务器释放磁盘空间

disk.sh #!/bin/bashcd /run/sr-mount/e54f0646-ae11-0457-b64f-eba4673b824c # 全部虚拟机物理磁盘文件存储 a$(ls -l | awk {print $NF} | cut -d. -f1) # 使用中的虚拟机物理磁盘文件 b$(xe vm-disk-list --multiple | grep uuid | awk {print $NF})printf "%s\n"…...

Java毕业设计:WML信息查询与后端信息发布系统开发

JAVAWML信息查询与后端信息发布系统实现 一、系统概述 本系统基于Java和WML(无线标记语言)技术开发&#xff0c;实现了移动设备上的信息查询与后端信息发布功能。系统采用B/S架构&#xff0c;服务器端使用Java Servlet处理请求&#xff0c;数据库采用MySQL存储信息&#xff0…...