绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务
disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler.
EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHandler1 和eventHandler2,这两个处理器都会去这4个事件,也就是每个事件会被处理两次,每个处理器处理一次;而如果有workHandler 和workHandler2这两个处理器,那么每个任务只会被其中一个处理器处理,这两处理器的处理时间总数加起来是这4个事件。
了解了这些,就可以能理解使用EventHandler为啥能实现多线程顺序处理了。
可以给事件定义一个hash函数,根据哈希取余的槽位下标和当前处理器的下标比较,判断出此事件是否应被当前的处理器处理,不该其处理的事件直接pass,避免重复消费
定义的哈希函数接口和统一的基础父类
public interface Hashed<K> {/*** 哈希码计算所需key** @return key值*/K getKey();/*** 计算哈希码** <p>* 默认实现是使用{@link java.util.HashMap HashMap}的哈希计算公式</br>* 建议对此计算做个缓存,进一步提高性能* </p>** @param key 哈希码生成的输入key,此入参是{@link #getKey()}方法的返回值* @return 哈希码* @see #getKey()*/default int hash(K key) {if (key == null) {throw new NullPointerException("key must not be null");}int h;return (h = key.hashCode()) ^ (h >>> 16);}}public abstract class PartitionEventHandler<E extends Hashed<K>, K> implements EventHandler<E>, Cloneable {protected final Logger log = LoggerFactory.getLogger(getClass());private final static int NO_INIT_VALUE = -1;private int index;private int indexMask;public final int getIndex() {return index;}public final int getIndexMask() {return indexMask;}@Overridepublic final void onEvent(E event, long sequence, boolean endOfBatch) throws Exception {if (index == NO_INIT_VALUE || indexMask == NO_INIT_VALUE) {throw new IllegalStateException("this should be inited");}int position = event.hash(event.getKey()) & indexMask;if (index != position) {if (log.isTraceEnabled()) {log.trace("The event with key [{}] should be handled on slot [{}],but current slot is [{}], it will be ignored.",event.getKey(), position, index);}return;}if (log.isDebugEnabled()) {log.debug("The event with key [{}] is being handled on slot [{}].", event.getKey(), position);}doOnEvent(event, sequence, endOfBatch);}protected abstract void doOnEvent(E event, long sequence, boolean endOfBatch) throws Exception;@SafeVarargspublic static <E extends Hashed<K>, K> PartitionEventHandler<E, K>[] initHandlers(final PartitionEventHandler<E, K>... handlers) {if (handlers == null) {throw new NullPointerException("handlers must not be null");}if (handlers.length == 0) {throw new IllegalArgumentException("handlers length must be more than zero");}if (Integer.bitCount(handlers.length) != 1) {throw new IllegalArgumentException("handlers count must be a power of 2");}int indexMask = handlers.length - 1;for (int i = 0; i < handlers.length; i++) {PartitionEventHandler<E, K> handler = handlers[i];handler.indexMask = indexMask;handler.index = i;}return handlers;}@Overridepublic PartitionEventHandler<E, K> clone() {try {@SuppressWarnings("unchecked")PartitionEventHandler<E, K> handler = (PartitionEventHandler<E, K>) super.clone();handler.index = NO_INIT_VALUE;handler.indexMask = NO_INIT_VALUE;return handler;} catch (CloneNotSupportedException e) {//never occurthrow new RuntimeException("can not clone " + getClass(), e);}}public PartitionEventHandler<E, K>[] clones(int count) {if (count < 0) {throw new IllegalArgumentException("count must be more then zero");}@SuppressWarnings("unchecked")PartitionEventHandler<E, K>[] result = new PartitionEventHandler[count];for (int i = 0; i < count; i++) {result[i] = clone();}return result;}
}
测试代码
public class DisruptorTest05 {@Testpublic void test1() {CustomizableThreadFactory threadFactory = new CustomizableThreadFactory("event-handler-");threadFactory.setDaemon(false);Disruptor<ChatGroupEvent> disruptor = new Disruptor<>(new ChatGroupEventFactory(), 1024, threadFactory);ChatGroupEventHandler handler = new ChatGroupEventHandler();PartitionEventHandler<ChatGroupEvent, String>[] handlers = handler.clones(8);disruptor.handleEventsWith(PartitionEventHandler.initHandlers(handlers));disruptor.start();RingBuffer<ChatGroupEvent> ringBuffer = disruptor.getRingBuffer();Random rdm = new Random();long start = System.currentTimeMillis();for (int i = 0; i < 1000; i++) {long seq = ringBuffer.next();ChatGroupEvent chatGroupEvent = ringBuffer.get(seq);int rdmInt = rdm.nextInt(12);chatGroupEvent.setGroupId(String.format("groupId-%02d", rdmInt));chatGroupEvent.setGroupOwner(String.format("owner-%03d", i));String type = i % 2 == 0 ? "add" : "exit";chatGroupEvent.setChangeType(type);ringBuffer.publish(seq);}disruptor.shutdown();long spent = System.currentTimeMillis() - start;System.out.println("total time " + spent);System.out.println();}static class ChatGroupEventFactory implements EventFactory<ChatGroupEvent> {@Overridepublic ChatGroupEvent newInstance() {return new ChatGroupEvent();}}static class ChatGroupEventHandler extends PartitionEventHandler<ChatGroupEvent, String> {@Overrideprotected void doOnEvent(ChatGroupEvent event, long sequence, boolean endOfBatch) throws Exception {Thread.sleep(50);log.info("event key={},owner={},changeType={}, index={}", event.getKey(), event.getGroupOwner(), event.getChangeType(), getIndex());}}static class ChatGroupEvent implements Hashed<String> {private volatile int hashcode=-1;private String groupId;private String changeType;private String groupOwner;@Overridepublic String getKey() {return groupId;}@Overridepublic int hash(String key) {if (hashcode == -1) {synchronized (this){if (hashcode == -1) {int h;hashcode = (h = key.hashCode()) ^ (h >>> 16);return hashcode;}}}return hashcode;}public String getGroupId() {return groupId;}public void setGroupId(String groupId) {this.groupId = groupId;}public String getChangeType() {return changeType;}public void setChangeType(String changeType) {this.changeType = changeType;}public String getGroupOwner() {return groupOwner;}public void setGroupOwner(String groupOwner) {this.groupOwner = groupOwner;}}
}相关文章:
绝对全网首发,利用Disruptor EventHandler实现在多线程下顺序执行任务
disruptor有两种任务处理器,一个是EventHandler ,另一个是WorkHandler. EventHandler可以彼此独立消费同一个队列中的任务,WorkHandler可以共同竞争消费同一个队列中的任务。也就是说,假设任务队列中有a、b、c、d三个事件,eventHa…...
单例设计模式双重检查的作用
先看双重校验锁的写法 public class Singleton {/*volatile 修饰,singleton new Singleton() 可以拆解为3步:1、分配对象内存(给singleton分配内存)2、调用构造器方法,执行初始化(调用 Singleton 的构造函数来初始化成员变量&am…...
NGINX_十二 nginx 地址重写 rewrite
十二 nginx 地址重写 rewrite 1 什么是Rewrite Rewrite对称URL Rewrite,即URL重写,就是把传入Web的请求重定向到其他URL的过程。URL Rewrite最常见的应用是URL伪静态化,是将动态页面显示为静态页面方式的一种技术。比如 http://www.123.com…...
react用ECharts实现组织架构图
找到ECharts中路径图。 然后开始爆改。 <div id{org- name} style{{ width: 100%, height: 650, display: flex, justifyContent: center }}></div> // data的数据格式 interface ChartData {name: string;value: number;children: ChartData[]; } const treeDep…...
坚持刷题|合并有序链表
文章目录 题目思考代码实现迭代递归 扩展实现k个有序链表合并方法一方法二 PriorityQueue基本操作Java示例注意事项 Hello,大家好,我是阿月。坚持刷题,老年痴呆追不上我,消失了一段时间,我又回来刷题啦,今天…...
SPI协议——对外部SPI Flash操作
目录 1. W25Q32JVSSIQ背景知识 1.1 64个可擦除块 1.2 1024个扇区(每个块有16个扇区) 1.3 页 1. W25Q32JVSSIQ背景知识 W25Q32JV阵列被组织成16,384个可编程页,每页有256字节。一次最多可以编程256个字节。页面可分为16组(4KB扇区清除&…...
kotlin类型检测与类型转换
一、is与!is操作符 1、使用 is 操作符或其否定形式 !is 在运行时检测对象是否符合给定类型。 fun main() {var a "1"if(a is String) {println("a是字符串类型:${a.length}")}// 或val b a is Stringprintln(b) } 二、"不安全的"转换操作符…...
【JDBC】Oracle数据库连接问题记录
Failed to load driver class oracle.jdbc.driver.OracleDriver in either of HikariConfig class oracle驱动包未正确加载,可以先尝试使用下面方式加载检查类是否存在,如果不存在需要手动下载odbc包 try {Class.forName("oracle.jdbc.driver.Ora…...
leetcode45 跳跃游戏II
题目 给定一个长度为 n 的 0 索引整数数组 nums。初始位置为 nums[0]。 每个元素 nums[i] 表示从索引 i 向前跳转的最大长度。换句话说,如果你在 nums[i] 处,你可以跳转到任意 nums[i j] 处: 0 < j < nums[i] i j < n 返回到达 nums[n - 1]…...
【数学】什么是方法矩估计?和最大似然估计是什么关系?
背景 方法矩估计(Method of Moments Estimation)和最大似然估计(Maximum Likelihood Estimation, MLE)是两种常用的参数估计方法。方法矩估计基于样本矩与总体矩的关系,通过样本数据计算样本矩来估计总体参数。最大似…...
C++初学者指南第一步---10.内存(基础)
C初学者指南第一步—10.内存(基础) 文章目录 C初学者指南第一步---10.内存(基础)1.内存模型1.1 纸上谈兵:C的抽象内存模型1.2 实践:内存的实际处理 2. 自动存储3.动态存储:std::vector3.1 动态内…...
扩散模型详细推导过程——编码与解码
符号表 符号含义 x ( i ) z 0 ( i ) \boldsymbol{x}^{(i)}\boldsymbol{z}_0^{(i)} x(i)z0(i)第 i i i个训练数据,其为长度为 d d d的向量 z t ( i ) \boldsymbol{z}_t^{(i)} zt(i)第 i i i个训练数据在第 t t t时刻的加噪版本 ϵ t ( i ) \boldsymbol{\epsilo…...
js如何实现开屏弹窗
开屏弹窗是什么,其实就是第一次登录后进入页面给你的一种公告提示,此后再回到当前这个页面时弹窗是不会再出现的。也就是说这个弹窗只会出现一次。 <!DOCTYPE html> <html><head><meta charset"utf-8"><title>…...
C#——文件读取Directory类详情
文件读取Directory类 Durectory提供了目录以及子目录进行创建移动和列举操作方法 Directory和Directorylnfo类(主要操作文件目录属性列如文件是否隐藏的 或者只读等这些属性) Directory对目录进行复制、移动、重命名、创建和删除等操作DirectoryInfo用于对目录属性执行操作 …...
Ruby on Rails Post项目设置网站初始界面
在构建了Ruby的Web服务器后,第三步就可以去掉框架的官方页面,设置自己的网页初始页了。 Linux系统安装Ruby语言-CSDN博客 、在Ubuntu中创建Ruby on Rails项目并搭建数据库-CSDN博客、 Ruby语言建立Web服务器-CSDN博客 了解Ruby onRails项目中的主要文件…...
03-QTWebEngine中使用qtvirtualkeyboard
qt提供了 virtualKeyboard 虚拟键盘模块,只需要在在main函数中最开始加入这样一句就可以了 qputenv("QT_IM_MODULE", QByteArray("qtvirtualkeyboard")); 但是在使用的时候遇到了一些问题: 1、中文输入的时候没有输入提示 Qvirt…...
leetcode3无重复字符的最长字串(重点讲滑动窗口)
本文主要讲解无重复字符的最长字串的要点与细节,根据步骤一步步走更方便理解 c与java代码如下,末尾 具体要点: 1. 区分一下子串和子序列 子串:要求元素在母串中是连续地出现 子序列:不要求连续 2. 题目中有两个核心…...
Gobject tutorial 八
The GObject base class Object memory management Gobject的内存管理相关的API很复杂,但其目标是提供一个基于引用计数的灵活的内存管理模式。 下面我们来介绍一下,与管理引用计数相关的函数。 Reference Count 函数g_object_ref和g_object_unref的…...
DDMA信号处理以及数据处理的流程---cfar检测
Hello,大家好,我是Xiaojie,好久不见,欢迎大家能够和Xiaojie一起学习毫米波雷达知识,Xiaojie准备连载一个系列的文章—DDMA信号处理以及数据处理的流程,本系列文章将从目标生成、信号仿真、测距、测速、cfar检测、测角、目标聚类、目标跟踪这几个模块逐步介绍,这个系列的…...
【机器学习】从理论到实践:决策树算法在机器学习中的应用与实现
📝个人主页:哈__ 期待您的关注 目录 📕引言 ⛓决策树的基本原理 1. 决策树的结构 2. 信息增益 熵的计算公式 信息增益的计算公式 3. 基尼指数 4. 决策树的构建 🤖决策树的代码实现 1. 数据准备 2. 决策树模型训练 3.…...
变量 varablie 声明- Rust 变量 let mut 声明与 C/C++ 变量声明对比分析
一、变量声明设计:let 与 mut 的哲学解析 Rust 采用 let 声明变量并通过 mut 显式标记可变性,这种设计体现了语言的核心哲学。以下是深度解析: 1.1 设计理念剖析 安全优先原则:默认不可变强制开发者明确声明意图 let x 5; …...
Vue3 + Element Plus + TypeScript中el-transfer穿梭框组件使用详解及示例
使用详解 Element Plus 的 el-transfer 组件是一个强大的穿梭框组件,常用于在两个集合之间进行数据转移,如权限分配、数据选择等场景。下面我将详细介绍其用法并提供一个完整示例。 核心特性与用法 基本属性 v-model:绑定右侧列表的值&…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
抖音增长新引擎:品融电商,一站式全案代运营领跑者
抖音增长新引擎:品融电商,一站式全案代运营领跑者 在抖音这个日活超7亿的流量汪洋中,品牌如何破浪前行?自建团队成本高、效果难控;碎片化运营又难成合力——这正是许多企业面临的增长困局。品融电商以「抖音全案代运营…...
土地利用/土地覆盖遥感解译与基于CLUE模型未来变化情景预测;从基础到高级,涵盖ArcGIS数据处理、ENVI遥感解译与CLUE模型情景模拟等
🔍 土地利用/土地覆盖数据是生态、环境和气象等诸多领域模型的关键输入参数。通过遥感影像解译技术,可以精准获取历史或当前任何一个区域的土地利用/土地覆盖情况。这些数据不仅能够用于评估区域生态环境的变化趋势,还能有效评价重大生态工程…...
三体问题详解
从物理学角度,三体问题之所以不稳定,是因为三个天体在万有引力作用下相互作用,形成一个非线性耦合系统。我们可以从牛顿经典力学出发,列出具体的运动方程,并说明为何这个系统本质上是混沌的,无法得到一般解…...
在鸿蒙HarmonyOS 5中使用DevEco Studio实现企业微信功能
1. 开发环境准备 安装DevEco Studio 3.1: 从华为开发者官网下载最新版DevEco Studio安装HarmonyOS 5.0 SDK 项目配置: // module.json5 {"module": {"requestPermissions": [{"name": "ohos.permis…...
DiscuzX3.5发帖json api
参考文章:PHP实现独立Discuz站外发帖(直连操作数据库)_discuz 发帖api-CSDN博客 简单改造了一下,适配我自己的需求 有一个站点存在多个采集站,我想通过主站拿标题,采集站拿内容 使用到的sql如下 CREATE TABLE pre_forum_post_…...
【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权
摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...
Mysql故障排插与环境优化
前置知识点 最上层是一些客户端和连接服务,包含本 sock 通信和大多数jiyukehuduan/服务端工具实现的TCP/IP通信。主要完成一些简介处理、授权认证、及相关的安全方案等。在该层上引入了线程池的概念,为通过安全认证接入的客户端提供线程。同样在该层上可…...
