当前位置: 首页 > news >正文

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖

        <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency>

2、配置application.yml

加入Kafka的配置

springkafka:#Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095producer:# 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认acks: 1# 发送失败时的重试次数,0表示不重试retries: 0# 批量发送时的批次大小(字节)batch-size: 30720000 # 30MB# 生产者的内存缓冲区大小(字节)buffer-memory: 33554432 # 32MB# Key的序列化器类key-serializer: org.apache.kafka.common.serialization.StringSerializer# Value的序列化器类value-serializer: org.apache.kafka.common.serialization.StringSerializerconsumer:# 消费者所属的组IDgroup-id: test-kafka# 禁用自动提交offset,改为手动提交enable-auto-commit: false# 偏移量重置策略:# earliest:从最早的记录开始消费# latest:从最新的记录开始消费auto-offset-reset: earliest# Key的反序列化器类key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# Value的反序列化器类value-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 每次poll()调用返回的最大消息条数max-poll-records: 2session:# 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)timeout:ms: 300000 # 5分钟listener:# 如果指定的主题不存在,是否让应用启动失败,false表示不会报错missing-topics-fatal: false# 消费模式:single=单条消息,batch=批量消费type: single# 消费确认模式:# manual_immediate:手动确认消息,立即提交offsetack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {private int id;private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {private int id;private String description;private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;@EnableKafka
@Configuration
public class KafkaConfig {// 单条消费监听器工厂,手动提交offset@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(ConsumerFactory<String, String> consumerFactory) {ConcurrentKafkaListenerContainerFactory<String, String> factory =new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory);factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);return factory;}}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;@SpringBootApplication
public class KafkaProducer {public static void main(String[] args) {SpringApplication.run(KafkaProducer.class, args);}@BeanCommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {return args -> {String topic = "task-topic";ObjectMapper objectMapper = new ObjectMapper();for (int i = 1; i <= 5; i++) {// 定义一个对象实例User user = User.builder().id(1).name("Alice").build();Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();//JAVA对象转化为JSON字符串String message =  objectMapper.writeValueAsString(task);kafkaTemplate.send(topic, message);System.out.println("Sent: " + message);Thread.sleep(500); // 模拟消息发送间隔}};}
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;@Service
public class SingleConsumer {@KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {String message = record.value();ObjectMapper objectMapper = new ObjectMapper();Task task = objectMapper.readValue(message,Task.class);// 取出System.out.println("User - Received: " + task.getAssignedUser());// 手动提交offsetacknowledgment.acknowledge();}
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!

相关文章:

Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

1、在pom.xml中加入依赖 <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-stream-kafka</artifactId><version>3.1.6</version></dependency> 2、配置application.yml 加入Kafk…...

深度学习(1)

一、torch的安装 基于直接设备情况&#xff0c;选择合适的torch版本&#xff0c;有显卡的建议安装GPU版本&#xff0c;可以通过nvidia-smi命令来查看显卡驱动的版本&#xff0c;在官网中根据cuda版本&#xff0c;选择合适的版本号&#xff0c;下面是安装示例代码 GPU&#xff…...

golang 嵌入式armv7l压缩编译打包

编译 Go 应用程序 go build -ldflags"-s -w" -o myapp.exe . 使用 UPX 压缩可执行文件&#xff08;window下载并设置环境变量&#xff09; upx --best --lzma myapp.exe 可从10M压缩到1M 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 …...

Makefile 之 join

join $(join <list1>,<list2> ) 名称&#xff1a;连接函数——join。 功能&#xff1a;把<list2>中的单词对应地加到<list1>的单词后面。如果<list1>的单词个数要比<list2>的多&#xff0c; 那么&#xff0c;<list1>中的多出…...

集合卡尔曼滤波(Ensemble Kalman Filter),用于二维滤波(模拟平面上的目标跟踪),MATLAB代码

集合卡尔曼滤波&#xff08;Ensemble Kalman Filter&#xff09; 文章目录 引言理论基础卡尔曼滤波集合卡尔曼滤波初始化预测步骤更新步骤卡尔曼增益更新集合 MATLAB 实现运行结果3. 应用领域结论 引言 集合卡尔曼滤波&#xff08;Ensemble Kalman Filter, EnKF&#xff09;是…...

北京申请中级职称流程(2024年)

想找个完整详细点的申请流程资料真不容易&#xff0c;做个分享送给需要的人吧。 不清楚为什么说文章过度宣传&#xff0c;把链接和页面去掉了&#xff0c;网上自己找一下。 最好用windows自带的EDGE浏览器打开申请网站&#xff0c;只有在开始申请的时间内才可以进行网上申报&…...

ubuntu.24安装cuda

1.下载CUDA Toolkit https://developer.nvidia.com/cuda-toolkit-archive 2.按照命令下载&#xff0c;安装 sudo sh cuda_12.2.2_535.104.05_linux.run 3.环境变量 sudo vi /etc/profile 最后面添加 export PATH“/usr/local/cuda-12.2/bin: P A T H " e x p o r t L D L…...

unity li2cpp逆向原理是什么?

主要涉及将Unity游戏引擎中的C#代码转换为C代码&#xff0c;并进一步编译为各平台的原生&#xff08;Native&#xff09;代码的过程&#xff0c;以及逆向工程工具如何利用这一过程中的特定文件来还原和分析原始代码。以下是对Unity IL2CPP逆向原理的详细解释&#xff1a; 对惹…...

Python网络爬虫实践案例:爬取猫眼电影Top100

以下是一个Python网络爬虫的实践案例&#xff0c;该案例将演示如何使用Python爬取猫眼电影Top100的电影名称、主演和上映时间等信息&#xff0c;并将这些信息保存到TXT文件中。此案例使用了requests库来发送HTTP请求&#xff0c;使用re库进行正则表达式匹配&#xff0c;并包含详…...

卷积神经网络(CNN)中的权重(weights)和偏置项(bias)

在卷积神经网络&#xff08;CNN&#xff09;中&#xff0c;权重&#xff08;weights&#xff09;和偏置项&#xff08;bias&#xff09;是两个至关重要的参数&#xff0c;它们在网络的学习和推断过程中起着关键作用。 一、权重&#xff08;Weights&#xff09; 1. 定义&#xf…...

华为FusionCube 500-8.2.0SPC100 实施部署文档

环境&#xff1a; 产品&#xff1a;FusionCube 500版本&#xff1a;8.2.0.SPC100场景&#xff1a;虚拟化基础设施平台&#xff1a;FusionCompute两节点 MCNA * 2硬件部署&#xff08;塔式交付场景&#xff09;免交换组网&#xff08;配置AR卡&#xff09; 前置准备 组网规划 节…...

Android 网络请求(二)OKHttp网络通信

学习笔记 OkHttp 是一个非常强大且流行的 HTTP 客户端库&#xff0c;广泛用于 Android 开发中进行网络请求。与 HttpURLConnection 相比&#xff0c;OkHttp 提供了更简单、更高效的 API&#xff0c;特别是在处理复杂的 HTTP 请求时。 如何使用 OkHttp 进行网络请求 以下是使…...

npm上传自己封装的插件(vue+vite)

一、npm账号及发包删包等命令 若没有账号&#xff0c;可在npm官网&#xff1a;https://www.npmjs.com/login 进行注册。 在当前项目根目录下打开终端命令窗口&#xff0c;常见命令如下&#xff1a; 1、登录命令&#xff1a;npm login&#xff08;不用每次都重新登录&#xff0…...

如何在Word文件中设置水印以及如何禁止修改水印

在日常办公和学习中&#xff0c;我们经常需要在Word文档中设置水印&#xff0c;以保护文件的版权或标明文件的机密性。水印可以是文字形式&#xff0c;也可以是图片形式&#xff0c;能够灵活地适应不同的需求。但仅仅设置水印是不够的&#xff0c;有时我们还需要确保水印不被随…...

.NET桌面应用架构Demo与实战|WPF+MVVM+EFCore+IOC+DI+Code First+AutoMapper

目录 .NET桌面应用架构Demo与实战|WPFMVVMEFCoreIOCDICode FirstAutoPapper技术栈简述项目地址&#xff1a;功能展示项目结构项目引用1. 新建模型2. Data层&#xff0c;依赖EF Core&#xff0c;实现数据库增删改查3. Bussiness层&#xff0c;实现具体的业务逻辑4. Service层&am…...

el-table根据指定字段合并行和列+根据屏幕高度实时设置el-table的高度

文章目录 html代码script代码arraySpanMethod.js代码 html代码 <template><div class"rightBar"><cl-table ref"tableData"border :span-method"arraySpanMethod" :data"tableData" :columns"columns":max-…...

图像处理 之 凸包和最小外围轮廓生成

“ 最小包围轮廓之美” 一起来欣赏图形之美~ 1.原始图片 男人牵着机器狗 2.轮廓提取 轮廓提取 3.最小包围轮廓 最小包围轮廓 4.凸包 凸包 5.凸包和最小包围轮廓的合照 凸包和最小包围轮廓的合照 上述图片中凸包、最小外围轮廓效果为作者实现算法生成。 图形几何之美系列&#…...

萤石设备视频接入平台EasyCVR私有化视频平台视频监控系统的需求及不同场景摄像机的选择

在现代社会&#xff0c;随着安全意识的提高和技术的进步&#xff0c;安防监控视频系统已成为保障人们生活和财产安全的重要工具。EasyCVR安防监控视频系统&#xff0c;以其先进的网络传输技术和强大的功能&#xff0c;为各种规模的项目提供了一个高效、可靠的监控解决方案。以下…...

网络安全之接入控制

身份鉴别 ​ 定义:验证主题真实身份与其所声称的身份是否符合的过程&#xff0c;主体可以是用户、进程、主机。同时也可实现防重放&#xff0c;防假冒。 ​ 分类:单向鉴别、双向鉴别、三向鉴别。 ​ 主题身份标识信息:密钥、用户名和口令、证书和私钥 Internet接入控制过程 …...

Sqlite: Java使用、sqlite-devel

这里写目录标题 一、简介二、使用1. Java项目中&#xff08;1&#xff09;引入驱动&#xff08;2&#xff09;工具类&#xff08;3&#xff09;调用举例 2. sqlite-devel in linuxsqlite-devel使用 三、更多应用1. 数据类型2. 如何存储日期和时间3. 备份 一、简介 非常轻量级&…...

【杂谈】-递归进化:人工智能的自我改进与监管挑战

递归进化&#xff1a;人工智能的自我改进与监管挑战 文章目录 递归进化&#xff1a;人工智能的自我改进与监管挑战1、自我改进型人工智能的崛起2、人工智能如何挑战人类监管&#xff1f;3、确保人工智能受控的策略4、人类在人工智能发展中的角色5、平衡自主性与控制力6、总结与…...

反向工程与模型迁移:打造未来商品详情API的可持续创新体系

在电商行业蓬勃发展的当下&#xff0c;商品详情API作为连接电商平台与开发者、商家及用户的关键纽带&#xff0c;其重要性日益凸显。传统商品详情API主要聚焦于商品基本信息&#xff08;如名称、价格、库存等&#xff09;的获取与展示&#xff0c;已难以满足市场对个性化、智能…...

基于Uniapp开发HarmonyOS 5.0旅游应用技术实践

一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架&#xff0c;支持"一次开发&#xff0c;多端部署"&#xff0c;可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务&#xff0c;为旅游应用带来&#xf…...

MVC 数据库

MVC 数据库 引言 在软件开发领域,Model-View-Controller(MVC)是一种流行的软件架构模式,它将应用程序分为三个核心组件:模型(Model)、视图(View)和控制器(Controller)。这种模式有助于提高代码的可维护性和可扩展性。本文将深入探讨MVC架构与数据库之间的关系,以…...

2021-03-15 iview一些问题

1.iview 在使用tree组件时&#xff0c;发现没有set类的方法&#xff0c;只有get&#xff0c;那么要改变tree值&#xff0c;只能遍历treeData&#xff0c;递归修改treeData的checked&#xff0c;发现无法更改&#xff0c;原因在于check模式下&#xff0c;子元素的勾选状态跟父节…...

Cinnamon修改面板小工具图标

Cinnamon开始菜单-CSDN博客 设置模块都是做好的&#xff0c;比GNOME简单得多&#xff01; 在 applet.js 里增加 const Settings imports.ui.settings;this.settings new Settings.AppletSettings(this, HTYMenusonichy, instance_id); this.settings.bind(menu-icon, menu…...

vue3 定时器-定义全局方法 vue+ts

1.创建ts文件 路径&#xff1a;src/utils/timer.ts 完整代码&#xff1a; import { onUnmounted } from vuetype TimerCallback (...args: any[]) > voidexport function useGlobalTimer() {const timers: Map<number, NodeJS.Timeout> new Map()// 创建定时器con…...

自然语言处理——循环神经网络

自然语言处理——循环神经网络 循环神经网络应用到基于机器学习的自然语言处理任务序列到类别同步的序列到序列模式异步的序列到序列模式 参数学习和长程依赖问题基于门控的循环神经网络门控循环单元&#xff08;GRU&#xff09;长短期记忆神经网络&#xff08;LSTM&#xff09…...

用机器学习破解新能源领域的“弃风”难题

音乐发烧友深有体会&#xff0c;玩音乐的本质就是玩电网。火电声音偏暖&#xff0c;水电偏冷&#xff0c;风电偏空旷。至于太阳能发的电&#xff0c;则略显朦胧和单薄。 不知你是否有感觉&#xff0c;近两年家里的音响声音越来越冷&#xff0c;听起来越来越单薄&#xff1f; —…...

springboot整合VUE之在线教育管理系统简介

可以学习到的技能 学会常用技术栈的使用 独立开发项目 学会前端的开发流程 学会后端的开发流程 学会数据库的设计 学会前后端接口调用方式 学会多模块之间的关联 学会数据的处理 适用人群 在校学生&#xff0c;小白用户&#xff0c;想学习知识的 有点基础&#xff0c;想要通过项…...