绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务
disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1 和eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandler 和workHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费
定义的哈希函数接口和统一的基础父类
public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}
测试代码
public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}相关文章:
绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务
disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHa…...
单例设计模式双重检查的作用
先看双重校验锁的写法 public class Singleton {/*volatile 修饰,singleton new Singleton() 可以拆解为3步:1、分配对象内存(给singleton分配内存)2、调用构造器方法,执行初始化(调用 Singleton 的构造函数来初始化成员变量&am…...
NGINX_十二 nginx 地址重写 rewrite
十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite,即URL重写,就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化,是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...
react用ECharts实现组织架构图
找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...
坚持刷题|合并有序链表
文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello,大家好,我是阿月。坚持刷题,老年痴呆追不上我,消失了一段时间,我又回来刷题啦,今天…...
SPI协议——对外部SPI Flash操作
目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区(每个块有16个扇区) 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页,每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...
kotlin类型检测与类型转换
一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...
【JDBC】Oracle数据库连接问题记录
Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载,可以先尝试使用下面方式加载检查类是否存在,如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...
leetcode45 跳跃游戏II
题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说,如果你在 nums[i] 处,你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...
【数学】什么是方法矩估计?和最大似然估计是什么关系?
背景 方法矩估计(Method of Moments Estimation)和最大似然估计(Maximum Likelihood Estimation, MLE)是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系,通过样本数据计算样本矩来估计总体参数。最大似…...
C++初学者指南第一步---10.内存(基础)
C初学者指南第一步—10.内存(基础) 文章目录 C初学者指南第一步---10.内存(基础)1.内存模型1.1 纸上谈兵:C的抽象内存模型1.2 实践:内存的实际处理 2. 自动存储3.动态存储:std::vector3.1 动态内…...
扩散模型详细推导过程——编码与解码
符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)第 i i i个训练数据,其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...
js如何实现开屏弹窗
开屏弹窗是什么,其实就是第一次登录后进入页面给你的一种公告提示,此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...
C#——文件读取Directory类详情
文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...
Ruby on Rails Post项目设置网站初始界面
在构建了Ruby的Web服务器后,第三步就可以去掉框架的官方页面,设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...
03-QTWebEngine中使用qtvirtualkeyboard
qt提供了 virtualKeyboard 虚拟键盘模块,只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题: 1、中文输入的时候没有输入提示 Qvirt…...
leetcode3无重复字符的最长字串(重点讲滑动窗口)
本文主要讲解无重复字符的最长字串的要点与细节,根据步骤一步步走更方便理解 c与java代码如下,末尾 具体要点: 1. 区分一下子串和子序列 子串:要求元素在母串中是连续地出现 子序列:不要求连续 2. 题目中有两个核心…...
Gobject tutorial 八
The GObject base class Object memory management Gobject的内存管理相关的API很复杂,但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下,与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...
DDMA信号处理以及数据处理的流程---cfar检测
Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...
【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现
📝个人主页:哈__ 期待您的关注 目录 📕引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 🤖决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...
大模型查询质量评估新范式(Perplexity算法底层逻辑首次公开)
更多请点击: https://codechina.net 第一章:大模型查询质量评估新范式(Perplexity算法底层逻辑首次公开) Perplexity(困惑度)并非仅是语言模型训练阶段的监控指标,而是当前大模型查询质量评估中…...
Agent 一接数据大屏就开始配错指标:从维度意图识别到口径一致性校验的工程实战
一、🎯 生产痛点:大促当夜的指标错位 去年双 11 零点,某电商团队的 Agent 接到"生成实时 GMV 监控大屏"指令后产出了一套仪表盘。运营同学却发现 GMV 曲线在凌晨 1 点下跌 40%。问题在于 Agent 把"下单金额"和"退款…...
别再只用默认模型了!手把手教你用SnowNLP训练专属情感分析模型(附完整代码)
突破SnowNLP默认模型局限:打造高精度领域情感分析系统的实战指南 从"水土不服"到精准预测:为什么你需要自定义情感模型 去年夏天,我们的产品团队在分析用户反馈时遇到了一个诡异现象:明明用户留言中充斥着"卡顿严重…...
C盘告急?手把手教你用mklink命令把Fusion 360挪到D盘(Win11保姆级教程)
拯救C盘空间:用符号链接将Fusion 360迁移到D盘的完整指南 当C盘空间告急时,很多用户会发现Fusion 360默认安装在系统盘,占用了大量宝贵空间。本文将详细介绍如何利用Windows的mklink命令,在不影响软件功能的前提下,将F…...
HPM6750 BGA196封装XPI0 CA端口缺失的CB端口启动解决方案
1. 项目概述与核心挑战最近在做一个对PCB尺寸有严格限制的嵌入式项目,主控芯片选用了先楫半导体的高性能MCU HPM6750。为了压缩板子面积,我放弃了引脚更丰富的BGA289封装(HPM6750IVM2),转而选择了更紧凑的BGA196封装&a…...
深度神经网络(DNN)百科全书从“深“到“无限深“
一、开篇:深度的奇迹 2012 年 9 月 30 日。 ImageNet 挑战赛的结果在 Florence 公布。所有人都以为冠军会延续过去 3 年的传统——传统计算机视觉方法(SIFT、HOG、SVM)小幅领先。 但那一年,一个叫 AlexNet 的"怪物"出现了。8 层的卷积神经网络,Top-5 错误率 …...
移动端测试实战:App兼容性测试的全套解决方案
一、移动端App兼容性测试的核心价值与挑战在移动互联网生态中,设备碎片化、系统版本迭代加速、网络环境多样性等因素,使得App兼容性问题成为影响用户体验与产品口碑的关键变量。据行业数据统计,兼容性问题引发的用户投诉占比超过30%ÿ…...
2025最权威的AI写作方案横评
Ai论文网站排名(开题报告、文献综述、降aigc率、降重综合对比) TOP1. 千笔AI TOP2. aipasspaper TOP3. 清北论文 TOP4. 豆包 TOP5. kimi TOP6. deepseek 当人工智能技术于当下迅猛发展之际,对于企业来讲,核心挑战其中之一便…...
深入理解强化学习基础:价值函数、策略梯度与PPO算法核心原理
深入理解强化学习基础:价值函数、策略梯度与PPO算法核心原理 【免费下载链接】LLM-RL-Visualized 🌟100 原创 LLM / RL 原理图📚,《大模型算法》作者巨献!💥(100 LLM/RL Algorithm Maps &#x…...
专业级LaTeX排版:深度解析中国科学技术大学学位论文模板括号使用的最佳实践
专业级LaTeX排版:深度解析中国科学技术大学学位论文模板括号使用的最佳实践 【免费下载链接】ustcthesis LaTeX template for USTC thesis 项目地址: https://gitcode.com/gh_mirrors/us/ustcthesis 在学术论文写作中,细节决定专业水准。中国科学…...
