当前位置: 首页 > news >正文

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandlerworkHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费

定义的哈希函数接口和统一的基础父类

public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>*     默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>*     建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}

测试代码

public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile  int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}

相关文章:

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器&#xff0c;一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务&#xff0c;WorkHandler可以共同竞争消费同一个队列中的任务。也就是说&#xff0c;假设任务队列中有a、b、c、d三个事件&#xff0c;eventHa…...

单例设计模式双重检查的作用

先看双重校验锁的写法 public class Singleton {/*volatile 修饰&#xff0c;singleton new Singleton() 可以拆解为3步&#xff1a;1、分配对象内存(给singleton分配内存)2、调用构造器方法&#xff0c;执行初始化&#xff08;调用 Singleton 的构造函数来初始化成员变量&am…...

NGINX_十二 nginx 地址重写 rewrite

十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite&#xff0c;即URL重写&#xff0c;就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化&#xff0c;是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...

react用ECharts实现组织架构图

找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...

坚持刷题|合并有序链表

文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello&#xff0c;大家好&#xff0c;我是阿月。坚持刷题&#xff0c;老年痴呆追不上我&#xff0c;消失了一段时间&#xff0c;我又回来刷题啦&#xff0c;今天…...

SPI协议——对外部SPI Flash操作

目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区&#xff08;每个块有16个扇区&#xff09; 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页&#xff0c;每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...

kotlin类型检测与类型转换

一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...

【JDBC】Oracle数据库连接问题记录

Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载&#xff0c;可以先尝试使用下面方式加载检查类是否存在&#xff0c;如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...

leetcode45 跳跃游戏II

题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...

【数学】什么是方法矩估计?和最大似然估计是什么关系?

背景 方法矩估计&#xff08;Method of Moments Estimation&#xff09;和最大似然估计&#xff08;Maximum Likelihood Estimation, MLE&#xff09;是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系&#xff0c;通过样本数据计算样本矩来估计总体参数。最大似…...

C++初学者指南第一步---10.内存(基础)

C初学者指南第一步—10.内存&#xff08;基础&#xff09; 文章目录 C初学者指南第一步---10.内存&#xff08;基础&#xff09;1.内存模型1.1 纸上谈兵&#xff1a;C的抽象内存模型1.2 实践&#xff1a;内存的实际处理 2. 自动存储3.动态存储&#xff1a;std::vector3.1 动态内…...

扩散模型详细推导过程——编码与解码

符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)​第 i i i个训练数据&#xff0c;其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)​第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...

js如何实现开屏弹窗

开屏弹窗是什么&#xff0c;其实就是第一次登录后进入页面给你的一种公告提示&#xff0c;此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...

C#——文件读取Directory类详情

文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...

Ruby on Rails Post项目设置网站初始界面

在构建了Ruby的Web服务器后&#xff0c;第三步就可以去掉框架的官方页面&#xff0c;设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...

03-QTWebEngine中使用qtvirtualkeyboard

qt提供了 virtualKeyboard 虚拟键盘模块&#xff0c;只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题&#xff1a; 1、中文输入的时候没有输入提示 Qvirt…...

leetcode3无重复字符的最长字串(重点讲滑动窗口)

本文主要讲解无重复字符的最长字串的要点与细节&#xff0c;根据步骤一步步走更方便理解 c与java代码如下&#xff0c;末尾 具体要点&#xff1a; 1. 区分一下子串和子序列 子串&#xff1a;要求元素在母串中是连续地出现 子序列&#xff1a;不要求连续 2. 题目中有两个核心…...

Gobject tutorial 八

The GObject base class Object memory management Gobject的内存管理相关的API很复杂&#xff0c;但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下&#xff0c;与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...

DDMA信号处理以及数据处理的流程---cfar检测

Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...

【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 目录 &#x1f4d5;引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 &#x1f916;决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...

符号回归在超快磁动力学研究中的应用:从数据中挖掘物理规律

1. 项目概述&#xff1a;当机器学习遇见超快磁动力学 在自旋电子学这个前沿领域&#xff0c;我们一直在与时间赛跑。从纳秒级的磁畴翻转&#xff0c;到飞秒级的超快退磁&#xff0c;理解磁性材料在不同时间尺度下的行为&#xff0c;是设计下一代高速、高密度存储器和逻辑器件的…...

保姆级教程:在Ubuntu 22.04上从源码编译COLMAP 3.9(含6个常见Bug解决方案)

在Ubuntu 22.04上从源码编译COLMAP 3.9的终极避坑指南三维重建技术正在重塑数字世界的构建方式&#xff0c;而COLMAP作为开源领域的标杆工具&#xff0c;其强大的多视图几何算法让学术研究和工业应用都受益匪浅。但当你第一次尝试在Ubuntu系统上编译这个工具时&#xff0c;可能…...

终极指南:如何免费快速上手Method Draw在线SVG编辑器

终极指南&#xff1a;如何免费快速上手Method Draw在线SVG编辑器 【免费下载链接】Method-Draw Method Draw, the SVG Editor for Method of Action 项目地址: https://gitcode.com/gh_mirrors/me/Method-Draw 如果你正在寻找一款简单高效的在线SVG编辑器&#xff0c;那…...

解决Claude Code密钥被封与Token不足的替代接入方案

&#x1f680; 告别海外账号与网络限制&#xff01;稳定直连全球优质大模型&#xff0c;限时半价接入中。 &#x1f449; 点击领取海量免费额度 解决Claude Code密钥被封与Token不足的替代接入方案 对于频繁使用Claude Code编程助手的开发者而言&#xff0c;开发流程中突然遇到…...

面试官问我SQL怎么调优,我直接甩出这套Explain对比法

面试官问我SQL怎么调优,我直接甩出这套Explain对比法 线上系统突然变慢,用户投诉纷至沓来,运维群里消息炸了锅。排查半天,发现罪魁祸首竟然是一条不起眼的SQL语句。这条SQL在测试环境跑得飞快,一到生产环境就像老牛拉破车。相信很多开发者都遇到过这种场景——SQL写的时候…...

3分钟上手跨平台资源下载神器:轻松获取微信视频号、抖音无水印内容

3分钟上手跨平台资源下载神器&#xff1a;轻松获取微信视频号、抖音无水印内容 【免费下载链接】res-downloader 视频号、小程序、抖音、快手、小红书、直播流、m3u8、酷狗、QQ音乐等常见网络资源下载! 项目地址: https://gitcode.com/GitHub_Trending/re/res-downloader …...

TikTok客户端关键字符串追踪与ttencrypt协议解析

1. 这不是“破解”&#xff0c;而是协议层的工程化还原很多人看到“TikTok算法逆向”第一反应是&#xff1a;这得用IDA Pro硬啃SO文件、在ARM汇编里找特征码、对着混淆后的Java层反复脱壳——其实大错特错。我过去三年深度参与过5个主流短视频App的客户端通信分析项目&#xff…...

DeepSeek / Qwen 大模型在昇腾上的推理优化实战

前言 把DeepSeek-V3和Qwen2.5-72B部署到昇腾910B集群上。客户说"GPU上跑得好好的&#xff0c;换昇腾应该也行吧"。结果第一天就被砸懵——同样的模型、同样的batch&#xff0c;昇腾上吞吐只有GPU的60%。不是算力不够&#xff0c;是我根本没搞清楚CANN的优化逻辑和CUD…...

AI Agent如何在毫秒级边缘设备上自主决策?揭秘轻量化推理框架与动态资源调度的7个关键技术突破

更多请点击&#xff1a; https://kaifayun.com 第一章&#xff1a;AI Agent边缘计算应用的范式演进 随着终端设备算力持续增强与轻量化模型技术日趋成熟&#xff0c;AI Agent不再仅依赖云端协同执行决策任务&#xff0c;而是逐步下沉至网络边缘&#xff0c;形成具备感知、推理…...

AI Agent自主操作软件的“最后一公里”危机:当它成功调用API却误删生产数据库——12个真实事故根因与防御性沙箱配置模板

更多请点击&#xff1a; https://codechina.net 第一章&#xff1a;AI Agent自主操作软件的“最后一公里”危机本质 当AI Agent在模拟环境中流畅调用API、生成SQL、解析PDF时&#xff0c;它却在真实办公桌面前频频卡壳——点击错按钮、误判窗口焦点、无法处理弹窗验证码、对非…...