SpringBoot 整合 Avro 与 Kafka 详解
SpringBoot 整合 Avro 与 Kafka 详解
在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数据格式。将这两者结合,并通过 Spring Boot 进行整合,可以构建出高效、可扩展的实时数据处理系统。
一、环境准备
在开始整合之前,需要准备好以下环境:
- Java:确保已经安装了 JDK,推荐使用 JDK 8 或更高版本。
- Maven:用于管理项目的依赖和构建过程。
- Spring Boot:作为项目的框架,推荐使用较新的版本,如 Spring Boot 2.x。
- Kafka:确保 Kafka 已经安装并运行,可以使用 Docker 部署 Kafka 集群。
- Avro:Avro 依赖 JSON 定义的架构来序列化数据。
二、项目结构
一个典型的 Spring Boot 项目结构可能如下:
spring-boot-kafka-avro
├── src
│ ├── main
│ │ ├── java
│ │ │ └── com
│ │ │ └── example
│ │ │ ├── SpringBootKafkaAvroApplication.java
│ │ │ ├── config
│ │ │ │ └── KafkaConfig.java
│ │ │ ├── producer
│ │ │ │ └── KafkaProducer.java
│ │ │ ├── consumer
│ │ │ │ └── KafkaConsumer.java
│ │ │ └── model
│ │ │ └── ElectronicsPackage.java (由 Avro 自动生成)
│ │ ├── resources
│ │ │ ├── application.properties
│ │ │ └── avro
│ │ │ └── electronicsPackage.avsc (Avro 架构文件)
│ └── test
│ └── java
│ └── com
│ └── example
│ └── SpringBootKafkaAvroApplicationTests.java
└── pom.xml
三、添加依赖
在 pom.xml 文件中添加必要的依赖:
<dependencies><!-- Spring Boot Starter --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency><!-- Spring Kafka --><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.9.13</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro --><dependency><groupId>org.apache.avro</groupId><artifactId>avro</artifactId><version>1.11.0</version> <!-- 根据需要选择合适的版本 --></dependency><!-- Avro Maven Plugin --><plugin><groupId>org.apache.avro</groupId><artifactId>avro-maven-plugin</artifactId><version>${avro.version}</version><executions><execution><phase>generate-sources</phase><goals><goal>schema</goal></goals><configuration><sourceDirectory>${project.basedir}/src/main/resources/avro/</sourceDirectory><outputDirectory>${project.build.directory}/generated/avro</outputDirectory></configuration></execution></executions></plugin>
</dependencies>
四、定义 Avro 架构
在 src/main/resources/avro/ 目录下创建一个 Avro 架构文件 electronicsPackage.avsc:
{"namespace": "com.example.model","type": "record","name": "ElectronicsPackage","fields": [{"name": "package_number", "type": ["string", "null"], "default": null},{"name": "frs_site_code", "type": ["string", "null"], "default": null},{"name": "frs_site_code_type", "type": ["string", "null"], "default": null}]
}
这个架构文件定义了 ElectronicsPackage 类,包括三个字段:package_number、frs_site_code 和 frs_site_code_type。
五、生成 Avro 类
运行 Maven 构建过程,Avro Maven 插件会根据 electronicsPackage.avsc 文件生成相应的 Java 类 ElectronicsPackage.java。
六、配置 Kafka
在 application.properties 文件中配置 Kafka 的相关属性:
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=my-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=com.example.config.AvroSerializer
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=com.example.config.AvroDeserializer
注意,这里指定了自定义的 AvroSerializer 和 AvroDeserializer 类。
七、实现 Avro 序列化器和反序列化器
创建 AvroSerializer 和 AvroDeserializer 类,用于 Avro 数据的序列化和反序列化。
// AvroSerializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Serializer;import java.io.ByteArrayOutputStream;
import java.io.IOException;public class AvroSerializer<T extends SpecificRecord> implements Serializer<T> {private final DatumWriter<T> writer;public AvroSerializer(Class<T> type) {this.writer = new SpecificDatumWriter<>(type);}@Overridepublic byte[] serialize(String topic, T data) {if (data == null) {return null;}ByteArrayOutputStream out = new ByteArrayOutputStream();Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);try {writer.write(data, encoder);encoder.flush();out.close();} catch (IOException e) {throw new RuntimeException(e);}return out.toByteArray();}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op}@Overridepublic void close() {// No-op}
}// AvroDeserializer.java
package com.example.config;import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;
import org.apache.kafka.common.serialization.Deserializer;import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.util.Map;public class AvroDeserializer<T extends SpecificRecord> implements Deserializer<T> {private final Class<T> type;private final DatumReader<T> reader;public AvroDeserializer(Class<T> type) {this.type = type;this.reader = new SpecificDatumReader<>(type);}@Overridepublic T deserialize(String topic, byte[] data) {if (data == null) {return null;}ByteArrayInputStream in = new ByteArrayInputStream(data);Decoder decoder = DecoderFactory.get().binaryDecoder(in, null);try {return reader.read(null, decoder);} catch (IOException e) {throw new RuntimeException(e);}}@Overridepublic void configure(Map<String, ?> configs, boolean isKey) {// No-op相关文章:
SpringBoot 整合 Avro 与 Kafka 详解
SpringBoot 整合 Avro 与 Kafka 详解 在大数据处理和实时数据流场景中,Apache Kafka 和 Apache Avro 是两个非常重要的工具。Kafka 作为一个分布式流处理平台,能够高效地处理大量数据,而 Avro 则是一个用于序列化数据的紧凑、快速的二进制数…...
若依 ruoyi VUE el-select 直接获取 选择option 的 label和value
1、最新在研究若依这个项目,我使用的是前后端分离的方案,RuoYi-Vue-fast(后端) RuoYi-Vue-->ruoyi-ui(前端)。RuoYi-Vue-fast是单应用版本没有区分那么多的modules 自己开发起来很方便,这个项目运行起来很方便,但是需要自定义的…...
大数据-155 Apache Druid 架构与原理详解 数据存储 索引服务 压缩机制
点一下关注吧!!!非常感谢!!持续更新!!! 目前已经更新到了: Hadoop(已更完)HDFS(已更完)MapReduce(已更完&am…...
修改MySQL存储路径
1.查看原路径 show variables like ‘%datadir%’; 2.停止MYSQL 以管理员身份运行命令提示符 net stop MySQL84 在服务中直接停止MySQL 3.编辑配置文件 可能会遇到无权限修改,可以先修改my.ini的权限。可以通过:右键my.ini → 属性 → 安全→ 编辑 …...
Git常用的命令【提交与回退】
git分布式版本控制系统 (SVN集中式版本控制系统)之间的对比 git有本地仓库和远程仓库,不同的开发人员可以分别提交自己的本地仓库并维护代码的版本控制。 然后多个人员在本地仓库协作的代码,可以提交到远程仓库中做整合。 git本…...
详解:HTTP/HTTPS协议
HTTP协议 一.HTTP是什么 HTTP,全称超文本传输协议,是一种用于分布式、协作式、超媒体信息系统的应用层协议。HTTP往往是基于传输层TCP协议实现的,采用的一问一答的模式,即发一个请求,返回一个响应。 Q:什…...
0.96寸OLED---STM32
一、简介 OLED:有机发光二极管 OLED显示屏:性能优异的新型显示屏,具有功耗低(相比LCD不需要背光源,每一个节点当度发光)、响应速度快、宽视角(自发光,从任何视角看都比较清晰&…...
保姆级教学 uniapp绘制二维码海报并保存至相册,真机正常展示图片二维码
一、获取二维码 uni.request({url: https://api.weixin.qq.com/wxa/getwxacode?access_token${getStorage("token")},responseType: "arraybuffer",method: "POST",data: {path: "/pages/index/index"},success(res) {// 转换为 Uint…...
常用Vim操作
vimrc配置 ctags -R * 生成tags文件 set number set ts4 set sw4 set autoindent set cindent set tag~/tmp/log/help/tags 自动补全: ctrln:自动补全 输入: a:从当前文字后插入i:从当前文字前插入s: 删除当前字…...
【C#】NET 9中LINQ的新特性-CountBy
前言 在 .NET 中,使用 LINQ 对元素进行分组并计算它们的出现次数时,需要通过两个步步骤。首先,使用 GroupBy方法根据特定键对元素进行分类。然后,再计算每个组元素包含个数。而随着 .NET 9 版本发布,引入了一些新特性。其中 LINQ 引入了一种新的方法 CountBy,本文一起来了…...
Trimble X9三维激光扫描仪高效应对化工厂复杂管道扫描测绘挑战【沪敖3D】
化工安全关系到国计民生,近年来随着化工厂数字化改革不断推进,数字工厂逐步成为工厂安全管理的重要手段。而化工管道作为工厂设施的重要组成部分,由于其数量多、种类繁杂,一直是企业管理的重点和难点。 传统的化工管廊往往缺乏详…...
【数据结构】文件和外部排序
外部排序 外存信息的存取 计算基本存储方式 内部存储(主存):断电后数据会丢失,访问速度快,成本高容量通常较小外部存储(辅存):断电后数据不会丢失,访问速度较慢&#x…...
新手学习:网页前端、后端、服务器Tomcat和数据库的基本介绍
首先一点,不管是那个框架开发的网页前端,最后都需要Build,构建完毕以后都是原始的HTML CSS JS 三样文件! 网页前端 目录结构 在开始开发网站之前,首先需要了解如何组织文件。一个简单的网页项目通常会有以下几个文件夹和文件&…...
机器学习贝叶斯模型原理
一、引言 在机器学习与数据分析的广袤天地中,贝叶斯模型犹如一颗璀璨的明星,闪耀着独特的光芒,为众多领域的分类、预测等任务提供了强大的理论支撑与实用解法。然而,对于许多初涉此领域的小伙伴而言,贝叶斯模型背后的…...
【C++】实现100以内素数的求解
博客主页: [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 💯前言💯代码概览💯代码结构与逻辑分析1. 包含的头文件和命名空间2. 素数判断函数 isPrime功能输入与输出核心逻辑数学背景 3. 主函数 main功能核心逻辑输出示例 &#…...
Python 浏览器自动化新利器:DrissionPage,让网页操作更简单!
Python 浏览器自动化新利器:DrissionPage,让网页操作更简单! 文章目录 Python 浏览器自动化新利器:DrissionPage,让网页操作更简单!🚀 引言🌟 DrissionPage简介🛠️ 三大…...
Rust学习笔记_13——枚举
Rust学习笔记_10——守卫 Rust学习笔记_11——函数 Rust学习笔记_12——闭包 枚举 文章目录 枚举1. 定义1.1 无值变体1.2 有值变体1.3 枚举与泛型的结合 2. 使用2.1 和匹配模式一起使用2.2 枚举作为类型别名 3. 常用枚举类型 在Rust编程语言中,枚举(enum…...
Postgresql 格式转换笔记整理
1、数据类型有哪些 1.1 数值类型 DECIMAL/NUMERIC 使用方法 DECIMAL是PostgreSQL中的一种数值数据类型,用于存储固定精度和小数位数的数值。DECIMAL的精度是由用户指定的,可以存储任何位数的数值,而小数位数则由用户自行定义。DECIMAL类型的…...
AI开发:卷积神经网络CNN原理初识,简易例程 - 机器学习
一 、卷积神经网络是什么 (1)印象 今天说的CNN,并不是我们熟知的美国有线电视新闻网。 那什么是CNN呢? Convolutional Neural Networks, CNN)简单来说,就是用一个筛子来筛面粉的。 筛子就是卷积核&…...
详细介绍vue的递归组件(重要)
递归组件在 Vue 中是一个非常强大的概念,尤其在渲染层级结构(如树形结构、嵌套列表、评论系统等)时,能够极大地简化代码。 什么是递归组件? 递归组件就是一个组件在其模板中引用自身。这种做法通常用于渲染树形结构或…...
linux之kylin系统nginx的安装
一、nginx的作用 1.可做高性能的web服务器 直接处理静态资源(HTML/CSS/图片等),响应速度远超传统服务器类似apache支持高并发连接 2.反向代理服务器 隐藏后端服务器IP地址,提高安全性 3.负载均衡服务器 支持多种策略分发流量…...
智能在线客服平台:数字化时代企业连接用户的 AI 中枢
随着互联网技术的飞速发展,消费者期望能够随时随地与企业进行交流。在线客服平台作为连接企业与客户的重要桥梁,不仅优化了客户体验,还提升了企业的服务效率和市场竞争力。本文将探讨在线客服平台的重要性、技术进展、实际应用,并…...
Axios请求超时重发机制
Axios 超时重新请求实现方案 在 Axios 中实现超时重新请求可以通过以下几种方式: 1. 使用拦截器实现自动重试 import axios from axios;// 创建axios实例 const instance axios.create();// 设置超时时间 instance.defaults.timeout 5000;// 最大重试次数 cons…...
【JavaWeb】Docker项目部署
引言 之前学习了Linux操作系统的常见命令,在Linux上安装软件,以及如何在Linux上部署一个单体项目,大多数同学都会有相同的感受,那就是麻烦。 核心体现在三点: 命令太多了,记不住 软件安装包名字复杂&…...
JVM暂停(Stop-The-World,STW)的原因分类及对应排查方案
JVM暂停(Stop-The-World,STW)的完整原因分类及对应排查方案,结合JVM运行机制和常见故障场景整理而成: 一、GC相关暂停 1. 安全点(Safepoint)阻塞 现象:JVM暂停但无GC日志,日志显示No GCs detected。原因:JVM等待所有线程进入安全点(如…...
Swagger和OpenApi的前世今生
Swagger与OpenAPI的关系演进是API标准化进程中的重要篇章,二者共同塑造了现代RESTful API的开发范式。 本期就扒一扒其技术演进的关键节点与核心逻辑: 🔄 一、起源与初创期:Swagger的诞生(2010-2014) 核心…...
Linux 中如何提取压缩文件 ?
Linux 是一种流行的开源操作系统,它提供了许多工具来管理、压缩和解压缩文件。压缩文件有助于节省存储空间,使数据传输更快。本指南将向您展示如何在 Linux 中提取不同类型的压缩文件。 1. Unpacking ZIP Files ZIP 文件是非常常见的,要在 …...
Web中间件--tomcat学习
Web中间件–tomcat Java虚拟机详解 什么是JAVA虚拟机 Java虚拟机是一个抽象的计算机,它可以执行Java字节码。Java虚拟机是Java平台的一部分,Java平台由Java语言、Java API和Java虚拟机组成。Java虚拟机的主要作用是将Java字节码转换为机器代码&#x…...
HTML前端开发:JavaScript 获取元素方法详解
作为前端开发者,高效获取 DOM 元素是必备技能。以下是 JS 中核心的获取元素方法,分为两大系列: 一、getElementBy... 系列 传统方法,直接通过 DOM 接口访问,返回动态集合(元素变化会实时更新)。…...
QT开发技术【ffmpeg + QAudioOutput】音乐播放器
一、 介绍 使用ffmpeg 4.2.2 在数字化浪潮席卷全球的当下,音视频内容犹如璀璨繁星,点亮了人们的生活与工作。从短视频平台上令人捧腹的搞笑视频,到在线课堂中知识渊博的专家授课,再到影视平台上扣人心弦的高清大片,音…...
