Spring Boot 使用 Disruptor 做内部高性能消息队列
这里写自定义目录标题
- 一 、背景
- 二 、Disruptor介绍
- 三 、Disruptor 的核心概念
- 3.1 Ring Buffer
- 3.2 Sequence Disruptor
- 3.3 Sequencer
- 3.4 Sequence Barrier
- 3.5 Wait Strategy
- 3.6 Event
- 3.7 EventProcessor
- 3.8 EventHandler
- 3.9 Producer
- 四、案例-demo
- 五、总结
一 、背景
工作中遇到项目使用Disruptor做消息队列,对你没看错,不是Kafka也不是rabbitmq。Disruptor有个最大的优点就是快,还有一点它是开源的哦,下面做个简单的记录。
二 、Disruptor介绍
- Disruptor 是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于 Disruptor 开发的系统单线程能支撑每秒 600 万订单,2010 年在 QCon 演讲后,获得了业界关注。;
- Disruptor是一个开源的Java框架,它被设计用于在生产者—消费者(producer-consumer problem,简称PCP)问题上获得尽量高的吞吐量(TPS)和尽量低的延迟;
- 从功能上来看,Disruptor 是实现了“队列”的功能,而且是一个有界队列。那么它的应用场景自然就是“生产者-消费者”模型的应用场合了;
- Disruptor是LMAX在线交易平台的关键组成部分,LMAX平台使用该框架对订单处理速度能达到600万TPS,除金融领域之外,其他一般的应用中都可以用到Disruptor,它可以带来显著的性能提升;
- 其实Disruptor与其说是一个框架,不如说是一种设计思路,这个设计思路对于存在“并发、缓冲区、生产者—消费者模型、事务处理”这些元素的程序来说,Disruptor提出了一种大幅提升性能(TPS)的方案;
Disruptor的github主页
三 、Disruptor 的核心概念
先从了解 Disruptor 的核心概念开始,来了解它是如何运作的。下面介绍的概念模型,既是领域对象,也是映射到代码实现上的核心对象。
3.1 Ring Buffer
如其名,环形的缓冲区。曾经 RingBuffer 是 Disruptor 中的最主要的对象,但从3.0版本开始,其职责被简化为仅仅负责对通过 Disruptor 进行交换的数据(事件)进行存储和更新。在一些更高级的应用场景中,Ring Buffer 可以由用户的自定义实现来完全替代。
3.2 Sequence Disruptor
通过顺序递增的序号来编号管理通过其进行交换的数据(事件),对数据(事件)的处理过程总是沿着序号逐个递增处理。一个 Sequence 用于跟踪标识某个特定的事件处理者( RingBuffer/Consumer )的处理进度。虽然一个 AtomicLong 也可以用于标识进度,但定义 Sequence 来负责该问题还有另一个目的,那就是防止不同的 Sequence 之间的CPU缓存伪共享(Flase Sharing)问题。
3.3 Sequencer
Sequencer 是 Disruptor 的真正核心。此接口有两个实现类 SingleProducerSequencer、MultiProducerSequencer ,它们定义在生产者和消费者之间快速、正确地传递数据的并发算法。
3.4 Sequence Barrier
用于保持对RingBuffer的 main published Sequence 和Consumer依赖的其它Consumer的 Sequence 的引用。Sequence Barrier 还定义了决定 Consumer 是否还有可处理的事件的逻辑。
3.5 Wait Strategy
定义 Consumer 如何进行等待下一个事件的策略。(注:Disruptor 定义了多种不同的策略,针对不同的场景,提供了不一样的性能表现)
3.6 Event
在 Disruptor 的语义中,生产者和消费者之间进行交换的数据被称为事件(Event)。它不是一个被 Disruptor 定义的特定类型,而是由 Disruptor 的使用者定义并指定。
3.7 EventProcessor
EventProcessor 持有特定消费者(Consumer)的 Sequence,并提供用于调用事件处理实现的事件循环(Event Loop)。
3.8 EventHandler
Disruptor 定义的事件处理接口,由用户实现,用于处理事件,是 Consumer 的真正实现。
3.9 Producer
即生产者,只是泛指调用 Disruptor 发布事件的用户代码,Disruptor 没有定义特定接口或类型。

四、案例-demo
通过下面8个步骤,你就能将Disruptor Get回家啦:
1、添加pom.xml依赖
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version>
</dependency>
2、消息体Model
/*** 消息体*/
@Data
public class MessageModel {private String message;
}
3、构造EventFactory
public class HelloEventFactory implements EventFactory<MessageModel> {@Overridepublic MessageModel newInstance() {return new MessageModel();}
}
4、构造EventHandler-消费者
@Slf4j
public class HelloEventHandler implements EventHandler<MessageModel> {@Overridepublic void onEvent(MessageModel event, long sequence, boolean endOfBatch) {try {//这里停止1000ms是为了确定消费消息是异步的Thread.sleep(1000);log.info("消费者处理消息开始");if (event != null) {log.info("消费者消费的信息是:{}",event);}} catch (Exception e) {log.info("消费者处理消息失败");}log.info("消费者处理消息结束");}
}
5、构造BeanManager
/*** 获取实例化对象*/
@Component
public class BeanManager implements ApplicationContextAware {private static ApplicationContext applicationContext = null;@Overridepublic void setApplicationContext(ApplicationContext applicationContext) throws BeansException {this.applicationContext = applicationContext;}public static ApplicationContext getApplicationContext() { return applicationContext; }public static Object getBean(String name) {return applicationContext.getBean(name);}public static <T> T getBean(Class<T> clazz) {return applicationContext.getBean(clazz);}
}
6、构造MQManager
@Configuration
public class MQManager {@Bean("messageModel")public RingBuffer<MessageModel> messageModelRingBuffer() {//定义用于事件处理的线程池, Disruptor通过java.util.concurrent.ExecutorSerivce提供的线程来触发consumer的事件处理ExecutorService executor = Executors.newFixedThreadPool(2);//指定事件工厂HelloEventFactory factory = new HelloEventFactory();//指定ringbuffer字节大小,必须为2的N次方(能将求模运算转为位运算提高效率),否则将影响效率int bufferSize = 1024 * 256;//单线程模式,获取额外的性能Disruptor<MessageModel> disruptor = new Disruptor<>(factory, bufferSize, executor,ProducerType.SINGLE, new BlockingWaitStrategy());//设置事件业务处理器---消费者disruptor.handleEventsWith(new HelloEventHandler());// 启动disruptor线程disruptor.start();//获取ringbuffer环,用于接取生产者生产的事件RingBuffer<MessageModel> ringBuffer = disruptor.getRingBuffer();return ringBuffer;}
7、构造Mqservice和实现类-生产者
public interface DisruptorMqService {/*** 消息* @param message*/void sayHelloMq(String message);
}@Slf4j
@Component
@Service
public class DisruptorMqServiceImpl implements DisruptorMqService {@Autowiredprivate RingBuffer<MessageModel> messageModelRingBuffer;@Overridepublic void sayHelloMq(String message) {log.info("record the message: {}",message);//获取下一个Event槽的下标long sequence = messageModelRingBuffer.next();try {//给Event填充数据MessageModel event = messageModelRingBuffer.get(sequence);event.setMessage(message);log.info("往消息队列中添加消息:{}", event);} catch (Exception e) {log.error("failed to add event to messageModelRingBuffer for : e = {},{}",e,e.getMessage());} finally {//发布Event,激活观察者去消费,将sequence传递给改消费者//注意最后的publish方法必须放在finally中以确保必须得到调用;如果某个请求的sequence未被提交将会堵塞后续的发布操作或者其他的producermessageModelRingBuffer.publish(sequence);}}
}
8、构造测试类及方法
@Slf4j
@RunWith(SpringRunner.class)
@SpringBootTest(classes = DemoApplication.class)
public class DemoApplicationTests {@Autowiredprivate DisruptorMqService disruptorMqService;/*** 项目内部使用Disruptor做消息队列* @throws Exception*/@Testpublic void sayHelloMqTest() throws Exception{disruptorMqService.sayHelloMq("消息到了,Hello world!");log.info("消息队列已发送完毕");//这里停止2000ms是为了确定是处理消息是异步的Thread.sleep(2000);}
}
测试运行结果
2020-04-05 14:31:18.543 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : record the message: 消息到了,Hello world!
2020-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.u.d.d.s.Impl.DisruptorMqServiceImpl : 往消息队列中添加消息:MessageModel(message=消息到了,Hello world!)
2020-04-05 14:31:18.545 INFO 7274 --- [ main] c.e.utils.demo.DemoApplicationTests : 消息队列已发送完毕
2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者处理消息开始
2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者消费的信息是:MessageModel(message=消息到了,Hello world!)
2020-04-05 14:31:19.547 INFO 7274 --- [pool-1-thread-1] c.e.u.d.disrupMq.mq.HelloEventHandler : 消费者处理消息结束
五、总结
其实 生成者 -> 消费者 模式是很常见的,通过一些消息队列也可以轻松做到上述的效果。不同的地方在于,Disruptor 是在内存中以队列的方式去实现的,而且是无锁的。这也是 Disruptor 为什么高效的原因。
相关文章:
Spring Boot 使用 Disruptor 做内部高性能消息队列
这里写自定义目录标题 一 、背景二 、Disruptor介绍三 、Disruptor 的核心概念3.1 Ring Buffer3.2 Sequence Disruptor3.3 Sequencer3.4 Sequence Barrier3.5 Wait Strategy3.6 Event3.7 EventProcessor3.8 EventHandler3.9 Producer 四、案例-demo五、总结 一 、背景 工作中遇…...
一、灵动mm32单片机_开发环境的搭建(Keil)
1、安装Keil MDK。 略。 2、安装芯片对应的Pack包。 (1)这里以MM32F0130单片机为例。 (2)进入灵动微电子官网。上海灵动微电子股份有限公司 (3)点击“支持”→“KEILPacl”。 (3)点击下载Pack包。 (4)下载后,解压下载的压缩包,找到对应的Pack包&…...
【5G PHY】5G SS/PBCH块介绍(二)
博主未授权任何人或组织机构转载博主任何原创文章,感谢各位对原创的支持! 博主链接 本人就职于国际知名终端厂商,负责modem芯片研发。 在5G早期负责终端数据业务层、核心网相关的开发工作,目前牵头6G算力网络技术标准研究。 博客…...
简单而高效:使用PHP爬虫从网易音乐获取音频的方法
概述 网易音乐是一个流行的在线音乐平台,提供了海量的音乐资源和服务。如果你想从网易音乐下载音频文件,你可能会遇到一些困难,因为网易音乐对其音频资源进行了加密和防盗链的处理。本文将介绍一种使用PHP爬虫从网易音乐获取音频的方法&…...
渗透测试工具-sqlmap使用
sqlmap是一个开源渗透测试的自动化工具,可以自动检测和利用SQL注入漏洞并接管数据库服务器。它配备了一个强大的检测引擎,许多用于终极渗透测试的利基功能,以及广泛的开关,包括数据库指纹识别、从数据库中获取数据、访问底层文件系…...
C# WPF: Imag图片填充方式有哪些?
C#和WPF中的图像填充方式 在WPF中,你可以使用Image控件来显示图像,并使用不同的填充方式来控制图像在控件中的显示方式。以下是一些常见的图像填充方式: Stretch(拉伸):这是默认的填充方式,它…...
uniapp开发小程序—根据生日日期计算年龄 周岁
0、需求 在UniApp开发小程序中,将接口返回的出生日期转化为年龄;判断接口返回的年龄是否是周岁 可以使用JavaScript的日期处理方法来实现。 一、第一种方式(示例代码): //javascript // 假设接口返回的年龄为生日的…...
windows下基于vscode的ssh服务远程连接ubuntu服务器
Ubuntu端配置 1.确保ubuntu端已启用ssh服务 首先,安装ssh服务 sudo apt-get install openssh-server 安装后,打开ssh服务 sudo service ssh start 如果显示有sshd就说明成功了。 判断是否成功打开 ps -e|grep ssh 同时也可以通过如下方式确保ss…...
OpenCV学习(二)——OpenCV中绘图功能
2. OpenCV中绘图功能2.1 画线2.2 画矩形2.3 画圆2.4 画多边形2.5 添加文本 2. OpenCV中绘图功能 绘图可以实现画线、画矩形、画圆、画多边形和添加文本等操作。 import cv2 import numpy as np# 读取图像 img cv2.imread(lena.jpg)# 画直线 cv2.line(img, (0, 0), (512, 512…...
业务架构、应用架构、技术架构、数据架构
架构规划的重要性 如果没有进行合理的架构规划,将会引发一系列的问题。为了避免这些问题的发生,企业需要进行业务架构、应用架构、技术架构和数据架构的全面规划和设计,以构建一个清晰、可持续发展的企业架构。 https://www.zhihu.com/que…...
独创改进 | RT-DETR 引入 Asymptotic Hybrid Encoder | 渐进混合特征解码结构
本专栏内容均为博主独家全网首发,未经授权,任何形式的复制、转载、洗稿或传播行为均属违法侵权行为,一经发现将采取法律手段维护合法权益。我们对所有未经授权传播行为保留追究责任的权利。请尊重原创,支持创作者的努力,共同维护网络知识产权。 文章目录 网络结构实验结果…...
SpringCloudAlibaba实战-nacos集群部署
写在前面:在学习阶段,我们想快速学习SpringCloudAlibaba功能,但总是花费大量时间跟着视频或博客做组件配置。由于版本的更迭,我们学习时的组件版本很可能和作者的不一致,又或者是各自环境不一,只能一坑又一…...
Elasticsearch安装IK分词器
ik分词包 参考博客、参考博客 将下载好的zip包解压,生成一个ik文件夹 将ik文件夹移动到ES安装目录下的plugins文件夹下(每台ES节点都要执行相同的操作) 重启ES集群 坑...
『51单片机』 DS1302时钟
🚩 WRITE IN FRONT 🚩 🔎 介绍:"謓泽"正在路上朝着"攻城狮"方向"前进四" 🔎🏅 荣誉:2021|2022年度博客之星物联网与嵌入式开发TOP5|TOP4、2021|2222年获评百大…...
ubuntu部署个人网盘nextCloud使用docker-compose方式
概述 当下各大网盘的容量都是有限制的,而且xx云不开会员网速就拉跨。 所以就想搭建一个自己的盘,并且可以控制用户的权限分组; nextCloud就很合适 我这边都是自己用偶尔给其他人使用下,所以直接docker部署了。 ubuntu版本&…...
【ChatGPT 01】ChatGPT基础科普
1. 从图灵测试到ChatGPT 1950年,艾伦•图灵(Alan Turing)发表论文**《计算机器与智能》( Computing Machinery and Intelligence),提出并尝试回答“机器能否思考”这一关键问题。在论文中,图灵提出了“模仿游戏”&…...
2317.操作后的最大异或和
非常好的一个位运算推公式题目 首先num[i]^x可以知道 这里可以变成任意一个数字 又有num[i]&上上面的数字 所以我们可以扣掉任意位的1把它变成0 答案让我们求异或和 所以只要这一位有1 答案的这一位就有1 我们发现这就是一个按位或运算 class Solution { public:int maxi…...
Python爬虫-经典案例详解
爬虫一般指从网络资源的抓取,通过Python语言的脚本特性,配置字符的处理非常灵活,Python有丰富的网络抓取模块,因而两者经常联系在一起Python就被叫作爬虫。爬虫可以抓取某个网站或者某个应用的内容提取有用的价值信息。有时还可以…...
【信创】银河麒麟V10 安装postgis
安装postGis步骤 1、安装 proj4 #tar -zxvf proj-4.8.0.tar.gz #cd proj-4.8.0 #mkdir -p /opt/proj-4.8.0 #./configure --prefix=/opt/proj-4.8.0 #make && make install #vi /etc/ld.so.conf.d/proj-4.8.0.conf #ldconfig 2、安装 geos #tar -xjf geos-3.6.1.tar.b…...
OpenCV常用功能——灰度处理和图像二值化处理
文章目录 一、灰度处理1.1 cvtColor函数 二、图像二值化处理2.1 全局阈值2.2 自适应阈值 一、灰度处理 1.1 cvtColor函数 函数原型: cv2.cvtColor(src, code[, dst[, dstCn]]) -> dst功能:转换图像颜色空间。 参数: src: 输入图像。co…...
【大模型RAG】拍照搜题技术架构速览:三层管道、两级检索、兜底大模型
摘要 拍照搜题系统采用“三层管道(多模态 OCR → 语义检索 → 答案渲染)、两级检索(倒排 BM25 向量 HNSW)并以大语言模型兜底”的整体框架: 多模态 OCR 层 将题目图片经过超分、去噪、倾斜校正后,分别用…...
Linux链表操作全解析
Linux C语言链表深度解析与实战技巧 一、链表基础概念与内核链表优势1.1 为什么使用链表?1.2 Linux 内核链表与用户态链表的区别 二、内核链表结构与宏解析常用宏/函数 三、内核链表的优点四、用户态链表示例五、双向循环链表在内核中的实现优势5.1 插入效率5.2 安全…...
三维GIS开发cesium智慧地铁教程(5)Cesium相机控制
一、环境搭建 <script src"../cesium1.99/Build/Cesium/Cesium.js"></script> <link rel"stylesheet" href"../cesium1.99/Build/Cesium/Widgets/widgets.css"> 关键配置点: 路径验证:确保相对路径.…...
PHP和Node.js哪个更爽?
先说结论,rust完胜。 php:laravel,swoole,webman,最开始在苏宁的时候写了几年php,当时觉得php真的是世界上最好的语言,因为当初活在舒适圈里,不愿意跳出来,就好比当初活在…...
【单片机期末】单片机系统设计
主要内容:系统状态机,系统时基,系统需求分析,系统构建,系统状态流图 一、题目要求 二、绘制系统状态流图 题目:根据上述描述绘制系统状态流图,注明状态转移条件及方向。 三、利用定时器产生时…...
现代密码学 | 椭圆曲线密码学—附py代码
Elliptic Curve Cryptography 椭圆曲线密码学(ECC)是一种基于有限域上椭圆曲线数学特性的公钥加密技术。其核心原理涉及椭圆曲线的代数性质、离散对数问题以及有限域上的运算。 椭圆曲线密码学是多种数字签名算法的基础,例如椭圆曲线数字签…...
NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...
UR 协作机器人「三剑客」:精密轻量担当(UR7e)、全能协作主力(UR12e)、重型任务专家(UR15)
UR协作机器人正以其卓越性能在现代制造业自动化中扮演重要角色。UR7e、UR12e和UR15通过创新技术和精准设计满足了不同行业的多样化需求。其中,UR15以其速度、精度及人工智能准备能力成为自动化领域的重要突破。UR7e和UR12e则在负载规格和市场定位上不断优化…...
【Linux】Linux安装并配置RabbitMQ
目录 1. 安装 Erlang 2. 安装 RabbitMQ 2.1.添加 RabbitMQ 仓库 2.2.安装 RabbitMQ 3.配置 3.1.启动和管理服务 4. 访问管理界面 5.安装问题 6.修改密码 7.修改端口 7.1.找到文件 7.2.修改文件 1. 安装 Erlang 由于 RabbitMQ 是用 Erlang 编写的,需要先安…...
篇章二 论坛系统——系统设计
目录 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 1. 数据库设计 1.1 数据库名: forum db 1.2 表的设计 1.3 编写SQL 2.系统设计 2.1 技术选型 2.2 设计数据库结构 2.2.1 数据库实体 通过需求分析获得概念类并结合业务实现过程中的技术需要&#x…...
