舞蹈网站模板/他达拉非片的作用及功效副作用
SpringBoot 整合 Avro 与 Kafka 详解
在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数据格式。将这两者结合,并通过 Spring Boot 进行整合,可以构建出高效、可扩展的实时数据处理系统。
一、环境准备
在开始整合之前,需要准备好以下环境:
- Java:确保已经安装了 JDK,推荐使用 JDK 8 或更高版本。
- Maven:用于管理项目的依赖和构建过程。
- Spring Boot:作为项目的框架,推荐使用较新的版本,如 Spring Boot 2.x。
- Kafka:确保 Kafka 已经安装并运行,可以使用 Docker 部署 Kafka 集群。
- Avro:Avro 依赖 JSON 定义的架构来序列化数据。
二、项目结构
一个典型的 Spring Boot 项目结构可能如下:
spring-boot-kafka-avro
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ ├── SpringBootKafkaAvroApplication.java
│ │ │ ├── config
│ │ │ │ └── KafkaConfig.java
│ │ │ ├── producer
│ │ │ │ └── KafkaProducer.java
│ │ │ ├── consumer
│ │ │ │ └── KafkaConsumer.java
│ │ │ └── model
│ │ │ └── ElectronicsPackage.java (由 Avro 自动生成)
│ │ ├── resources
│ │ │ ├── application.properties
│ │ │ └── avro
│ │ │ └── electronicsPackage.avsc (Avro 架构文件)
│ └── test
│ └── java
│ └── com
│ └── example
│ └── SpringBootKafkaAvroApplicationTests.java
└── pom.xml
三、添加依赖
在 pom.xml
文件中添加必要的依赖:
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.11.0</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro Maven Plugin --><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>${avro.version}</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory><outputDirectory>${project.build.directory}/generated/avro</outputDirectory></configuration></execution></executions></plugin>
</dependencies>
四、定义 Avro 架构
在 src/main/resources/avro/
目录下创建一个 Avro 架构文件 electronicsPackage.avsc
:
{"namespace": "com.example.model","type": "record","name": "ElectronicsPackage","fields": [{"name": "package_number", "type": ["string", "null"], "default": null},{"name": "frs_site_code", "type": ["string", "null"], "default": null},{"name": "frs_site_code_type", "type": ["string", "null"], "default": null}]
}
这个架构文件定义了 ElectronicsPackage
类,包括三个字段:package_number
、frs_site_code
和 frs_site_code_type
。
五、生成 Avro 类
运行 Maven 构建过程,Avro Maven 插件会根据 electronicsPackage.avsc
文件生成相应的 Java 类 ElectronicsPackage.java
。
六、配置 Kafka
在 application.properties
文件中配置 Kafka 的相关属性:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.config.AvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.config.AvroDeserializer
注意,这里指定了自定义的 AvroSerializer
和 AvroDeserializer
类。
七、实现 Avro 序列化器和反序列化器
创建 AvroSerializer
和 AvroDeserializer
类,用于 Avro 数据的序列化和反序列化。
// AvroSerializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class AvroSerializer<T extends SpecificRecord> implements Serializer<T> {private final DatumWriter<T> writer;public AvroSerializer(Class<T> type) {this.writer = new SpecificDatumWriter<>(type);}@Overridepublic byte[] serialize(String topic, T data) {if (data == null) {return null;}ByteArrayOutputStream out = new ByteArrayOutputStream();Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);try {writer.write(data, encoder);encoder.flush();out.close();} catch (IOException e) {throw new RuntimeException(e);}return out.toByteArray();}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op}@Overridepublic void close() {// No-op}
}// AvroDeserializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;public class AvroDeserializer<T extends SpecificRecord> implements Deserializer<T> {private final Class<T> type;private final DatumReader<T> reader;public AvroDeserializer(Class<T> type) {this.type = type;this.reader = new SpecificDatumReader<>(type);}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteArrayInputStream in = new ByteArrayInputStream(data);Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);try {return reader.read(null, decoder);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op
相关文章:

SpringBoot 整合 Avro 与 Kafka 详解
SpringBoot 整合 Avro 与 Kafka 详解 在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数…...

若依 ruoyi VUE el-select 直接获取 选择option 的 label和value
1、最新在研究若依这个项目,我使用的是前后端分离的方案,RuoYi-Vue-fast(后端) RuoYi-Vue-->ruoyi-ui(前端)。RuoYi-Vue-fast是单应用版本没有区分那么多的modules 自己开发起来很方便,这个项目运行起来很方便,但是需要自定义的…...

大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...

修改MySQL存储路径
1.查看原路径 show variables like ‘%datadir%’; 2.停止MYSQL 以管理员身份运行命令提示符 net stop MySQL84 在服务中直接停止MySQL 3.编辑配置文件 可能会遇到无权限修改,可以先修改my.ini的权限。可以通过:右键my.ini → 属性 → 安全→ 编辑 …...

Git常用的命令【提交与回退】
git分布式版本控制系统 (SVN集中式版本控制系统)之间的对比 git有本地仓库和远程仓库,不同的开发人员可以分别提交自己的本地仓库并维护代码的版本控制。 然后多个人员在本地仓库协作的代码,可以提交到远程仓库中做整合。 git本…...

详解:HTTP/HTTPS协议
HTTP协议 一.HTTP是什么 HTTP,全称超文本传输协议,是一种用于分布式、协作式、超媒体信息系统的应用层协议。HTTP往往是基于传输层TCP协议实现的,采用的一问一答的模式,即发一个请求,返回一个响应。 Q:什…...

0.96寸OLED---STM32
一、简介 OLED:有机发光二极管 OLED显示屏:性能优异的新型显示屏,具有功耗低(相比LCD不需要背光源,每一个节点当度发光)、响应速度快、宽视角(自发光,从任何视角看都比较清晰&…...

保姆级教学 uniapp绘制二维码海报并保存至相册,真机正常展示图片二维码
一、获取二维码 uni.request({url: https://api.weixin.qq.com/wxa/getwxacode?access_token${getStorage("token")},responseType: "arraybuffer",method: "POST",data: {path: "/pages/index/index"},success(res) {// 转换为 Uint…...

常用Vim操作
vimrc配置 ctags -R * 生成tags文件 set number set ts4 set sw4 set autoindent set cindent set tag~/tmp/log/help/tags 自动补全: ctrln:自动补全 输入: a:从当前文字后插入i:从当前文字前插入s: 删除当前字…...

【C#】NET 9中LINQ的新特性-CountBy
前言 在 .NET 中,使用 LINQ 对元素进行分组并计算它们的出现次数时,需要通过两个步步骤。首先,使用 GroupBy方法根据特定键对元素进行分类。然后,再计算每个组元素包含个数。而随着 .NET 9 版本发布,引入了一些新特性。其中 LINQ 引入了一种新的方法 CountBy,本文一起来了…...

Trimble X9三维激光扫描仪高效应对化工厂复杂管道扫描测绘挑战【沪敖3D】
化工安全关系到国计民生,近年来随着化工厂数字化改革不断推进,数字工厂逐步成为工厂安全管理的重要手段。而化工管道作为工厂设施的重要组成部分,由于其数量多、种类繁杂,一直是企业管理的重点和难点。 传统的化工管廊往往缺乏详…...

【数据结构】文件和外部排序
外部排序 外存信息的存取 计算基本存储方式 内部存储(主存):断电后数据会丢失,访问速度快,成本高容量通常较小外部存储(辅存):断电后数据不会丢失,访问速度较慢&#x…...

新手学习:网页前端、后端、服务器Tomcat和数据库的基本介绍
首先一点,不管是那个框架开发的网页前端,最后都需要Build,构建完毕以后都是原始的HTML CSS JS 三样文件! 网页前端 目录结构 在开始开发网站之前,首先需要了解如何组织文件。一个简单的网页项目通常会有以下几个文件夹和文件&…...

机器学习贝叶斯模型原理
一、引言 在机器学习与数据分析的广袤天地中,贝叶斯模型犹如一颗璀璨的明星,闪耀着独特的光芒,为众多领域的分类、预测等任务提供了强大的理论支撑与实用解法。然而,对于许多初涉此领域的小伙伴而言,贝叶斯模型背后的…...

【C++】实现100以内素数的求解
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯代码概览💯代码结构与逻辑分析1. 包含的头文件和命名空间2. 素数判断函数 isPrime功能输入与输出核心逻辑数学背景 3. 主函数 main功能核心逻辑输出示例 &#…...

Python 浏览器自动化新利器:DrissionPage,让网页操作更简单!
Python 浏览器自动化新利器:DrissionPage,让网页操作更简单! 文章目录 Python 浏览器自动化新利器:DrissionPage,让网页操作更简单!🚀 引言🌟 DrissionPage简介🛠️ 三大…...

Rust学习笔记_13——枚举
Rust学习笔记_10——守卫 Rust学习笔记_11——函数 Rust学习笔记_12——闭包 枚举 文章目录 枚举1. 定义1.1 无值变体1.2 有值变体1.3 枚举与泛型的结合 2. 使用2.1 和匹配模式一起使用2.2 枚举作为类型别名 3. 常用枚举类型 在Rust编程语言中,枚举(enum…...

Postgresql 格式转换笔记整理
1、数据类型有哪些 1.1 数值类型 DECIMAL/NUMERIC 使用方法 DECIMAL是PostgreSQL中的一种数值数据类型,用于存储固定精度和小数位数的数值。DECIMAL的精度是由用户指定的,可以存储任何位数的数值,而小数位数则由用户自行定义。DECIMAL类型的…...

AI开发:卷积神经网络CNN原理初识,简易例程 - 机器学习
一 、卷积神经网络是什么 (1)印象 今天说的CNN,并不是我们熟知的美国有线电视新闻网。 那什么是CNN呢? Convolutional Neural Networks, CNN)简单来说,就是用一个筛子来筛面粉的。 筛子就是卷积核&…...

详细介绍vue的递归组件(重要)
递归组件在 Vue 中是一个非常强大的概念,尤其在渲染层级结构(如树形结构、嵌套列表、评论系统等)时,能够极大地简化代码。 什么是递归组件? 递归组件就是一个组件在其模板中引用自身。这种做法通常用于渲染树形结构或…...

【单片机基础知识】基础知识(CortexM系列、STM32系统框架、存储器映射、寄存器映射)
1. CortexM系列介绍 ARM官方资料: 📎Arm Cortex-M4 Processor Datasheet.pdf📎Arm-Cortex-M7-Processor-Datasheet.pdf📎Arm Cortex-M Comparison Table_v3.pdf📎Arm Cortex-M3 Processor Datasheet.pdf 课程资料&a…...

yolov5导出命令
python export.py --weights yolov5s.pt --img-size 640 --batch-size 1 --device cpu --include onnx 关闭动态输入,cpu导出 检测onnx模型能否加载成功指令: python detect.py --weights yolov5s.onnx --dnn 终端调用detect.py检测图片命令&…...

RabbitMQ的常用术语介绍
出版商 “出版商”一词在不同的上下文中有不同的含义。一般来说,在消息传递中 发布者(也称为“生成者”)是应用程序(或应用程序实例) 发布 (生成) 消息。同一应用程序也可以使用消息 因此同时也…...

Docker魔法:用docker run -p轻松开通容器服务大门
前言 “容器”与“虚拟化”作为现代软件开发和运维中的关键概念,已经广泛应用于各个技术领域。然而,在使用 Docker 部署应用时,常常会遇到这样的问题:容器正常运行,却无法让外界访问其内部服务?即使容器内的应用顺利启动,外部无法通过浏览器或 API 进行连接。此时,doc…...

【后端面试总结】Redis过期删除策略
Redis会将每个设置了过期时间的key放入一个独立的字典中,以后会定时遍历这个字典来删除到期的key。除了定时遍历之外,它还会使用惰性策略来删除过期的key。所谓惰性策略就是在客户端访问这个key的时候,Redis对key的过期时间进行检查ÿ…...

数字图像处理(15):图像平移
(1)图像平移的基本原理:计算每个像素点的移动向量,并将这些像素按照指定的方向和距离进行移动。 (2)平移向量包括水平和垂直分量,可以表示为(dx,dy)ÿ…...

高级java每日一道面试题-2024年12月08日-JVM篇-什么是类加载器?
如果有遗漏,评论区告诉我进行补充 面试官: 什么是类加载器? 我回答: 在Java高级面试中,类加载器(ClassLoader)是一个重要的概念,它涉及到Java类的加载和初始化机制。以下是对类加载器的详细解释: 定义与作用 类加…...

JAVA子类的无参构造器中第一行的super
在 Java 中,子类的构造器是否需要显式调用 super 取决于父类(超类)的构造器。 如果父类有一个无参构造器: 如果父类有一个无参构造器,那么子类的构造器可以不显式调用 super。在这种情况下,如果子类构造器的…...

mysql程序介绍,选项介绍(常用选项,指定选项的方式,特性),命令介绍(查看,部分命令),从sql文件执行sql语句的两种方法
目录 mysql程序 介绍 选项 介绍 常用选项 指定选项的方式 编辑配置文件 环境变量 选项特性 指定选项 选项名 选项值 命令 介绍 查看客户端命令 tee/notee prompt source system help contents 从.sql文件执行sql语句 介绍 方式 source 从外部直接导入…...

Unity教程(十九)战斗系统 受击反馈
Unity开发2D类银河恶魔城游戏学习笔记 Unity教程(零)Unity和VS的使用相关内容 Unity教程(一)开始学习状态机 Unity教程(二)角色移动的实现 Unity教程(三)角色跳跃的实现 Unity教程&…...