风车网站做花盆磨具/上海企业网站推广
高性能队列框架-Disruptor
首先介绍一下 Disruptor 框架,Disruptor是一个通用解决方案,用于解决并发编程中的难题(低延迟与高吞吐量),Disruptor 在高并发场景下性能表现很好,如果有这方面需要,可以深入研究其源码
其本质还是一个队列(环形),与其他队列类似,也是基于生产者消费者模式设计,只不过这个队列很特别是一个环形队列。这个队列能够在无锁的条件下进行并行消费,也可以根据消费者之间的依赖关系进行先后次序消费。
使用 Disruptor 框架的好处就是:速度快!
生产者向 RingBuffer 写入,消费者从 RingBuffer 中消费,基于 Disruptor 开发的系统每秒可以支持 600 万订单
下边介绍一下 Disruptor 框架中常见概念:
RingBuffer
基于数组实现的一个环,用于在不同线程间传递数据,RingBuffer 有一个 Sequencer 序号器,指向数组中下一个可用元素
Sequencer 序号器
该类是 Disruptor 核心,有两个实现类:
- SingleProducerSequencer 单生产者
- MultiProducerSequencer 多生产者
WaitStrategy 等待策略
消费者等待生产者将数据放入 RingBuffer,有不同的等待策略:
BlockingWaitStrategy
:阻塞等待策略,最低效的策略,但其对 CPU 的消耗最小并且在各种不同部署环境中能提供更加一致的性能表现。SleepingWaitStrategy
:休眠等待策略,性能表现跟 BlockingWaitStrategy 差不多,对 CPU 的消耗也类似,但其对生产者线程的影响最小,适合用于异步日志类似的场景。YieldingWaitStrategy
:产生等待策略,性能最好,适合用于低延迟的系统,在要求极高性能且事件处理线程数小于 CPU 逻辑核心数的场景中,推荐使用。是无锁并行
Disruptor 的设计中是没有锁的,在 Disruptor 中出现线程竞争的地方也就是 RingBuffer 中的下标 Sequence,Disruptor 通过 CAS 操作来代替加锁,从而提升性能,CAS 的性能大约是加锁操作性能的 8 倍,
伪共享问题
Disruptor 中还会出现伪共享问题
参考:《高性能队列——Disruptor》——美团技术团队
缓存行
Cache 是由很多个 cache line 组成,每个 cache line 通常是 64B,并且可以有效地引用主内存中的一块地址。
Java 中 long 类型变量是 8B,因此一个 cache line 可以存储 8 个 long 类型变量
CPU 每次从主存中拉取数据时,会把相邻的数据也存入同一个 cache line,那么在访问一个 long 数组时,如果数组中的一个值被加入缓存中,那么也会加载另外 7 个
伪共享问题
在 ArrayBlockingQueue 中有 3 个成员变量:
- takeIndex:需要被取走元素下标
- putIndex:可被插入元素下标
- count:队列元素数量
这 3 个变量如果在同一个 cache line 中的话,假如此时有两个线程对这 3 个变量进行操作,线程 A 修改了 takeIndex 变量,那么会导致线程 B 中这个变量所在的 cache line 失效,需要从内存重新读取
这种无法充分利用 cache line 特性的线程,成为 伪共享
解决方案就是,增大数组元素之间的间隔,使得不同线程存取的元素位于不同的 cache line 上,通过空间换时间
在jdk1.8中,有专门的注解
@Contended
来避免伪共享,更优雅地解决问题。
Disruptor 通过哪些设计来解决队列速度慢的问题了呢?
-
环形数组 RingBuffer
采用环形数组,空间重复利用,避免垃圾回收,并且数组对于缓存机制更加友好
-
元素位置定位
数组长度 2^n,通过位运算,加快定位速度
-
无锁设计
通过 CAS 代替锁来保证操作的线程安全
在美团内部,很多高并发场景借鉴了Disruptor的设计,减少竞争的强度。其设计思想可以扩展到分布式场景,通过无锁设计,来提升服务性能
Disruptor 多个生产者、多个消费者原理
在 Disruptor 中,多个生产者生产数据时,每个线程获取不同的一段数组空间再加上 CAS 操作,可以避免多个线程重复写同一个元素
在读取时,如何避免读取到未写的元素呢?
Disruptor 中新创建了一个与 RingBuffer 大小相同的 available Buffer,当某个位置写入成功,就在 available Buffer 中标记为 true,通过该标记来读取已经写好的元素
Disruptor 单生产者单消费者实战
首先引入依赖:
<dependency><groupId>com.lmax</groupId><artifactId>disruptor</artifactId><version>3.3.4</version>
</dependency>
定义订单:
/*** 订单对象,生产者要生产订单对象,消费者消费订单对象*/
public class OrderEvent {// 订单的价格private long value;public long getValue() {return value;}public void setValue(long value) {this.value = value;}
}
定义工厂类,用于创建订单对象:
/*** 建立一个工厂类,用于创建Event的实例(OrderEvent)*/
public class OrderEventFactory implements EventFactory<OrderEvent> {@Overridepublic OrderEvent newInstance() {// 生产对象return new OrderEvent();}
}
定义事件处理器,用于监听消费订单:
/*** 消费者*/
public class OrderEventHandler implements EventHandler<OrderEvent> {@Overridepublic void onEvent(OrderEvent orderEvent, long l, boolean b) {System.err.println("消费者:" + orderEvent.getValue());}
}
定义生产者,用于生产订单:
public class OrderEventProducer {// ringBuffer 用于存储数据private RingBuffer<OrderEvent> ringBuffer;public OrderEventProducer(RingBuffer<OrderEvent> ringBuffer) {this.ringBuffer = ringBuffer;}// 生产者向 ringBuffer 中生产消息public void sendData(ByteBuffer data) {// 1. 生产者先从 ringBuffer 拿到可用的序号long sequence = ringBuffer.next();try {// 2.根据这个序号找到具体的 OrderEvent 元素, 此时获取到的 OrderEvent 对象是一个没有被赋值的空对象OrderEvent event = ringBuffer.get(sequence);// 3. 设置订单价格event.setValue(data.getLong(0));} catch (Exception e) {e.printStackTrace();} finally {// 4. 提交发布操作ringBuffer.publish(sequence);}}
}
测试类:
public class Main {public static void main(String[] args) {// 初始化一些参数OrderEventFactory orderEventFactory = new OrderEventFactory();int ringBufferSize = 8;ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());/*** 参数说明:* eventFactory:消息(event)工厂对象* ringBufferSize: 容器的长度* executor:线程池,建议使用自定义的线程池,线程上限。* ProducerType:单生产者或多生产者* waitStrategy:等待策略*/// 1. 实例化disruptor对象Disruptor<OrderEvent> disruptor = new Disruptor<OrderEvent>(orderEventFactory,ringBufferSize,executor,ProducerType.SINGLE,new BlockingWaitStrategy());// 2. 向 Disruptor 中添加消费者,消费者监听到 Disruptor 的 RingBuffer 中有数据了,就会进行消费disruptor.handleEventsWith(new OrderEventHandler());// 3. 启动disruptordisruptor.start();// 4. 拿到存放数据的容器:RingBufferRingBuffer<OrderEvent> ringBuffer = disruptor.getRingBuffer();// 5. 创建生产者OrderEventProducer producer = new OrderEventProducer(ringBuffer);// 6. 通过生产者向容器 RingBuffer 中存放数据ByteBuffer bb = ByteBuffer.allocate(8);for (long i = 0; i < 100; i++) {bb.putLong(0, i);producer.sendData(bb);}// 7.关闭disruptor.shutdown();executor.shutdown();}
}
Disruptor 多生产者和多消费者实战
定义消费者,用于从 ringBuffer 中消费订单:
public class ConsumerHandler implements WorkHandler<Order> {// 每个消费者有自己的idprivate String comsumerId;// 计数统计,多个消费者,所有的消费者总共消费了多个消息。private static AtomicInteger count = new AtomicInteger(0);private Random random = new Random();public ConsumerHandler(String comsumerId) {this.comsumerId = comsumerId;}// 当生产者发布一个 sequence,ringbuffer 中一个序号,里面生产者生产出来的消息,生产者最后publish发布序号// 消费者会监听,如果监听到,就会ringbuffer去取出这个序号,取到里面消息@Overridepublic void onEvent(Order event) throws Exception {// 模拟消费者处理消息的耗时,设定1-4毫秒之间TimeUnit.MILLISECONDS.sleep(1 * random.nextInt(5));System.out.println("当前消费者:" + this.comsumerId + ", 消费信息 ID:" + event.getId());// count 计数器增加 +1,表示消费了一个消息count.incrementAndGet();}// 返回所有消费者总共消费的消息的个数。public int getCount() {return count.get();}
}
定义订单:
@Data
public class Order {private String id;private String name;private double price;public Order() {}
}
定义生产者,用于向 ringBuffer 中生产订单:
public class Producer {private RingBuffer<Order> ringBuffer;// 为生产者绑定 ringBufferpublic Producer(RingBuffer<Order> ringBuffer) {this.ringBuffer = ringBuffer;}// 发送数据public void sendData(String uuid) {// 1. 获取到可用sequencelong sequence = ringBuffer.next();try {Order order = ringBuffer.get(sequence);order.setId(uuid);} finally {// 2. 发布序号ringBuffer.publish(sequence);}}
}
测试类:
public class TestMultiDisruptor {public static void main(String[] args) throws InterruptedException {// 1. 创建 RingBuffer,Disruptor 包含 RingBufferRingBuffer<Order> ringBuffer = RingBuffer.create(ProducerType.MULTI, // 多生产者new EventFactory<Order>() {@Overridepublic Order newInstance() {return new Order();}}, 1024 * 1024, new YieldingWaitStrategy());// 2. 创建 ringBuffer 屏障SequenceBarrier sequenceBarrier = ringBuffer.newBarrier();// 3. 创建多个消费者数组ConsumerHandler[] consumers = new ConsumerHandler[10];for (int i = 0; i < consumers.length; i++) {consumers[i] = new ConsumerHandler("C" + i);}// 4. 构建多消费者工作池WorkerPool<Order> workerPool = new WorkerPool<Order>(ringBuffer, sequenceBarrier, new EventExceptionHandler(), consumers);// 5. 设置多个消费者的 sequence 序号,用于单独统计消费者的消费进度。消费进度让RingBuffer知道ringBuffer.addGatingSequences(workerPool.getWorkerSequences());// 6. 启动 workPoolworkerPool.start(Executors.newFixedThreadPool(5)); // 在实际开发,自定义线程池。final CountDownLatch latch = new CountDownLatch(1);// 100 个生产者向 ringBuffer 生产数据,每个生产者发送 100 个数据,共 10000 个数据for (int i = 0; i < 100; i ++) {final Producer producer = new Producer(ringBuffer);new Thread(new Runnable() {@Overridepublic void run() {try {// 先等待创建完 100 个生产者之后,再发送数据latch.await();} catch (Exception e) {e.printStackTrace();}// 每个生产者发送 100 个数据for (int j = 0; j < 100; j ++) {producer.sendData(UUID.randomUUID().toString());}}}).start();}// 把所有线程都创建完TimeUnit.SECONDS.sleep(2);// 唤醒线程让生产者开始发送数据,开始运行100个线程latch.countDown();// 等待数据发送完毕TimeUnit.SECONDS.sleep(10);System.out.println("任务总数:" + consumers[0].getCount());}static class EventExceptionHandler implements ExceptionHandler<Order> {//消费时出现异常@Overridepublic void handleEventException(Throwable throwable, long l, Order order) {}//启动时出现异常@Overridepublic void handleOnStartException(Throwable throwable) {}//停止时出现异常@Overridepublic void handleOnShutdownException(Throwable throwable) {}}
}
Disruptor 与 Netty 结合大幅提高数据处理性能
使用 Netty 接收处理数据时,不要在工作线程上进行处理,降低 Netty 性能,可以使用异步机制,通过线程池来处理,异步处理的话,就是用 Disruptor 来作为任务队列即可
即在 Netty 收到处理数据请求时,封装成一个事件,向 Disruptor 中推送,再通过多消费者来进行处理,可以提升 Netty 处理数据时的性能,流程图如下(绿色部分为通过 Disruptor 优化部分):
相关文章:

高性能队列框架-Disruptor使用、Netty结合Disruptor大幅提高数据处理性能
高性能队列框架-Disruptor 首先介绍一下 Disruptor 框架,Disruptor是一个通用解决方案,用于解决并发编程中的难题(低延迟与高吞吐量),Disruptor 在高并发场景下性能表现很好,如果有这方面需要,…...

Linux学习笔记3 xshell(lnmp)
xshell能连接虚拟机的前提是真机能够ping通虚拟机网址 装OpenSSL依赖文件 [rootlocalhost nginx-1.12.2]# yum -y install openssl pcre-devel 依赖检测[rootlocalhost nginx-1.12.2]# ./configure [rootlocalhost nginx-1.12.2]# yum -y install zlib [rootlocalhost n…...

分享几个可以免费使用GPT工具
1. 国产可以使用GPT3.5和4.0的网站,每日有免费的使用额度,响应速度,注册时不用使用手机号,等个人信息,注重用户隐私,好评! 一个好用的ChatGPT系统 ,可以免费使用3.5 和 4.0https://…...

一篇文章带你快速入门 Nuxt.js 服务端渲染
1. Nuxt.js 概述 1.1 我们一起做过的SPA SPA(single page web application)单页 Web 应用,Web 不再是一张张页面,而是一个整体的应用,一个由路由系统、数据系统、页面(组件)系统等等࿰…...

导入JDBC元数据到Apache Atlas
前言 前期实现了导入MySQL元数据到Apache Atlas, 由于是初步版本,且功能参照Atlas Hive Hook,实现的不够完美 本期对功能进行改进,实现了导入多种关系型数据库元数据到Apache Atlas 数据库schema与catalog 按照SQL标准的解释,…...

大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现
大数据项目——基于Django/协同过滤算法的房源可视化分析推荐系统的设计与实现 技术栈:大数据爬虫/机器学习学习算法/数据分析与挖掘/大数据可视化/Django框架/Mysql数据库 本项目基于 Django框架开发的房屋可视化分析推荐系统。这个系统结合了大数据爬虫、机器学…...

[网鼎杯 2020 朱雀组]phpweb1
提示 call_user_func()函数先通过php内置函数来进行代码审计绕过system(##不止一种方法) 拿到题目养成一个好的习惯先抓个包 从抓到的包以及它首页的报错来看,这里死活会post传输两个参数func以及p func传输函数,而p则是传输参数的…...

深度学习之注意力机制
注意力机制与外部记忆 注意力机制与记忆增强网络是相辅相成的,神经网络去从内存中或者外部记忆中选出与当前输入相关的内容时需要注意力机制,而在注意力机制的很多应用场景中,我们的外部信息也可以看作是一个外部的记忆 这是一个阅读理解任务…...

WordPress:解决xmlrpc.php被扫描爆破的风险
使用WordPress的朋友都知道,一些【垃圾渣渣】会利用xmlrpc.php文件来进行攻击,绕过WP后台错误登录次数限制进行爆破。虽然密码复杂的极难爆破,但及其占用服务器资源。 方法一、利用宝塔防火墙(收费版) 一般可以直接使…...

Fiddler抓包模拟器(雷电模拟器)
Fiddler设置 List item 打开fiddler,的options 点击OK,重启fiddler 模拟器 更改网络设置 IP可以在电脑上终端上查看 然后在模拟器浏览器中输入IP:端口 安装证书...

RepidJson将内容写入文件
使用 RapidJSON 将内容写入文件的步骤如下: 创建一个 rapidjson::Document 对象,将需要写入文件的内容存储到其中。创建一个 rapidjson::StringBuffer 对象来保存 JSON 字符串。将 rapidjson::Document 对象转换为 JSON 字符串,并将其放入 r…...

Endnote使用教程
原由 最近要进行开题报告,要求不低于60文献的阅读与引用,单独插入引入我觉得是非常繁琐的事情,所以就借助Endnote这个工具,减少我们的工作量。 使用方法 第一步:先新建一个数据库,这样子可以在这个数据库…...

java中用Thead创建线程和用Runnable创建线程的区别是什么?
在 Java 中,创建线程的两种主要方式是通过继承 Thread 类和通过实现 Runnable 接口。下面是它们之间的主要区别: 1. 继承 Thread 类: class MyThread extends Thread {public void run() {// 线程执行的代码} }// 创建并启动线程 MyThread …...

0013Java程序设计-基于Vue的上课签到系统的设计与实现
文章目录 **摘 要**目录系统设计4.2学生签到4.3 签到信息列表4.4 用户信息管理5.1系统登录5.1.1 登录5.1.2 清除用户登记记录5.1.3 登录拦截 5.2用户管理5.2.2 用户添加5.2.3 用户编辑5.2.4 用户删除5.2.5 用户分页 5.3签到信息5.3.1签到信息列表 5.4学生签到5.4.1学生签到 开发…...

2.修改列名与列的数据类型
修改字段名与字段数据类型 1.修改字段名 有时,在我们建好一张表后会突然发现,哎呀!字段名貌似写错了!怎么办?要删了表再重新建一个新表吗?还是要删了这个字段再新建一个新的字段? 都不用&…...

[Firefly-Linux] RK3568 Ubuntu固件分区详解
RK为了方便开发与产品定制,自己定义了一套固件的分区,这些分区信息存放在parameter.txt文件中,Firefly参考这个文件定义了自己的Ubuntu分区,文件为parameter-ubuntu.txt,存放于Linux_SDK的device/rockchip/rk356x目录下…...

SpringBoot项目访问resources下的静态资源
1.新建一个配置文件夹,放配置类 2.编辑 WebMvcConfig.java package com.southwind.configuration;import org.springframework.context.annotation.Configuration; import org.springframework.web.servlet.config.annotation.ResourceHandlerRegistry; import or…...

Qt之面试经验
1.恒生芸擎网络 技术没怎么问,一面问对方工作日常会涉及的一些东西(自动发布),二面公司流程,三面其他(没发offer) 2.光珀智能科技 涉及AI算法落地,问了点基础问题,比如…...

数据库基础概念与范式反范式总结
文章目录 一、基本概念1、属性2、元组3、关系4、超键5、候选键6、主键7、主属性8、外键9、函数依赖完全依赖 二、数据库范式1、第一范式(1NF)2、第二范式(2NF)3、第三范式(3NF)4、巴斯-科德范式(…...

tanstack/react-query使用手册
1. useQuery useQuery的使用一、data是后端成功返回的数据, 第一次的值为undefined 二、isLoading是指数据是否正在加载的状态,通常用于判断请求是否还在进行中。当isLoading为true时,表示数据正在加载中,当isLoading为false时&a…...

camera2对摄像头编码h264
MediaCodec编码摄像头数据 前置:保存的一些成员变量 // 摄像头开启的 handler private Handler cameraHandler; // Camera session 会话 handler private Handler sessionHandler; //这里是个Context都行 private AppCompatActivity mActivity; // 这个摄像头所有需…...

Apache solr XXE 漏洞(CVE-2017-12629)
任务一: 复现环境中的漏洞 任务二: 利用XXE漏洞发送HTTP请求,在VPS服务器端接受请求,或收到DNS记录 任务三: 利用XXE漏洞读取本地的/etc/passwd文件 1.搭建环境 2.开始看wp的时候没有看懂为什么是core,然…...

HTML代码混淆技术:原理、应用和实现方法详解
HTML代码混淆是一种常用的反爬虫技术,它可以有效地防止爬虫对网站数据的抓取。本文将详细介绍HTML代码混淆技术的原理、应用以及实现方法,帮助大家更好地了解和运用这一技术。 一、HTML代码混淆的原理 HTML代码混淆是指将HTML源码通过特定的算法进行加…...

quickapp_快应用_系统接口应用
系统接口 在项目中使用到的接口都需要在配置文件manifest.json中声明,不然会报如下警告 [WARN] 请在 manifest.json 文件里声明项目代码中用到的接口: system.storage, service.account, system.package, system.webview[1]检查某app是否在手机上安装 官方文档&a…...

sqlmap400报错问题解决
python sqlmap.py -r sql.txt --batch --techniqueB --tamperspace2comment --risk 3 --force-ssl–batch 选项全部默认 不用再手动输入 –techniqueB 使用布尔盲注,该参数是指出要求使用的注入方式 –tamperspace2comment使用特殊脚本,space2comment是把…...

【S32DS报错】-2-提示Error while launching command:arm-none-eabi-gdb –version错误
目录 1 Error错误提示 2 Error错误原因 3 如何消除Error错误 结尾 【S32K3_MCAL从入门到精通】合集: S32K3_MCAL从入门到精通https://blog.csdn.net/qfmzhu/category_12519033.html 1 Error错误提示 使用S32DSJ-LinK下载程序,在Dedug Configurati…...

Windows核心编程 HOOK
目录 HOOK概述 HOOK API SetWindowsHookExA 函数(winuser.h) UnhookWindowsHookEx 函数(winuser.h) NextHookEx 函数(winuser.h) 局部钩子 全局钩子 为什么全局钩子需要用dll作为过程函数? HOOK概述 本质:Windows消系统的消息过滤器。 全局钩子…...

P4 Qt如何添加qss样式表文件和添加图片资源
目录 前言 01 添加图片资源文件 02 添加qss文件 前言 🎬 个人主页:ChenPi 🐻推荐专栏1: 《C_ChenPi的博客-CSDN博客》✨✨✨ 🔥 推荐专栏2: 《Qt基础_ChenPi的博客-CSDN博客》✨✨✨ 🌺本篇简介 :这一章…...

【华为OD题库-085】路灯照明II-Java
题目 在一条笔直的公路上安装了N个路灯,从位置0开始安装,路灯之间间距固定为100米。 每个路灯都有自己的照明半径,请计算第一个路灯和最后一个路灯之间,无法照明的区间的长度和。输入描述 第一行为一个数N,表示路灯个数…...

附录1、vuepress中的Markdown语法
# 一、标题 # 说明: #后面跟的内容就是标题,一个#就是一级标题,有几个#就是几级标题,例如2级标题就有两个##,markdown的2级和3级标题会默认自动作为子目录, 注意:#后面必须有个空格࿰…...