当前位置: 首页 > news >正文

C# 如何实现一个事件总线

EventBus(事件总线)是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。

它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中,我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件的发布和订阅。

首先,我们有两个基本的约束接口:IEventIAsyncEventHandler<TEvent>

IEvent是一个空接口,用于约束事件的类型。IAsyncEventHandler<TEvent>是一个泛型接口,用于约束事件处理程序的类型。它定义了处理事件的异步方法HandleAsync和处理异常的方法HandleException。接下来,我们有一个IEventBus接口,它定义了一些操作方法用于发布和订阅事件。

其中,Publish<TEvent>PublishAsync<TEvent>方法用于发布事件,而OnSubscribe<TEvent>方法用于订阅事件。然后,我们看到一个实现了本地事件总线的类LocalEventBusManager<TEvent>。它实现了ILocalEventBusManager<TEvent>接口,用于在单一管道内处理本地事件。它使用了一个Channel<TEvent>来存储事件,并提供了发布事件的方法PublishPublishAsync。此外,它还提供了一个自动处理事件的方法AutoHandle

总的来说Event Bus提供了一种方便的方式来实现组件之间的松耦合通信。

通过发布和订阅事件,组件可以独立地进行操作,而不需要直接依赖于彼此的实现细节。

这种机制可以提高代码的可维护性和可扩展性。

Github仓库地址:https://github.com/DonPangPang/soda-event-bus

实现一些基本约束

先实现一些约束,实现IEvent约束事件,实现IAsyncEvnetHandler<TEvent> where TEvent:IEvent来约束事件的处理程序。

public interface IEvent
{}public interface IAsyncEventHandler<in TEvent> where TEvent : IEvent
{Task HandleAsync(IEvent @event);void HandleException(IEvent @event, Exception ex);
}

接下来规定一下咱们的IEventBus,会有哪些操作方法。基本就是发布和订阅。

public interface IEventBus
{void Publish<TEvent>(TEvent @event) where TEvent : IEvent;Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent;void OnSubscribe<TEvent>() where TEvent : IEvent;
}

实现一个本地事件总线

本地事件处理

本地事件的处理我打算采用两种方式实现,一种是LocalEventBusManager即本地事件管理,第二种是LocalEventBusPool池化本地事件。

LocalEvnetBusManager

LocalEventBusManager主要在单一管道内进行处理,集中进行消费。

public interface ILocalEventBusManager<in TEvent>where TEvent : IEvent
{void Publish(TEvent @event);Task PublishAsync(TEvent @event) ;void AutoHandle();
}public class LocalEventBusManager<TEvent>(IServiceProvider serviceProvider):ILocalEventBusManager<TEvent>where TEvent: IEvent
{readonly IServiceProvider _servicesProvider = serviceProvider;private readonly Channel<TEvent> _eventChannel = Channel.CreateUnbounded<TEvent>();public void Publish(TEvent @event){Debug.Assert(_eventChannel != null, nameof(_eventChannel) + " != null");_eventChannel.Writer.WriteAsync(@event);}private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();}public async Task PublishAsync(TEvent @event){await _eventChannel.Writer.WriteAsync(@event);}public void AutoHandle(){// 确保只启动一次if (!Cts.IsCancellationRequested) return;Task.Run(async () =>{while (!Cts.IsCancellationRequested){var reader = await _eventChannel.Reader.ReadAsync();await HandleAsync(reader);}}, Cts.Token);}async Task HandleAsync(TEvent @event){var handler = _servicesProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler is null){throw new NullReferenceException($"No handler for event {@event.GetType().Name}");}try{await handler.HandleAsync(@event);}catch (Exception ex){handler.HandleException( @event, ex);}}
}
LocalEventBusPool

LocalEventBusPool即所有的Event都会有一个单独的管道处理,单独消费处理,并行能力更好一些。

public sealed class LocalEventBusPool(IServiceProvider serviceProvider)
{private readonly IServiceProvider _serviceProvider = serviceProvider;private class ChannelKey{public required string Key { get; init; }public int Subscribers { get; set; }public override bool Equals(object? obj){if (obj is ChannelKey key){return string.Equals(key.Key, Key, StringComparison.OrdinalIgnoreCase);}return false;}public override int GetHashCode(){return 0;}}private Channel<IEvent> Rent(string channel){_channels.TryGetValue(new ChannelKey() { Key = channel }, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(new ChannelKey() { Key = channel }, value);return value;}private Channel<IEvent> Rent(ChannelKey channelKey){_channels.TryGetValue(channelKey, out var value);if (value != null) return value;value = Channel.CreateUnbounded<IEvent>();_channels.TryAdd(channelKey, value);return value;}private readonly ConcurrentDictionary<ChannelKey, Channel<IEvent>> _channels = new();private CancellationTokenSource Cts { get; } = new();public void Cancel(){Cts.Cancel();_channels.Clear();Cts.TryReset();}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{await Rent(typeof(TEvent).Name).Writer.WriteAsync(@event);}public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{Rent(typeof(TEvent).Name).Writer.TryWrite(@event);}public void OnSubscribe<TEvent>() where TEvent : IEvent{var channelKey = _channels.FirstOrDefault(x => x.Key.Key == typeof(TEvent).Name).Key ??new ChannelKey() { Key = typeof(TEvent).Name };channelKey.Subscribers++;Task.Run(async () =>{try{while (!Cts.IsCancellationRequested){var @event = await ReadAsync(channelKey);var handler = _serviceProvider.GetService<IAsyncEventHandler<TEvent>>();if (handler == null) throw new NullReferenceException($"No handler for Event {typeof(TEvent).Name}");try{await handler.HandleAsync((TEvent)@event);}catch (Exception ex){handler.HandleException((TEvent)@event, ex);}}}catch (Exception e){throw new InvalidOperationException("Error on onSubscribe handler", e);}}, Cts.Token);}private async Task<IEvent> ReadAsync(string channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}private async Task<IEvent> ReadAsync(ChannelKey channel){return await Rent(channel).Reader.ReadAsync(Cts.Token);}
}
LocalEventBus

实现LocalEventBus继承自IEventBus即可,如果有需要扩展的方法自行添加,池化和管理器的情况单独处理。

public interface ILocalEventBus: IEventBus
{}
public class LocalEventBus(IServiceProvider serviceProvider, LocalEventBusOptions options) : ILocalEventBus
{private  LocalEventBusPool? EventBusPool => serviceProvider.GetService<LocalEventBusPool>();public void Publish<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.Publish(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.Publish(@event);}}public async Task PublishAsync<TEvent>(TEvent @event) where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");await EventBusPool.PublishAsync(@event);}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");await manager.PublishAsync(@event);}}public void OnSubscribe<TEvent>() where TEvent : IEvent{if (options.Pool){Debug.Assert(EventBusPool != null, nameof(EventBusPool) + " != null");EventBusPool.OnSubscribe<TEvent>();}else{var manager = serviceProvider.GetService<LocalEventBusManager<TEvent>>();if (manager is null) throw new NullReferenceException($"No manager for event {typeof(TEvent).Name}, please add singleton service it.");manager.AutoHandle();}}
}

分布式事件总线

根据需要扩展即可,基本逻辑相同,但可能需要增加确认机制等。

相关文章:

C# 如何实现一个事件总线

EventBus&#xff08;事件总线&#xff09;是一种用于在应用程序内部或跨应用程序组件之间进行事件通信的机制。 它允许不同的组件通过发布和订阅事件来进行解耦和通信。在给定的代码片段中&#xff0c;我们可以看到一个使用C#实现的Event Bus。它定义了一些接口和类来实现事件…...

Python学习路线图

防止忘记&#xff0c;温故知新 进阶路线...

作业2.14

chgrp: 只能修改文件的所属组 chgrp 新的组 文件名 要求&#xff1a;修改的目标组已经存在 chown: chown 新的用户名 文件名 sudo chown root &#xff1a;1 将文件1的所属组用户和所属组用户都改为root sudo chown root&#xff1a;ubuntu 1 将文件1的所属用户…...

基于python+django+mysql的小区物业管理系统

该系统是基于pythondjango开发的小区物业管理系统。适用场景&#xff1a;大学生、课程作业、毕业设计。学习过程中&#xff0c;如遇问题可以在github给作者留言。主要功能有&#xff1a;业主管理、报修管理、停车管理、资产管理、小区管理、用户管理、日志管理、系统信息。 演示…...

控制与状态机算法

控制与状态机算法是计算机科学、电子工程和自动化领域中常用的一种设计工具,它用来描述一个系统的行为,该系统在不同时间点可以处于不同的状态,并且其行为取决于当前状态以及输入的信号或事件。状态机算法的核心概念包括: 状态(State):系统的任何可能配置。每个状态代表…...

sql常用语句小结

创建表&#xff1a; create table 表名&#xff08; 字段1 字段类型 【约束】【comment 字段1注释】&#xff0c; //【】里面的东西可以不用加上去 字段2 字段类型 【约束】【comment 字段2注释】 &#xff09;【comment 表注释】 约束&#xff1a;作用于表中字段上的规则…...

云计算基础-虚拟机迁移原理

什么是虚拟机迁移 虚拟机迁移是指将正在运行的虚拟机实例从一个物理服务器&#xff08;或主机&#xff09;迁移到另一个物理服务器&#xff08;或主机&#xff09;的过程&#xff0c;而不会中断虚拟机的运行。 虚拟机拟机迁移分类虚 热迁移&#xff1a;开机状态下迁移 冷迁…...

云计算基础-云计算概念

云计算定义 云计算是一种基于互联网的计算方式&#xff0c;通过这种计算方式&#xff0c;共享的软硬件资源和信息可以按需提供给计算机和其他设备。云计算依赖资源共享以达成规模经济&#xff0c;类似基础设置(如电力网)。 云计算最基本的概念就是云加端&#xff0c;我们有一个…...

如何将阿里云服务器迁移

&#x1f4d1;前言 本文主要是如何将阿里云服务器迁移实现数据转移的文章&#xff0c;如果有什么需要改进的地方还请大佬指出⛺️** &#x1f3ac;作者简介&#xff1a;大家好&#xff0c;我是青衿&#x1f947; ☁️博客首页&#xff1a;CSDN主页放风讲故事 &#x1f304;每日…...

如何将本地的python项目部署到linux服务器中

大家好&#xff0c;我是雄雄&#xff0c;欢迎关注微信公众号&#xff1a;雄雄的小课堂 。 前言 本地写好的python项目&#xff0c;如何部署在服务器上运行呢&#xff1f;今天&#xff0c;我们就来抽一点点时间来看看。&#xff08;网上找的资料&#xff0c;大部分都囫囵吞枣的…...

每日五道java面试题之java基础篇(五)

目录&#xff1a; 第一题. final、finally、finalize 的区别&#xff1f;第二题. 和 equals 的区别&#xff1f;第三题.hashCode 与 equals?第四题. Java 是值传递&#xff0c;还是引⽤传递&#xff1f;第五题 深拷贝和浅拷贝&#xff1f; 第一题. final、finally、finalize 的…...

HiveSQL——用户行为路径分析

注&#xff1a;参考文档&#xff1a; SQL之用户行为路径分析--HQL面试题46【拼多多面试题】_路径分析 sql-CSDN博客文章浏览阅读2k次&#xff0c;点赞6次&#xff0c;收藏19次。目录0 问题描述1 数据分析2 小结0 问题描述已知用户行为表 tracking_log&#xff0c; 大概字段有&…...

专利的申请

申请发明或者实用新型专利的&#xff0c;应当提交请求书、说明书及其摘要和权利要求书等文件。 请求书应当写明发明或者实用新型的名称&#xff0c;发明人或者设计人的姓名&#xff0c;申请人姓名或者名称、地址&#xff0c;以及其他事项。 说明书应当对发明或者实用新型作出清…...

嵌入式学习 C++ Day5、6

嵌入式学习 C Day5、6 一、思维导图 二、作业 1.以下是一个简单的比喻&#xff0c;将多态概念与生活中的实际情况相联系&#xff1a; 比喻&#xff1a;动物园的讲解员和动物表演 想象一下你去了一家动物园&#xff0c;看到了许多不同种类的动物&#xff0c;如狮子、大象、猴…...

阿里云香港服务器cn2速度测试和租用价格表

阿里云香港服务器中国香港数据中心网络线路类型BGP多线精品&#xff0c;中国电信CN2高速网络高质量、大规格BGP带宽&#xff0c;运营商精品公网直连中国内地&#xff0c;时延更低&#xff0c;优化海外回中国内地流量的公网线路&#xff0c;可以提高国际业务访问质量。阿里云服务…...

《学成在线》微服务实战项目实操笔记系列(P92~P120)【下】

史上最详细《学成在线》项目实操笔记系列【下】&#xff0c;跟视频的每一P对应&#xff0c;全系列18万字&#xff0c;涵盖详细步骤与问题的解决方案。如果你操作到某一步卡壳&#xff0c;参考这篇&#xff0c;相信会带给你极大启发。 四、课程发布模块 4.1 (课程发布)模块需求…...

php数据类型以及运算符、判断条件

php数据类型以及运算符 1. php数据类型2. 使用举例3. 运算符4. 判断条件if else elseif 1. php数据类型 包括 String(字符串)、Integer(整型)、Float(浮点型)、Boolean(布尔型)、Array(数组)、Object(对象)、NULL(空值) 2. 使用举例 1.字符串 2.整型 3.浮点型 4.布尔型 5.数组…...

大数据01-导论

零、文章目录 大数据01-导论 1、数据与数据分析 **数据&#xff1a;是事实或观察的结果&#xff0c;是对客观事物的逻辑归纳&#xff0c;是用于表示客观事物的未经加工的原始素材。**数据可以是连续的值&#xff0c;比如声音、图像&#xff0c;称为模拟数据&#xff1b;也可…...

智能网卡(SmartNIC):增强网络性能

在当今的数字时代&#xff0c;网络性能和数据安全是各行各业面临的关键挑战。智能网卡是一项颠覆性的技术创新&#xff0c;对增强网络性能和加强数据安全性具有关键推动作用。本文旨在探讨智能网卡的工作原理及其在不同应用场景中的重要作用。 什么是智能网卡&#xff1f; 智…...

算法刷题day14

目录 引言一、平均二、三国游戏三、松散子序列 引言 今天做了三道新题&#xff0c;类型是贪心、枚举、DP&#xff0c;不是特别难&#xff0c;但是努力一下刚好能够够得上&#xff0c;还是不错的&#xff0c;只要能够一直坚持下去&#xff0c;不断刷题不断总结&#xff0c;就是…...

论文解读:交大港大上海AI Lab开源论文 | 宇树机器人多姿态起立控制强化学习框架(二)

HoST框架核心实现方法详解 - 论文深度解读(第二部分) 《Learning Humanoid Standing-up Control across Diverse Postures》 系列文章: 论文深度解读 + 算法与代码分析(二) 作者机构: 上海AI Lab, 上海交通大学, 香港大学, 浙江大学, 香港中文大学 论文主题: 人形机器人…...

MFC内存泄露

1、泄露代码示例 void X::SetApplicationBtn() {CMFCRibbonApplicationButton* pBtn GetApplicationButton();// 获取 Ribbon Bar 指针// 创建自定义按钮CCustomRibbonAppButton* pCustomButton new CCustomRibbonAppButton();pCustomButton->SetImage(IDB_BITMAP_Jdp26)…...

c++ 面试题(1)-----深度优先搜索(DFS)实现

操作系统&#xff1a;ubuntu22.04 IDE:Visual Studio Code 编程语言&#xff1a;C11 题目描述 地上有一个 m 行 n 列的方格&#xff0c;从坐标 [0,0] 起始。一个机器人可以从某一格移动到上下左右四个格子&#xff0c;但不能进入行坐标和列坐标的数位之和大于 k 的格子。 例…...

使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装

以下是基于 vant-ui&#xff08;适配 Vue2 版本 &#xff09;实现截图中照片上传预览、删除功能&#xff0c;并封装成可复用组件的完整代码&#xff0c;包含样式和逻辑实现&#xff0c;可直接在 Vue2 项目中使用&#xff1a; 1. 封装的图片上传组件 ImageUploader.vue <te…...

数据链路层的主要功能是什么

数据链路层&#xff08;OSI模型第2层&#xff09;的核心功能是在相邻网络节点&#xff08;如交换机、主机&#xff09;间提供可靠的数据帧传输服务&#xff0c;主要职责包括&#xff1a; &#x1f511; 核心功能详解&#xff1a; 帧封装与解封装 封装&#xff1a; 将网络层下发…...

在WSL2的Ubuntu镜像中安装Docker

Docker官网链接: https://docs.docker.com/engine/install/ubuntu/ 1、运行以下命令卸载所有冲突的软件包&#xff1a; for pkg in docker.io docker-doc docker-compose docker-compose-v2 podman-docker containerd runc; do sudo apt-get remove $pkg; done2、设置Docker…...

【C++特殊工具与技术】优化内存分配(一):C++中的内存分配

目录 一、C 内存的基本概念​ 1.1 内存的物理与逻辑结构​ 1.2 C 程序的内存区域划分​ 二、栈内存分配​ 2.1 栈内存的特点​ 2.2 栈内存分配示例​ 三、堆内存分配​ 3.1 new和delete操作符​ 4.2 内存泄漏与悬空指针问题​ 4.3 new和delete的重载​ 四、智能指针…...

Python训练营-Day26-函数专题1:函数定义与参数

题目1&#xff1a;计算圆的面积 任务&#xff1a; 编写一个名为 calculate_circle_area 的函数&#xff0c;该函数接收圆的半径 radius 作为参数&#xff0c;并返回圆的面积。圆的面积 π * radius (可以使用 math.pi 作为 π 的值)要求&#xff1a;函数接收一个位置参数 radi…...

深入理解 React 样式方案

React 的样式方案较多,在应用开发初期,开发者需要根据项目业务具体情况选择对应样式方案。React 样式方案主要有: 1. 内联样式 2. module css 3. css in js 4. tailwind css 这些方案中,均有各自的优势和缺点。 1. 方案优劣势 1. 内联样式: 简单直观,适合动态样式和…...

深入解析 ReentrantLock:原理、公平锁与非公平锁的较量

ReentrantLock 是 Java 中 java.util.concurrent.locks 包下的一个重要类,用于实现线程同步,支持可重入性,并且可以选择公平锁或非公平锁的实现方式。下面将详细介绍 ReentrantLock 的实现原理以及公平锁和非公平锁的区别。 ReentrantLock 实现原理 基本架构 ReentrantLo…...