当前位置: 首页 > news >正文

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka

  • 1 安装 Kafka
  • 2 创建 Topic
  • 3 Java 创建 Topic
  • 4 SpringBoot 项目
    • 4.1 pom.xml
    • 4.2 application.yml
    • 4.3 KafkaApplication.java
    • 4.4 CustomizePartitioner.java
    • 4.5 KafkaInitialConfig.java
    • 4.6 SendMessageController.java
  • 5 测试

1 安装 Kafka

Docker 安装 Kafka

2 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:\Users\Administrator> docker exec -it kafka /bin/sh$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

3 Java 创建 Topic

package com.xu.mq.demo.test.service;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4 SpringBoot 项目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.7.8</version><relativePath/> <!-- lookup parent from repository --></parent><groupId>com.xu</groupId><artifactId>kafka</artifactId><version>0.0.1-SNAPSHOT</version><name>kafka</name><description>Demo project for Spring Boot</description><properties><java.version>1.8</java.version></properties><dependencies><dependency><groupId>cn.hutool</groupId><artifactId>hutool-all</artifactId><version>5.8.12</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-streams</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><optional>true</optional></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-devtools</artifactId><scope>runtime</scope><optional>true</optional></dependency></dependencies><build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><configuration><excludes><exclude><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId></exclude></excludes></configuration></plugin></plugins></build></project>

4.2 application.yml

server:port: 8001spring:application:name: hello-kafkakafka:# 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)bootstrap-servers: 192.168.1.92:9092producer:# 发生错误后,消息重发的次数。retries: 0#当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。batch-size: 16384# 设置生产者内存缓冲区的大小。buffer-memory: 33554432# 键的序列化方式key-serializer: org.apache.kafka.common.serialization.StringSerializer# 值的序列化方式value-serializer: org.apache.kafka.common.serialization.StringSerializer# acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。# acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。# acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。acks: allconsumer:# 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5Dauto-commit-interval: 1S# 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:# latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)# earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录auto-offset-reset: earliest# 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量enable-auto-commit: true# 键的反序列化方式key-deserializer: org.apache.kafka.common.serialization.StringDeserializer# 值的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializerlistener:# 在侦听器容器中运行的线程数。concurrency: 4

4.3 KafkaApplication.java

package com.xu.mq.demo;import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;/*** @author Administrator*/
@EnableKafka
@SpringBootApplication
public class KafkaApplication {public static void main(String[] args) {SpringApplication.run(DemoApplication.class, args);}}

4.4 CustomizePartitioner.java

package com.xu.kafka.config;import java.util.Map;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;/*** @author Administrator*/
public class CustomizePartitioner implements Partitioner {/*** 自定义分区规则** @param topic      The topic name* @param key        The key to partition on (or null if no key)* @param keyBytes   The serialized key to partition on( or null if no key)* @param value      The value to partition on or null* @param valueBytes The serialized value to partition on or null* @param cluster    The current cluster metadata* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {return 0;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}}

4.5 KafkaInitialConfig.java

package com.xu.kafka.config;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import org.apache.kafka.clients.admin.NewTopic;/*** kafka 初始化配置类 创建 Topic** @author Administrator* @date 2023年2月17日11点30分*/
@Configuration
public class KafkaInitialConfig {public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";@Beanpublic NewTopic audioUploadTopic() {return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);}@Beanpublic NewTopic textUploadTopic() {return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);}}

4.6 SendMessageController.java

package com.xu.kafka.message.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;import com.xu.kafka.config.KafkaInitialConfig;import cn.hutool.json.JSONUtil;/*** @author Administrator*/
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {@Autowiredprivate KafkaTemplate template;/*** KafkaTemplate 发送消息** @param message*/@GetMapping("/test1/{message}")public void test1(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message);}/*** KafkaTemplate 发送消息 有回调** @param message*/@GetMapping("/test2/{message}")public void test2(@PathVariable("message") String message) {template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message).addCallback(success -> {System.out.println("发送成功\t" + success);}, fail -> {System.out.println("发送失败\t" + fail);});}
}

5 测试

在这里插入图片描述

发送成功	SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

在这里插入图片描述

相关文章:

SpringBoot 集成 Kafka

SpringBoot 集成 Kafka1 安装 Kafka2 创建 Topic3 Java 创建 Topic4 SpringBoot 项目4.1 pom.xml4.2 application.yml4.3 KafkaApplication.java4.4 CustomizePartitioner.java4.5 KafkaInitialConfig.java4.6 SendMessageController.java5 测试1 安装 Kafka Docker 安装 Kafk…...

OpenCV 图像金字塔算子

本文是OpenCV图像视觉入门之路的第14篇文章&#xff0c;本文详细的介绍了图像金字塔算子的各种操作&#xff0c;例如&#xff1a;高斯金字塔算子 、拉普拉斯金字塔算子等操作。 高斯金字塔中的较高级别&#xff08;低分辨率&#xff09;是通过先用高斯核对图像进行卷积再删除偶…...

【自学Linux】Linux一切皆文件

Linux一切皆文件 Linux一切皆文件教程 Linux 中所有内容都是以文件的形式保存和管理的&#xff0c;即一切皆文件&#xff0c;普通文件是文件&#xff0c;目录是文件&#xff0c;硬件设备&#xff08;键盘、监视器、硬盘、打印机&#xff09;是文件&#xff0c;就连套接字&…...

CUDA C++扩展的详细描述

CUDA C扩展的详细描述 文章目录CUDA C扩展的详细描述CUDA函数执行空间说明符B.1.1 \_\_global\_\_B.1.2 \_\_device\_\_B.1.3 \_\_host\_\_B.1.4 Undefined behaviorB.1.5 __noinline__ and __forceinline__B.2 Variable Memory Space SpecifiersB.2.1 \_\_device\_\_B.2.2. \_…...

为什么重写equals必须重写hashCode

关于这个问题&#xff0c;看了网上很多答案&#xff0c;感觉都参差不齐&#xff0c;没有答到要点&#xff0c;这次就记录一下&#xff01; 首先我们为什么要重写equals&#xff1f;这个方法是用来干嘛的&#xff1f; public boolean equals &#xff08;Object object&#x…...

< 每日小技巧:N个很棒的 Vue 开发技巧, 持续记录ing >

每日小技巧&#xff1a;6 个很棒的 Vue 开发技巧&#x1f449; ① Watch 妙用> watch的高级使用> 一个监听器触发多个方法> watch 监听多个变量&#x1f449; ② 自定义事件 $emit() 和 事件参数 $event&#x1f449; ③ 监听组件生命周期常规写法hook写法&#x1f44…...

数据结构与算法之二分查找分而治之思想

决定我们成为什么样人的&#xff0c;不是我们的能力&#xff0c;而是我们的选择。——《哈利波特与密室》二分查找是查找算法里面是很优秀的一个算法&#xff0c;特别是在有序的数组中&#xff0c;这种算法思想体现的淋漓尽致。一.题目描述及其要求请实现无重复数字的升序数组的…...

训练自己的中文word2vec(词向量)--skip-gram方法

训练自己的中文word2vec&#xff08;词向量&#xff09;–skip-gram方法 什么是词向量 ​ 将单词映射/嵌入&#xff08;Embedding&#xff09;到一个新的空间&#xff0c;形成词向量&#xff0c;以此来表示词的语义信息&#xff0c;在这个新的空间中&#xff0c;语义相同的单…...

ubuntu系统环境配置和常用软件安装

系统环境 修改文件夹名称为英文 参考链接 export LANGen_US xdg-user-dirs-gtk-update 常用软件安装 常用工具 ping 和ifconfig工具 sudo apt install -y net-tools inetutils-ping 截图软件 sudo apt install -y net-tools inetutils-ping flameshot 录屏 sudo apt-get i…...

【1139. 最大的以 1 为边界的正方形】

来源&#xff1a;力扣&#xff08;LeetCode&#xff09; 描述&#xff1a; 给你一个由若干 0 和 1 组成的二维网格 grid&#xff0c;请你找出边界全部由 1 组成的最大 正方形 子网格&#xff0c;并返回该子网格中的元素数量。如果不存在&#xff0c;则返回 0。 示例 1&#…...

windows11安装sqlserver2022报错

window11安装SQL Server 2022 报错 糟糕… 无法安装SQL Server (setup.exe)。此 SQL Server安装程序介质不支持此OS的语言&#xff0c;或没有SQL Server英语版本的安装文件。请使用匹配的特定语言SQL Server介质;或安装两个特定语言MUI&#xff0c;然后通过控制面板的区域设置…...

Python快速上手系列--日志模块--详解篇

前言本篇主要说说日志模块&#xff0c;在写自动化测试框架的时候我们就需要用到这个模块了&#xff0c;方便我们快速的定位错误&#xff0c;了解软件的运行情况&#xff0c;更加顺畅的调试程序。为什么要用到日志模块&#xff0c;直接print不就好了&#xff01;那得写多少print…...

【THREE.JS学习(1)】绘制一个可以旋转、放缩的立方体

学习新技能&#xff0c;做一下笔记。在使用ThreeJS的时候&#xff0c;首先创建一个场景const scene new THREE.Scene();接着&#xff0c;创建一个相机其中&#xff0c;THREE.PerspectiveCamera&#xff08;&#xff09;四个参数分别为&#xff1a;1.fov 相机视锥体竖直方向视野…...

数仓实战 - 滴滴出行

项目大致流程&#xff1a; 1、项目业务背景 1.1 目的 本案例将某出行打车的日志数据来进行数据分析&#xff0c;例如&#xff1a;我们需要统计某一天订单量是多少、预约订单与非预约订单的占比是多少、不同时段订单占比等 数据海量 – 大数据 hive比MySQL慢很多 1.2 项目架…...

python虚拟环境与环境变量

一、环境变量 1.环境变量 在命令行下&#xff0c;使用可执行文件&#xff0c;需要来到可执行文件的路径下执行 如果在任意路径下执行可执行文件&#xff0c;能够有响应&#xff0c;就需要在环境变量配置 2.设置环境变量 用户变量&#xff1a;当前用户登录到系统&#xff0c;…...

BeautifulSoup文档4-详细方法 | 用什么方法对文档树进行搜索?

4-详细方法 | 用什么方法对文档树进行搜索&#xff1f;1 过滤器1.1 字符串1.2 正则表达式1.3 列表1.4 True1.5 可以自定义方法2 find_all()2.1 参数原型2.2 name参数2.3 keyword 参数2.4 string 参数2.5 limit 参数2.6 recursive 参数3 find()4 find_parents()和find_parent()5…...

初识Tkinter界面设计

目录 前言 一、初识Tkinter 二、Label控件 三、Button控件 四、Entry控件 前言 本文简单介绍如何使用Python创建一个界面。 一、初识Tk...

软件测试面试题中的sql题目你会做吗?

目录 1.学生表 2.一道SQL语句面试题&#xff0c;关于group by表内容&#xff1a; 3.表中有A B C三列,用SQL语句实现&#xff1a;当A列大于B列时选择A列否则选择B列&#xff0c;当B列大于C列时选择B列否则选择C列 4. 5.姓名&#xff1a;name 课程&#xff1a;subject 分数&…...

VS实用调试技巧

一.什么是BUG&#x1f41b;Bug一词的原意是虫子&#xff0c;而在电脑系统或程序中隐藏着的一些未被发现的缺陷或问题&#xff0c;人们也叫它"bug"。这是为什么呢&#xff1f;这就要追溯到一个程序员与飞蛾的故事了。Bug的创始人格蕾丝赫柏&#xff08;Grace Murray H…...

通俗易懂理解三次握手、四次挥手(TCP)

文章目录1、通俗语言理解1.1 三次握手1.2 四次挥手2、进一步理解三次握手和四次挥手2.1 三次握手2.2 四次挥手1、通俗语言理解 1.1 三次握手 C:客户端 S&#xff1a;服务器端 第一次握手&#xff1a; C&#xff1a;在吗&#xff1f;我要和你建立连接。 第二次握手&#xff…...

1.1 什么是并发

1.1 什么是并发 并发&#xff1a;指两个或更多独立的活动同时发生。并发在生活中随处可见。我们可以一边走路一边说话&#xff0c;也可以两只手同时做不同的动作。 1.1.1 计算机系统中的并发 当我们提到计算机术语的“并发”&#xff0c;指的是在单个系统里同时执行多个独立…...

万字讲解你写的代码是如何跑起来的?

今天我们来思考一个简单的问题&#xff0c;一个程序是如何在 Linux 上执行起来的&#xff1f; 我们就拿全宇宙最简单的 Hello World 程序来举例。 #include <stdio.h> int main() {printf("Hello, World!\n");return 0; } 我们在写完代码后&#xff0c;进行…...

034.Solidity入门——21不可变量

Solidity 中的不可变量是在编译时就被确定的常量&#xff0c;也称为常量变量&#xff08;constant variable&#xff09;或只读变量&#xff08;read-only variable&#xff09;。这些变量在定义时必须立即初始化&#xff0c;并且在整个合约中都无法被修改&#xff0c;可以在函…...

Vulnhub 渗透练习(四)—— Acid

环境搭建 环境下载 kail 和 靶机网络适配调成 Nat 模式&#xff0c;实在不行直接把网络适配还原默认值&#xff0c;再重试。 信息收集 主机扫描 没扫到&#xff0c;那可能端口很靠后&#xff0c;把所有端口全扫一遍。 发现 33447 端口。 扫描目录&#xff0c;没什么有用的…...

C++ 在线工具

online编译器https://godbolt.org/Online C Compiler - online editor (onlinegdb.com) https://www.onlinegdb.com/online_c_compilerC Shell (cpp.sh) https://cpp.sh/在线文档Open Standards (open-std.org)Index of /afs/cs.cmu.edu/academic/class/15211/spring.96/wwwC P…...

使用MMDetection进行目标检测、实例和全景分割

MMDetection 是一个基于 PyTorch 的目标检测开源工具箱&#xff0c;它是 OpenMMLab 项目的一部分。包含以下主要特性&#xff1a; 支持三个任务 目标检测&#xff08;Object Detection&#xff09;是指分类并定位图片中物体的任务实例分割&#xff08;Instance Segmentation&a…...

使用ThreadLocal实现当前登录信息的存取

有志者&#xff0c;事竟成 文章持续更新&#xff0c;可以关注【小奇JAVA面试】第一时间阅读&#xff0c;回复【资料】获取福利&#xff0c;回复【项目】获取项目源码&#xff0c;回复【简历模板】获取简历模板&#xff0c;回复【学习路线图】获取学习路线图。 文章目录一、使用…...

高通平台开发系列讲解(Android篇)AudioTrack音频流数据传输

文章目录 一、音频流数据传输通道创建1.1、流程描述1.2、流程图解二、音频数据传输2.1、流程描述2.2、流程图解沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇章主要图解AudioTrack音频流数据传输 。 一、音频流数据传输通道创建 1.1、流程描述 AudioTrack在set函…...

BUUCTF-firmware1

题目下载&#xff1a;下载 新题型&#xff0c;记录一下 题目给出了flag形式&#xff0c;md5{网址&#xff1a;端口}&#xff0c;下载发现是一个.bin文件 二进制文件&#xff0c;其用途依系统或应用而定。一种文件格式binary的缩写。一个后缀名为".bin"的文件&#x…...

【C++之容器篇】二叉搜索树的理论与使用

目录前言一、二叉搜索树的概念二、二叉搜素树的模拟实现&#xff08;增删查非递归实现&#xff09;1. 二叉搜素树的结点2. 二叉搜索树的实现&#xff08;1&#xff09;. 二叉搜索树的基本结构&#xff08;2&#xff09;构造函数&#xff08;3&#xff09;查找函数&#xff08;4…...

可视化在线做网站/windows优化大师怎么使用

http://ardui.co/archives/738 我是潘&#xff0c;曾经是个工程师。这是为 Ardui.Co 制作的 “Arduino 公开课” 系列的入门教程。上一课介绍了I2C 协议连接1602 LCD。现在我们将屏幕升级到更强大的12864 OLED&#xff08;也称“1306”&#xff09;&#xff0c;让交互界面更加丰…...

新上线的网站怎么做优化/常德今日头条新闻

从 8i 到 11g &#xff0c;看看 Oracle 的自动化做了哪些&#xff1a;内存空间的自动管理、 UNDO 空间自动管理、存储参数自动管理、 RMAN 备份和归档存放的自动管理、统计信息自动收集、 AWR 数据自动采集&#xff0c;最多在加上一个存储自动管理 ASM 。同样看看从 8i 到 11g …...

wordpress博客 分类/网络优化工具

LinkedList &#xff1a;/** * 考虑的比较的周全&#xff0c;并且包含了全部的情况&#xff0c;代码也不乱<b></b> * * param index * 插入的位置 * param c * 插入的元素集合 */ private boolean addAll(int index, Collection<? extends E> c) { // 缺少…...

做网站需要学些什么软件/怎么查看网站的友情链接

燕雀安知鸿鹄之志?———《史记陈涉世家》 意思是燕雀怎么能知道鸿鹄的远大志向&#xff0c;比喻平凡的人哪里知道英雄人物的志向。 前文说了 Exectuor 的具体用法&#xff0c;却没有说到如何停止一个任务的执行。所以&#xff0c;本篇文章就来讲解一下 Executor 的生命周期管…...

福田公司网站建设/最新热搜新闻事件

jquery插件的插件验证每个人似乎都喜欢jQuery图书的动画效果&#xff0c;因此这里列出了几个易于使用的jQuery图书插件 &#xff0c;这些插件可提供良好的页面翻转体验并有助于创建类似于书籍的界面。 请享用&#xff01; 相关文章&#xff1a; 10个jQuery分页插件 有趣Java…...

做外贸网站选择服务器/东莞百度seo在哪里

问题描述   给定一个整数数列&#xff0c;数列中连续相同的最长整数序列算成一段&#xff0c;问数列中共有多少段&#xff1f; 输入格式   输入的第一行包含一个整数n&#xff0c;表示数列中整数的个数。   第二行包含n个整数a1, a2, …, an&#xff0c;表示给定的数列&a…...