绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务
disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1 和eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandler 和workHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费
定义的哈希函数接口和统一的基础父类
public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}
测试代码
public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}相关文章:
绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务
disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHa…...
单例设计模式双重检查的作用
先看双重校验锁的写法 public class Singleton {/*volatile 修饰,singleton new Singleton() 可以拆解为3步:1、分配对象内存(给singleton分配内存)2、调用构造器方法,执行初始化(调用 Singleton 的构造函数来初始化成员变量&am…...
NGINX_十二 nginx 地址重写 rewrite
十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite,即URL重写,就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化,是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...
react用ECharts实现组织架构图
找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...
坚持刷题|合并有序链表
文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello,大家好,我是阿月。坚持刷题,老年痴呆追不上我,消失了一段时间,我又回来刷题啦,今天…...
SPI协议——对外部SPI Flash操作
目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区(每个块有16个扇区) 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页,每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...
kotlin类型检测与类型转换
一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...
【JDBC】Oracle数据库连接问题记录
Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载,可以先尝试使用下面方式加载检查类是否存在,如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...
leetcode45 跳跃游戏II
题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说,如果你在 nums[i] 处,你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...
【数学】什么是方法矩估计?和最大似然估计是什么关系?
背景 方法矩估计(Method of Moments Estimation)和最大似然估计(Maximum Likelihood Estimation, MLE)是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系,通过样本数据计算样本矩来估计总体参数。最大似…...
C++初学者指南第一步---10.内存(基础)
C初学者指南第一步—10.内存(基础) 文章目录 C初学者指南第一步---10.内存(基础)1.内存模型1.1 纸上谈兵:C的抽象内存模型1.2 实践:内存的实际处理 2. 自动存储3.动态存储:std::vector3.1 动态内…...
扩散模型详细推导过程——编码与解码
符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)第 i i i个训练数据,其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...
js如何实现开屏弹窗
开屏弹窗是什么,其实就是第一次登录后进入页面给你的一种公告提示,此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...
C#——文件读取Directory类详情
文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...
Ruby on Rails Post项目设置网站初始界面
在构建了Ruby的Web服务器后,第三步就可以去掉框架的官方页面,设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...
03-QTWebEngine中使用qtvirtualkeyboard
qt提供了 virtualKeyboard 虚拟键盘模块,只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题: 1、中文输入的时候没有输入提示 Qvirt…...
leetcode3无重复字符的最长字串(重点讲滑动窗口)
本文主要讲解无重复字符的最长字串的要点与细节,根据步骤一步步走更方便理解 c与java代码如下,末尾 具体要点: 1. 区分一下子串和子序列 子串:要求元素在母串中是连续地出现 子序列:不要求连续 2. 题目中有两个核心…...
Gobject tutorial 八
The GObject base class Object memory management Gobject的内存管理相关的API很复杂,但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下,与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...
DDMA信号处理以及数据处理的流程---cfar检测
Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...
【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现
📝个人主页:哈__ 期待您的关注 目录 📕引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 🤖决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...
使用Python运行VirtualLab Fusion光学仿真
摘要 VirtualLab Fusion允许Python外部访问其建模技术、求解器和结果。这个用例介绍了一种使用路径变量和Visual Studio代码将Python连接到VirtualLab Fusion的简单方法。在本示例中,我们将演示如何使用Python脚本运行光学仿真,以向用户简要概述这种跨…...
SSCOM串口助手5个隐藏技巧:多窗口同步调试效率翻倍(附配置截图)
SSCOM串口助手5个隐藏技巧:多窗口同步调试效率翻倍(附配置截图) 在嵌入式开发和硬件调试领域,串口通信工具的效率直接影响着工程师的工作节奏。SSCOM作为一款广受欢迎的串口调试助手,其简洁界面背后隐藏着许多能显著提…...
Power BI视觉对象交互设计秘籍--巧用书签按钮实现动态提示
1. 为什么需要动态提示功能? 做数据分析报表最怕什么?不是数据不准,而是看报表的人看不懂。我见过太多这样的场景:精心设计的柱状图被用户误读,复杂的折线图被理解成完全相反的趋势。这时候你会想,要是有个…...
告别Xshell!Mac上这款免费串口工具CoolTerm,固件调试日志记录真香了
告别Xshell!Mac上这款免费串口工具CoolTerm,固件调试日志记录真香了 从Windows切换到Mac平台的嵌入式开发者,最头疼的莫过于找不到趁手的串口调试工具。Xshell和SecureCRT在Windows上堪称神器,但它们的Mac版本要么收费高昂&#…...
避坑指南:uniapp调用支付宝授权时常见的5个错误及解决方案
Uniapp支付宝授权实战:5个高频错误与深度解决方案 移动应用开发中,第三方授权登录是提升用户体验的关键环节。作为国内主流支付平台,支付宝授权在电商、生活服务类App中应用广泛。但许多Uniapp开发者在实现支付宝授权功能时,总会遇…...
VeraCrypt加密卷功能解析与个性化配置指南
VeraCrypt加密卷功能解析与个性化配置指南 【免费下载链接】VeraCrypt Disk encryption with strong security based on TrueCrypt 项目地址: https://gitcode.com/GitHub_Trending/ve/VeraCrypt VeraCrypt作为一款基于TrueCrypt的开源磁盘加密工具,提供了强…...
轴,V带轮,斜齿轮,丝杠零件图CAD图纸
轴作为机械系统中的核心传动部件,承担着传递扭矩与支撑旋转的重要功能。其设计需综合考虑材料强度、刚度及热处理工艺,以确保在复杂载荷下保持稳定运行。典型结构包含阶梯轴、空心轴等类型,通过优化轴肩定位与键槽布局,可有效提升…...
Pixel Dream Workshop 在电商领域的应用:一键生成商品场景图
Pixel Dream Workshop 在电商领域的应用:一键生成商品场景图 1. 电商商品图的痛点与机遇 电商行业有个公开的秘密:商品图片的制作成本往往比想象中高得多。我们曾合作过的一家服装电商,每月仅模特拍摄费用就超过20万元,这还不包…...
Windows环境下Jaeger全链路监控系统搭建指南
1. 为什么需要全链路监控系统 在微服务架构中,一个用户请求可能会经过多个服务的处理。想象一下,你在电商网站下单时,这个操作会触发订单服务、支付服务、库存服务等多个系统的协同工作。当出现问题时,传统的日志排查就像在迷宫里…...
从 99.8% 到 14.9%:Paperxie AI 降重,让论文 AIGC 焦虑彻底成为过去式
paperxie-免费查重复率aigc检测/开题报告/毕业论文/智能排版/文献综述/AIPPThttps://www.paperxie.cn/weight?type1https://www.paperxie.cn/weight?type1 一、写在前面:被 AIGC 检测支配的论文焦虑,终于有解了 当知网、维普等平台全面升级 AIGC 检测…...
