单机无锁线程安全队列-Disruptor
Disruptor
1、基本介绍
说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。
而Disruptor是一个线程安全、低延迟、吞吐量高的队列,并且解决BlockingQueue加锁带来的性能下降问题,十分适合单机使用。
Disruptor是英国外汇交易公司LMAX开发的一个高性能队列,研发的初衷是解决内存队列的延迟问题。基于Disruptor开发的系统单线程能支撑每秒600万订单。
2、与BlockingQueue对比
- 使用CAS代替锁
- 多播模式,同一事件可以交给多个消费者处理
- 基于环形数组RingBuffer,创建时就固定长度,不出现空间新分配情况,减少垃圾回收
这是官网与BlockingQueue对比的延迟直方图,可以看出,BlockingQueue出现延迟的机率比Disruptor高得多。
3、生产者消费者模式
在Disruptor中,生产者与消费者支持一对一、一对多或者多对多的关系。下面举例如何实现:
引入最新包
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>4.0.0</version></dependency>
定义一个商品
@Data
public class Goods {private String name;}
定义生产者
public class Producer {private final RingBuffer<Goods> ringBuffer;public Producer(RingBuffer<Goods> ringBuffer) {this.ringBuffer = ringBuffer;}/*** 生产货品* @param goodsName*/public void onData(String goodsName) {long sequence = ringBuffer.next();try {Goods goods = ringBuffer.get(sequence);goods.setName(goodsName);} finally {ringBuffer.publish(sequence);}}
}
定义消费者
@Data
public class Consumer implements EventHandler<Goods>{private String name;public Consumer(String name){this.name = name;}@Overridepublic void onEvent(Goods goods, long l, boolean b) {//消费者接收到货品System.out.println(name+"消费了"+goods.getName());}@Overridepublic void onBatchStart(long batchSize, long queueDepth) {EventHandler.super.onBatchStart(batchSize, queueDepth);}@Overridepublic void onStart() {EventHandler.super.onStart();}@Overridepublic void onShutdown() {EventHandler.super.onShutdown();}@Overridepublic void onTimeout(long sequence) throws Exception {EventHandler.super.onTimeout(sequence);}@Overridepublic void setSequenceCallback(Sequence sequenceCallback) {EventHandler.super.setSequenceCallback(sequenceCallback);}
}
一个生产者对一个消费者
public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16, // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.SINGLE, //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//单生产者,单消费者disruptor.handleEventsWith(new Consumer("Consumer1"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}
一个生产者对多个消费者
消费者按顺序消费:
public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16, // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI, //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();//多个消费者按顺序消费disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer = new Producer(ringBuffer);while (true){producer.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}
多播模式,同一事件可以交给多个消费者处理
只需要将上述代码修改一下即可
//Consumer1、Consumer2、Consumer3先消费,Consumer4后消费disruptor.handleEventsWith(new Consumer("Consumer1"),new Consumer("Consumer2"),new Consumer("Consumer3")).then(new Consumer("Consumer4"));
多个生产者对多个消费者
public class DisruptorDemo {public static void main(String[] args) throws InterruptedException {Disruptor<Goods> disruptor = new Disruptor<>(Goods::new,16, // RingBuffer 大小,必须是 2 的 N 次方Executors.defaultThreadFactory(), //线程池ProducerType.MULTI, //指定单生产者还是多生产者new YieldingWaitStrategy() //等待策略);RingBuffer<Goods> ringBuffer = disruptor.getRingBuffer();disruptor.handleEventsWith(new Consumer("Consumer1")).then(new Consumer("Consumer2"));disruptor.start();Producer producer1 = new Producer(ringBuffer);Producer producer2 = new Producer(ringBuffer);Producer producer3 = new Producer(ringBuffer);while (true){producer1.onData("goods"+UUID.randomUUID());producer2.onData("goods"+UUID.randomUUID());producer3.onData("goods"+UUID.randomUUID());Thread.sleep(1000);}}
}
除了上述多播模式中多个消费者各自处理事件(一个event事件会同时被多个消费者处理),其实还有Disruptor另一种模式:多个消费者合作处理一批事件(一个event事件会被其中一个消费者处理),由Disruptor 的 WorkPool 支持,不过在4.0中已经被去除了
看了github的issue,作者大概意思说难以维护,并且在LMAX公司也不会用到WorkPool,所以就去除了。
4、RingBuffer原理
Disruptor内部由环形数组Ring Buffer(数组必须为2的n次方)。
1、Ring Buffer使用环形数组,有效避免线性数组index越界问题,而且数组内元素的内存地址是连续的,对CPU缓存友好,在硬件级别,数组中的元素是会被预加载的,所以RingBuffer中,CPU无需时不时去主内存加载数组中的下一个元素。通过对cursor指针的移动,可以实现数据在数组中的环形存取。
2、在多生产者场景下,多个生产者会进行竞争,防止读到还未写的元素。引入了一个与Ring Buffer大小相同的buffer:available Buffer,用来判断Ring Buffer某个元素是否已经就绪。
3、为什么available Buffer也做成圈呢?这样做是防止把上一轮的数据当成这一轮的数据,错误判断Ring Buffer元素可用。
4、为什么Ring Buffer要2的n次方,因为会涉及到二进制&运算,来算出元素位置,在源码中可以找到。
5、具体RingBuffer写数据和读数据流程,可以参考美团技术博客:https://tech.meituan.com/2016/11/18/disruptor.html
5、等待策略
生产者和消费者都可能出现速度过快的情况,比如队列满了,生产者需要等待消费者消费后才能生产,或者消费者消费过快导致队列为空,进而需要等待生产者生产。
Disruptor目前一共内置了8种等待策略。
- BlockingWaitStrategy:用了ReentrantLock的等待唤醒机制实现等待逻辑,是默认策略,对CPU的消耗最小
- BusySpinWaitStrategy: 持续自旋,会消耗大量CPU资源
- LiteBlockingWaitStrategy: 基于BlockingWaitStrategy,非重入锁的阻塞等待策略,在没有锁竞争的时候会省去唤醒操作
- TimeoutBlockingWaitStrategy: 超时等待策略,它会使消费者线程进入阻塞状态,在指定的时间内等待新的事件,如果等待超时则退出
- LiteTimeoutBlockingWaitStrategy: 基于TimeoutBlockingWaitStrategy,在没有锁竞争的时候会省去唤醒操作
- SleepingWaitStrategy: 三段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU,第三阶段睡眠执行时间,反复的睡眠
- YieldingWaitStrategy: 二段式,第一阶段自旋,第二阶段执行Thread.yield交出CPU
- PhasedBackoffWaitStrategy: 四段式,第一阶段自旋指定次数,第二阶段自旋指定时间,第三阶段执行Thread.yield交出CPU,第四阶段调用成员变量的waitFor方法,这个成员变量可以被设置为BlockingWaitStrategy、LiteBlockingWaitStrategy、SleepingWaitStrategy这三个中的一个
6、结束
Disruptor简单的介绍已经结束了,点个赞再走啦!~
相关文章:
单机无锁线程安全队列-Disruptor
Disruptor 1、基本介绍 说到队列,除了常见的mq中间件,java中也自带线程安全的BlockingQueue,但是BlockingQueue通过在入队和出队时加锁的方式避免并发操作,性能上会大打折扣。 而Disruptor是一个线程安全、低延迟、吞吐量高的队…...
好工具知多少:国内外最常用的SCADA软件
随着现代SCADA系统的发展,工业自动化取得了巨大的飞跃。如今,监控和数据采集(SCADA)系统已成为工业过程的重要组成部分。这些系统使操作员能够实时监控和控制复杂的系统。 SCADA系统正在广泛的行业中发挥着至关重要的作用&#x…...
SQL Server 2016(创建数据库)
1、实验环境。 某公司有一台已经安装了SQL Server 2016的服务器,现在需要新建数据库。 2、需求描述。 创建一个名为"db_class"的数据库,数据文件和日志文件初始大小设置为10MB,启用自动增长,数据库文件存放路径为C:\db…...
Vue学习计划--Vue2(一)简单了解vue
Vue2的终止支持时间为2023年12月31日。 在这个矛盾的时间点,还是决定先把vue2的笔记放出来,在Vue2完结后再把Vue3的笔记补上。这样呢,2和3都不落下,也算是来一个启承的作用吧。在工作中呢,旧的项目可以维护,…...
微信小程序生成二维码并保存到本地方法
微信小程序生成二维码请保存到本地方法 官方weapp-qrcode插件 github链接 功能完成样子 wxml <view class"qrcode"><canvas style"width: 275px; height: 275px;" canvas-idmyQrcode></canvas> </view> <view class" …...
shell_exec 和 exec区别
shell_exec 和 exec 都是用于在 PHP 中执行系统命令的函数,但它们之间有一些区别。 返回值类型:shell_exec 函数返回命令的输出结果作为字符串,而 exec 函数将输出结果存储在数组中。 输出结果:shell_exec 函数返回命令的完整输出…...
WPF创建进度条
使用wpf做一个原生的进度条,进度条上面有值,先看效果。 功能就是点击按钮,后台处理数据,前台显示处理数据的变化,当然还可以对进度条进行美化和关闭的操作,等待后台处理完毕数据,然后自动关闭。…...
全网最新最全面的Appium自动化:Appium常用操作之混合应用webview页面操作--待补充!
上下文操作: 在appium中,对于混合应用,需要进行WebView页面和原生应用的切换 常用的方法如下: 1、context(self) / current_context(self):返回当前会话的当前上下文,context可以理解为可进入的窗口。对于…...
基于OpenCV+YOLOv5实现车辆跟踪与计数(附源码)
导 读 本文主要介绍基于OpenCVYOLOv5实现车辆跟踪与计数的应用,并给出源码。 资源下载 基础代码和视频下载地址: https://github.com/freedomwebtech/win11vehiclecount main.py代码: import cv2import torchimport numpy as npfrom tr…...
05、pytest断言确定的异常
官方用例 # content of test_sysexit.py import pytestdef f():raise SystemExit(1)def test_mytest():with pytest.raises(SystemExit):f()解读与实操 标准python raise函数可产生异常。pytest.raises可以断言某个异常会发现。异常发生了,用例执行成功&#x…...
金蝶云星空单据编辑界面,不允许批量填充操作
文章目录 金蝶云星空单据编辑界面,不允许批量填充操作案例演示开发设计测试 金蝶云星空单据编辑界面,不允许批量填充操作 案例演示 售后单,明细信息单据体,物料编码字段禁止批量填充。 开发设计 编写表单插件,在Be…...
Springboot项目启动成功后可通过五种方式继续执行
实现CommandLineRunner接口 项目初始化完毕后,才会调用方法,提供服务 Component public class StartRunner implements CommandLineRunner {Overridepublic void run(String... args) throws Exception {System.out.println("CommandLineRunner&qu…...
什么是供应链金融分账系统?
一、供应链金融的重要性 供应链金融在很多行业都是要用到,比如在抖音,快手店铺的商家资金回笼,通常需要7-21天的回款周期,这对于商家的周转来说是一件很困难的事情,在供应链金融中,分账就扮演着至关重要的角色,不仅是金融流程中的一环,更是保…...
【测绘程序设计】——坐标换带与高程投影
测绘工程中经常遇到 “坐标换带” 与 “高程投影” 问题,前者是在改变投影的分带号——即投影的中央子午线,通过 “(x,y)->(B,L)->(x,y)” 进行;而后者则是为减小投影变形(高程投影变短、高斯投影变长,详情可参考博客《测绘综合能力》真题易错本 第(37)条)通过平…...
企业计算机服务器中了Mallox勒索病毒如何解密,Mallox勒索病毒数据恢复
随着计算机技术的不断应用与发展,网络为企业的生产运营提供了极大帮助,越来越多的企业开始利用网络办公,因此,随之而来的网络安全威胁也在不断增加。近期,云天数据恢复中心陆续接到很多企业的求助,企业的计…...
一套rk3588 rtsp服务器推流的 github 方案及记录 -01
我不生产代码,我只是代码的搬运工,相信我,看完这个文章你的图片一定能变成流媒体推出去。 诉求:使用opencv拉流,转成bgr数据,需要把处理后的数据(BGR)编码成264,然后推流…...
PyQt6 QComboBox下拉组合框控件
锋哥原创的PyQt6视频教程: 2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~_哔哩哔哩_bilibili2024版 PyQt6 Python桌面开发 视频教程(无废话版) 玩命更新中~共计34条视频,包括:2024版 PyQt6 Python桌面开发 视频教程(无废话…...
常用类与比较器
常用类 学一个类,先搞清楚继承关系,再看源码 包装类Wrapper jdk5之前是手动装箱拆箱 jdk5及之后是自动装箱拆箱(调用valueOf方法(自动默认)/创建对象的构造方法,XXXvalue方法…...
【上海大学《面向对象程序设计A》课程小项目报告】抽象向量类模板及其派生类
1 项目内容及要求 本项目通过设计一个抽象向量类模板,以及一个通用的向量类模板和一个字符串类作为其派生类,以满足各种应用场景中的数据存储和处理需求。 项目内容: 抽象向量类模板。派生向量类。派生字符串类。测试及异常处理。联合测试…...
Leetcode每日一题学习训练——Python3版(到达首都的最少油耗)
版本说明 当前版本号[20231205]。 版本修改说明20231205初版 目录 文章目录 版本说明目录到达首都的最少油耗理解题目代码思路参考代码 原题可以点击此 2477. 到达首都的最少油耗 前去练习。 到达首都的最少油耗 给你一棵 n 个节点的树(一个无向、连通、无环…...
Java面试题(每天10题)-------连载(42)
目录 Spring篇 1、Spring Bean的作用域之间有什么区别? 2、什么是Spring inner beans? 3、Spring框架中的单例Beans是线程安全的吗? 4、请举例说明如何在Spring中诸如一个Java Collection? 5、如何向Spring Bean中诸如一个J…...
netty websocket学习
【硬核】肝了一月的Netty知识点 超详细Netty入门,看这篇就够了! bzm_netty_sb netty-chat vuewebsokect实现实时聊天,可单聊、可群聊(一) vue实现聊天栏定位到最底部(超简单、可直接复制使用)…...
【数据结构】环形队列
环形队列 1. 定义 环形队列就是将队列在逻辑上看作环形结构、物理上仍是数组形式存储的一种数据结构。 其实现主要分为两种情况: 浪费空间法记录空间法 2. 实现 实现要考虑的是成员变量 2.1 记录空间法 使用used标识当前存储了多少元素,如果为空&a…...
嵌入式C编码规范
嵌入式C编码规范 编码规范,没有最好,只有最合适,有但不执行不如没有。 嵌入式C编码规范 https://mp.weixin.qq.com/s/z4u3YnF6vdQ1olsLeF-y_A 更多嵌入式信息请关注微信公众号【嵌入式系统】...
Golang 并发 — 流水线
并发模式 我们可以将流水线理解为一组由通道连接并由 goroutine 处理的阶段。每个阶段都被定义为执行特定的任务,并按顺序执行,下一个阶段在前一个阶段完成后开始执行。 流水线的另一个重要特性是,除了连接在一起,每个阶段都使用…...
Elasticsearch:什么是非结构化数据?
非结构化数据定义 非结构化数据是指未按照设计的模型或结构组织的数据。 非结构化数据通常被归类为定性数据,可以是人类或机器生成的。 非结构化数据是最丰富的可用数据类型,经过分析后,可用于指导业务决策并在许多其他用例中实现业务目标。…...
15:00的面试,15:06就出来了,问的问题过于变态了。。。
从小厂出来,没想到在另一家公司又寄了。 到这家公司开始上班,加班是每天必不可少的,看在钱给的比较多的份上,就不太计较了。没想到5月一纸通知,所有人不准加班,加班费不仅没有了,薪资还要降40%…...
Web自动化测试怎么做?Web网页测试全流程解析
1、功能测试 web网页测试中的功能测试,主要测试网页中的所有链接、数据库连接、用于在网页中提交或获取用户信息的表单、Cookie 测试等。 (1)查看所有链接: 测试从所有页面到被测特定域的传出链接。 测试所有内部链接。 测…...
MySQL数据库SQLSTATE[22007]: Invalid datetime format 日期类型不能为空值的解决办法
如果你的数据库是mysql, 如果你创建表或插入数据时遇到的BUG–它长这样: Invalid datetime format: 1292 Incorrect datetime value: ‘’ for column ‘xxx’ at row 1 或 1067 - Invalid default value for ‘xx’ 那么我将赐予你 两套剑法: &#…...
搬运工让你分分钟了解Web接口测试
01、什么是接口 百度说:接口泛指实体把自己提供给外界的一种抽象化物(可以为另一实体),用以由内部操作分离出外部沟通方法,使其能被内部修改而不影响外界其他实体与其交互的方式 上面这句有点抽象,网上的…...
html商城网站源码/湖南关键词优化首选
与 UpdatePanel 控件不兼容的控件 下面的 ASP.NET 控件与部分页更新不兼容,因此,不能用在 UpdatePanel 控件内: 在以下几种情况下的 Treeview 控件:一种是当回调不是作为异步回发的一部分启用时;一种是您直接将样式设置…...
网站开发时间进度/百度公司官网入口
笔记本电脑很难以有意义的方式进行升级。在许多情况下,处理器,母板和视频卡都是直接焊接在主板上的。如果一个组件发生故障,则不能简单地将其替换掉-您需要更换整个主板,这可能要花许多的钱。通常,您可以手动升级的唯一…...
企业网站软件/成都网站快速排名
1.安装 ActiveMQ 这里以 ActiveMQ 5.15.3 版本为例,下载地址:http://activemq.apache.org/activemq-5153-release.html 官网提供的快速开始案例:http://activemq.apache.org/getting-started.html 下载完以后,解压到用户自定义的目…...
龙江做网站/网站都有哪些
作者:张楷露、张琪 封面:自己想吧一、基本思想的异同共同点从二者表达的含义上看,主成分分析法和因子分析法都寻求少数的几个变量(或因子)来综合反映全部变量(或因子)的大部分信息,变量虽然较原始变量少,但所包含的信…...
天津网站建设 seo/深圳网站开发制作
并发知识不管在学习、面试还是工作过程中都非常非常重要,看完本文相信绝对能助你一臂之力。1、线程和进程有什么区别?线程是进程的子集,一个进程可以有很多线程。每个进程都有自己的内存空间,可执行代码和唯一进程标识符(PID)。每…...
哪些网站可以找到做海报的素材/百度的链接
🎄🎄近期,小海带在空闲之余收集整理了一批农业作物开源数据集资源供大家参考。 整理不易,小伙伴们记得一键三连喔!!!🎈🎈 一、农作物图像分类(小麦、睡到、甘…...