整合Spring Boot和Pulsar实现可扩展的消息处理
整合Spring Boot和Pulsar实现可扩展的消息处理
大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿!
在现代分布式系统中,消息队列是实现异步通信和解耦的重要组件。Apache Pulsar作为一个分布式消息流平台,具备高吞吐、低延迟、多租户支持等优势,是很多高性能消息处理场景的理想选择。本文将介绍如何在Spring Boot项目中整合Pulsar,实现可扩展的消息处理功能。
什么是Apache Pulsar
Apache Pulsar是一个开源的分布式消息流平台,支持多租户、多主题和持久化。Pulsar的架构包括Brokers、Bookies(Apache BookKeeper的存储节点)和ZooKeeper协调服务,提供了高可用性和高性能的消息传递和存储服务。
在Spring Boot中集成Pulsar
为了在Spring Boot项目中使用Pulsar,我们需要以下几个步骤:
- 添加Maven依赖
- 配置Pulsar客户端
- 创建消息生产者
- 创建消息消费者
1. 添加Maven依赖
首先,我们需要在pom.xml中添加Pulsar的依赖:
<dependencies><dependency><groupId>org.apache.pulsar</groupId><artifactId>pulsar-client</artifactId><version>2.9.1</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter</artifactId></dependency>
</dependencies>
2. 配置Pulsar客户端
接下来,我们需要创建一个配置类来初始化Pulsar客户端。创建一个名为PulsarConfig的配置类:
package cn.juwatech.config;import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class PulsarConfig {@Beanpublic PulsarClient pulsarClient() throws PulsarClientException {return PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();}
}
3. 创建消息生产者
我们需要一个消息生产者来发送消息到Pulsar。创建一个名为PulsarProducer的生产者类:
package cn.juwatech.producer;import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.ProducerBuilder;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;@Component
public class PulsarProducer {private final PulsarClient pulsarClient;private Producer<byte[]> producer;@Autowiredpublic PulsarProducer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;initProducer();}private void initProducer() {try {ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer();this.producer = producerBuilder.topic("my-topic").create();} catch (PulsarClientException e) {e.printStackTrace();}}public void sendMessage(String message) {try {producer.send(message.getBytes());} catch (PulsarClientException e) {e.printStackTrace();}}
}
4. 创建消息消费者
我们需要一个消息消费者来接收来自Pulsar的消息。创建一个名为PulsarConsumer的消费者类:
package cn.juwatech.consumer;import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;@Component
public class PulsarConsumer {private final PulsarClient pulsarClient;private Consumer<byte[]> consumer;@Autowiredpublic PulsarConsumer(PulsarClient pulsarClient) {this.pulsarClient = pulsarClient;}@PostConstructprivate void initConsumer() {try {this.consumer = pulsarClient.newConsumer().topic("my-topic").subscriptionName("my-subscription").subscribe();startConsumer();} catch (PulsarClientException e) {e.printStackTrace();}}private void startConsumer() {new Thread(() -> {while (true) {try {Message<byte[]> msg = consumer.receive();String message = new String(msg.getData());System.out.println("Received message: " + message);consumer.acknowledge(msg);} catch (PulsarClientException e) {e.printStackTrace();}}}).start();}
}
5. 测试Pulsar生产者和消费者
最后,我们编写一个简单的测试类来验证生产者和消费者的工作。创建一个名为PulsarTest的测试类:
package cn.juwatech;import cn.juwatech.producer.PulsarProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;@SpringBootApplication
public class PulsarApplication implements CommandLineRunner {@Autowiredprivate PulsarProducer pulsarProducer;public static void main(String[] args) {SpringApplication.run(PulsarApplication.class, args);}@Overridepublic void run(String... args) throws Exception {pulsarProducer.sendMessage("Hello, Pulsar!");}
}
运行上述代码后,您应该会在控制台上看到消费者接收到的消息。
总结
通过以上步骤,我们成功地在Spring Boot项目中整合了Pulsar,实现了可扩展的消息处理功能。Pulsar的高性能和可扩展性使其非常适合分布式系统中的消息传递和流处理。在实际项目中,可以根据需求进一步优化和扩展Pulsar的使用,例如配置不同的主题和分区、实现更复杂的消息处理逻辑等。
相关文章:
整合Spring Boot和Pulsar实现可扩展的消息处理
整合Spring Boot和Pulsar实现可扩展的消息处理 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在现代分布式系统中,消息队列是实现异步通信和解耦…...
如何给WPS、Word、PPT等办公三件套添加收费字体---方正仿宋GBK
1.先下载需要的字体。 下载字体的网站比较多,基本上都是免费的。随便在网上搜索一个就可以了,下面是下载的链接。 方正仿宋GBK字体免费下载和在线预览-字体天下 www.fonts.net.cn/font-31602268591.html 注意:切记不要商用,以免…...
《重构》读书笔记【第1章 重构,第一个示例,第2章 重构原则】
文章目录 第1章 重构,第一个示例1.1 重构前1.2 重构后 第2章 重构原则2.1 何谓重构2.2 两顶帽子2.3 为何重构2.4 何时重构2.5 重构和开发过程 第1章 重构,第一个示例 我这里使用的IDE是IntelliJ IDEA 1.1 重构前 plays.js export const plays {&quo…...
学会整理电脑,基于小白用户(无关硬件升级)
如果你不想进行硬件升级,就要学会进行整理维护电脑 基于小白用户,每一个操作点我都会在后续整理出流程,软件推荐会选择占用小且实用的软件 主要从三个角度去讨论【如果有新的内容我会随时修改,也希望有补充告诉我,我…...
使用ioDraw,AI绘图只需几秒钟!
只需几秒钟,就能将文字或图片转化为精准的思维导图、流程图、折线图、柱状图、饼图等各种图表! 思维导图 思维导图工具使用入口 文字转思维导图 将文本大纲或想法转换成可视化的思维导图,以组织和结构化您的想法。 图片转思维导图 从现有…...
Websocket解析及用法(封装一个通用订阅发布主题的webSocket类)
1、什么是WebSocket? websocket的目标是通过一个长连接实现与服务器全双工,双向的通信。是一种在单个TCP连接上进行全双工通信的协议,使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在 js中创建websocket…...
Foxit Reader(福昕阅读器)详细安装和使用教程
第一部分:Foxit Reader简介和基本信息 1.1 什么是Foxit Reader? Foxit Reader(福昕阅读器)是一款功能强大的PDF阅读和编辑软件,以其快速、轻巧和丰富的功能而闻名。它不仅支持常规的PDF阅读功能,还提供了…...
c++静态成员变量和静态成员函数
1)C入门级小知识,分享给将要学习或者正在学习C开发的同学。 2)内容属于原创,若转载,请说明出处。 3)提供相关问题有偿答疑和支持。 我们可以使用 static 关键字来把类成员定义为静态的。当我们声明类的成…...
视频共享融合赋能平台LntonCVS统一视频接入平台数字化升级医疗体系
医疗健康事关国计民生,然而,当前我国医疗水平的地区发展不平衡、医疗资源分布不均和医疗信息系统老化等问题,制约了整体服务能力和水平的提升。视频融合云平台作为推动数字医疗的关键工具,在医疗领域的广泛应用和普及,…...
Gin框架基础
1、一个简单的Gin示例 下载并安装Gin: go get -u github.com/gin-gonic/gin1.1 一个简单的例子 package mainimport ("net/http""github.com/gin-gonic/gin" )func main() {// 创建一个默认的路由引擎r : gin.Default()// 当客户端以GET方式访问 /hello…...
用GPT-4纠错GPT-4 OpenAI推出CriticGPT模型
根据OpenAI周四(6月27日)发布的新闻稿,该公司新推出了一个基于GPT-4的模型——CriticGPT,用于捕获ChatGPT代码输出中的错误。CriticGPT的作用相当于让人们用GPT-4来查找GPT-4的错误。该模型可以对ChatGPT响应结果做出批评评论&…...
SQL CASE WHEN语句的使用技巧
SQL CASE WHEN语句的使用技巧 大家好,我是免费搭建查券返利机器人省钱赚佣金就用微赚淘客系统3.0的小编,也是冬天不穿秋裤,天冷也要风度的程序猿! 在SQL查询中,经常需要根据不同的条件进行分支处理,这时就…...
虹科技术丨跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力
来源:虹科技术丨跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力 原文链接:虹科技术 | 跨越距离障碍:PCAN系列网关在远程CAN网络通信的应用潜力 欢迎关注虹科,为您提供最新资讯! #PCAN #网关 #CA…...
【UE 网络】RPC远程过程调用 入门篇
目录 0 引言1 RPC基本概念1.1 定义1.2 分类 2 RPC的使用2.1 Client RPC2.2 Server RPC2.3 Multicast RPC 🙋♂️ 作者:海码007📜 专栏:UE虚幻引擎专栏💥 标题:【UE 网络】RPC远程过程调用 入门篇❣️ 寄语…...
安装maven与nexus
安装maven与nexus Maven官网下载地址:http://maven.apache.org cd /data/software/wget https://mirrors.tuna.tsinghua.edu.cn/apache/maven/maven-3/3.8.1/binaries/apache-maven-3.8.8-bin.tar.gz# 解压 tar xf apache-maven-3.8.1-bin.tar.gz -C /opt/[rooth…...
如何用DCA1000持续采集雷达数据
摘要:本文介绍一下如何通过mmwave studio软件,搭配DCA1000数据采集卡,对AWR1843BOOST进行不间断的数据采集。本文要求读者已经掌握了有关基础知识。 本文开放获取,无需关注。 到SensorConfig页面下,一步步操作…...
怎么用JavaScript写爬虫
随着互联网技术的不断发展,爬虫(web crawler)已经成为当前最热门的爬取信息方式之一。通过爬虫技术,我们可以轻松地获取互联网上的数据,并用于数据分析、挖掘、建模等多个领域。而javascript语言则因其强大的前端开发工…...
Leetcode 3203. Find Minimum Diameter After Merging Two Trees
Leetcode 3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路2. 代码实现 题目链接:3203. Find Minimum Diameter After Merging Two Trees 1. 解题思路 这一题的话算是一个拓扑树的题目?总之就是从树的叶子节点不断向上遍历ÿ…...
【抽代复习笔记】24-群(十八):循环群的两道例题
例1:证明: (1)三次交错群A3是循环群,它与(Z3,)同构,其中Z3 {[0],[1],[2]}; (2)G {1,i,-1,-i},G上的代数运算是数的乘法,则G是一个循环群&…...
Linux常见操作问题
1、登录刚创建的用户,无法操作。 注:etc/passwd文件是Linux操作系统中存储用户账户信息的文本文件,包含了系统中所有用户的基本信息,比如用户名、用户ID、用户组ID、用户家目录路径。 注:etc: 这个目录存放所有的系统…...
MySQL 隔离级别:脏读、幻读及不可重复读的原理与示例
一、MySQL 隔离级别 MySQL 提供了四种隔离级别,用于控制事务之间的并发访问以及数据的可见性,不同隔离级别对脏读、幻读、不可重复读这几种并发数据问题有着不同的处理方式,具体如下: 隔离级别脏读不可重复读幻读性能特点及锁机制读未提交(READ UNCOMMITTED)允许出现允许…...
在rocky linux 9.5上在线安装 docker
前面是指南,后面是日志 sudo dnf config-manager --add-repo https://download.docker.com/linux/centos/docker-ce.repo sudo dnf install docker-ce docker-ce-cli containerd.io -y docker version sudo systemctl start docker sudo systemctl status docker …...
YSYX学习记录(八)
C语言,练习0: 先创建一个文件夹,我用的是物理机: 安装build-essential 练习1: 我注释掉了 #include <stdio.h> 出现下面错误 在你的文本编辑器中打开ex1文件,随机修改或删除一部分,之后…...
深入理解JavaScript设计模式之单例模式
目录 什么是单例模式为什么需要单例模式常见应用场景包括 单例模式实现透明单例模式实现不透明单例模式用代理实现单例模式javaScript中的单例模式使用命名空间使用闭包封装私有变量 惰性单例通用的惰性单例 结语 什么是单例模式 单例模式(Singleton Pattern&#…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院查看报告小程序
一、开发环境准备 工具安装: 下载安装DevEco Studio 4.0(支持HarmonyOS 5)配置HarmonyOS SDK 5.0确保Node.js版本≥14 项目初始化: ohpm init harmony/hospital-report-app 二、核心功能模块实现 1. 报告列表…...
【Oracle】分区表
个人主页:Guiat 归属专栏:Oracle 文章目录 1. 分区表基础概述1.1 分区表的概念与优势1.2 分区类型概览1.3 分区表的工作原理 2. 范围分区 (RANGE Partitioning)2.1 基础范围分区2.1.1 按日期范围分区2.1.2 按数值范围分区 2.2 间隔分区 (INTERVAL Partit…...
Element Plus 表单(el-form)中关于正整数输入的校验规则
目录 1 单个正整数输入1.1 模板1.2 校验规则 2 两个正整数输入(联动)2.1 模板2.2 校验规则2.3 CSS 1 单个正整数输入 1.1 模板 <el-formref"formRef":model"formData":rules"formRules"label-width"150px"…...
OPenCV CUDA模块图像处理-----对图像执行 均值漂移滤波(Mean Shift Filtering)函数meanShiftFiltering()
操作系统:ubuntu22.04 OpenCV版本:OpenCV4.9 IDE:Visual Studio Code 编程语言:C11 算法描述 在 GPU 上对图像执行 均值漂移滤波(Mean Shift Filtering),用于图像分割或平滑处理。 该函数将输入图像中的…...
佰力博科技与您探讨热释电测量的几种方法
热释电的测量主要涉及热释电系数的测定,这是表征热释电材料性能的重要参数。热释电系数的测量方法主要包括静态法、动态法和积分电荷法。其中,积分电荷法最为常用,其原理是通过测量在电容器上积累的热释电电荷,从而确定热释电系数…...
淘宝扭蛋机小程序系统开发:打造互动性强的购物平台
淘宝扭蛋机小程序系统的开发,旨在打造一个互动性强的购物平台,让用户在购物的同时,能够享受到更多的乐趣和惊喜。 淘宝扭蛋机小程序系统拥有丰富的互动功能。用户可以通过虚拟摇杆操作扭蛋机,实现旋转、抽拉等动作,增…...
