当前位置: 首页 > news >正文

单机无锁线程安全队列-Disruptor

Disruptor

1、基本介绍

说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。

2、与BlockingQueue对比

  1. 使用CAS代替锁
  2. 多播模式,同一事件可以交给多个消费者处理
  3. 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收

这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。

img.png

3、生产者消费者模式

在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:

引入最新包

        <dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>4.0.0</version></dependency>

定义一个商品

@Data
public class Goods {private String name;}

定义生产者

public class Producer {private final RingBuffer<Goods> ringBuffer;public Producer(RingBuffer<Goods> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 生产货品* @param goodsName*/public void onData(String goodsName) {long sequence = ringBuffer.next();try {Goods goods = ringBuffer.get(sequence);goods.setName(goodsName);} finally {ringBuffer.publish(sequence);}}
}

定义消费者

@Data
public class Consumer implements EventHandler<Goods>{private String name;public Consumer(String name){this.name = name;}@Overridepublic void onEvent(Goods goods, long l, boolean b)  {//消费者接收到货品System.out.println(name+"消费了"+goods.getName());}@Overridepublic void onBatchStart(long batchSize, long queueDepth) {EventHandler.super.onBatchStart(batchSize, queueDepth);}@Overridepublic void onStart() {EventHandler.super.onStart();}@Overridepublic void onShutdown() {EventHandler.super.onShutdown();}@Overridepublic void onTimeout(long sequence) throws Exception {EventHandler.super.onTimeout(sequence);}@Overridepublic void setSequenceCallback(Sequence sequenceCallback) {EventHandler.super.setSequenceCallback(sequenceCallback);}
}

一个生产者对一个消费者

img_1.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.SINGLE,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//单生产者,单消费者disruptor.handleEventsWith(new Consumer("Consumer1"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

一个生产者对多个消费者

消费者按顺序消费:

img_2.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//多个消费者按顺序消费disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

多播模式,同一事件可以交给多个消费者处理

img_4.png
只需要将上述代码修改一下即可

   //Consumer1、Consumer2、Consumer3先消费,Consumer4后消费disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3")).then(new Consumer("Consumer4"));

多个生产者对多个消费者

img_5.png

public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16,  // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI,   //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer1 = new Producer(ringBuffer);Producer producer2 = new Producer(ringBuffer);Producer producer3 = new Producer(ringBuffer);while (true){producer1.onData("goods"+UUID.randomUUID());producer2.onData("goods"+UUID.randomUUID());producer3.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}

除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了

img_8.png
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。

img_9.png

img_10.png

4、RingBuffer原理

Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。

image.png
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。

img_11.png
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html

5、等待策略

生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。

img_7.png

  1. BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
  2. BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
  3. LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
  4. TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
  5. LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
  6. SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
  7. YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
  8. PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个

6、结束

Disruptor简单的介绍已经结束了,点个赞再走啦!~

相关文章:

单机无锁线程安全队列-Disruptor

Disruptor 1、基本介绍 说到队列&#xff0c;除了常见的mq中间件&#xff0c;java中也自带线程安全的BlockingQueue&#xff0c;但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作&#xff0c;性能上会大打折扣。 而Disruptor是一个线程安全、低延迟、吞吐量高的队…...

好工具知多少:国内外最常用的SCADA软件

随着现代SCADA系统的发展&#xff0c;工业自动化取得了巨大的飞跃。如今&#xff0c;监控和数据采集&#xff08;SCADA&#xff09;系统已成为工业过程的重要组成部分。这些系统使操作员能够实时监控和控制复杂的系统。 SCADA系统正在广泛的行业中发挥着至关重要的作用&#x…...

SQL Server 2016(创建数据库)

1、实验环境。 某公司有一台已经安装了SQL Server 2016的服务器&#xff0c;现在需要新建数据库。 2、需求描述。 创建一个名为"db_class"的数据库&#xff0c;数据文件和日志文件初始大小设置为10MB&#xff0c;启用自动增长&#xff0c;数据库文件存放路径为C:\db…...

Vue学习计划--Vue2(一)简单了解vue

Vue2的终止支持时间为2023年12月31日。 在这个矛盾的时间点&#xff0c;还是决定先把vue2的笔记放出来&#xff0c;在Vue2完结后再把Vue3的笔记补上。这样呢&#xff0c;2和3都不落下&#xff0c;也算是来一个启承的作用吧。在工作中呢&#xff0c;旧的项目可以维护&#xff0c…...

微信小程序生成二维码并保存到本地方法

微信小程序生成二维码请保存到本地方法 官方weapp-qrcode插件 github链接 功能完成样子 wxml <view class"qrcode"><canvas style"width: 275px; height: 275px;" canvas-idmyQrcode></canvas> </view> <view class" …...

shell_exec 和 exec区别

shell_exec 和 exec 都是用于在 PHP 中执行系统命令的函数&#xff0c;但它们之间有一些区别。 返回值类型&#xff1a;shell_exec 函数返回命令的输出结果作为字符串&#xff0c;而 exec 函数将输出结果存储在数组中。 输出结果&#xff1a;shell_exec 函数返回命令的完整输出…...

WPF创建进度条

使用wpf做一个原生的进度条&#xff0c;进度条上面有值&#xff0c;先看效果。 功能就是点击按钮&#xff0c;后台处理数据&#xff0c;前台显示处理数据的变化&#xff0c;当然还可以对进度条进行美化和关闭的操作&#xff0c;等待后台处理完毕数据&#xff0c;然后自动关闭。…...

全网最新最全面的Appium自动化:Appium常用操作之混合应用webview页面操作--待补充!

上下文操作&#xff1a; 在appium中&#xff0c;对于混合应用&#xff0c;需要进行WebView页面和原生应用的切换 常用的方法如下&#xff1a; 1、context(self) / current_context(self)&#xff1a;返回当前会话的当前上下文&#xff0c;context可以理解为可进入的窗口。对于…...

基于OpenCV+YOLOv5实现车辆跟踪与计数(附源码)

导 读 本文主要介绍基于OpenCVYOLOv5实现车辆跟踪与计数的应用&#xff0c;并给出源码。 资源下载 基础代码和视频下载地址&#xff1a; https://github.com/freedomwebtech/win11vehiclecount main.py代码:​​​​​​​ import cv2import torchimport numpy as npfrom tr…...

05、pytest断言确定的异常

官方用例 # content of test_sysexit.py import pytestdef f():raise SystemExit(1)def test_mytest():with pytest.raises(SystemExit):f()解读与实操 ​ 标准python raise函数可产生异常。pytest.raises可以断言某个异常会发现。异常发生了&#xff0c;用例执行成功&#x…...

金蝶云星空单据编辑界面,不允许批量填充操作

文章目录 金蝶云星空单据编辑界面&#xff0c;不允许批量填充操作案例演示开发设计测试 金蝶云星空单据编辑界面&#xff0c;不允许批量填充操作 案例演示 售后单&#xff0c;明细信息单据体&#xff0c;物料编码字段禁止批量填充。 开发设计 编写表单插件&#xff0c;在Be…...

Springboot项目启动成功后可通过五种方式继续执行

实现CommandLineRunner接口 项目初始化完毕后&#xff0c;才会调用方法&#xff0c;提供服务 Component public class StartRunner implements CommandLineRunner {Overridepublic void run(String... args) throws Exception {System.out.println("CommandLineRunner&qu…...

什么是供应链金融分账系统?

一、供应链金融的重要性 供应链金融在很多行业都是要用到,比如在抖音,快手店铺的商家资金回笼,通常需要7-21天的回款周期,这对于商家的周转来说是一件很困难的事情,在供应链金融中&#xff0c;分账就扮演着至关重要的角色&#xff0c;不仅是金融流程中的一环&#xff0c;更是保…...

【测绘程序设计】——坐标换带与高程投影

测绘工程中经常遇到 “坐标换带” 与 “高程投影” 问题,前者是在改变投影的分带号——即投影的中央子午线,通过 “(x,y)->(B,L)->(x,y)” 进行;而后者则是为减小投影变形(高程投影变短、高斯投影变长,详情可参考博客《测绘综合能力》真题易错本 第(37)条)通过平…...

企业计算机服务器中了Mallox勒索病毒如何解密,Mallox勒索病毒数据恢复

随着计算机技术的不断应用与发展&#xff0c;网络为企业的生产运营提供了极大帮助&#xff0c;越来越多的企业开始利用网络办公&#xff0c;因此&#xff0c;随之而来的网络安全威胁也在不断增加。近期&#xff0c;云天数据恢复中心陆续接到很多企业的求助&#xff0c;企业的计…...

一套rk3588 rtsp服务器推流的 github 方案及记录 -01

我不生产代码&#xff0c;我只是代码的搬运工&#xff0c;相信我&#xff0c;看完这个文章你的图片一定能变成流媒体推出去。 诉求&#xff1a;使用opencv拉流&#xff0c;转成bgr数据&#xff0c;需要把处理后的数据&#xff08;BGR&#xff09;编码成264&#xff0c;然后推流…...

PyQt6 QComboBox下拉组合框控件

​锋哥原创的PyQt6视频教程&#xff1a; 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计34条视频&#xff0c;包括&#xff1a;2024版 PyQt6 Python桌面开发 视频教程(无废话…...

常用类与比较器

常用类 学一个类&#xff0c;先搞清楚继承关系&#xff0c;再看源码 包装类Wrapper jdk5之前是手动装箱拆箱 jdk5及之后是自动装箱拆箱&#xff08;调用valueOf方法&#xff08;自动默认&#xff09;/创建对象的构造方法&#xff0c;XXXvalue方法…...

【上海大学《面向对象程序设计A》课程小项目报告】抽象向量类模板及其派生类

1 项目内容及要求 本项目通过设计一个抽象向量类模板&#xff0c;以及一个通用的向量类模板和一个字符串类作为其派生类&#xff0c;以满足各种应用场景中的数据存储和处理需求。 项目内容&#xff1a; 抽象向量类模板。派生向量类。派生字符串类。测试及异常处理。联合测试…...

Leetcode每日一题学习训练——Python3版(到达首都的最少油耗)

版本说明 当前版本号[20231205]。 版本修改说明20231205初版 目录 文章目录 版本说明目录到达首都的最少油耗理解题目代码思路参考代码 原题可以点击此 2477. 到达首都的最少油耗 前去练习。 到达首都的最少油耗 ​ 给你一棵 n 个节点的树&#xff08;一个无向、连通、无环…...

RestClient

什么是RestClient RestClient 是 Elasticsearch 官方提供的 Java 低级 REST 客户端&#xff0c;它允许HTTP与Elasticsearch 集群通信&#xff0c;而无需处理 JSON 序列化/反序列化等底层细节。它是 Elasticsearch Java API 客户端的基础。 RestClient 主要特点 轻量级&#xff…...

[特殊字符] 智能合约中的数据是如何在区块链中保持一致的?

&#x1f9e0; 智能合约中的数据是如何在区块链中保持一致的&#xff1f; 为什么所有区块链节点都能得出相同结果&#xff1f;合约调用这么复杂&#xff0c;状态真能保持一致吗&#xff1f;本篇带你从底层视角理解“状态一致性”的真相。 一、智能合约的数据存储在哪里&#xf…...

第19节 Node.js Express 框架

Express 是一个为Node.js设计的web开发框架&#xff0c;它基于nodejs平台。 Express 简介 Express是一个简洁而灵活的node.js Web应用框架, 提供了一系列强大特性帮助你创建各种Web应用&#xff0c;和丰富的HTTP工具。 使用Express可以快速地搭建一个完整功能的网站。 Expre…...

eNSP-Cloud(实现本地电脑与eNSP内设备之间通信)

说明&#xff1a; 想象一下&#xff0c;你正在用eNSP搭建一个虚拟的网络世界&#xff0c;里面有虚拟的路由器、交换机、电脑&#xff08;PC&#xff09;等等。这些设备都在你的电脑里面“运行”&#xff0c;它们之间可以互相通信&#xff0c;就像一个封闭的小王国。 但是&#…...

19c补丁后oracle属主变化,导致不能识别磁盘组

补丁后服务器重启&#xff0c;数据库再次无法启动 ORA01017: invalid username/password; logon denied Oracle 19c 在打上 19.23 或以上补丁版本后&#xff0c;存在与用户组权限相关的问题。具体表现为&#xff0c;Oracle 实例的运行用户&#xff08;oracle&#xff09;和集…...

日语AI面试高效通关秘籍:专业解读与青柚面试智能助攻

在如今就业市场竞争日益激烈的背景下&#xff0c;越来越多的求职者将目光投向了日本及中日双语岗位。但是&#xff0c;一场日语面试往往让许多人感到步履维艰。你是否也曾因为面试官抛出的“刁钻问题”而心生畏惧&#xff1f;面对生疏的日语交流环境&#xff0c;即便提前恶补了…...

深入剖析AI大模型:大模型时代的 Prompt 工程全解析

今天聊的内容&#xff0c;我认为是AI开发里面非常重要的内容。它在AI开发里无处不在&#xff0c;当你对 AI 助手说 "用李白的风格写一首关于人工智能的诗"&#xff0c;或者让翻译模型 "将这段合同翻译成商务日语" 时&#xff0c;输入的这句话就是 Prompt。…...

docker详细操作--未完待续

docker介绍 docker官网: Docker&#xff1a;加速容器应用程序开发 harbor官网&#xff1a;Harbor - Harbor 中文 使用docker加速器: Docker镜像极速下载服务 - 毫秒镜像 是什么 Docker 是一种开源的容器化平台&#xff0c;用于将应用程序及其依赖项&#xff08;如库、运行时环…...

CVPR 2025 MIMO: 支持视觉指代和像素grounding 的医学视觉语言模型

CVPR 2025 | MIMO&#xff1a;支持视觉指代和像素对齐的医学视觉语言模型 论文信息 标题&#xff1a;MIMO: A medical vision language model with visual referring multimodal input and pixel grounding multimodal output作者&#xff1a;Yanyuan Chen, Dexuan Xu, Yu Hu…...

【第二十一章 SDIO接口(SDIO)】

第二十一章 SDIO接口 目录 第二十一章 SDIO接口(SDIO) 1 SDIO 主要功能 2 SDIO 总线拓扑 3 SDIO 功能描述 3.1 SDIO 适配器 3.2 SDIOAHB 接口 4 卡功能描述 4.1 卡识别模式 4.2 卡复位 4.3 操作电压范围确认 4.4 卡识别过程 4.5 写数据块 4.6 读数据块 4.7 数据流…...