SpringBoot3集成Kafka
标签:Kafka3.Kafka-eagle3;
一、简介
Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内置分区、复制和容错,这使得它成为大规模消息处理应用程序的一个很好的解决方案;
二、环境搭建
1、Kafka部署
1、下载安装包:kafka_2.13-3.5.0.tgz2、配置环境变量open -e ~/.bash_profileexport KAFKA_HOME=/本地路径/kafka3.5
export PATH=$PATH:$KAFKA_HOME/binsource ~/.bash_profile3、该目录【kafka3.5/bin】启动zookeeper
zookeeper-server-start.sh ../config/zookeeper.properties4、该目录【kafka3.5/bin】启动kafka
kafka-server-start.sh ../config/server.properties
2、Kafka测试
1、生产者
kafka-console-producer.sh --broker-list localhost:9092 --topic test-topic
>id-1-message
>id-2-message2、消费者
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic
id-1-message
id-2-message3、查看topic列表
kafka-topics.sh --bootstrap-server localhost:9092 --list
test-topic4、查看消息列表
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test-topic --from-beginning --partition 0
id-1-message
id-2-message
3、可视化工具
配置和部署
1、下载安装包:kafka-eagle-bin-3.0.2.tar.gz2、配置环境变量open -e ~/.bash_profileexport KE_HOME=/本地路径/efak-web-3.0.2
export PATH=$PATH:$KE_HOME/binsource ~/.bash_profile3、修改配置文件:system-config.propertiesefak.zk.cluster.alias=cluster1
cluster1.zk.list=localhost:2181
efak.url=jdbc:mysql://127.0.0.1:3306/kafka-eagle4、本地新建数据库:kafka-eagle,注意用户名和密码是否一致5、启动命令
efak-web-3.0.2/bin/ke.sh start
命令语法: ./ke.sh {start|stop|restart|status|stats|find|gc|jdk|version|sdate|cluster}6、本地访问【localhost:8048】 username:admin password:123456
KSQL语句测试
select * from `test-topic` where `partition` in (0) order by `date` desc limit 5
select * from `test-topic` where `partition` in (0) and msg like '%5%' order by `date` desc limit 3
三、工程搭建
1、工程结构
2、依赖管理
这里关于依赖的管理就比较复杂了,首先spring-kafka
组件选择与boot框架中spring相同的依赖,即6.0.10
版本,在spring-kafka
最近的版本中3.0.8
符合;
但是该版本使用的是kafka-clients
组件的3.3.2
版本,在Spring文档的kafka模块中,明确说明spring-boot:3.1
要使用kafka-clients:3.4
,所以从spring-kafka
组件中排除掉,重新依赖kafka-clients
组件;
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>${spring-kafka.version}</version><exclusions><exclusion><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId></exclusion></exclusions>
</dependency>
<dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>${kafka-clients.version}</version>
</dependency>
3、配置文件
配置kafka连接地址,监听器的消息应答机制,消费者的基础模式;
spring:# kafka配置kafka:bootstrap-servers: localhost:9092listener:missing-topics-fatal: falseack-mode: manual_immediateconsumer:group-id: boot-kafka-groupenable-auto-commit: falsemax-poll-records: 10properties:max.poll.interval.ms: 3600000
四、基础用法
1、消息生产
模板类KafkaTemplate
用于执行高级的操作,封装各种消息发送的方法,在该方法中,通过topic
和key
以及消息主体,实现消息的生产;
@RestController
public class ProducerWeb {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@GetMapping("/send/msg")public String sendMsg (){try {// 构建消息主体JsonMapper jsonMapper = new JsonMapper();String msgBody = jsonMapper.writeValueAsString(new MqMsg(7,"boot-kafka-msg"));// 发送消息kafkaTemplate.send("boot-kafka-topic","boot-kafka-key",msgBody);} catch (JsonProcessingException e) {e.printStackTrace();}return "OK" ;}
}
2、消息消费
编写消息监听类,通过KafkaListener
注解控制监听的具体信息,在实现消息生产和消费的方法测试后,使用可视化工具kafka-eagle
查看topic和消息列表;
@Component
public class ConsumerListener {private static final Logger log = LoggerFactory.getLogger(ConsumerListener.class);@KafkaListener(topics = "boot-kafka-topic")public void listenUser (ConsumerRecord<?,String> record, Acknowledgment acknowledgment) {try {String key = String.valueOf(record.key());String body = record.value();log.info("\n=====\ntopic:boot-kafka-topic,key{},body:{}\n=====\n",key,body);} catch (Exception e){e.printStackTrace();} finally {acknowledgment.acknowledge();}}
}
五、参考源码
文档仓库:
https://gitee.com/cicadasmile/butte-java-note源码仓库:
https://gitee.com/cicadasmile/butte-spring-parent
相关文章:

SpringBoot3集成Kafka
标签:Kafka3.Kafka-eagle3; 一、简介 Kafka是一个开源的分布式事件流平台,常被用于高性能数据管道、流分析、数据集成和关键任务应用,基于Zookeeper协调的处理平台,也是一种消息系统,具有更好的吞吐量、内…...

css学习1
1、样式定义如何显示元素。 2、样式通常保存至外部的css文件中。 3、样式可以使内容与表现分离。 4、css主要有两部分组成:选择器与一条或多条声明。 选择器通常为要改变的html元素,每条声明由一个属性和一个值组成。每个属性有一个值,属性…...

rust踩雷笔记(1)——切片传参和解引用赋值
最近学习rust,网上资料还是很有限,做题遇到的问题,有时需要自己试验。把自己做题过程遇到的问题,和试验的结论,做一些简单记录。 阅读下列文字和代码 用切片(的引用)做参数要非常小心ÿ…...

安全 1自测
常见对称加密算法: DES(Data Encryption Standard):数据加密标准,速度较快,适用于加密大量数据的场合; 3DES(Triple DES):是基于DES,对一块数据用…...

寻路算法小游戏
寻路算法小demo 寻路算法有两种,一种是dfs 深度优先算法,一种是 dfs 深度优先算法 深度优先搜索的步骤分为 1.递归下去 2.回溯上来。顾名思义,深度优先,则是以深度为准则,先一条路走到底,直到达到目标。这…...

CSS基础 知识点总结
一.CSS简介 1.1 CSS简介 ① CSS指的是层叠样式表,用来控制网页外观的一门技术 ② CSS发展至今,经历过CSS1.0 CSS2.0 CSS2.1 CSS3.0这几个版本,CSS3.0是CSS最新版本 1.2 CSS引入方式 ① 在一个页面引入CSS,共有三种方式 外部…...

自动执行探索性数据分析 (EDA),更快、更轻松地理解数据
一、说明 EDA是 exploratory data analysis (探索性数据分析 )的缩写。所谓EDA就是在数据分析之前需要对数据进行以此系统性研判,在这个研判后,得到基本的数据先验知识,在这个基础上进行数据分析。本文将在R语言和python语言的探索性处理。 摄…...

【自定义系统服务】【android13】添加自定义java系统服务
背景 在平时的业务开发中,我们往往需要开发自定义的系统服务来处理自己特殊的需求,这里介绍的是添加自定义的Java系统服务,可以在系统App中直接调用 定义aidl Binder默认可以传输基本类型的数据,如果要传递类对象,则这个类需要实现序列化。我们先定义一个序列化的自定义…...

【Sklearn】基于随机梯度下降算法的数据分类预测(Excel可直接替换数据)
【Sklearn】基于随机梯度下降算法的数据分类预测(Excel可直接替换数据) 1.模型原理2.模型参数3.文件结构4.Excel数据5.下载地址6.完整代码7.运行结果1.模型原理 随机梯度下降(Stochastic Gradient Descent,SGD)是一种优化算法,用于训练模型的参数以最小化损失函数。在分…...

44、TCP报文(二)
接上节内容,本节我们继续TCP报文首部字段含义的学习。上节为止我们学习到“数据偏移”和“保留”字段。接下来我们学习后面的一些字段(暂不包含“检验和”的计算方法和选项字段)。 TCP首部结构(续) “数据偏移”和“保…...

目标检测(Object Detection)
文章目录 1. 目标检测1.1 目标检测简要概述及名词解释1.2 IOU1.3 TP TN FP FN1.4 precision(精确度)和recall(召回率) 2. 边框回归Bounding-Box regression3. Faster R-CNN3.1 Faster-RCNN:conv layer3.2 Faster-RCNN&…...

vue中实现文字检索时候将搜索内容标红
实现结果 html: <div class"searchBox"><span class"bt">标  题</span><div class"search"><div class"shuru"><!-- <span class"title">生产经营<…...

PCL protocol composition logic
PCL 协议组合逻辑 一 主体(principal)和线程(thread)的区分 1.主体:指 **协议的参与者,用X^来表示。**每个主体可以扮演一个或多个角色,如 InitCR和RespCR ; 2.线程:主…...

聊聊看React和Vue的区别
Vue 更适合小项目,React 更适合大公司大项目; Vue 的学习成本较低,很容易上手,但项目质量不能保证...... 真的是这样吗?借助本篇文章,我们来从一些方面的比较来客观的去看这个问题。 论文档的丰富性 从两个…...

OSPF在广播类型的网络拓扑中DR和BDR的选举
指定路由器(DR): 一个网段上的其他路由器都和指定路由器(DR)构成邻接关系,而不是它们互相之间构成邻接关系。 备份指定路由器(BDR): 当DR出现问题,由BDR接…...

系统学习Linux-Mariadb高可用MHA
概念 MHA(MasterHigh Availability)是一套优秀的MySQL高可用环境下故障切换和主从复制的软件。 MHA 的出现就是解决MySQL 单点的问题。 MySQL故障切换过程中,MHA能做到0-30秒内自动完成故障切换操作。 MHA能在故障切换的过程中最大程度上…...

慢SQL的原因
如何排查慢SQL问题 识别慢SQL:使用数据库性能监控工具,如慢SQL日志,识别耗时较长的查询。执行计划分析:使用数据库提供的分析工具,例如EXPLAIN来查看查询的执行计划,判断是否存在全表扫描,索引…...

php正则替换文章的图片
要使用正则表达式替换文章中的图片链接,可以按照以下步骤进行操作: 1. 获取文章内容:首先,你需要获取包含图片链接的文章内容。你可以从文件中读取文章,或者从数据库中检索文章内容。 2. 使用正则表达式匹配图片链接…...

57 | TAPTAP客户端分析
TAPTAP客户端分析 一、用户群分析 首先,TapTap用户群可分为三大类: 游戏爱好者游戏发烧者游戏开发者(次要用户,有开发者后台,可以显示数据,不重点分析)注:爱好者与发烧者区别在于,前者是用空余时间来玩游戏,时间不如后者充足,且后者更执着于游戏,游戏种类更多。 …...

开源了一套基于springboot+vue+uniapp的商城,包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发
RuoYi-Mall-JAVA商城-电商系统简介 开源了一套基于若依框架,SringBoot2MybatisPlusSpringSecurityjwtredisVueUniapp的前后端分离的商城系统, 包含分类、sku、商户管理、分销、会员、适合企业或个人二次开发。 前端采用Vue、Element UI(ant…...

Android进阶之多级列表
遇到一个需求需要显示多级列表,因为界面是在平板上的,所以层级是从左向右往下排的,类似于 我当时的写法是在xml布局里一个个RecyclerView往下排的 当然前提是已经规定好最大的层级我才敢如此去写界面,如果已经明确规定只有两级或…...

Stochastic: Distribution-Expectation-Inequalities
见:https://www.math.hkust.edu.hk/~makchen/MATH5411/Chap1Sec2.pdf...

Java算法_ 二叉树的最大深度(LeetCode_Hot100)
题目描述:给定一个二叉树 ,返回其最大深度。root 二叉树的 最大深度 是指从根节点到最远叶子节点的最长路径上的节点数。 获得更多?算法思路:代码文档,算法解析的私得。 运行效果 完整代码 /*** 2 * Author: LJJ* 3 * Date: 2023/…...

行业追踪,2023-08-18
自动复盘 2023-08-18 凡所有相,皆是虚妄。若见诸相非相,即见如来。 k 线图是最好的老师,每天持续发布板块的rps排名,追踪板块,板块来开仓,板块去清仓,丢弃自以为是的想法,板块去留让…...

js将项目中的图片上传到服务器
项目上有时候会有奇怪的需求,比如前端有一些示例,想点击按钮就能上传图片,而这个图片是在前端的项目中的,如果不上传吧,又获取不到一些业务数据的id,但后端又不想为这块功能做特殊的处理,这时想通过前端直接上传到后端,需要file对象才可以。 这个时候我们需要将img转换…...

【C语言】指针的进阶
目录 一、字符指针 二、指针数组 三、数组指针 1.数组指针的定义 2.&数组名和数组名区别 3.数组指针的使用 四、数组参数与指针参数 1.一维数组传参 2.二维数组传参 3.一级指针传参 4.二级指针传参 五、函数指针 六、函数指针数组 七、指向函数指针数组的指针…...

【Windows系统编程】03.远线程注入ShellCode
shellcode:本质上也是一段普通的代码,只不过特殊的编程手法,可以在任意环境下,不依赖于原有的依赖库执行。 远程线程 #include <iostream> #include <windows.h> #include <TlHelp32.h>int main(){HANDLE hPr…...

第1天----验证一个字符串是否是另一个字符串的子串
本文我们将学习如何去验证一个字符串是否是另一个字符串的子串。 一、小试牛刀: 题目描述 输入两个字符串,验证其中一个串是否为另一个串的子串。 输入格式 两行,每行一个字符串。 输出格式 若第一个串 s 1 是第二个串 s 2 的子串,…...

项目实战第四十三讲:使用模版模式优雅实现财务编辑费用
项目实战第四十三讲:财务编辑费用 本文是项目实战第43讲:使用模版模式优雅实现财务编辑费用。支持查看司机填写费用信息,并且附件管理支持展示司机上传费用照片。 文章目录 项目实战第四十三讲:财务编辑费用1、项目背景2、主要技术3、项目职责4、项目实现4.1、表结构4.2、流…...

[JavaWeb]【六】web后端开发-请求响应
前言:请求响应 目录 一 引子 二 请求 2.1 Postman 2.1.1 安装 2.1.2 创建工作空间 2.1.3 添加接口 2.2 简单参数 2.2.1 原始方式(不推荐) 2.2.2 SpringBoot方式-GET(参数名与形参变量名相同) 2.2.3 SpringBoot方式-POST(参数名与形参…...