当前位置: 首页 > news >正文

单机无锁线程安全队列-Disruptor

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>4.0.0</version></dependency>

定义一个商品

@Data
public class Goods {private String name;}

定义生产者

public class Producer {private final RingBuffer<Goods> ringBuffer;public Producer(RingBuffer<Goods> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 生产货品* @param goodsName*/public void onData(String goodsName) {long sequence = ringBuffer.next();try {Goods goods = ringBuffer.get(sequence);goods.setName(goodsName);} finally {ringBuffer.publish(sequence);}}
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{private String name;public Consumer(String name){this.name = name;}@Overridepublic void onEvent(Goods goods, long l, boolean b)  {//消费者接收到货品System.out.println(name+"消费了"+goods.getName());}@Overridepublic void onBatchStart(long batchSize, long queueDepth) {EventHandler.super.onBatchStart(batchSize, queueDepth);}@Overridepublic void onStart() {EventHandler.super.onStart();}@Overridepublic void onShutdown() {EventHandler.super.onShutdown();}@Overridepublic void onTimeout(long sequence) throws Exception {EventHandler.super.onTimeout(sequence);}@Overridepublic void setSequenceCallback(Sequence sequenceCallback) {EventHandler.super.setSequenceCallback(sequenceCallback);}
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.SINGLE,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//单生产者,单消费者disruptor.handleEventsWith(new Consumer("Consumer1"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//多个消费者按顺序消费disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3")).then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer1 = new Producer(ringBuffer);Producer producer2 = new Producer(ringBuffer);Producer producer3 = new Producer(ringBuffer);while (true){producer1.onData("goods"+UUID.randomUUID());producer2.onData("goods"+UUID.randomUUID());producer3.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~

相关文章:

单机无锁线程安全队列-Disruptor

Disruptor 1、基本介绍 说到队列&#xff0c;除了常见的mq中间件&#xff0c;java中也自带线程安全的BlockingQueue&#xff0c;但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作&#xff0c;性能上会大打折扣。 而Disruptor是一个线程安全、低延迟、吞吐量高的队…...

好工具知多少:国内外最常用的SCADA软件

随着现代SCADA系统的发展&#xff0c;工业自动化取得了巨大的飞跃。如今&#xff0c;监控和数据采集&#xff08;SCADA&#xff09;系统已成为工业过程的重要组成部分。这些系统使操作员能够实时监控和控制复杂的系统。 SCADA系统正在广泛的行业中发挥着至关重要的作用&#x…...

SQL Server 2016(创建数据库)

1、实验环境。 某公司有一台已经安装了SQL Server 2016的服务器&#xff0c;现在需要新建数据库。 2、需求描述。 创建一个名为"db_class"的数据库&#xff0c;数据文件和日志文件初始大小设置为10MB&#xff0c;启用自动增长&#xff0c;数据库文件存放路径为C:\db…...

Vue学习计划--Vue2(一)简单了解vue

Vue2的终止支持时间为2023年12月31日。 在这个矛盾的时间点&#xff0c;还是决定先把vue2的笔记放出来&#xff0c;在Vue2完结后再把Vue3的笔记补上。这样呢&#xff0c;2和3都不落下&#xff0c;也算是来一个启承的作用吧。在工作中呢&#xff0c;旧的项目可以维护&#xff0c…...

微信小程序生成二维码并保存到本地方法

微信小程序生成二维码请保存到本地方法 官方weapp-qrcode插件 github链接 功能完成样子 wxml <view class"qrcode"><canvas style"width: 275px; height: 275px;" canvas-idmyQrcode></canvas> </view> <view class" …...

shell_exec 和 exec区别

shell_exec 和 exec 都是用于在 PHP 中执行系统命令的函数&#xff0c;但它们之间有一些区别。 返回值类型&#xff1a;shell_exec 函数返回命令的输出结果作为字符串&#xff0c;而 exec 函数将输出结果存储在数组中。 输出结果&#xff1a;shell_exec 函数返回命令的完整输出…...

WPF创建进度条

使用wpf做一个原生的进度条&#xff0c;进度条上面有值&#xff0c;先看效果。 功能就是点击按钮&#xff0c;后台处理数据&#xff0c;前台显示处理数据的变化&#xff0c;当然还可以对进度条进行美化和关闭的操作&#xff0c;等待后台处理完毕数据&#xff0c;然后自动关闭。…...

全网最新最全面的Appium自动化:Appium常用操作之混合应用webview页面操作--待补充!

上下文操作&#xff1a; 在appium中&#xff0c;对于混合应用&#xff0c;需要进行WebView页面和原生应用的切换 常用的方法如下&#xff1a; 1、context(self) / current_context(self)&#xff1a;返回当前会话的当前上下文&#xff0c;context可以理解为可进入的窗口。对于…...

基于OpenCV+YOLOv5实现车辆跟踪与计数(附源码)

导 读 本文主要介绍基于OpenCVYOLOv5实现车辆跟踪与计数的应用&#xff0c;并给出源码。 资源下载 基础代码和视频下载地址&#xff1a; https://github.com/freedomwebtech/win11vehiclecount main.py代码:​​​​​​​ import cv2import torchimport numpy as npfrom tr…...

05、pytest断言确定的异常

官方用例 # content of test_sysexit.py import pytestdef f():raise SystemExit(1)def test_mytest():with pytest.raises(SystemExit):f()解读与实操 ​ 标准python raise函数可产生异常。pytest.raises可以断言某个异常会发现。异常发生了&#xff0c;用例执行成功&#x…...

金蝶云星空单据编辑界面,不允许批量填充操作

文章目录 金蝶云星空单据编辑界面&#xff0c;不允许批量填充操作案例演示开发设计测试 金蝶云星空单据编辑界面&#xff0c;不允许批量填充操作 案例演示 售后单&#xff0c;明细信息单据体&#xff0c;物料编码字段禁止批量填充。 开发设计 编写表单插件&#xff0c;在Be…...

Springboot项目启动成功后可通过五种方式继续执行

实现CommandLineRunner接口 项目初始化完毕后&#xff0c;才会调用方法&#xff0c;提供服务 Component public class StartRunner implements CommandLineRunner {Overridepublic void run(String... args) throws Exception {System.out.println("CommandLineRunner&qu…...

什么是供应链金融分账系统?

一、供应链金融的重要性 供应链金融在很多行业都是要用到,比如在抖音,快手店铺的商家资金回笼,通常需要7-21天的回款周期,这对于商家的周转来说是一件很困难的事情,在供应链金融中&#xff0c;分账就扮演着至关重要的角色&#xff0c;不仅是金融流程中的一环&#xff0c;更是保…...

【测绘程序设计】——坐标换带与高程投影

测绘工程中经常遇到 “坐标换带” 与 “高程投影” 问题,前者是在改变投影的分带号——即投影的中央子午线,通过 “(x,y)->(B,L)->(x,y)” 进行;而后者则是为减小投影变形(高程投影变短、高斯投影变长,详情可参考博客《测绘综合能力》真题易错本 第(37)条)通过平…...

企业计算机服务器中了Mallox勒索病毒如何解密,Mallox勒索病毒数据恢复

随着计算机技术的不断应用与发展&#xff0c;网络为企业的生产运营提供了极大帮助&#xff0c;越来越多的企业开始利用网络办公&#xff0c;因此&#xff0c;随之而来的网络安全威胁也在不断增加。近期&#xff0c;云天数据恢复中心陆续接到很多企业的求助&#xff0c;企业的计…...

一套rk3588 rtsp服务器推流的 github 方案及记录 -01

我不生产代码&#xff0c;我只是代码的搬运工&#xff0c;相信我&#xff0c;看完这个文章你的图片一定能变成流媒体推出去。 诉求&#xff1a;使用opencv拉流&#xff0c;转成bgr数据&#xff0c;需要把处理后的数据&#xff08;BGR&#xff09;编码成264&#xff0c;然后推流…...

PyQt6 QComboBox下拉组合框控件

​锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计34条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话…...

常用类与比较器

常用类 学一个类&#xff0c;先搞清楚继承关系&#xff0c;再看源码 包装类Wrapper jdk5之前是手动装箱拆箱 jdk5及之后是自动装箱拆箱&#xff08;调用valueOf方法&#xff08;自动默认&#xff09;/创建对象的构造方法&#xff0c;XXXvalue方法…...

【上海大学《面向对象程序设计A》课程小项目报告】抽象向量类模板及其派生类

1 项目内容及要求 本项目通过设计一个抽象向量类模板&#xff0c;以及一个通用的向量类模板和一个字符串类作为其派生类&#xff0c;以满足各种应用场景中的数据存储和处理需求。 项目内容&#xff1a; 抽象向量类模板。派生向量类。派生字符串类。测试及异常处理。联合测试…...

Leetcode每日一题学习训练——Python3版(到达首都的最少油耗)

版本说明 当前版本号[20231205]。 版本修改说明20231205初版 目录 文章目录 版本说明目录到达首都的最少油耗理解题目代码思路参考代码 原题可以点击此 2477. 到达首都的最少油耗 前去练习。 到达首都的最少油耗 ​ 给你一棵 n 个节点的树&#xff08;一个无向、连通、无环…...

什么?连接服务器也能可视化显示界面?:基于X11 Forwarding + CentOS + MobaXterm实战指南

文章目录 什么是X11?环境准备实战步骤1️⃣ 服务器端配置(CentOS)2️⃣ 客户端配置(MobaXterm)3️⃣ 验证X11 Forwarding4️⃣ 运行自定义GUI程序(Python示例)5️⃣ 成功效果![在这里插入图片描述](https://i-blog.csdnimg.cn/direct/55aefaea8a9f477e86d065227851fe3d.pn…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

iview框架主题色的应用

1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题&#xff0c;无需引入&#xff0c;直接可…...

从 GreenPlum 到镜舟数据库:杭银消费金融湖仓一体转型实践

作者&#xff1a;吴岐诗&#xff0c;杭银消费金融大数据应用开发工程师 本文整理自杭银消费金融大数据应用开发工程师在StarRocks Summit Asia 2024的分享 引言&#xff1a;融合数据湖与数仓的创新之路 在数字金融时代&#xff0c;数据已成为金融机构的核心竞争力。杭银消费金…...

【学习笔记】erase 删除顺序迭代器后迭代器失效的解决方案

目录 使用 erase 返回值继续迭代使用索引进行遍历 我们知道类似 vector 的顺序迭代器被删除后&#xff0c;迭代器会失效&#xff0c;因为顺序迭代器在内存中是连续存储的&#xff0c;元素删除后&#xff0c;后续元素会前移。 但一些场景中&#xff0c;我们又需要在执行删除操作…...

CVPR2025重磅突破:AnomalyAny框架实现单样本生成逼真异常数据,破解视觉检测瓶颈!

本文介绍了一种名为AnomalyAny的创新框架&#xff0c;该方法利用Stable Diffusion的强大生成能力&#xff0c;仅需单个正常样本和文本描述&#xff0c;即可生成逼真且多样化的异常样本&#xff0c;有效解决了视觉异常检测中异常样本稀缺的难题&#xff0c;为工业质检、医疗影像…...

水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关

在水泥厂的生产流程中&#xff0c;工业自动化网关起着至关重要的作用&#xff0c;尤其是JH-DVN-RTU疆鸿智能Devicenet转Modbus rtu协议转换网关&#xff0c;为水泥厂实现高效生产与精准控制提供了有力支持。 水泥厂设备众多&#xff0c;其中不少设备采用Devicenet协议。Devicen…...

大数据治理的常见方式

大数据治理的常见方式 大数据治理是确保数据质量、安全性和可用性的系统性方法&#xff0c;以下是几种常见的治理方式&#xff1a; 1. 数据质量管理 核心方法&#xff1a; 数据校验&#xff1a;建立数据校验规则&#xff08;格式、范围、一致性等&#xff09;数据清洗&…...

JS红宝书笔记 - 3.3 变量

要定义变量&#xff0c;可以使用var操作符&#xff0c;后跟变量名 ES实现变量初始化&#xff0c;因此可以同时定义变量并设置它的值 使用var操作符定义的变量会成为包含它的函数的局部变量。 在函数内定义变量时省略var操作符&#xff0c;可以创建一个全局变量 如果需要定义…...

文件上传漏洞防御全攻略

要全面防范文件上传漏洞&#xff0c;需构建多层防御体系&#xff0c;结合技术验证、存储隔离与权限控制&#xff1a; &#x1f512; 一、基础防护层 前端校验&#xff08;仅辅助&#xff09; 通过JavaScript限制文件后缀名&#xff08;白名单&#xff09;和大小&#xff0c;提…...