Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门
参考文章
【Docker安装部署Kafka+Zookeeper详细教程】_linux arm docker安装kafka-CSDN博客
Docker搭建kafka+zookeeper
打开我们的docker的镜像源配置
vim /etc/docker/daemon.json
配置
{
"registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"]
}
下面的那个insecure是我自己虚拟机的,不用理会

拉取镜像
然后开始拉取我们的zookeeper镜像和我们的kafka镜像
这个是我们的zookeeper镜像,没有指定版本默认就是拉取最新的版本
docker pull zookeeper
kafka镜像
docker pull wurstmeister/kafka
因为我们的docker不同容器之间的网络是互相隔开的,所以我们要创建一个共同使用的网络
让不同容器都加入这个网络
docker network create创建我们的网络
然后那个zookeeper_network是我们自定义的网络名称
docker network create --driver bridge zookeeper_network
kafka是依赖于zookeeper的所以我们要先安装zookeeper
我们先用run来创建一个zookeeper容器
docker run -d --name zookeeper1 --network zookeeper_network -p 2181:2181 zookeeper
-d 是后台运行
--name 是我们自定义容器的名字 我定义的名字是zookeeper1
--network
是指定我们的网络环境,我们刚刚创建的网络环境名字叫zookeeper_network,所以我们要让容器加入这个网络
-p 是指定我们的容器暴露给外部的端口 2181:2181是指虚拟机(或服务器)的2181端口与容器内部的2181端口做映射
最后面的那个zookeeper 是我们的使用的镜像源的名称
一般是zookeeper:xxx来执行使用镜像源的版本,如果不指定版本默认用的就是最新版本
查看我们创建的网络环境的地址
docker inspect zookeeper_network


那个IPv4就是我们的网络环境的地址,这是我的网络环境的地址
我的是12.21.0.2,这个ip地址是要记住方便后面使用的
创建一个kafka容器
这段代码有点长,根子自己改吧
# 启动kafka
docker run -d --name kafka1 --network zookeeper_network -p 9092:9092 -e KAFKA_BROKER_ID=0 -e KAFKA_ZOOKEEPER_CONNECT=<zookeeperIP地址>:2181 -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://<宿主机IP地址>:9092 -e KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 wurstmeister/kafka
解释
KAFKA_ZOOKEEPER_CONNECT 后面写的是我们的之前的网络的地址
KAFKA_ADVERTISED_LISTENERS=PLAINTEXT 我们的虚拟机(服务器)的本机的地址
不知道本机地址可以输入 ip addr来查看本机地址

这样子就搭建完成了
SpringBoot集成kafka
首先就是springboot和kafka的版本兼容了
Spring for Apache Kafka

然后我们引入两个kafka的依赖
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId> </dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>3.2.0</version> </dependency>
自己对着自己的版本来
自己看b站视频,9分钟就搞定了
63-kafka-集成-Java场景-SpringBoot_哔哩哔哩_bilibili
然后开始写我们的application.yml配置文件

下面是配置文件的全部+解析
其实和普通mq差不多
也就是配置生产者和消费者和一些过期时间超时时间
重点在于那个missing-topics-fatal
主题不存在的话,我们是否还要成功启动
我自己的写的默认的主题是test,但是我还没在kafka里面创建,kafka里面还没有这个叫test的主题
所以我启动的时候,报错然后失败了
spring:kafka:bootstrap-servers: 192.168.88.130:9092 #Kafka 集群的地址和端口号producer:acks: all #生产者发送消息时, Kafka 集群需要确认的确认级别。all 表示需要所有 broker 确认消息已经写入batch-size: 16384 #生产者在发送消息时, 会先缓存一些消息, 达到 batch-size 后再批量发送。这个参数设置了批量发送的大小。buffer-memory: 33554432 #生产者用于缓存消息的内存大小key-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。value-serializer: org.apache.kafka.common.serialization.StringSerializer #定义了消息 key 和 value 的序列化方式。retries: 0consumer:group-id: test #消费者组ID#消费方式: 在有提交记录的时候,earliest与latest是一样的,从提交记录的下一条开始消费# earliest:无提交记录,表示从最早的消息开始消费#latest:无提交记录,从最新的消息的下一条开始消费auto-offset-reset: earliest #当消费者没有提交过 offset 时, 从何处开始消费消息enable-auto-commit: true #是否自动提交偏移量offsetauto-commit-interval: 1s #前提是 enable-auto-commit=true。自动提交 offset 的间隔时间key-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式value-deserializer: org.apache.kafka.common.serialization.StringDeserializer #定义了消息 key 和 value 的反序列化方式max-poll-records: 2 #一次 poll 操作最多返回的消息数量properties:#如果在这个时间内没有收到心跳,该消费者会被踢出组并触发{组再平衡 rebalance}#消费者与 Kafka 服务端的会话超时时间session.timeout.ms: 120000#最大消费时间。此决定了获取消息后提交偏移量的最大时间,超过设定的时间(默认5分钟),服务端也会认为该消费者失效。踢出并再平衡#消费者调用 poll 方法的最大间隔时间max.poll.interval.ms: 300000#消费者发送请求到 Kafka 服务端的超时时间#配置控制客户端等待请求响应的最长时间。#如果在超时之前没有收到响应,客户端将在必要时重新发送请求,#或者如果重试次数用尽,则请求失败。request.timeout.ms: 60000#订阅或分配主题时,允许自动创建主题。0.11之前,必须设置falseallow.auto.create.topics: true#消费者向协调器发送心跳的间隔时间。#poll方法向协调器发送心跳的频率,为session.timeout.ms的三分之一heartbeat.interval.ms: 40000#每个分区里返回的记录最多不超max.partitions.fetch.bytes 指定的字节#0.10.1版本后 如果 fetch 的第一个非空分区中的第一条消息大于这个限制#仍然会返回该消息,以确保消费者可以进行#每个分区最多拉取的消息字节数。#max.partition.fetch.bytes=1048576 #1Mlistener:#当enable.auto.commit的值设置为false时,该值会生效;为true时不会生效#manual_immediate:需要手动调用Acknowledgment.acknowledge()后立即提交#ack-mode: manual_immediate 手动 ACK 的方式。#如果监听的主题不存在, 是否启动失败。missing-topics-fatal: false #如果至少有一个topic不存在,true启动失败。false忽略#消费方式, single 表示单条消费, batch 表示批量消费#type: single #单条消费?批量消费? #批量消费需要配合 consumer.max-poll-recordstype: batch#并发消费的线程数concurrency: 2 #配置多少,就为为每个消费者实例创建多少个线程。多出分区的线程空闲#默认的主题名称template:default-topic: "test"#springboot启动的端口号
server:port: 9999 #这个是java项目启动的端口
基本案例
这是常量类
指定了一个topic和group
主题和分组id
groupid是消费者组的唯一标识
这个视频9分钟看懂kafka
小朋友也可以懂的Kafka入门教程,还不快来学_哔哩哔哩_bilibili
生产者

我们这个Autowired自动注入,会根据我们的配置文件的配置来自动注入

@Autowiredprivate KafkaTemplate<String, String> kafkaTemplate;

produces里面指定我们前端传的是json格式

我们往这个标题发送我们的消息,其实这个就是我们的常量类里面写的"test"

消费者

@KafkaListener(topics = SpringBootKafkaConfig.TOPIC_TEST, groupId = SpringBootKafkaConfig.GROUP_ID)public void topic_test(List<String> messages, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {for (String message : messages) {//因为这个String是Json,所以我们可以转回Object对象,其实是转成JsonObject对象final JSONObject entries = JSONUtil.parseObj(message);System.out.println(SpringBootKafkaConfig.GROUP_ID + " 消费了: Topic:" + topic + ",Message:" + entries.getStr("name"));//ack.acknowledge();}}
我们用List<String>来接收,因为可能一个消费者接收多条消息

指定消费者监听的主题topic
以及指定消费者的唯一标识GROUP_ID
这些其实都是自己在常量类里面自己写好的
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
这个是得到我们的主题topic的名字

我用apifox调试之后,成功执行了

![]()
kafka的图形化工具
这里介绍一个免费的开源项目KafkaKing
Releases · Bronya0/Kafka-King (github.com)
里面还能指定中文

相关文章:
Docker搭建kafka+zookeeper以及Springboot集成kafka快速入门
参考文章 【Docker安装部署KafkaZookeeper详细教程】_linux arm docker安装kafka-CSDN博客 Docker搭建kafkazookeeper 打开我们的docker的镜像源配置 vim /etc/docker/daemon.json 配置 { "registry-mirrors": ["https://widlhm9p.mirror.aliyuncs.com"…...
【cocos2dx】【iOS工程】如何保存用户在游戏内的绘画数据,并将数据以图像形式展示在预览界面
【cocos2dx】【iOS工程】如何保存用户在应用内的操作数据,并将数据以图像形式展示在预览界面 设备/引擎:Mac(11.6)/Mac Mini 开发工具:Xcode(15.0.1) 开发需求:如何保存用户在应用…...
拥抱应用创新,拒绝无谓的模型竞争
💝💝💝欢迎来到我的博客,很高兴能够在这里和您见面!希望您在这里可以感受到一份轻松愉快的氛围,不仅可以获得有趣的内容和知识,也可以畅所欲言、分享您的想法和见解。 推荐:kwan 的首页,持续学…...
【源码+文档+调试讲解】旅游资源网站
摘 要 本论文主要论述了如何使用JAVA语言开发一个旅游资源网站 ,本系统将严格按照软件开发流程进行各个阶段的工作,采用B/S架构,面向对象编程思想进行项目开发。在引言中,作者将论述旅游资源网站的当前背景以及系统开发的目的&…...
Monaco 多行提示的实现方式
AI 代码助手最近太火爆,国内有模型厂商都有代码助手,代码助手是个比较典型的 AI 应用,主要看前端,后端的模型都差不多,国内外都有专门的代码模型。现在都是集中在 VSCode 和 Idea的插件,本文通过 Monaco 实…...
SpringMVC的架构有什么优势?——表单和数据校验(四)
#SpringMVC的架构有什么优势?——表单和数据校验(四) 前言 关键字: 机器学习 人工智能 AI chatGPT 学习 实现 使用 搭建 深度 python 事件 远程 docker mysql安全 技术 部署 技术 自动化 代码 文章目录 - - - - - 表单数据…...
Linux实战记录
踩坑实录: day2: 最坑:安装UB居然不知道创建文件夹。 1.虚拟机上不了网:多重置几次 网卡 2.Winscp链接主机: 用户名 就是 linux terminal中的 第一个用户名!...
时间、查找、打包、行过滤与指令的运行——linux指令学习(二)
前言:本节内容标题虽然为指令,但是并不只是讲指令, 更多的是和指令相关的一些原理性的东西。 如果友友只想要查一查某个指令的用法, 很抱歉, 本节不是那种带有字典性质的文章。但是如果友友是想要来学习的,…...
android CameraX构建相机拍照
Android CameraX 是一个 Jetpack 支持库,旨在简化相机应用的开发工作。它提供了一致且易用的API接口,适用于大多数Android设备,并可向后兼容至Android 5.0(API级别21)。 CameraX解决了在多种设备上实现相机功能时所遇…...
【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示 proteus仿真+程序+设计报告+讲解视频
【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 1.主要功能:讲解视频:2.仿真3. 程序代码4. 设计报告5. 设计资料内容清单&&下载链接资料下载链接: 【普中】基于51单片机的矩阵电子密码锁LCD1602液晶显示设计 ( proteus仿真…...
工厂水电燃气表流量计等能耗计量仪表非侵入式拍照抄表的方案
在企业园区、工厂等企事业单位,传统的手动抄表方式已逐渐不能满足现代化、信息化管理的需求。为了提高抄表工作的效率,减少人工操作的误差,同时保障数据的安全性和实时性,我们提出了拍照采集抄表方案。本方案旨在通过拍照的方式&a…...
LLM大模型应用中的安全对齐的简单理解
LLM大模型应用中的安全对齐的简单理解 随着人工智能技术的不断发展,大规模语言模型(如GPT-4)的应用越来越广泛。为了保证这些大模型在实际应用中的性能和安全性,安全对齐(Safe Alignment)成为一个重要的概…...
clickhouse-jdbc-bridge rce
clickhouse-jdbc-bridge 是什么 JDBC bridge for ClickHouse. It acts as a stateless proxy passing queries from ClickHouse to external datasources. With this extension, you can run distributed query on ClickHouse across multiple datasources in real time, whic…...
java中Comparator函数的用法实例?
在Java中,Comparator接口用于比较两个对象的顺序,常用于集合的排序。自Java 8开始,Comparator接口得到了增强,提供了许多默认方法,使得排序逻辑更加灵活和强大。下面将通过几个实例来展示Comparator的用法。 示例1&am…...
mysql实战入门-基础篇
目录 1、MySQL概述 1.1、数据库相关概念 1.2、MySQL数据库 1.2.1、版本 1.2.2、下载 1.2.3、安装 输入MySQL中root用户的密码,一定记得记住该密码 1.2.4、启动停止 1.2.5、客户端连接 1.2.6、数据模型 2、SQL 2.1、SQL通用语法 2.2、SQL分类 2.3、DDL 2.3.1、数据…...
阶段三:项目开发---民航功能模块实现:任务24:航空实时监控
任务描述 内 容:地图展示、飞机飞行轨迹、扇区控制。航空实时监控,是飞机每秒发送坐标,经过终端转换实时发送给塔台,为了飞机位置的精准度,传输位置的密度很大,在地图位置显示不明显。本次为了案例展示效…...
手机容器化 安装docker
旧手机-基于Termux容器化 1、安装app 在手机上安装Termux或ZeroTermux(Termux扩展) 1.1 切换源 注:可以将termux进行换源,最好采用国内源,例如:清华源等 更新包列表和升级包(可选࿰…...
科普文:深入理解Mybatis
概叙 (1) JDBC JDBC(Java Data Base Connection,java数据库连接)是一种用于执行SQL语句的Java API,可以为多种关系数据库提供统一访问,它由一组用Java语言编写的类和接口组成.JDBC提供了一种基准,据此可以构建更高级的工具和接口,使数据库开发人员能够编写数据库应用程序。 优点…...
称重传感器有哪些种类
有关称重传感器的知识,称重传感器是众多传感器产品中的一种,也是很常用的传感器之一,那么称重传感器有哪些种类,称重传感器的分类方式是什么样的,一起来了解下。 称重传感器的分类 主要有六种称重传感器类型…...
程序员鱼皮的保姆级写简历指南第四弹,优秀简历参考
大家好,我是程序员鱼皮。做知识分享这些年来,我看过太多简历、也帮忙修改过很多的简历,发现很多同学是完全不会写简历的、会犯很多常见的问题,不能把自己的优势充分展示出来,导致措施了很多面试机会,实在是…...
【配置 YOLOX 用于按目录分类的图片数据集】
现在的图标点选越来越多,如何一步解决,采用 YOLOX 目标检测模式则可以轻松解决 要在 YOLOX 中使用按目录分类的图片数据集(每个目录代表一个类别,目录下是该类别的所有图片),你需要进行以下配置步骤&#x…...
ardupilot 开发环境eclipse 中import 缺少C++
目录 文章目录 目录摘要1.修复过程摘要 本节主要解决ardupilot 开发环境eclipse 中import 缺少C++,无法导入ardupilot代码,会引起查看不方便的问题。如下图所示 1.修复过程 0.安装ubuntu 软件中自带的eclipse 1.打开eclipse—Help—install new software 2.在 Work with中…...
有限自动机到正规文法转换器v1.0
1 项目简介 这是一个功能强大的有限自动机(Finite Automaton, FA)到正规文法(Regular Grammar)转换器,它配备了一个直观且完整的图形用户界面,使用户能够轻松地进行操作和观察。该程序基于编译原理中的经典…...
Springboot社区养老保险系统小程序
一、前言 随着我国经济迅速发展,人们对手机的需求越来越大,各种手机软件也都在被广泛应用,但是对于手机进行数据信息管理,对于手机的各种软件也是备受用户的喜爱,社区养老保险系统小程序被用户普遍使用,为方…...
安卓基础(aar)
重新设置java21的环境,临时设置 $env:JAVA_HOME "D:\Android Studio\jbr" 查看当前环境变量 JAVA_HOME 的值 echo $env:JAVA_HOME 构建ARR文件 ./gradlew :private-lib:assembleRelease 目录是这样的: MyApp/ ├── app/ …...
HarmonyOS运动开发:如何用mpchart绘制运动配速图表
##鸿蒙核心技术##运动开发##Sensor Service Kit(传感器服务)# 前言 在运动类应用中,运动数据的可视化是提升用户体验的重要环节。通过直观的图表展示运动过程中的关键数据,如配速、距离、卡路里消耗等,用户可以更清晰…...
技术栈RabbitMq的介绍和使用
目录 1. 什么是消息队列?2. 消息队列的优点3. RabbitMQ 消息队列概述4. RabbitMQ 安装5. Exchange 四种类型5.1 direct 精准匹配5.2 fanout 广播5.3 topic 正则匹配 6. RabbitMQ 队列模式6.1 简单队列模式6.2 工作队列模式6.3 发布/订阅模式6.4 路由模式6.5 主题模式…...
深度学习水论文:mamba+图像增强
🧀当前视觉领域对高效长序列建模需求激增,对Mamba图像增强这方向的研究自然也逐渐火热。原因在于其高效长程建模,以及动态计算优势,在图像质量提升和细节恢复方面有难以替代的作用。 🧀因此短时间内,就有不…...
Java数值运算常见陷阱与规避方法
整数除法中的舍入问题 问题现象 当开发者预期进行浮点除法却误用整数除法时,会出现小数部分被截断的情况。典型错误模式如下: void process(int value) {double half = value / 2; // 整数除法导致截断// 使用half变量 }此时...
STM32HAL库USART源代码解析及应用
STM32HAL库USART源代码解析 前言STM32CubeIDE配置串口USART和UART的选择使用模式参数设置GPIO配置DMA配置中断配置硬件流控制使能生成代码解析和使用方法串口初始化__UART_HandleTypeDef结构体浅析HAL库代码实际使用方法使用轮询方式发送使用轮询方式接收使用中断方式发送使用中…...
