当前位置: 首页 > news >正文

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandlerworkHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费

定义的哈希函数接口和统一的基础父类

public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>*     默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>*     建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}

测试代码

public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile  int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}

相关文章:

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器&#xff0c;一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务&#xff0c;WorkHandler可以共同竞争消费同一个队列中的任务。也就是说&#xff0c;假设任务队列中有a、b、c、d三个事件&#xff0c;eventHa…...

单例设计模式双重检查的作用

先看双重校验锁的写法 public class Singleton {/*volatile 修饰&#xff0c;singleton new Singleton() 可以拆解为3步&#xff1a;1、分配对象内存(给singleton分配内存)2、调用构造器方法&#xff0c;执行初始化&#xff08;调用 Singleton 的构造函数来初始化成员变量&am…...

NGINX_十二 nginx 地址重写 rewrite

十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite&#xff0c;即URL重写&#xff0c;就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化&#xff0c;是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...

react用ECharts实现组织架构图

找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...

坚持刷题|合并有序链表

文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello&#xff0c;大家好&#xff0c;我是阿月。坚持刷题&#xff0c;老年痴呆追不上我&#xff0c;消失了一段时间&#xff0c;我又回来刷题啦&#xff0c;今天…...

SPI协议——对外部SPI Flash操作

目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区&#xff08;每个块有16个扇区&#xff09; 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页&#xff0c;每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...

kotlin类型检测与类型转换

一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...

【JDBC】Oracle数据库连接问题记录

Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载&#xff0c;可以先尝试使用下面方式加载检查类是否存在&#xff0c;如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...

leetcode45 跳跃游戏II

题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...

【数学】什么是方法矩估计?和最大似然估计是什么关系?

背景 方法矩估计&#xff08;Method of Moments Estimation&#xff09;和最大似然估计&#xff08;Maximum Likelihood Estimation, MLE&#xff09;是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系&#xff0c;通过样本数据计算样本矩来估计总体参数。最大似…...

C++初学者指南第一步---10.内存(基础)

C初学者指南第一步—10.内存&#xff08;基础&#xff09; 文章目录 C初学者指南第一步---10.内存&#xff08;基础&#xff09;1.内存模型1.1 纸上谈兵&#xff1a;C的抽象内存模型1.2 实践&#xff1a;内存的实际处理 2. 自动存储3.动态存储&#xff1a;std::vector3.1 动态内…...

扩散模型详细推导过程——编码与解码

符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)​第 i i i个训练数据&#xff0c;其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)​第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...

js如何实现开屏弹窗

开屏弹窗是什么&#xff0c;其实就是第一次登录后进入页面给你的一种公告提示&#xff0c;此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...

C#——文件读取Directory类详情

文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...

Ruby on Rails Post项目设置网站初始界面

在构建了Ruby的Web服务器后&#xff0c;第三步就可以去掉框架的官方页面&#xff0c;设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...

03-QTWebEngine中使用qtvirtualkeyboard

qt提供了 virtualKeyboard 虚拟键盘模块&#xff0c;只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题&#xff1a; 1、中文输入的时候没有输入提示 Qvirt…...

leetcode3无重复字符的最长字串(重点讲滑动窗口)

本文主要讲解无重复字符的最长字串的要点与细节&#xff0c;根据步骤一步步走更方便理解 c与java代码如下&#xff0c;末尾 具体要点&#xff1a; 1. 区分一下子串和子序列 子串&#xff1a;要求元素在母串中是连续地出现 子序列&#xff1a;不要求连续 2. 题目中有两个核心…...

Gobject tutorial 八

The GObject base class Object memory management Gobject的内存管理相关的API很复杂&#xff0c;但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下&#xff0c;与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...

DDMA信号处理以及数据处理的流程---cfar检测

Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...

【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 目录 &#x1f4d5;引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 &#x1f916;决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...

Zookeeper 集群节点故障剔除、切换、恢复原理

Zookeeper 集群节点故障剔除、切换、恢复原理 zookeeper 集群节点故障时,如何剔除节点,如果为领导节点如何处理,如何进行故障恢 复的,实现原理? 在 Zookeeper 集群中,当节点故障时,集群需要自动剔除故障节点并进行故障恢复,确保集群的高 可用性和一致性。具体来说,…...

解决帝国cms栏目管理拼音乱码的问题

帝国CMS7.5版本utf-8版网站后台增加栏目生成乱码的问题怎么解决 1、需要改一个函数&#xff0c;并且增加一个处理文件&#xff0c;方法如下&#xff1a; 修改e/class/connect.php文件&#xff0c;找到ReturnPinyinFun函数&#xff0c;如未修改文件在4533-4547行&#xff0c;将…...

Git快速入门

一 快速使用 1.1 初始化 什么是版本库呢&#xff1f;版本库又名仓库&#xff0c;可以简单理解成一个目录&#xff0c;这个目录里面的所有文件都可以被Git管理起来&#xff0c;每个文件的修改、删除&#xff0c;Git都能跟踪&#xff0c;以便任何时刻都可以追踪历史&#xff0…...

【18.0】JavaScript---事件案例

【18.0】JavaScript—事件案例 【一】开关灯事件 【介绍】设置一个按钮&#xff0c;按下按钮触发事件&#xff0c;来回切换圆形图片的颜色 【分析】 图片设置&#xff1a;设置成圆形的图片背景颜色&#xff1a;设置红绿两个颜色&#xff0c;来回切换按钮设置&#xff1a;点击…...

推荐系统三十六式学习笔记:原理篇.矩阵分解12|如果关注排序效果,那么这个模型可以帮到你

目录 矩阵分解的不足贝叶斯个性化排序AUC构造样本目标函数训练方法 总结 矩阵分解在推荐系统中的地位非常崇高。它既有协同过滤的血统&#xff0c;又有机器学习的基因&#xff0c;可以说是非常优秀了&#xff1b;但即便如此&#xff0c;传统的矩阵分解无论是在处理显式反馈&…...

Kafka之ISR机制的理解

文章目录 Kafka的基本概念什么是ISRISR的维护机制ISR的作用ISR相关配置参数同步过程示例代码总结 Kafka中的ISR&#xff08;In-Sync Replicas同步副本&#xff09;机制是确保数据高可用性和一致性的核心组件。 Kafka的基本概念 在Kafka中&#xff0c;数据被组织成主题&#xf…...

如何设计一个点赞系统

首先我们定义出一个点赞系统需要对外提供哪些接口&#xff1a; 1.用户对特定的消息进行点赞&#xff1b; 2.用户查看自己发布的某条消息点赞数量以及被哪些人赞过&#xff1b; 3.用户查看自己给哪些消息点赞过&#xff1b; 这里假设每条消息都有一个message_id, 每一个用户都…...

对象存储测试工具-s3cmd

一、环境安装 官网&#xff1a;https://s3tools.org/s3cmd 下载安装包&#xff1a;https://s3tools.org/download GitHub&#xff1a;https://github.com/s3tools/s3cmd/releases 本文安装包&#xff1a;https://github.com/s3tools/s3cmd/releases/download/v2.0.2/s3cmd-2.0…...

OpenCV--图像色彩空间及转换

图像色彩空间及转换 python代码和笔记 python代码和笔记 import cv2 色彩空间&#xff0c;基础&#xff1a;RGB或BGR OpenCV中&#xff1a; 一、HSV(HSB)&#xff1a;用的最多&#xff0c; Hue&#xff1a;色相-色彩(0-360)&#xff0c;红色&#xff1a;0&#xff0c;绿色&…...

RIP解决不连续子网问题

#交换设备 RIP解决不连续子网问题 一、不连续子网的概念 相同主网下的子网&#xff0c;被另一个主网分割&#xff0c;例如下面实验拓扑在某公司的网络整改项目中&#xff0c;原先R1 和RS 属于同一主网络 10.0.0.0/8&#xff0c;现被 R2、R3、R4 分离&#xff0c;整网采用了 …...

网站开发 发表文章/广州seo搜索

一、编辑器 1.链接 我们很多时候需要引用别人或自己以前的代码。这里可以很方便的加上链接。 一个添加链接 一个取消链接 选中要链接的文字右键就可以选中链接。 填入URL地址&#xff0c;目标一般选择在新窗口打开。 2.书签 插入书签 写入书签名 这样可以在文章开始写目录 跳到…...

epanel wordpress/seo快速工具

分析&#xff1a;题意就是要求两个多边形相交的部分的面积如果会求多边形的核&#xff0c;这题就不难了&#xff0c;可以看我写的一篇POJ 1279 求多边形的核&#xff0c;因为本题已经说了给定的是顺时针方向&#xff0c;所以把POJ 1279的逆时针改下&#xff0c;再稍微改几个地方…...

wordpress阿里百秀/重庆关键词自然排名

今天写数据库脚本&#xff0c;创建数据库时&#xff0c;有一个字段是datetime类型的&#xff0c;想要设置默认时间为当前时间&#xff0c;于是有了如下代码registerDate datetime NOT NULL DEFAULT NOW() COMMENT 注册时间执行之后报错Error Code: 1067. Invalid default value…...

海口疫情最新消息今天/seo研究中心好客站

作者&#xff1a;朱金灿 来源&#xff1a;http://blog.csdn.net/clever101 上次我谈了DLL 封装框架视图的第一种方式动态创建窗口&#xff0c;下面我谈谈第二种方式新建文档模板。实际上这种方式更为简单。&#xff08;下面所有代码的开发环境为&#xff1a;VS C 2005s…...

网站基础建设和维护/seo外链要做些什么

瀑布流布局中的图片有一个核心特点 —— 等宽不定等高&#xff0c;瀑布流布局在国内网网站都有一定规模的使用&#xff0c;比如pinterest、花瓣网等等。那么接下来就基于这个特点开始瀑布流探索之旅。 基础功能实现 首先我们定义好一个有 20 张图片的容器&#xff0c; <body…...

开源项目网站怎么做 带视频/外贸网站平台哪个好

Java中的内存管理机制Java内存的划分java把内存分为两种栈内存堆内存栈内存在函数中定义的一些基本类型的变量和对象的引用变量都是在函数的栈内存中分配&#xff0c;当在一段代码中定义一个变量时&#xff0c;java就会在栈中为这个变量分配内存空间&#xff0c;当超过变量的作…...