当前位置: 首页 > news >正文

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandlerworkHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费

定义的哈希函数接口和统一的基础父类

public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>*     默认实现是使用{@link  java.util.HashMap HashMap}的哈希计算公式</br>*     建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}

测试代码

public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile  int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}

相关文章:

绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务

disruptor有两种任务处理器&#xff0c;一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务&#xff0c;WorkHandler可以共同竞争消费同一个队列中的任务。也就是说&#xff0c;假设任务队列中有a、b、c、d三个事件&#xff0c;eventHa…...

单例设计模式双重检查的作用

先看双重校验锁的写法 public class Singleton {/*volatile 修饰&#xff0c;singleton new Singleton() 可以拆解为3步&#xff1a;1、分配对象内存(给singleton分配内存)2、调用构造器方法&#xff0c;执行初始化&#xff08;调用 Singleton 的构造函数来初始化成员变量&am…...

NGINX_十二 nginx 地址重写 rewrite

十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite&#xff0c;即URL重写&#xff0c;就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化&#xff0c;是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...

react用ECharts实现组织架构图

找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...

坚持刷题|合并有序链表

文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello&#xff0c;大家好&#xff0c;我是阿月。坚持刷题&#xff0c;老年痴呆追不上我&#xff0c;消失了一段时间&#xff0c;我又回来刷题啦&#xff0c;今天…...

SPI协议——对外部SPI Flash操作

目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区&#xff08;每个块有16个扇区&#xff09; 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页&#xff0c;每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...

kotlin类型检测与类型转换

一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...

【JDBC】Oracle数据库连接问题记录

Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载&#xff0c;可以先尝试使用下面方式加载检查类是否存在&#xff0c;如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...

leetcode45 跳跃游戏II

题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说&#xff0c;如果你在 nums[i] 处&#xff0c;你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...

【数学】什么是方法矩估计?和最大似然估计是什么关系?

背景 方法矩估计&#xff08;Method of Moments Estimation&#xff09;和最大似然估计&#xff08;Maximum Likelihood Estimation, MLE&#xff09;是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系&#xff0c;通过样本数据计算样本矩来估计总体参数。最大似…...

C++初学者指南第一步---10.内存(基础)

C初学者指南第一步—10.内存&#xff08;基础&#xff09; 文章目录 C初学者指南第一步---10.内存&#xff08;基础&#xff09;1.内存模型1.1 纸上谈兵&#xff1a;C的抽象内存模型1.2 实践&#xff1a;内存的实际处理 2. 自动存储3.动态存储&#xff1a;std::vector3.1 动态内…...

扩散模型详细推导过程——编码与解码

符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)​第 i i i个训练数据&#xff0c;其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)​第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...

js如何实现开屏弹窗

开屏弹窗是什么&#xff0c;其实就是第一次登录后进入页面给你的一种公告提示&#xff0c;此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...

C#——文件读取Directory类详情

文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...

Ruby on Rails Post项目设置网站初始界面

在构建了Ruby的Web服务器后&#xff0c;第三步就可以去掉框架的官方页面&#xff0c;设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...

03-QTWebEngine中使用qtvirtualkeyboard

qt提供了 virtualKeyboard 虚拟键盘模块&#xff0c;只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题&#xff1a; 1、中文输入的时候没有输入提示 Qvirt…...

leetcode3无重复字符的最长字串(重点讲滑动窗口)

本文主要讲解无重复字符的最长字串的要点与细节&#xff0c;根据步骤一步步走更方便理解 c与java代码如下&#xff0c;末尾 具体要点&#xff1a; 1. 区分一下子串和子序列 子串&#xff1a;要求元素在母串中是连续地出现 子序列&#xff1a;不要求连续 2. 题目中有两个核心…...

Gobject tutorial 八

The GObject base class Object memory management Gobject的内存管理相关的API很复杂&#xff0c;但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下&#xff0c;与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...

DDMA信号处理以及数据处理的流程---cfar检测

Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...

【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现

&#x1f4dd;个人主页&#xff1a;哈__ 期待您的关注 目录 &#x1f4d5;引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 &#x1f916;决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...

XCTF-web-easyupload

试了试php&#xff0c;php7&#xff0c;pht&#xff0c;phtml等&#xff0c;都没有用 尝试.user.ini 抓包修改将.user.ini修改为jpg图片 在上传一个123.jpg 用蚁剑连接&#xff0c;得到flag...

C++初阶-list的底层

目录 1.std::list实现的所有代码 2.list的简单介绍 2.1实现list的类 2.2_list_iterator的实现 2.2.1_list_iterator实现的原因和好处 2.2.2_list_iterator实现 2.3_list_node的实现 2.3.1. 避免递归的模板依赖 2.3.2. 内存布局一致性 2.3.3. 类型安全的替代方案 2.3.…...

智慧工地云平台源码,基于微服务架构+Java+Spring Cloud +UniApp +MySql

智慧工地管理云平台系统&#xff0c;智慧工地全套源码&#xff0c;java版智慧工地源码&#xff0c;支持PC端、大屏端、移动端。 智慧工地聚焦建筑行业的市场需求&#xff0c;提供“平台网络终端”的整体解决方案&#xff0c;提供劳务管理、视频管理、智能监测、绿色施工、安全管…...

理解 MCP 工作流:使用 Ollama 和 LangChain 构建本地 MCP 客户端

&#x1f31f; 什么是 MCP&#xff1f; 模型控制协议 (MCP) 是一种创新的协议&#xff0c;旨在无缝连接 AI 模型与应用程序。 MCP 是一个开源协议&#xff0c;它标准化了我们的 LLM 应用程序连接所需工具和数据源并与之协作的方式。 可以把它想象成你的 AI 模型 和想要使用它…...

Reasoning over Uncertain Text by Generative Large Language Models

https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829https://ojs.aaai.org/index.php/AAAI/article/view/34674/36829 1. 概述 文本中的不确定性在许多语境中传达,从日常对话到特定领域的文档(例如医学文档)(Heritage 2013;Landmark、Gulbrandsen 和 Svenevei…...

JVM虚拟机:内存结构、垃圾回收、性能优化

1、JVM虚拟机的简介 Java 虚拟机(Java Virtual Machine 简称:JVM)是运行所有 Java 程序的抽象计算机,是 Java 语言的运行环境,实现了 Java 程序的跨平台特性。JVM 屏蔽了与具体操作系统平台相关的信息,使得 Java 程序只需生成在 JVM 上运行的目标代码(字节码),就可以…...

嵌入式学习笔记DAY33(网络编程——TCP)

一、网络架构 C/S &#xff08;client/server 客户端/服务器&#xff09;&#xff1a;由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序&#xff0c;负责提供用户界面和交互逻辑 &#xff0c;接收用户输入&#xff0c;向服务器发送请求&#xff0c;并展示服务…...

mac 安装homebrew (nvm 及git)

mac 安装nvm 及git 万恶之源 mac 安装这些东西离不开Xcode。及homebrew 一、先说安装git步骤 通用&#xff1a; 方法一&#xff1a;使用 Homebrew 安装 Git&#xff08;推荐&#xff09; 步骤如下&#xff1a;打开终端&#xff08;Terminal.app&#xff09; 1.安装 Homebrew…...

解决:Android studio 编译后报错\app\src\main\cpp\CMakeLists.txt‘ to exist

现象&#xff1a; android studio报错&#xff1a; [CXX1409] D:\GitLab\xxxxx\app.cxx\Debug\3f3w4y1i\arm64-v8a\android_gradle_build.json : expected buildFiles file ‘D:\GitLab\xxxxx\app\src\main\cpp\CMakeLists.txt’ to exist 解决&#xff1a; 不要动CMakeLists.…...

Bean 作用域有哪些?如何答出技术深度?

导语&#xff1a; Spring 面试绕不开 Bean 的作用域问题&#xff0c;这是面试官考察候选人对 Spring 框架理解深度的常见方式。本文将围绕“Spring 中的 Bean 作用域”展开&#xff0c;结合典型面试题及实战场景&#xff0c;帮你厘清重点&#xff0c;打破模板式回答&#xff0c…...