当前位置: 首页 > news >正文

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandlerworkHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费

定义的哈希函数接口和统一的基础父类

public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>*     默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>*     建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}

测试代码

public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile  int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}

相关文章:

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器&#xff0c;一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务&#xff0c;WorkHandler可以共同竞争消费同一个队列中的任务。也就是说&#xff0c;假设任务队列中有a、b、c、d三个事件&#xff0c;eventHa…...

单例设计模式双重检查的作用

先看双重校验锁的写法 public class Singleton {/*volatile 修饰&#xff0c;singleton new Singleton() 可以拆解为3步&#xff1a;1、分配对象内存(给singleton分配内存)2、调用构造器方法&#xff0c;执行初始化&#xff08;调用 Singleton 的构造函数来初始化成员变量&am…...

NGINX_十二 nginx 地址重写 rewrite

十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite&#xff0c;即URL重写&#xff0c;就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化&#xff0c;是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...

react用ECharts实现组织架构图

找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...

坚持刷题|合并有序链表

文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello&#xff0c;大家好&#xff0c;我是阿月。坚持刷题&#xff0c;老年痴呆追不上我&#xff0c;消失了一段时间&#xff0c;我又回来刷题啦&#xff0c;今天…...

SPI协议——对外部SPI Flash操作

目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区&#xff08;每个块有16个扇区&#xff09; 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页&#xff0c;每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...

kotlin类型检测与类型转换

一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...

【JDBC】Oracle数据库连接问题记录

Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载&#xff0c;可以先尝试使用下面方式加载检查类是否存在&#xff0c;如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...

leetcode45 跳跃游戏II

题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...

【数学】什么是方法矩估计?和最大似然估计是什么关系?

背景 方法矩估计&#xff08;Method of Moments Estimation&#xff09;和最大似然估计&#xff08;Maximum Likelihood Estimation, MLE&#xff09;是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系&#xff0c;通过样本数据计算样本矩来估计总体参数。最大似…...

C++初学者指南第一步---10.内存(基础)

C初学者指南第一步—10.内存&#xff08;基础&#xff09; 文章目录 C初学者指南第一步---10.内存&#xff08;基础&#xff09;1.内存模型1.1 纸上谈兵&#xff1a;C的抽象内存模型1.2 实践&#xff1a;内存的实际处理 2. 自动存储3.动态存储&#xff1a;std::vector3.1 动态内…...

扩散模型详细推导过程——编码与解码

符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)​第 i i i个训练数据&#xff0c;其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)​第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...

js如何实现开屏弹窗

开屏弹窗是什么&#xff0c;其实就是第一次登录后进入页面给你的一种公告提示&#xff0c;此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...

C#——文件读取Directory类详情

文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...

Ruby on Rails Post项目设置网站初始界面

在构建了Ruby的Web服务器后&#xff0c;第三步就可以去掉框架的官方页面&#xff0c;设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...

03-QTWebEngine中使用qtvirtualkeyboard

qt提供了 virtualKeyboard 虚拟键盘模块&#xff0c;只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题&#xff1a; 1、中文输入的时候没有输入提示 Qvirt…...

leetcode3无重复字符的最长字串(重点讲滑动窗口)

本文主要讲解无重复字符的最长字串的要点与细节&#xff0c;根据步骤一步步走更方便理解 c与java代码如下&#xff0c;末尾 具体要点&#xff1a; 1. 区分一下子串和子序列 子串&#xff1a;要求元素在母串中是连续地出现 子序列&#xff1a;不要求连续 2. 题目中有两个核心…...

Gobject tutorial 八

The GObject base class Object memory management Gobject的内存管理相关的API很复杂&#xff0c;但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下&#xff0c;与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...

DDMA信号处理以及数据处理的流程---cfar检测

Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...

【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 目录 &#x1f4d5;引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 &#x1f916;决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...

Zookeeper 集群节点故障剔除、切换、恢复原理

Zookeeper 集群节点故障剔除、切换、恢复原理 zookeeper 集群节点故障时,如何剔除节点,如果为领导节点如何处理,如何进行故障恢 复的,实现原理? 在 Zookeeper 集群中,当节点故障时,集群需要自动剔除故障节点并进行故障恢复,确保集群的高 可用性和一致性。具体来说,…...

解决帝国cms栏目管理拼音乱码的问题

帝国CMS7.5版本utf-8版网站后台增加栏目生成乱码的问题怎么解决 1、需要改一个函数&#xff0c;并且增加一个处理文件&#xff0c;方法如下&#xff1a; 修改e/class/connect.php文件&#xff0c;找到ReturnPinyinFun函数&#xff0c;如未修改文件在4533-4547行&#xff0c;将…...

Git快速入门

一 快速使用 1.1 初始化 什么是版本库呢&#xff1f;版本库又名仓库&#xff0c;可以简单理解成一个目录&#xff0c;这个目录里面的所有文件都可以被Git管理起来&#xff0c;每个文件的修改、删除&#xff0c;Git都能跟踪&#xff0c;以便任何时刻都可以追踪历史&#xff0…...

【18.0】JavaScript---事件案例

【18.0】JavaScript—事件案例 【一】开关灯事件 【介绍】设置一个按钮&#xff0c;按下按钮触发事件&#xff0c;来回切换圆形图片的颜色 【分析】 图片设置&#xff1a;设置成圆形的图片背景颜色&#xff1a;设置红绿两个颜色&#xff0c;来回切换按钮设置&#xff1a;点击…...

推荐系统三十六式学习笔记:原理篇.矩阵分解12|如果关注排序效果,那么这个模型可以帮到你

目录 矩阵分解的不足贝叶斯个性化排序AUC构造样本目标函数训练方法 总结 矩阵分解在推荐系统中的地位非常崇高。它既有协同过滤的血统&#xff0c;又有机器学习的基因&#xff0c;可以说是非常优秀了&#xff1b;但即便如此&#xff0c;传统的矩阵分解无论是在处理显式反馈&…...

Kafka之ISR机制的理解

文章目录 Kafka的基本概念什么是ISRISR的维护机制ISR的作用ISR相关配置参数同步过程示例代码总结 Kafka中的ISR&#xff08;In-Sync Replicas同步副本&#xff09;机制是确保数据高可用性和一致性的核心组件。 Kafka的基本概念 在Kafka中&#xff0c;数据被组织成主题&#xf…...

如何设计一个点赞系统

首先我们定义出一个点赞系统需要对外提供哪些接口&#xff1a; 1.用户对特定的消息进行点赞&#xff1b; 2.用户查看自己发布的某条消息点赞数量以及被哪些人赞过&#xff1b; 3.用户查看自己给哪些消息点赞过&#xff1b; 这里假设每条消息都有一个message_id, 每一个用户都…...

对象存储测试工具-s3cmd

一、环境安装 官网&#xff1a;https://s3tools.org/s3cmd 下载安装包&#xff1a;https://s3tools.org/download GitHub&#xff1a;https://github.com/s3tools/s3cmd/releases 本文安装包&#xff1a;https://github.com/s3tools/s3cmd/releases/download/v2.0.2/s3cmd-2.0…...

OpenCV--图像色彩空间及转换

图像色彩空间及转换 python代码和笔记 python代码和笔记 import cv2 色彩空间&#xff0c;基础&#xff1a;RGB或BGR OpenCV中&#xff1a; 一、HSV(HSB)&#xff1a;用的最多&#xff0c; Hue&#xff1a;色相-色彩(0-360)&#xff0c;红色&#xff1a;0&#xff0c;绿色&…...

RIP解决不连续子网问题

#交换设备 RIP解决不连续子网问题 一、不连续子网的概念 相同主网下的子网&#xff0c;被另一个主网分割&#xff0c;例如下面实验拓扑在某公司的网络整改项目中&#xff0c;原先R1 和RS 属于同一主网络 10.0.0.0/8&#xff0c;现被 R2、R3、R4 分离&#xff0c;整网采用了 …...

网站策划 要求/长沙百度开户

堆是堆(heap)&#xff0c;栈是栈(stack)&#xff0c;堆栈是栈。栈中分配的是基本类型和自定义对象的引用。堆中分配的是对象&#xff0c;也就是new出来的东西。 被所有线程共享。方法区/静态区 存放的是类信息和static变量、常量。 被所有线程共享。也可以…...

wap网站制作哪家好/seo专业学校

一开始,打算用正态波松,发现不行,原因是点的个数太多. 1,将两个点云分别采样100万个点,法线估计,SIFT关键点&#xff0c;PFH特征描述子提取 2&#xff0c;点云粗配准两个PFH特征描述子&#xff0c;得到双向一致性的匹配点对 3,sac确定内部点&#xff0c;并得到变换矩阵 4&a…...

个人网站可以做淘宝客吗/代做百度关键词排名

原文地址 过去&#xff0c;box 只是 VirtualBox 导出的 tar 文件。由于 Vagrant 现在支持多个 provider 和版本控制&#xff0c;box 文件稍微复杂一些。 用于 Vagrant 1.0.x 的 Box 文件&#xff08;VirtualBox 导出的 tar 文件&#xff09;可以继续与新版本的 Vagrant 一起使用…...

溧阳网站建设/seo页面排名优化

Seata实现熔断与限流一、分布式事务问题二、Seata简介三、Seata-Server安装四、订单/库存/账户业务数据库准备1、创建业务数据库2、按业务数据库创建表3、按业务数据库创建对应的回滚日记表五、订单/库存/账户业务微服务准备1、订单业务微服务Module2、库存业务微服务Module3、…...

房地产销售平台网站建设/如何宣传推广自己的店铺

趁着最近项目稍微能放松下&#xff0c;还是赶在2018到来之前&#xff0c;写个年终总结一下。2017&#xff0c;算是经历比较多的一年&#xff0c;从纯开发质变到技术经理。 1.招聘 春节之后上班之后&#xff0c;就开启了招聘模式。第一阶段&#xff0c;大约从2月份一直到5月份&a…...

dwcs2018怎么做动态网站/微信营销的案例

作者&#xff1a;桂。 时间&#xff1a;2017-09-09 20:04:22 链接&#xff1a;http://www.cnblogs.com/xingshansi/p/7413139.html 前言 目前分析的问题&#xff0c;仍然限定在布阵的环节&#xff0c;暂不涉及后处理及硬件实现。 一、宽带处理的一般方式 前面分析的阵列信号模…...