搭建大型分布式服务(四十二)SpringBoot 无代码侵入实现多Kafka数据源整合插件发布
系列文章目录
文章目录
- 系列文章目录
- 前言
- MultiKafkaStarter [V2.2]
- 一、功能特性
- 二、快速开始(生产端)
- 三、快速开始(消费端)
- 四、其它特性
- 五、变更记录
- 六、参考文章
前言
在分布式服务的架构演进中,消息队列作为核心组件之一,承载着解耦、异步、削峰填谷等关键职责。Apache Kafka 作为业界广泛使用的分布式流处理平台,因其高吞吐、低延迟的特性被大量应用在各类大数据场景中。然而,随着业务的复杂度不断提升,如何在 SpringBoot 中高效地整合并管理多个 Kafka 数据源,成为了一个值得探讨的问题。
在过去的一段时间里,我们通过系列文章详细阐述了如何在 SpringBoot 中以零代码或极低的代码侵入方式,实现多 Kafka 数据源的整合。从基础的配置到高级特性如 protobuf 支持、Aware 模式以及亿级消息生产者的优化,我们希望通过这些内容帮助开发者更加便捷地应对复杂的业务场景。
今天,我们将这些内容凝练成一个全新的 SpringBoot 插件——MultiKafkaStarter,旨在进一步降低开发者整合多 Kafka 数据源的门槛,提升系统的可维护性和扩展性。
核心特点
- 无代码侵入:通过 SpringBoot 的自动配置机制,无需修改业务代码即可实现多 Kafka 数据源的整合。
- 灵活配置:支持动态配置多个 Kafka 数据源,包括 bootstrap servers、group id、security protocol 等关键参数。
- 全面特性支持:不仅支持基础的消息消费和生产功能,还提供了对 protobuf 序列化/反序列化的支持,以及对 Aware 模式的适配。
- 亿级消息处理:针对高并发场景,提供了包括批量发送、分区策略优化等在内的多项性能优化措施,确保系统能够稳定处理亿级消息量。
- 易用性与可维护性:插件采用模块化的设计思想,易于集成和升级,同时提供了丰富的文档和社区支持
国籍惯例,先上源码:Github源码
MultiKafkaStarter [V2.2]
SpringBoot 零代码方式整合多个kafka数据源,支持任意kafka集群,已封装为一个小模块,集成所有kafka配置,让注意力重新回归业务本身。
一、功能特性
- SpringBoot无编程方式整合多个kafka数据源
- 支持批量消费kafka并对单批次消息根据条件去重
- 支持消费kafka消息类型为pb格式
- 支持任意数量生产者
1、引入最新依赖包,如果找不到依赖包,请到工程目录mvn clean package install
执行一下命令。
<dependency><groupId>io.github.vipjoey</groupId><artifactId>multi-kafka-starter</artifactId><version>2.2</version>
</dependency>
二、快速开始(生产端)
2、添加kafka地址等相关配置。
## json消息生产者
spring.kafka.four.enabled=true
spring.kafka.four.producer.count=1 ## 生产者数量,默认为1个
spring.kafka.four.producer.name=fourKafkaSender ## 设置bean的名称,方便后续引用。如果没有设置,默认值为xxxKafkaSender
spring.kafka.four.producer.bootstrap-servers=${spring.embedded.kafka.brokers} ## 必须设置
spring.kafka.four.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.four.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer## pb 消息生产者
spring.kafka.five.enabled=true
spring.kafka.five.producer.name=fiveKafkaSender
spring.kafka.five.producer.bootstrap-servers=${spring.embedded.kafka.brokers}
spring.kafka.five.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.five.producer.value-serializer=org.apache.kafka.common.serialization.ByteArraySerializer
3、根据名称注入生产者MmcKafkaMultiSender
,就可以发送kafka消息。
@Resource(name = "fourKafkaSender")private MmcKafkaMultiSender mmcKafkaMultiSender;@Resource(name = "fiveKafkaSender")private MmcKafkaMultiSender mmcKafkaMultiSender;@Resourceprivate MmcKafkaOutputContainer mmcKafkaOutputContainer;// 方式一void produceMessage() {for (int i = 0; i < 10; i++) {DemoAwareMsg msg = new DemoAwareMsg();msg.setRoutekey("routekey" + i);msg.setName("name" + i);msg.setTimestamp(System.currentTimeMillis());String json = JsonUtil.toJsonStr(msg);mmcKafkaMultiSender.sendStringMessage(topicOne, "aaa", json);}}// 方式二void produceMessage() {MmcKafkaSender sender = mmcKafkaOutputContainer.getOutputs().get("xxxKafkaSender");sender.sendStringMessage(topic, sku.getRoutekey(), message);}
三、快速开始(消费端)
2、添加kafka地址等相关配置。
## topic1的kafka配置
spring.kafka.one.enabled=true
spring.kafka.one.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.one.topic=mmc-topic-one
spring.kafka.one.group-id=group-consumer-one
spring.kafka.one.processor=你的处理类bean名称(例如:oneProcessor)
spring.kafka.one.dupicate=true ## 如果为true表示对批次内的kafka消息去重,需要实现MmcKafkaMsg接口,默认为false
spring.kafka.one.consumer.auto-offset-reset=latest
spring.kafka.one.consumer.max-poll-records=10
spring.kafka.one.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.one.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## topic2的kafka配置
spring.kafka.two.enabled=true
spring.kafka.two.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.two.topic=mmc-topic-two
spring.kafka.two.group-id=group-consumer-two
spring.kafka.two.processor=你的处理类bean名称
spring.kafka.two.consumer.auto-offset-reset=latest
spring.kafka.two.consumer.max-poll-records=10
spring.kafka.two.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.two.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer## protobuf类型的消息的kafka配置
spring.kafka.pb.enabled=true
spring.kafka.pb.consumer.bootstrapServers=${spring.embedded.kafka.brokers}
spring.kafka.pb.topic=mmc-topic-pb
spring.kafka.pb.group-id=group-consumer-pb
spring.kafka.pb.processor=pbProcessor
spring.kafka.pb.consumer.auto-offset-reset=latest
spring.kafka.pb.consumer.max-poll-records=10
spring.kafka.pb.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.pb.consumer.value-deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
3、新建kafka消息对应的实体类,可以选择实现MmcMsgDistinctAware
接口,例如
@Data
class DemoMsg implements MmcMsgDistinctAware {private String routekey;private String name;private Long timestamp;}如果你配置了spring.kafka.xxx.duplicate=fale,则不需要实现MmcMsgDistinctAware接口。PS:如果实现MmcMsgDistinctAware接口,就自动具备了消息去重能力
4、新建kafka消息处理类,要求继承MmcKafkaKafkaAbastrctProcessor
,然后就可以愉快地编写你的业务逻辑了。
@Slf4j
@Service("oneProcessor") // 你的处理类bean名称,如果没有定义bean名称,那么默认就是首字母缩写的类名称
public class OneProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Overrideprotected void dealMessage(List<DemoMsg> datas) {// 下面开始编写你的业务代码}}@Slf4j
@Service("pbProcessor")
public class PbProcessor extends MmcKafkaKafkaAbastrctProcessor<DemoMsg> {@Overrideprotected Stream<DemoMsg> doParseProtobuf(byte[] record) {try {DemoPb.PbMsg msg = DemoPb.PbMsg.parseFrom(record);DemoMsg demo = new DemoMsg();BeanUtils.copyProperties(msg, demo);return Stream.of(demo);} catch (InvalidProtocolBufferException e) {log.error("parssPbError", e);return Stream.empty();}}@Overrideprotected void dealMessage(List<DemoMsg> datas) {System.out.println("PBdatas: " + datas);}
}
四、其它特性
1、支持单次拉取kafka的batch消息里去重,需要实现MmcMsgDistinctAware
的getRoutekey()和getTimestamp()方法;如果为false,则不要实现MmcMsgDistinctAware
接口。
spring.kafka.xxx.duplicate=true
2、支持字符串kafka消息,json是驼峰或者下划线
# 默认为支持驼峰的kafka消息,为ture则支持下划线的消息
spring.kafka.xxx.snakeCase=false
3、支持pb的kafka消息,需要自行重写父类的doParseProtobuf
方法;
@Overrideprotected Stream<DemoMsg> doParseProtobuf(byte[] record) {try {DemoMsg msg = new DemoMsg();DemoPb.PbMsg pb = DemoPb.PbMsg.parseFrom(record);BeanUtils.copyProperties(pb, msg);return Stream.of(msg);} catch (InvalidProtocolBufferException e) {log.error("doParseProtobuf error: {}", e.getMessage());return Stream.empty();}}
4、支持获取kafka的topic、offset属性,注入到实体类中,需要实现MmcMsgKafkaAware
接口
@Data
class DemoAwareMsg implements MmcKafkaAware {private String routekey;private String name;private Long timestamp;private String topic;private long offset;}
五、变更记录
- 20240623 v2.2 支持Kafka生产者,并对MultiKafkaConsumerStarter项目重命名为MultiKafkaStarter
- 20240602 v2.1 支持获取kafka消息中topic、offset属性
- 20240602 v2.0 支持protobuf、json格式
- 20240430 v1.1 取消限定符
- 20231111 v1.0 初始化
六、参考文章
- 《搭建大型分布式服务(三十六)SpringBoot 零代码方式整合多个kafka数据源》
- 《搭建大型分布式服务(三十七)SpringBoot 整合多个kafka数据源-取消限定符》
- 《搭建大型分布式服务(三十八)SpringBoot 整合多个kafka数据源-支持protobuf》
- 《搭建大型分布式服务(三十九)SpringBoot 整合多个kafka数据源-支持Aware模式》
- 《搭建大型分布式服务(四十)SpringBoot 整合多个kafka数据源-支持生产者》
- 《搭建大型分布式服务(四十一)SpringBoot 整合多个kafka数据源-支持亿级消息生产者》
- 《搭建大型分布式服务(四十二)SpringBoot 无代码侵入实现多Kafka数据源整合插件发布》
- 《搭建大型分布式服务(四十三)SpringBoot 多Kafka数据源发布到Maven中央仓库:让世界看到你的作品!》
加我加群一起交流学习!更多干货下载、项目源码和大厂内推等着你
相关文章:
搭建大型分布式服务(四十二)SpringBoot 无代码侵入实现多Kafka数据源整合插件发布
系列文章目录 文章目录 系列文章目录前言MultiKafkaStarter [V2.2]一、功能特性二、快速开始(生产端)三、快速开始(消费端)四、其它特性五、变更记录六、参考文章 前言 在分布式服务的架构演进中,消息队列作为核心组件…...
Python 学习路线及技巧
一、学习路线 1. 基础阶段 ● 学习 Python 的语法基础,如变量、数据类型、运算符、控制流等。 ● 掌握常用的 Python 标准库,如 os、sys、re、datetime 等。 ● 通过编写简单的程序来巩固基础,如计算器、字符串处理等。 2. 进阶阶段 ● 深入…...
计算机网络知识整理笔记
目录 1.对网络协议的分层? 2.TCP/IP和UDP之间的区别? 3.建立TCP连接的三次握手? 4.断开TCP连接的四次挥手? 5.TCP协议如何保证可靠性传输? 6.什么是TCP的拥塞控制? 7.什么是HTTP协议? 8…...
练习 String翻转 注册处理 字符串统计
p493 将字符串中指定部分进行翻转 package chapter;public class reverse {public static void main(String[] args) {String str "abcdef";str reverseMethod(str,0,3);System.out.println(str);}public static String reverseMethod(String str, int start, in…...
linux的常用系统维护命令
1.ps显示某个时间点的程序运行情况 -a :显示所有用户的进程 -u :显示用户名和启动时间 -x :显示 没有控制终端的进程 -e :显示所有进程,包括没有控制终端的进程 -l :长格式显示 -w :宽…...
java:aocache 0.4.0 缓存控制机制
aoocache发布第一个版本0.1.0时,没有考虑到使用aocache的项目对方法缓存的控制需求。 场景 给同事做培训时,同事提到这个需求,他希望能够有方法主动去清理指定方法的缓存: 他的数据是由其他服务启动时提供的,他的方法…...
试析C#编程语言的特点及功能
行步骤,而不必创建新方法。其声明方法是在实例化委托基础上,加一对花括号以代表执行范围,再加一个分号终止语句。 2.3.3 工作原理 C#编译器在“匿名”委托时会自动把执行代码转换成惟一命名类里的惟一命名函数。再对存储代码块的委托进行设…...
Textual Learning2 -- 使用时的小问题
1、出现的问题: 在vscode里面直接运行函数会显示报错: 我尝试在vscode中含textual库的环境下运行,但仍然报错 2、解决方案: 在命令行中运行: 首先按winR,输入cmd打开命令行 或在已经安装的conda环境&a…...
CST--如何在PCB三维模型中自由创建离散端口
在使用CST电磁仿真软件进行PCB的三维建模时,经常会遇到不能自动创建离散端口的问题,原因有很多,比如:缺少元器件封装、开路端口、多端子模型等等,这个时候,很多人会选择手动进行端口创建,但是&a…...
C++中的虚函数表结构框架
一.虚函数表介绍 Virtual Table虚函数表是实现多态的 每个有虚函数的类的实现,都有个指向虚函数的指针表(不管是父类还是子类) 指向虚表的指针是作为数据成员存在实例对象中 当调用虚函数时,就去查找对象的虚表中指向整顿派生类函…...
【ES】--Elasticsearch的高亮模式
目录 一、高亮策略1、Fast Vector Highlighter(快速向量高亮器)2、Posting Highlighter(帖子高亮器)3、Unified Highlighter(统一高亮器)4、Plain Highlighter(普通高亮器)5、总结二、高亮参数三、高亮案例解析1、words_one配置解析2、words_two配置解析3、words_three…...
使用matlab开发stm32总结,stm32-matlab常见的问题处理以及报错合集
1,问题:本来是好的,突然编译运行报错,说是确少包, 解决方案:重启以后好了 2,有完美的马鞍波,为什么不能够转动呢? 原因是我这里模型的问题,我计算出来的是占…...
落石滑坡监测报警系统:创新保障高速公路安全
在现代交通建设中,高速公路的安全性和稳定性至关重要。特别是易发生落石区域,如何有效预防和应对落石滑坡带来的事故成为了一项关键性挑战。为此,落石滑坡监测报警系统应运而生,它通过先进的技术手段,为高速…...
Linux开发讲课20--- QSPI
SPI 是英语 Serial Peripheral interface 的缩写,顾名思义就是串行外围设备接口,一种高速的,全双工,同步的通信总线,并且在芯片的管脚上只占用四根线,节约了芯片的管脚,为 PCB 的布局上节省空间…...
VMware ESXi 8.0U3 macOS Unlocker OEM BIOS 集成驱动版,新增 12 款 I219 网卡驱动
VMware ESXi 8.0U3 macOS Unlocker & OEM BIOS 集成驱动版,新增 12 款 I219 网卡驱动 VMware ESXi 8.0U3 macOS Unlocker & OEM BIOS 集成网卡驱动和 NVMe 驱动 (集成驱动版) 发布 ESXi 8.0U3 集成驱动版,在个人电脑上运行企业级工作负载 请访…...
vuepress使用简介及个人博客搭建
目录 一、介绍二、环境准备三、安装运行vuepress四、目录结构五、配置文件六、导航栏配置七、导航栏logo八、浏览器图标九、侧边栏配置十、添加 Git 仓库和编辑链接十一、部署到GitHub十二、搭建成功 一、介绍 VuePress 是 Vuejs 官方提供的一个是Vue驱动的静态网站生成器&…...
c#文件读写
1.1读取文件 方法说明File.ReadAllText(FilePath);读取指定路径的文件File.ReadAllText(FilePath, Encoding);通过指定编码格式来读取指定文件File.ReadAllBytes();读取二进制文件,并把内容读取到一个字节数组File.ReadAllLines();以行的形式读取文…...
WIFI 企业级认证手段 EAP-TLS介绍
EAP-TLS(EAP-Transport Layer Security)被认为是WLAN网络里最安全的认证方法,因此被企业广泛采用。本文会针对EAP-TLS的基本原理进行介绍。 在介绍原理之前,先介绍下WLAN网络里认证加密手段涉及到的一些基本概念。 1 802.1x IEE…...
【网络架构】keepalive
目录 一、keepalive基础 1.1 作用 1.2 原理 1.3 功能 二、keepalive安装 2.1 yum安装 2.2 编译安装 三、配置文件 3.1 keepalived相关文件 3.2 主配置的组成 3.2.1 全局配置 3.2.2 配置虚拟路由器 四、实际操作 4.1 lvskeepalived高可用群集 4.2 keepalivedngi…...
【Dison夏令营 Day 03】使用 Python 创建我们自己的 21 点游戏
21 点(英文:Blackjack)是一种在赌场玩的纸牌游戏。这种游戏的参与者不是互相竞争,而是与赌场指定的庄家竞争。在本文中,我们将从头开始创建可在终端上玩的玩家与庄家之间的二十一点游戏。 二十一点规则 我们将为从未玩过二十一点的读者提供…...
Workbench密码登录登录失败
Workbench密码登录登录失败操作系统禁用了密码登录方式,会导致使用了正确的用户名和密码仍无法登录 sudo vim /etc/ssh/sshd_config 输入O进入编辑 改完后重启 systemctl restart sshd.service 登录报错 有试了几遍登上了 可能是改完还要等一会儿...
哈尔滨高校大学智能制造实验室数字孪生可视化系统平台项目的验收
哈尔滨高校大学智能制造实验室数字孪生可视化系统平台项目的验收,标志着这一技术在教育领域的应用取得了新的突破。项目旨在开发一个数字孪生可视化系统平台,用于哈尔滨高校大学智能制造实验室的设备模拟、监测与数据分析。项目的主要目标包括࿱…...
009、MongoDB的分片策略
目录 MongoDB的分片策略:范围分片vs哈希分片 1. 范围分片(Range Sharding) 1.1 工作原理 1.2 优点 1.3 缺点 1.4 研究支持 2. 哈希分片(Hash Sharding) 2.1 工作原理 2.2 优点 2.3 缺点 2.4 研究支持 3. 选择合适的分片策略 4. 实践案例 4.1 电子商务平台 4.2 社…...
go~缓存设计配合singleFlight
一个缓存设计,配合go的singleFlight 最开始的设计如下 添加分布式缓存 上线后分布式缓存上涨的流量并不等于下游下降的流量,而是下游下降的流量 * 2~3 究其原因,就是采用了go的singleFlight,假定请求缓存时长10ms&a…...
多线程引发的安全问题
前言👀~ 上一章我们介绍了线程的一些基础知识点,例如创建线程、查看线程、中断线程、等待线程等知识点,今天我们讲解多线程下引发的安全问题 线程安全(最复杂也最重要) 产生线程安全问题的原因 锁(重要…...
在晋升受阻或遭受不公待遇申诉时,这样写是不是好一些?
在晋升受阻或遭受不公待遇申诉时,这样写是不是好一些? 在职场中,晋升受阻或遭受不公待遇是员工可能面临的问题之一。面对这样的情况,如何撰写一份有效的申诉材料,以维护自己的合法权益,就显得尤为重要。#李…...
LeetCode 2710.移除字符串中的尾随零:模拟
【LetMeFly】2710.移除字符串中的尾随零:模拟 力扣题目链接:https://leetcode.cn/problems/remove-trailing-zeros-from-a-string/ 给你一个用字符串表示的正整数 num ,请你以字符串形式返回不含尾随零的整数 num 。 示例 1: 输…...
代码随想录训练营第二十三天 39组合总和 40组合总和II 131分割回文串
第一题: 原题链接:39. 组合总和 - 力扣(LeetCode) 思路: 终止条件: 用一个sum值来记录当前组合中元素的总和。当sum的值大于target的时候证明该组合不合适,直接return。当sum的值等于target的…...
【C++】数组、字符串
六、数组、字符串 讨论数组离不开指针,指针基本上就是数组的一切的基础,数组和指针的相关内容参考我的C系列博文:【C语言学习笔记】四、指针_通过变量名访问内存单元中的数据缺点-CSDN博客【C语言学习笔记】三、数组-CSDN博客 1、数组就是&…...
MySQL InnoDB支持几种行格式
数据库表的行格式决定了一行数据是如何进行物理存储的,进而影响查询和DML操作的性能。 在InnoDB中,常见的行格式有4种: 1、COMPACT:是MySQL 5.0之前的默认格式,除了保存字段值外,还会利用空值列表保存null…...
佛山做网站公司哪家好/电子商务软文写作
昨天安装了最新版本XAMPP for Windows 1.8.3。 今天早上打开XAMPP双击mysql Start按钮报错,如下(部分截取): 2013-09-17 10:12:02 9012 [ERROR] InnoDB: Attempted to open a previously opened tablespace. Previous tablespace mysql/slave_relay_log_…...
dede英文网站/网上销售哪些平台免费
引言 大数据平台是对海量结构化、非结构化、半机构化数据进行采集、存储、计算、统计、分析处理的一系列技术平台。大数据平台处理的数据量通常是TB级,甚至是PB或EB级的数据,这是传统数据仓库工具无法处理完成的,其涉及的技术有分布式计算、…...
做网站简单还是做app简单/网络销售怎么找客源
海信POS机可编程键值定义:Q:档键设置,即右仙K0\K1\K2\K3\K4\K5键的设置,其对应键上的L\R\S\X\Z键,如何将其第一层键值放到R档上,第二层键值放到S档上?A:将对应R档的K1键设为空,对应…...
专业做简历的网站/app线上推广是什么工作
第三章 囹圄小学四年级了,但是“在魂”还未萌芽,只是在一个黑暗的地方慢慢酝酿着,等待着解开封印的那一天。可是就在这一年,“在魂”终于浮出水面探出了头。来的时候没打过招呼,反正就这样莫名其妙的产生了.记得有一天…...
wordpress进入中国市场/电子商务主要学什么就业方向
一 总结 文章第4节,访问资源服务的时候,还要对token进行通过调用远程认证服务进行校验 文章第5节,使用jwt的token,在不需要调用认证服务器,因为自身就是携带了用户的信息。 二 校验token 令牌申请成功可以使用/uaa/oa…...
网站后台链接怎么做/seo课培训
在讲述这些之前我们需要一些预备知识:Java的内存结构我们可以通过两个方面去看待它。从该角度看的话Java内存结构包含以下部分:该部分内容可以结合:JVM简介(更加详细深入的介绍)1、栈区:由编译器自动分配释放,具体方法…...