JCTools Mpsc源码详解(二) MpscArrayQueue
MpscArrayQueue是一个固定大小的环形数组队列,继承自ConcurrentCircularArrayQueue
MpscArrayQueue的特点:
- 环形队列
- 底层数据结构为数组
- 有界
看一下MpscArrayQueue的属性(填充类除外)---
//生产者索引
private volatile long producerIndex;
//生产者边界
private volatile long producerLimit;
//消费者索引
private volatile long consumerIndex;//继承自父类属性
//下标运算符
protected final long mask;
//消息存储数组
protected final E[] buffer;
首先,系统理解一下MpscArrayQueue的实现,MpscArrayQueue是一个循环数组队列,支持多生产者并发提交消息,重点依赖三个参数----pIndex,cIndex,producerLimit,看一下这三个参数在MpscArrayQueue中的作用,然后一步步深入他们是如何保证Mpsc运行机制的--
上图是普通情况下,cIndex,pIndex和producerlimit的关系,如图可以很容易得理解这三个参数的意义-
- cIndex表示消费者的消费索引,当消费者调用poll()方法出队的时候,获取到的便是cIndex索引指向的消息元素
- pIndex表示的是生产者的生产索引,当生产者调用offer方法入队的时候,将新的消息写入到pIndex指向的位置;
- producerlimit表示的队列边界,或者说pIndex和producerlimit之间表示的是队列空白位置,而cIndex和pIndex之间表示的是队列已经写入的位置
- 他们三者的关系也就很清晰了---cIndex<pIndex,pIndex<producerlimit----但是这里要注意的是,这三者的关系是在有界数组中的位置关系,而实际上MpscArrayQueue使用的是循环数组,所以,这三者的关系重新定义为:cIndex在pIndex位置之前,pIndex 在producerlimit位置之前,而producerlimit一般位于cIndex的前一个位置;
- 这里重点是为了解释逻辑上他们三者之间的位置关系,并非实际实现上的数组Index大小关系,另外,由于MpscArrayQueue的无锁实现,实际上在多线程并发的情况下有可能会短暂的出现这三者逻辑位置错误的情况,但是MpscArrayQueue通过自旋和内存屏障保证了最终数据一致性,以及生产消费之间的逻辑正确性,具体实现继续往下看;
由上图我们大概可以理解MpscArrayQueue的工作机制:
- 初始化时,pIndex指向数组头,也就是array[0]的位置,而producerlimit指向数组尾的位置----array[length-1],而cIndex可以理解为空,因为此时队列中没有消息数据;
- 生产消息---每次生产者提交消息的时候先获取当前pIndex的值,判断pIndex是否大于producerlimit,
- 如果pIndex小于等于producerlimit,则表明当前队列中还有空闲位置,则在pIndex的位置上写入新消息,并将pIndex自增;
- 如果此时pIndex大于producerlimit则说明队列已满,返回错误即可
- 消费消息----这里消费者先获取Index的值,并将cindex加一,然后判断是否大于pIndex
- 如果小于等于pIndex,则说明该位置上存在可消费的消息,则获取该消息的值即可
- 如果大于pIndex,说明消息队列此时是空的,返回错误即可
- 数组循环----数组循环主要体现在对producerlimit的维护上,主要在于一点:
- 当producerlimit到达数组尾的时候,这时候要判断数组头到cIndex之间是否有空闲位置,这个逻辑在概念上其实很好理解-----也就是说,每次消费者消费之后,要想办法及时将producerlimit的位置更新到cIndex的前一个位置,从而保证了Index可以循环使用数组中的空闲位置,如图--
- 当producerlimit到达数组尾的时候,这时候要判断数组头到cIndex之间是否有空闲位置,这个逻辑在概念上其实很好理解-----也就是说,每次消费者消费之后,要想办法及时将producerlimit的位置更新到cIndex的前一个位置,从而保证了Index可以循环使用数组中的空闲位置,如图--
当消费者对val1出队之后,此时producerlimit应该指向原来val1 的位置,也就是数组的头,从而保证生产者在数组满的时候可以从数组头的空闲位置生产消息;
上边主要从概念上解释了循环数组队列的生产消费实现,以及cIndex,pIndex,producerlimit这三个参数之间的逻辑关系,下边看一下MpscArrayQueue具体是怎么实现队列循环以及保证多生产者线程并发安全的--
先看一下三个参数的初始化----由于填充和继承的关系,略去了无关代码,只体现具体初始化的逻辑
- producerlimit初始化为数组容量capacity
- cIndex和pIndex初始化为0----没有显式初始化赋值;
然后看一下offer方法---
- 首先获取mask和producerlimit的值,mask可以理解为一个固定值,在队列初始化的时候就指定了mask值的大小,主要用来计算位移偏移量和producerlimit的值,这里的数学逻辑稍后解释
然后进入do-while循环,注意这里是do-while,先判断producerlimit和pIndex的大小关系,然后更新producerlimit的值,while循环的条件是cas自旋更新抢占pIndex的值是否正确看一下代码
---
do{//注意这里的do-While循环,循环体内主要获取并更新producerlimit,然后cas自旋获取pIndex锁,获取成功后再一次更新producerlimit;//这也是官方代码设计巧妙的一点,正常逻辑在生产者提交消息之前首先抢占PIndex的索引即可,但是这里更新producerlimit保证了在抢占pindex前pindex是有效的//获取producer indexpIndex = lvProducerIndex();if (pIndex >= producerLimit){//获取consumer indexfinal long cIndex = lvConsumerIndex();//计算producer limitproducerLimit = cIndex + mask + 1;if (pIndex >= producerLimit){return false;}else{//将生产者限制更新为下一个指数,我们必须重新检查消费者指数这很激烈,但竞争是良性的//这里的意思是---上边的if判断成运行到这里之后,可能有其他线程在此处并发修改了producerlimit,所以这里producerlimit的set是有竞争的,是线程不安全的,// 但是这种不安全最终对消息的提交和生产不会造成并发错误---这里有一个非常巧妙的设计,下边解释里会详细解释soProducerLimit(producerLimit);//这里对producerlimit的修改也是lazyset---底层调用了Unsafe类的putLongVolatile()方法}}}while (!casProducerIndex(pIndex, pIndex + 1));
这个循环可以理解为循环获取最新pIndex和producerlimit的值,然后比较二者的大小,最后cas抢占pIndex索引位置;当然producerlimit计算出新值之后要lazyset写回,这里有两点需要注意--
- 为什么使用do-while循环----这里主要是为了保证在抢占索引之前,这个索引是有效的,即pIndex<=producerlimit;否则就发生错误了,正常的while循环逻辑--
while(true){//死循环,自旋尝试抢占pIndexpIndex = lvProducerIndex();if (pIndex >= producerLimit){final long cIndex = lvConsumerIndex();producerLimit = cIndex + mask + 1;if (pIndex >= producerLimit){return false;}else{soProducerLimit(producerLimit);}}if(casProducerIndex(pIndex, pIndex + 1))break;}
- 为什么第一次判断pIndex>=producerlimit之后还要重新判断一次而不是直接退出循环----这里就是MpscArrayQueue的核心点之一----多生产者的实现了,非常关键!!!!(另外一个核心点就是循环数组的实现)
- 首先看一下第一次判断的producerlimit来自哪里----第一次producerlimit是在循环开始之前获取到的值,在循环开始之后判断第一次pIndex>=producerlimit直接退出是没有问题的---逻辑上不会产生错误,此时代码应该是这样子的--
long producerLimit = lvProducerLimit();while(true){//死循环,自旋尝试抢占pIndex//注意这里pIndex必须在循环内更新,每次抢占失败说明pIndex被其他线程更新了,所以要重新获取pIndex;pIndex = lvProducerIndex();if (pIndex >= producerLimit){return false;}if(casProducerIndex(pIndex, pIndex + 1))break;}
- 这个逻辑下,假设循环开始的时候producerlimit的值是10,pIndex第一次获取到的值是5,这时有两种情况--
- 在pIndex为8的时候抢占成功了,开始执行数组写入的逻辑,没有问题,
- 在pIndex为10的时候都没有抢占陈工,也就是走了return false的逻辑,生产者此次提交消息失败了,只能等待下次提交;
- 但是这里有个问题就是,在cas自旋的时候,producerlimit的值可能会更新(这里先不考虑在哪里更新的问题),所以当pIndex从5抢占到10这个过程中,上面的逻辑没有办法及时获取到最新的producerlimit的值,只能用循环开始前获取到的producerlimit的值做判断---这里逻辑上是没有问题的,producerlimit只会往后移,我们拿到的值不会产生错误;
- 所以为了解决3中的问题就应该将producerlimit每次在循环内更新,所以代码变成了这样--
while(true){//死循环,自旋尝试抢占pIndex//注意这里pIndex必须在循环内更新,每次抢占失败说明pIndex被其他线程更新了,所以要重新获取pIndex;pIndex = lvProducerIndex();//重新计算producerlimit的值final long cIndex = lvConsumerIndex();producerLimit = cIndex + mask + 1;if (pIndex >= producerLimit){return false;}soProducerLimit(producerLimit);if(casProducerIndex(pIndex, pIndex + 1))break;}
- 这样的话逻辑上就完全没有问题了,每次cas自旋都能获取到最新的pIndex和producerlimit的值,判断队列是否满了,队列没满的情况下就写入;否则失败;
- 但是这样处理的话会有另一个问题---性能问题,每次pIndex和producerlimit判断的时候都要重新计算producer的值,而producerlimit是volatile修饰的,读写都会加上很多内存屏障,另外producerlimit本身表示的是队列可写容量,在可写容量没满的情况下我们其实不需要获取或者更新最新的producerlimit的值,比如在上述情况1中,当cas自旋到8的时候就抢占成功了,这时我们保证了8是可以用的(producerlimit只会后移),但实际上producerlimit更新到了15还是20本次消息提交是不需要关心的,为此做的内存操作显然是没有太大必要的,我们只需要一开始的一个缓存值就可以了;为了保证更新的问题,只有当当前线程一直到producerlimit的时候都没有对pIndex抢占成功才需要重新计算producerlimit,然后判断是否需要继续自旋;
- 所以问题又回到了MpscArrayQueue中的实现---一开始获取一个producerlimit的快照,当快照容量内的pIndex都抢占失败之后才获取计算新的producerlimit的值,同时更新这个新的producerlimit的值-----这样一来,每个生产者对producerlimit的值的获取和更新都变成了懒加载机制----真的很巧妙的实现!!!!
- 这里再采用一下官方的注释--
"使用consumer index的缓存视图,可能在循环中更新"
- 首先看一下第一次判断的producerlimit来自哪里----第一次producerlimit是在循环开始之前获取到的值,在循环开始之后判断第一次pIndex>=producerlimit直接退出是没有问题的---逻辑上不会产生错误,此时代码应该是这样子的--
- 然后再看看关于producerlimit的更新的问题----producerlimit真的是一直后移的吗?这里直接贴出来官方注释------"将producerlimit更新为下一个指数,我们必须重新检查消费者指数这很激烈,但竞争是良性的"
- 怎么理解
public boolean offer(final E e){if (null == e){throw new NullPointerException();}//官方注释--- 使用consumer index的缓存视图可能在循环中更新---// 这句话怎么理解--producer limit计算方式--// producerLimit = cIndex + mask + 1//可以看出producer limit的值跟mask和consumer limit有关,//首先看一下mask的值--// int actualCapacity = Pow2.roundToPowerOfTwo(capacity);// mask = actualCapacity - 1;//可以看出mask的值是跟capacity有关,而容量是我们初始化队列的时候就定义好的值,所以运行过程中producerlimit的值的变化取决于consumer index,//而这里获取到producerlimit的值之后才进入cas自旋更新pIndex,所以在自旋的过程中consumerindex的值可能会发生改变,导致producerlimit的值发生改变//所以这里在自旋开始前获取到的producer limit的值是源于cIndex的一个缓存值--在自旋成功后该值未必是正确的--这也导致了自旋成功后第二次判断producerlimit的值;//其实这里可以先不获取producerlimit的值,自旋跟新pIndex的值之后再获取,而官方这里提前获取可能是为了优化性能---断定这里cIndex被修改的可能性小(因为是单消费者)final long mask = this.mask;long producerLimit = lvProducerLimit();long pIndex;do{//注意这里的do-While循环,循环体内主要获取并更新producerlimit,然后cas自旋获取pIndex锁,获取成功后再一次更新producerlimit;//这也是官方代码设计巧妙的一点,正常逻辑在生产者提交消息之前首先抢占PIndex的索引即可,但是这里更新producerlimit保证了在抢占pindex前pindex是有效的//获取producer indexpIndex = lvProducerIndex();if (pIndex >= producerLimit){//获取consumer indexfinal long cIndex = lvConsumerIndex();//计算producer limitproducerLimit = cIndex + mask + 1;if (pIndex >= producerLimit){return false;}else{//将生产者限制更新为下一个指数,我们必须重新检查消费者指数这很激烈,但竞争是良性的//这里的意思是---上边的if判断成运行到这里之后,可能有其他线程在此处并发修改了producerlimit,所以这里producerlimit的set是有竞争的,是线程不安全的,// 但是这种不安全最终对消息的提交和生产不会造成并发错误---这里有一个非常巧妙的设计,下边解释里会详细解释soProducerLimit(producerLimit);//这里对producerlimit的修改也是lazyset---底层调用了Unsafe类的putLongVolatile()方法}}}while (!casProducerIndex(pIndex, pIndex + 1));/*注意:新的pindex在数组中的元素之前可见。如果我们依赖poll()的索引可见性,我们将需要处理元素不可见的情况。* ---这句官方注释怎么理解:* 多生产者为了实现生产者并发生产消息,每个生产者在抢占到Pindex之后会先将pindex暴露出去,提供其他的生产者抢占,之后才对具体的消息进行lazyset,这里就有一个问题了,消费者也能看到这个pIndex,所以消费者想* 消费这个pIndex对应的消息的时候有可能这个时候生产者还未实际进行写入,或者写入不可见,所以在消费者poll的时候要处理这中情况* */// Won CAS, move on to storingfinal long offset = calcCircularRefElementOffset(pIndex, mask);// REF_ARRAY_BASE + ((index & mask) << REF_ELEMENT_SHIFT);//REF_ARRAY_BASE表示数组初始位置,相当于0,REF_ELEMENT_SHIFT为2,这里左移两位相当于×4//重点看一下index&mask, index 表示当前生产者线程获取到的index,mask的计算方式--// int actualCapacity = Pow2.roundToPowerOfTwo(capacity)---从中可以找到下一个二次幂的值。//返回下一个2的正幂,如果是2的幂,则返回该值。负值映射到1。// mask = actualCapacity - 1;soRefElement(buffer, offset, e);return true; // AWESOME :)}
相关文章:

JCTools Mpsc源码详解(二) MpscArrayQueue
MpscArrayQueue是一个固定大小的环形数组队列,继承自ConcurrentCircularArrayQueue MpscArrayQueue的特点: 环形队列底层数据结构为数组有界 看一下MpscArrayQueue的属性(填充类除外)--- //生产者索引 private volatile long producerIndex; //生产者边界 private volatile…...

前端面试的性能优化部分(13)每天10个小知识点
目录 系列文章目录前端面试的性能优化部分(1)每天10个小知识点前端面试的性能优化部分(2)每天10个小知识点前端面试的性能优化部分(3)每天10个小知识点前端面试的性能优化部分(4)每天…...

C++ STL无序关联式容器(详解)
STL无序关联式容器 继 map、multimap、set、multiset 关联式容器之后,从本节开始,再讲解一类“特殊”的关联式容器,它们常被称为“无序容器”、“哈希容器”或者“无序关联容器”。 注意,无序容器是 C 11 标准才正式引入到 STL 标…...

Python爬虫解析工具之xpath使用详解
文章目录 一、数据解析方式二、xpath介绍三、环境安装1. 插件安装2. 依赖库安装 四、xpath语法五、xpath语法在Python代码中的使用 一、数据解析方式 爬虫抓取到整个页面数据之后,我们需要从中提取出有价值的数据,无用的过滤掉。这个过程称为数据解析&a…...

Linux防火墙报错:Failed to start firewalld.service Unit is masked
Linux防火墙报错:Failed to start firewalld.service: Unit is masked. 1、故障现象: 启动防火墙失败,报错情况如下: systemctl start firewalld # 报错: Failed to start firewalld.service: Unit is masked.原因是…...

前端面试:【Vuex】Vue.js的状态管理利器
嗨,亲爱的Vuex探险家!在Vue.js开发的旅程中,有一个强大的状态管理库,那就是Vuex。Vuex是Vue.js的官方状态管理工具,通过State、Mutation、Action和Module等核心概念,协助你轻松管理应用的状态。 1. 什么是V…...
Kotlin协程runBlocking并发launch,Semaphore同步1个launch任务运行
Kotlin协程runBlocking并发launch,Semaphore同步1个launch任务运行 <dependency><groupId>org.jetbrains.kotlinx</groupId><artifactId>kotlinx-coroutines-core</artifactId><version>1.7.3</version><type>pom&…...

c++ Union之妙用
union的作用基本是它里面的变量都用了同一块内存,跟起了别名一样,类型不一样的别名。 基本用法: struct Union{union {float a;int b;};};Union u;u.a 2.0f;std::cout << u.a << "," << u.b << std::endl…...

JSON的处理
1、JSON JSON(JavaScript Object Notation):是一种轻量级的数据交换格式。 它是基于 ECMAScript 规范的一个子集,采用完全独立于编程语言的文本格式来存储和表示数据。 简洁和清晰的层次结构使得 JSON 成为理想的数据交换语言。易于人阅读和编写&#…...

matlab使用教程(20)—插值基础
1.网格和散点样本数据 插值是在位于一组样本数据点域中的查询位置进行函数值估算的方法。函数值是根据最接近查询点的样本数据点计算的。MATLAB 根据样本数据的结构,可以执行两种插值。样本数据可以形成网格,也可以是分散的。 网格化的样本数据使得插值…...

Python功能制作之简单的3D特效
需要导入的库: pygame: 这是一个游戏开发库,用于创建多媒体应用程序,提供了处理图形、声音和输入的功能。 from pygame.locals import *: 导入pygame库中的常量和函数,用于处理事件和输入。 OpenGL.GL: 这是OpenGL的Python绑定…...

leetcode-5-最长回文串
题目描述 给你一个字符串 s,找到 s 中最长的回文子串。 如果字符串的反序与原始字符串相同,则该字符串称为回文字符串。 示例 1: 输入:s “babad” 输出:“bab” 解释:“aba” 同样是符合题意的答案。 示…...

二、Oracle 数据库安装集
一、CentOS 安装 OCI下载地址 1. 启动 # 1. 登录服务器,切换到oracle用户,或者以oracle用户登录 su - oracle# 2. 打开监听服务 lsnrctl start# 3. 查看Oracle监听器运行状况 lsnrctl status# 4. 以sys用户身份登录 sqlplus /nolog# 5. 切换用户conn 用…...

【Python】Python中的常用函数及用法
目录 输入输出类型转换引用哈希字符串常用操作判断类型查找替换大小写转换文本对齐去除空白字符拆分和连接 列表常用操作增删改查增删改统计排序 元组常用操作 字典常用操作 范围随机数学比较常用函数三角函数数学常量 输入 input():从键盘等待用户的输入࿰…...

基于JavaEE的ssm公司员工信息管理系统的设计与实现
基于JavaEE的ssm公司员工信息管理系统的设计与实现043 开发工具:idea 数据库mysql5.7 数据库链接工具:navcat,小海豚等 技术:ssm 摘 要 现代经济快节奏发展以及不断完善升级的信息化技术,让传统数据信息的管理升级为软件存…...

cornerstoneJS加载图片(base、矩阵)
cornerstoneJS默认加载dicom影像数据,将识别到的dicom数据转换成imageData数据,在界面上展示。故,cornerstoneJS也可直接加载imageData。 imageData数据的data是一个数组,每四个元素代表一个点,四个元素分别表示R、G、…...

3.Trunc截断函数用法
TRUNC函数用于对值进行截断 用法有两种:TRUNC(NUMBER)表示截断数字,TRUNC(date)表示截断日期 (1)截断数字 格式:TRUNC(n1,n2),n1表示被截断的数字…...

腾讯云 CODING 荣获 TiD 质量竞争力大会 2023 软件研发优秀案例
点击链接了解详情 8 月 13-16 日,由中关村智联软件服务业质量创新联盟主办的第十届 TiD 2023 质量竞争力大会在北京国家会议中心召开。本次大会以“聚焦数字化转型 探索智能软件研发”为主题,聚焦智能化测试工程、数据要素、元宇宙、数字化转型、产融合作…...

VSCode如何为远程安装预设(固定)扩展
背景 在使用VSCode进行远程开发时(python开发之远程开发工具选择_CodingInCV的博客-CSDN博客),特别是远程的机器经常变化时(如机器来源于动态分配),每次连接新的远程时,都不得不手动安装一些开…...

一文解析HTTP与HTTPS,它们的区别和联系
一文解析HTTP与HTTPS,它们的区别和联系 HTTP和HTTPS之间不同点 尽管HTTP和HTTPS在安全性方面存在差异,但它们仍然共享许多相同的基本特征和功能。这些相同点使得HTTP成为广泛应用的标准协议,并且HTTPS作为更安全的替代方案被广泛采用。HTTP…...

Faster RCNN网络数据流总结
前言 在学习Faster RCNN时,看了许多别人写的博客。看了以后,对Faster RCNN整理有了一个大概的了解,但是对训练时网络内部的数据流还不是很清楚,所以在结合这个版本的faster rcnn代码情况下,对网络数据流进行总结。以便…...

拒绝摆烂!C语言练习打卡第五天
🔥博客主页:小王又困了 📚系列专栏:每日一练 🌟人之为学,不日近则日退 ❤️感谢大家点赞👍收藏⭐评论✍️ 目录 一、选择题 📝1.第一题 📝2.第二题 Ὅ…...

关于LambdaQueryWrapper.or()导致错误
这个是原始的代码,到导致一个问题,后面所有的内容,都在这个or的右边,也就是整个查询语句就这一个or,而很明显( xxx or xxx)and()这才是我们要的,所以需要将这…...

Day17-Node后端身份认证-JWT
Day17-Node后端身份验证 一 密码加密 1 MD5加密 创建MD5.js//node提供了一个内置模块crypto用于密码加密 const crypto = require("crypto")module.exports.getMd5 = function(password){const md5...

onvif中imaging setting图像画质总结!
前言: 大家好,今天给大家来分享一篇关于图像质量的内容,这个内容是我在做onvif中的imaging setting的时候,关注到里面有关于: brightness(亮度)color saturation(色彩饱和度)contrast(对比度)sharpness(锐度)white balance(白平衡…...

not in效率低(MYSQL的Not IN、not EXISTS如何优化)
【版权所有,文章允许转载,但须以链接方式注明源地址,否则追究法律责任】【创作不易,点个赞就是对我最大的支持】 前言 仅作为学习笔记,供大家参考 总结的不错的话,记得点赞收藏关注哦! 目录 …...

微信小程序拉起支付报: 调用支付JSAPI缺少参数: total_fee
1. 调用支付JSAPI缺少参数: total_fee 2. 检查返回给前端调起支付的参数是否正确 一开始是params.put("package", prepay_id); 回来改回params.put("package", "prepay_id"prepay_id);...

Thinkphp6 如何 生成二维码
最近需要用到使用到二维码,需要将对应的网址输出生成二维码,Thinkphp6实现还是比较简单的: 第一步:安装 think-qrcode composer require dh2y/think-qrcode第二步:在对应的控制器使用 use dh2y\qrcode\QRcode;第三步&a…...

01.机器学习引言
1.机器学习的步骤 1. 数据搜集 其中数据划分,是将数据集分为训练集、验证集和测试集(通常不考虑时间) 2. 数据清洗 3. 特征工程 提取对象:原始数据(特征提取一般在特征选择之前) 提取目的:…...

结构型(二) - 桥接模式
一、概念 桥接模式(Bridge Pattern):是用于把抽象化与实现化解耦,使得二者可以独立变化。它通过提供抽象化和实现化之间的桥接结构,来实现二者的解耦。 另一种理解方式:一个类存在两个(或多个…...