Springboot整合Kafka消息队列服务实例
一、Kafka相关概念
1、关于Kafka的描述
Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处理。日志类的数据需要高吞吐量的性能要求,对于像Hadoop一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息。
2、关于Kafka的功能特点
- 通过磁盘数据结构提供消息的持久化,消息存储也能够保持长时间稳定性;
- 高吞吐量,即使是非常普通的硬件Kafka也可以支持每秒超高的并发量;
- 支持通过Kafka服务器和消费机集群来分区消息;
- 支持Hadoop并行数据加载;
- API包封装的非常好,简单易用,上手快 ;
- 分布式消息队列。Kafka对消息保存时根据Topic(主题)进行归类,发送消息者称为Producer(生产者),消息接受者称为Consumer(消费者);
3、Kafka消息功能
如下图所示,Kafka作为一个中间服务,代表一个broker(经纪人)角色,负责接收APP的消费与推送消息给其他相关APP。这里APP可分为Producer,Consumer。

消息的消费模式
点对点模式:点对点模式通常是一个基于拉取或者轮询的消息传递模型,消费者主动拉取数据,消息收到后从队列移除消息,这种模型不是将消息推送到客户端,而是从队列中请求消息。特点是发送到队列的消息被一个且只有一个消费者接收处理,即使有多个消费者监听队列也是如此。
发布订阅模式:订阅模式是一个基于推送的消费传送模型,消息产生后,Kafka会推送给所有订阅相关Topic的订阅者。发布订阅模型可以有多种不同的订阅者,临时订阅者只在主动监听主题时才接收消息,而持久订阅者则监听主题的所有消息,即使当前订阅者不可用,处于离线状态。
4、Kafka消息队列的作用
- 应用程序之间解耦,生产者与消费者相互独立,各自异步执行。
- 消息数据持久化存储,直到所有消息都被消费,规避消息数据丢失的风险。
- 流量削峰,使用Kafka消息队列可以帮助server承接访问压力,尽可能避免应用程序崩溃。
- 降低进程间的耦合度,系统部分应用组件发生崩溃时,不会影响到整体系统的运行。
- 保证消息顺序执行,解决特定场景业务需求。
5、Kafka相关术语介绍
- Broker
一台kafka服务器就是一个broker(经纪人)。一个集群由多个broker组成。一个broker可以容纳多个topic(消息主题)。
- Producer
消息生产者,就是向kafka broker发消息的APP客户端。
- Consumer
消息消费者,向kafka broker取消息的APP客户端。
- Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic,可以理解为一个队列。
- Consumer Group
每个Consumer属于一个特定的Consumer Group,可为每个Consumer指定group name,若不指定group name则属于默认的分组。
- Partition
一个庞大大的topic可以分布到多个broker上,一个topic可以分为多个partition,每个partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id。kafka只保证按一个partition中的顺序将消息发给consumer,不保证一个topic的整体的顺序。Partition是物理上的概念,方便在集群中扩展,提高并发。
二、liunx系统下搭建Kafka环境
--新建kafka应用目录。并下载到当前目录下
cd /usr/localmkdir kafkacd kafka
--下载wget https://downloads.apache.org/kafka/3.7.0/kafka-3.7.0-src.tgz--解压tar -zxvf kafka-3.7.0-src.tgz--启动服务cd kafka-3.7.0./bin/kafka-server-start.sh config/server.properties--查看服务ps -aux |grep kafka--开放kafka地址端口vim server.properties--添加下面注释advertised.listeners=PLAINTEXT://10.98.3.22:9092

三、Springboot2整合Kafka 服务
1、导入基础依赖
<!-- SpringBoot依赖 -->
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- kafka 依赖 -->
<dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>2.2.4.RELEASE</version>
</dependency>
2、项目目录结构

3、生产者与消费者yml文件配置
#消费者配置
spring:kafka:bootstrap-servers: 127.0.0.1:9092consumer:group-id: test-consumer-group#生产者配置spring:kafka:bootstrap-servers: 127.0.0.1:9092
4、生成消息
@RestController
@RequestMapping("/kafka")
public class ProducerController {@Resourceprivate KafkaTemplate<String, String> kafkaTemplate;@RequestMapping("/send")public HttpResult sendMsg () {MsgLog msgLog = new MsgLog(1,"消息生成",1,"消息日志",new Date()) ;String msg = JSON.toJSONString(msgLog) ;// 这里Topic如果不存在,会自动创建kafkaTemplate.send("cicada-topic", msg);return HttpResult.create(HttpStatus.SUCCESS,msg);}
}
@Component
public class ConsumerMsg {private static Logger LOGGER = LoggerFactory.getLogger(ConsumerMsg.class);//此注解是监听主题为cicada-topic的消息队列@KafkaListener(topics = "cicada-topic")public void listenMsg (ConsumerRecord<?,String> record) {String value = record.value();LOGGER.info("ConsumerMsg====>>"+value);}
}
相关文章:
Springboot整合Kafka消息队列服务实例
一、Kafka相关概念 1、关于Kafka的描述 Kafka是由Apache开源,具有分布式、分区的、多副本的、多订阅者,基于Zookeeper协调的分布式处理平台,由Scala和Java语言编写。通常用来搜集用户在应用服务中产生的动作日志数据,并高速的处…...
kotlin——MVVM框架下的大型项目优化
目录 概要 优化思路 一、重构过长的Activity 二、优化臃肿的ViewModel 示例代码: 概要 在大型项目中,随着项目越做越大,activity和viewmodel的代码会越来越多,尽量保持Activity和ViewModel的代码精简和易于维护是非常重要的。个人…...
echarts实现折线图点击添加标记
文章目录 背景一、代码示例 背景 业务场景体现在功能层面主要两点, 折线图表设置点击事件点击事件与图标渲染标记绑定 对于节点没有被添加标记的可以,弹框提示添加标记,并提供标记内容输入框,已经添加过标记的点,点…...
循环赛日程表
描述 n 2 ^ k个选手 每个选手必须与其他n-1个选手各赛一次每个选手一天赛一次比赛打n-1天 思路 k 3时的解 我们先进行假设:每个选手第一天和自己比,然后分解成4个子问题: (1)14号的第14天,对手1~4号; (2)14号的第58天&a…...
计算机网络:运输层 - 概述
计算机网络:运输层 - 概述 运输层的任务端口号复用与分用UDP协议首部格式 TCP协议面向字节流 运输层的任务 物理层、数据链路层以及网络层,他们共同解决了将主机通过网络互联起来所面临的问题,实现了主机到主机的通信。 网络层的作用范围是…...
使用阿里开源的Spring Cloud Alibaba AI开发第一个大模型应用
背景 前段时间看到Spring推出了SpringAI,可以方便快速的接入ChatGPT等国外的大模型,现在阿里巴巴也紧追脚步推出了Spring Cloud Alibaba AI,Spring Cloud Alibaba AI 目前基于 Spring AI 0.8.1 版本 API 完成通义系列大模型的接入。通义接入…...
`THREE.PointsMaterial` 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性,比如颜色、大小和透明度。
demo案例 THREE.PointsMaterial 是 Three.js 中用于创建粒子系统材质的类。它允许你设置粒子系统的外观属性,比如颜色、大小和透明度。下面是对其构造函数的参数、属性和方法的详细讲解。 构造函数 const material new THREE.PointsMaterial(parameters);参数&am…...
Android-Android Studio-FAQ
1 需求 2 接口 3 Android Studio xml布局代码补全功能失效问题 最终解决方案就是尝试修改compileSdk 为不同SDK版本来解决问题,将原本34修改为32测试会发现xml代码补全功能有效了! 参考资料 Android Studio xml布局代码补全功能失效问题_android studi…...
架构师指南:现代 Datalake 参考架构
这篇文章的缩写版本于 2024 年 3 月 26 日出现在 The New Stack 上。 旨在最大化其数据资产的企业正在采用可扩展、灵活和统一的数据存储和分析方法。这一趋势是由企业架构师推动的,他们的任务是制定符合不断变化的业务需求的基础设施。现代数据湖体系结构通过将数…...
通讯协议大全(UART,RS485,SPI,IIC)
参考自: 常见的通讯协议总结(USART、IIC、SPI、485、CAN)-CSDN博客 UART那么好用,为什么单片机还需要I2C和SPI?_哔哩哔哩_bilibili 5分钟看懂!串口RS232 RS485最本质的区别!_哔哩哔哩_bilibili 喜欢几位…...
基于EXCEL数据表格创建省份专题地图
1 数据源 随着西藏于5月1日发布2022年一季度经济运行情况,31省份一季度GDP数据已全部出炉。 总量方面,粤苏鲁稳居前三;增速方面,23省份高于“全国线”,新疆表现最佳,增速达到7.0%。 表格表现数据不够直观…...
基于java+springboot+vue实现的电商应用系统(文末源码+Lw)241
摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存储,归纳,集中处理数据信息的管理方式。本电商应用系统就是在这样的大环境下诞生,其可以帮助管理者在短时间内处理完毕庞大的数据信息&a…...
好文!12个策略解决 Kafka 数据丢失问题
哥们儿!有遇到Kafka数据丢失问题的问题吗,你是如何解决的?今天的文章,V哥来详细解释一下,整理了12种解决策略,希望可以帮助你解决项目中的问题:以下是一些常见的解决方案和最佳实践。 生产者确认…...
Android 第三方框架:网络:OkHttp:源码分析:拦截器
文章目录 涉及到的设计模式 责任链模式:ArrayList策略模式:Interceptor和XXXInterceptor源码分析API总结涉及到的设计模式 责任链模式:ArrayList ArrayList 用ArrayList作为保存顺序的数据结构 把系统提供的各种Interceptor和自定义的Interceptor放入ArrayList中 RealI…...
FlowUs AI的使用教程和使用体验
FlowUs AI 使用教程 FlowUs AI特点使其成为提升个人和团队生产力的有力工具,无论是在学术研究、内容创作、技术开发还是日常办公中都能发挥重要作用。现在来看看如何使用FlowUs AI吧! 注册与登录:首先,确保您已经注册并登录FlowU…...
SwiftUI 6.0(iOS 18)ScrollView 全新的滚动位置(ScrollPosition)揭秘
概览 在只有方寸之间大小的手持设备上要想体面的向用户展示海量信息,滚动视图(ScrollView)无疑是绝佳的“东牀之选”。 在 SwiftUI 历史的长河中,总觉得苹果对于 ScrollView 视图功能的升级是在“挤牙膏”。这不,在本…...
阿贝云免费虚拟主机和免费云服务器评测
阿贝云是一家提供免费虚拟主机和免费云服务器的服务商,为用户提供了一个便捷的搭建网站和应用的平台。他们的服务受到了很多用户的好评。用户可以轻松地在阿贝云上创建自己的网站,并享受免费的虚拟主机和云服务器。通过阿贝云的服务,用户可以…...
不懂就问,开通小程序地理位置接口有那么难吗?
小程序地理位置接口有什么功能? 若提审后被驳回,理由是“当前提审小程序代码包中地理位置相关接口( chooseAddress、getLocation )暂未开通,建议完成接口开通后或移除接口相关内容后再进行后续版本提审”,那么遇到这种情况&#x…...
Python 全栈系列256 异步任务与队列消息控制(填坑)
说明 每个创新都会伴随着一系列的改变。 在使用celery进行异步任务后,产生的一个问题恰好也是因为异步产生的。 内容 1 问题描述 我有一个队列 stream1, 对应的worker1需要周期性的获取数据,对输入的数据进行模式识别后分流。worker1我设施为10秒运行…...
从零开始的Ollama指南:部署私域大模型
大模型相关目录 大模型,包括部署微调prompt/Agent应用开发、知识库增强、数据库增强、知识图谱增强、自然语言处理、多模态等大模型应用开发内容 从0起步,扬帆起航。 大模型应用向开发路径:AI代理工作流大模型应用开发实用开源项目汇总大模…...
【Python】 -- 趣味代码 - 小恐龙游戏
文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...
C++实现分布式网络通信框架RPC(3)--rpc调用端
目录 一、前言 二、UserServiceRpc_Stub 三、 CallMethod方法的重写 头文件 实现 四、rpc调用端的调用 实现 五、 google::protobuf::RpcController *controller 头文件 实现 六、总结 一、前言 在前边的文章中,我们已经大致实现了rpc服务端的各项功能代…...
DAY 47
三、通道注意力 3.1 通道注意力的定义 # 新增:通道注意力模块(SE模块) class ChannelAttention(nn.Module):"""通道注意力模块(Squeeze-and-Excitation)"""def __init__(self, in_channels, reduction_rat…...
MMaDA: Multimodal Large Diffusion Language Models
CODE : https://github.com/Gen-Verse/MMaDA Abstract 我们介绍了一种新型的多模态扩散基础模型MMaDA,它被设计用于在文本推理、多模态理解和文本到图像生成等不同领域实现卓越的性能。该方法的特点是三个关键创新:(i) MMaDA采用统一的扩散架构…...
linux 下常用变更-8
1、删除普通用户 查询用户初始UID和GIDls -l /home/ ###家目录中查看UID cat /etc/group ###此文件查看GID删除用户1.编辑文件 /etc/passwd 找到对应的行,YW343:x:0:0::/home/YW343:/bin/bash 2.将标红的位置修改为用户对应初始UID和GID: YW3…...
HTML前端开发:JavaScript 常用事件详解
作为前端开发的核心,JavaScript 事件是用户与网页交互的基础。以下是常见事件的详细说明和用法示例: 1. onclick - 点击事件 当元素被单击时触发(左键点击) button.onclick function() {alert("按钮被点击了!&…...
3403. 从盒子中找出字典序最大的字符串 I
3403. 从盒子中找出字典序最大的字符串 I 题目链接:3403. 从盒子中找出字典序最大的字符串 I 代码如下: class Solution { public:string answerString(string word, int numFriends) {if (numFriends 1) {return word;}string res;for (int i 0;i &…...
[Java恶补day16] 238.除自身以外数组的乘积
给你一个整数数组 nums,返回 数组 answer ,其中 answer[i] 等于 nums 中除 nums[i] 之外其余各元素的乘积 。 题目数据 保证 数组 nums之中任意元素的全部前缀元素和后缀的乘积都在 32 位 整数范围内。 请 不要使用除法,且在 O(n) 时间复杂度…...
Java多线程实现之Thread类深度解析
Java多线程实现之Thread类深度解析 一、多线程基础概念1.1 什么是线程1.2 多线程的优势1.3 Java多线程模型 二、Thread类的基本结构与构造函数2.1 Thread类的继承关系2.2 构造函数 三、创建和启动线程3.1 继承Thread类创建线程3.2 实现Runnable接口创建线程 四、Thread类的核心…...
Web 架构之 CDN 加速原理与落地实践
文章目录 一、思维导图二、正文内容(一)CDN 基础概念1. 定义2. 组成部分 (二)CDN 加速原理1. 请求路由2. 内容缓存3. 内容更新 (三)CDN 落地实践1. 选择 CDN 服务商2. 配置 CDN3. 集成到 Web 架构 …...
