C# 如何实现一个事件总线
EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。
它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。
首先,我们有两个基本的约束接口:IEvent和IAsyncEventHandler<TEvent>。
IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。
其中,Publish<TEvent>和PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法Publish和PublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle。
总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。
通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。
这种机制可以提高代码的可维护性和可扩展性。
Github仓库地址:https://github.com/DonPangPang/soda-event-bus
实现一些基本约束
先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。
public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}
接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。
public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}
实现一个本地事件总线
本地事件处理
本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。
LocalEvnetBusManager
LocalEventBusManager主要在单一管道内进行处理,集中进行消费。
public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool
LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。
public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus
实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。
public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}
分布式事件总线
根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。
相关文章:
C# 如何实现一个事件总线
EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件…...
Python学习路线图
防止忘记,温故知新 进阶路线...
作业2.14
chgrp: 只能修改文件的所属组 chgrp 新的组 文件名 要求:修改的目标组已经存在 chown: chown 新的用户名 文件名 sudo chown root :1 将文件1的所属组用户和所属组用户都改为root sudo chown root:ubuntu 1 将文件1的所属用户…...
基于python+django+mysql的小区物业管理系统
该系统是基于pythondjango开发的小区物业管理系统。适用场景:大学生、课程作业、毕业设计。学习过程中,如遇问题可以在github给作者留言。主要功能有:业主管理、报修管理、停车管理、资产管理、小区管理、用户管理、日志管理、系统信息。 演示…...
控制与状态机算法
控制与状态机算法是计算机科学、电子工程和自动化领域中常用的一种设计工具,它用来描述一个系统的行为,该系统在不同时间点可以处于不同的状态,并且其行为取决于当前状态以及输入的信号或事件。状态机算法的核心概念包括: 状态(State):系统的任何可能配置。每个状态代表…...
sql常用语句小结
创建表: create table 表名( 字段1 字段类型 【约束】【comment 字段1注释】, //【】里面的东西可以不用加上去 字段2 字段类型 【约束】【comment 字段2注释】 )【comment 表注释】 约束:作用于表中字段上的规则…...
云计算基础-虚拟机迁移原理
什么是虚拟机迁移 虚拟机迁移是指将正在运行的虚拟机实例从一个物理服务器(或主机)迁移到另一个物理服务器(或主机)的过程,而不会中断虚拟机的运行。 虚拟机拟机迁移分类虚 热迁移:开机状态下迁移 冷迁…...
云计算基础-云计算概念
云计算定义 云计算是一种基于互联网的计算方式,通过这种计算方式,共享的软硬件资源和信息可以按需提供给计算机和其他设备。云计算依赖资源共享以达成规模经济,类似基础设置(如电力网)。 云计算最基本的概念就是云加端,我们有一个…...
如何将阿里云服务器迁移
📑前言 本文主要是如何将阿里云服务器迁移实现数据转移的文章,如果有什么需要改进的地方还请大佬指出⛺️** 🎬作者简介:大家好,我是青衿🥇 ☁️博客首页:CSDN主页放风讲故事 🌄每日…...
如何将本地的python项目部署到linux服务器中
大家好,我是雄雄,欢迎关注微信公众号:雄雄的小课堂 。 前言 本地写好的python项目,如何部署在服务器上运行呢?今天,我们就来抽一点点时间来看看。(网上找的资料,大部分都囫囵吞枣的…...
每日五道java面试题之java基础篇(五)
目录: 第一题. final、finally、finalize 的区别?第二题. 和 equals 的区别?第三题.hashCode 与 equals?第四题. Java 是值传递,还是引⽤传递?第五题 深拷贝和浅拷贝? 第一题. final、finally、finalize 的…...
HiveSQL——用户行为路径分析
注:参考文档: SQL之用户行为路径分析--HQL面试题46【拼多多面试题】_路径分析 sql-CSDN博客文章浏览阅读2k次,点赞6次,收藏19次。目录0 问题描述1 数据分析2 小结0 问题描述已知用户行为表 tracking_log, 大概字段有&…...
专利的申请
申请发明或者实用新型专利的,应当提交请求书、说明书及其摘要和权利要求书等文件。 请求书应当写明发明或者实用新型的名称,发明人或者设计人的姓名,申请人姓名或者名称、地址,以及其他事项。 说明书应当对发明或者实用新型作出清…...
嵌入式学习 C++ Day5、6
嵌入式学习 C Day5、6 一、思维导图 二、作业 1.以下是一个简单的比喻,将多态概念与生活中的实际情况相联系: 比喻:动物园的讲解员和动物表演 想象一下你去了一家动物园,看到了许多不同种类的动物,如狮子、大象、猴…...
阿里云香港服务器cn2速度测试和租用价格表
阿里云香港服务器中国香港数据中心网络线路类型BGP多线精品,中国电信CN2高速网络高质量、大规格BGP带宽,运营商精品公网直连中国内地,时延更低,优化海外回中国内地流量的公网线路,可以提高国际业务访问质量。阿里云服务…...
《学成在线》微服务实战项目实操笔记系列(P92~P120)【下】
史上最详细《学成在线》项目实操笔记系列【下】,跟视频的每一P对应,全系列18万字,涵盖详细步骤与问题的解决方案。如果你操作到某一步卡壳,参考这篇,相信会带给你极大启发。 四、课程发布模块 4.1 (课程发布)模块需求…...
php数据类型以及运算符、判断条件
php数据类型以及运算符 1. php数据类型2. 使用举例3. 运算符4. 判断条件if else elseif 1. php数据类型 包括 String(字符串)、Integer(整型)、Float(浮点型)、Boolean(布尔型)、Array(数组)、Object(对象)、NULL(空值) 2. 使用举例 1.字符串 2.整型 3.浮点型 4.布尔型 5.数组…...
大数据01-导论
零、文章目录 大数据01-导论 1、数据与数据分析 **数据:是事实或观察的结果,是对客观事物的逻辑归纳,是用于表示客观事物的未经加工的原始素材。**数据可以是连续的值,比如声音、图像,称为模拟数据;也可…...
智能网卡(SmartNIC):增强网络性能
在当今的数字时代,网络性能和数据安全是各行各业面临的关键挑战。智能网卡是一项颠覆性的技术创新,对增强网络性能和加强数据安全性具有关键推动作用。本文旨在探讨智能网卡的工作原理及其在不同应用场景中的重要作用。 什么是智能网卡? 智…...
算法刷题day14
目录 引言一、平均二、三国游戏三、松散子序列 引言 今天做了三道新题,类型是贪心、枚举、DP,不是特别难,但是努力一下刚好能够够得上,还是不错的,只要能够一直坚持下去,不断刷题不断总结,就是…...
uniapp 对接腾讯云IM群组成员管理(增删改查)
UniApp 实战:腾讯云IM群组成员管理(增删改查) 一、前言 在社交类App开发中,群组成员管理是核心功能之一。本文将基于UniApp框架,结合腾讯云IM SDK,详细讲解如何实现群组成员的增删改查全流程。 权限校验…...
基于Uniapp开发HarmonyOS 5.0旅游应用技术实践
一、技术选型背景 1.跨平台优势 Uniapp采用Vue.js框架,支持"一次开发,多端部署",可同步生成HarmonyOS、iOS、Android等多平台应用。 2.鸿蒙特性融合 HarmonyOS 5.0的分布式能力与原子化服务,为旅游应用带来…...
dedecms 织梦自定义表单留言增加ajax验证码功能
增加ajax功能模块,用户不点击提交按钮,只要输入框失去焦点,就会提前提示验证码是否正确。 一,模板上增加验证码 <input name"vdcode"id"vdcode" placeholder"请输入验证码" type"text&quo…...
转转集团旗下首家二手多品类循环仓店“超级转转”开业
6月9日,国内领先的循环经济企业转转集团旗下首家二手多品类循环仓店“超级转转”正式开业。 转转集团创始人兼CEO黄炜、转转循环时尚发起人朱珠、转转集团COO兼红布林CEO胡伟琨、王府井集团副总裁祝捷等出席了开业剪彩仪式。 据「TMT星球」了解,“超级…...
spring:实例工厂方法获取bean
spring处理使用静态工厂方法获取bean实例,也可以通过实例工厂方法获取bean实例。 实例工厂方法步骤如下: 定义实例工厂类(Java代码),定义实例工厂(xml),定义调用实例工厂ÿ…...
Spring Boot+Neo4j知识图谱实战:3步搭建智能关系网络!
一、引言 在数据驱动的背景下,知识图谱凭借其高效的信息组织能力,正逐步成为各行业应用的关键技术。本文聚焦 Spring Boot与Neo4j图数据库的技术结合,探讨知识图谱开发的实现细节,帮助读者掌握该技术栈在实际项目中的落地方法。 …...
零基础设计模式——行为型模式 - 责任链模式
第四部分:行为型模式 - 责任链模式 (Chain of Responsibility Pattern) 欢迎来到行为型模式的学习!行为型模式关注对象之间的职责分配、算法封装和对象间的交互。我们将学习的第一个行为型模式是责任链模式。 核心思想:使多个对象都有机会处…...
SAP学习笔记 - 开发26 - 前端Fiori开发 OData V2 和 V4 的差异 (Deepseek整理)
上一章用到了V2 的概念,其实 Fiori当中还有 V4,咱们这一章来总结一下 V2 和 V4。 SAP学习笔记 - 开发25 - 前端Fiori开发 Remote OData Service(使用远端Odata服务),代理中间件(ui5-middleware-simpleproxy)-CSDN博客…...
Hive 存储格式深度解析:从 TextFile 到 ORC,如何选对数据存储方案?
在大数据处理领域,Hive 作为 Hadoop 生态中重要的数据仓库工具,其存储格式的选择直接影响数据存储成本、查询效率和计算资源消耗。面对 TextFile、SequenceFile、Parquet、RCFile、ORC 等多种存储格式,很多开发者常常陷入选择困境。本文将从底…...
MySQL JOIN 表过多的优化思路
当 MySQL 查询涉及大量表 JOIN 时,性能会显著下降。以下是优化思路和简易实现方法: 一、核心优化思路 减少 JOIN 数量 数据冗余:添加必要的冗余字段(如订单表直接存储用户名)合并表:将频繁关联的小表合并成…...
