C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式
概述
生产者-消费者模式
是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。
二.Channels
概念
Channels
提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0
引入以来,System.Threading.Channels
命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels
已经完全集成到.NET的异步模型中,支持async/await
关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道
有两种类型:有限容量的bound Channel
和无限容量的unbound Channel
。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。
三.Channels
生产者-消费者模式实现
创建通道
来作为生产者和消费者之间的共享缓冲区
- 无界通道
- 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用
Channel.CreateUnbounded<T>()
方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
- 有界通道
- 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略
BoundedChannelFullMode
枚举处理方式:Wait
:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest
:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest
:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite
:直接删除当前正在尝试写入的数据。
使用Channel.CreateBounded<T>(int capacity)
方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
- 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用
WriteAsync
方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
- 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。
async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}
下面展示一个完整的生产者和消费者
示例
- 启动
Program
类
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
- 生产者
Producer
类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
- 消费者
Consumer
类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}
- [ 参考] :
https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0
相关文章:
C#中通道(Channels)的应用之(生产者-消费者模式)
一.生产者-消费者模式概述 生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区&#x…...
git: hint:use --reapply-cherry-picks to include skipped commits
问: 当我在feture分支写完功能,切换到dev更新了远端dev代码,切回feture分支,git rebase dev分支后出现报错: warning skipped previously applied commit 709xxxx hint:use --reapply-cherry-picks to include skippe…...
AI:对比ChatGPT这类聊天机器人,人形机器人对人类有哪些不一样的影响?
人形机器人与像ChatGPT这样的聊天机器人相比,虽然都属于人工智能技术的应用,但由于其具备的物理形态和与环境的互动能力,它们对人类的影响会有很大的不同。下面从多个角度进行对比,阐述它们各自对人类的不同影响: 1. …...
vue3 +ts 学习记录
1 父子传参 父传子 父组件 <TestFuzichuancan :title"title"/> const title 父组件标题子组件 import { defineProps } from vue; interface Props {title?: string,arr: number[]; } const props withDefaults(defineProps<Props>(), {title: 默认…...
微服务的配置共享
1.什么是微服务的配置共享 微服务架构中,配置共享是一个重要环节,它有助于提升服务间的协同效率和数据一致性。以下是对微服务配置共享的详细阐述: 1.1.配置共享的概念 配置共享是指在微服务架构中,将某些通用或全局的配置信息…...
Scala分布式语言二(基础功能搭建、面向对象基础、面向对象高级、异常、集合)
章节3基础功能搭建 46.函数作为值三 package cn . itbaizhan . chapter03 // 函数作为值,函数也是个对象 object FunctionToTypeValue { def main ( args : Array [ String ]): Unit { //Student stu new Student() /*val a ()>{"GTJin"…...
Chromium 132 编译指南 Windows 篇 - 配置核心环境变量 (三)
1. 引言 在之前的 Chromium 编译指南系列文章中,我们已经完成了编译前的准备工作以及 depot_tools 工具的安装与配置。本篇我们将聚焦于 Chromium 编译过程中至关重要的环境变量设置,这些配置是您顺利进行 Chromium 构建的基石。 2. 启用本地编译&…...
开源文件存储分享平台Seafile部署与应用
Seafile 是一款开源的企业云盘,注重可靠性和性能,支持全平台客户端。Seafile 内置协同文档 SeaDoc ,让协作撰写、管理和发布文档更便捷。适用于团队协作、文件存储和同步的开源解决方案,它提供了可靠、安全和易用的云存储服务。主要有以下特点: 文件存储和同步:Seafile 允…...
MYSQL-创建数据库 CREATE DATABASE (十一)
13.1.11 CREATE DATABASE 语句 -- 创建 数据库的 CREATE 权限 CREATE {DATABASE | SCHEMA} [IF NOT EXISTS] db_name[create_option] ...create_option: [DEFAULT] {CHARACTER SET [] charset_name| COLLATE [] collation_name } -- 删除 数据库具有 DROP 权限 DROP {DATABASE…...
Java高频面试之SE-11
hello啊,各位观众姥爷们!!!本牛马baby今天又来了!哈哈哈哈哈嗝🐶 Java中是引用传递还是值传递? 在 Java 中,方法参数传递是通过 值传递 的方式实现的,但这可能会引起一…...
C#结构体,枚举,泛型,事件,委托--10
目录 一.结构体 二.特殊的结构体(ref struct): 三.枚举 四.泛型 泛型的使用: 1.泛型类:定义一个泛型类,使用类型参数T 2.泛型方法:在方法定义中使用类型参数 3.泛型接口 五.委托及泛型委托 委托 泛型委托 六.事件 事件: 泛型事件:使用泛型委托(如Event…...
MapReduce完整工作流程
1、mapreduce工作流程(终极版) 0. 任务提交 1. 拆-split逻辑切片--任务切分。 FileInputFormat--split切片计算工具 FileSplit--单个计算任务的数据范围。 2. 获得split信息和个数。 MapTask阶段 1. 读取split范围内的数据。k(偏移量)-v(行数据) 关键API:TextI…...
网络编程(1)
网络编程概述 Java是 Internet 上的语言,它从语言级上提供了对网络应用程序的支持,程序员能够很容易开发常见的网络应用程序。 Java提供的网络类库,可以实现无痛的网络连接,联网的底层细节被隐藏在 Java 的本机安装系统里&#…...
mysql中创建计算字段
目录 1、计算字段 2、拼接字段 3、去除空格和使用别名 (1)去除空格 (2)使用别名:AS 4、执行算术计算 5、小结 博主用的是mysql8 DBMS,附上示例资料: 百度网盘链接: https://pan.baidu.co…...
【算法】判断一个链表是否为回文结构
问: 给定一个单链表的头节点head,请判断该链表是否为回文结构 例: 1 -> 2 -> 1返回true;1 -> 2 -> 2 -> 1返回true;15 -> 6 -> 15返回true 答: 笔试:初始化一个栈用来…...
计算机网络之---ICMP协议与Ping命令
ICMP 协议 ICMP (Internet Control Message Protocol) 是一种网络层协议,主要用于在 IP 网络中传递控制消息。ICMP 主要用于网络设备之间的故障报告和诊断,帮助设备检测网络连接问题。它是 IP 协议的核心部分之一,用于发送错误消息和操作信息…...
【硬件介绍】Type-C接口详解
一、Type-C接口概述 Type-C接口特点:以其独特的扁头设计和无需区分正反两面的便捷性而广受欢迎。这种设计大大提高了用户的使用体验,避免了传统USB接口需要多次尝试才能正确插入的问题。Type-C接口内部结构:内部上下两排引脚的设计虽然可能不…...
【Pandas】pandas Series rtruediv
Pandas2.2 Series Binary operator functions 方法描述Series.add()用于对两个 Series 进行逐元素加法运算Series.sub()用于对两个 Series 进行逐元素减法运算Series.mul()用于对两个 Series 进行逐元素乘法运算Series.div()用于对两个 Series 进行逐元素除法运算Series.true…...
项目开发版本控制Git流程规范
个人&测试&预发布&生产分支命名 1)个人分支: 从sit或者master进行切出,姓名切出分支命名,或者日期切出分支命名 示例:liuys_sit、20250110_sit2)测试分支: sit3)用户验…...
STM32 : 波特率发生器
波特率发生器 1. 发送器和接收器的波特率 波特率寄存器 (BRR): 在串行通信中,发送器和接收器的波特率是由波特率寄存器(BRR)中的一个值 DIV 来确定的。 2. 计算公式 计算公式: 详细解释 1. 波特率寄存器 (BRR) BRR: 波特率寄存器是一…...
STM32 USB组合设备 MSC CDC
STM32 USB组合设备 MSC CDC实现 教程 教程请看大佬niu_88 手把手教你使用USB的CDCMSC复合设备(基于stm32f407) 大佬的教程很好,很详细,我调出来了,代码请见我绑定的资源 注意事项 值得注意的是: 1、 cu…...
继续以“实用”指导Pythonic编码(re通配表达式)(2024年终总结2)
弃现成工具手剥任务🧐,我哈哈滴就像笨笨的傻大个儿😋。 (笔记模板由python脚本于2025年01月12日 23:29:33创建,本篇笔记适合熟悉正则表达式的coder翻阅) 【学习的细节是欢悦的历程】 Python官网:https://www.python.or…...
Flutter使用BorderRadiusTween实现由矩形变成圆形的动画
BorderRadiusTween 是插值动画中,用于组件边框半径的类,专门作用于组件边框和半径动化过度。 这个类继承自Tween,用法相似。 下面是示例写法 class BorderRadiusTweenPage extends StatefulWidget {overrideState<StatefulWidget> c…...
VSCode 中的 launch.json 配置使用
VSCode 中的 launch.json 配置使用 在 VSCode 中,launch.json 文件用于配置调试设置,特别是用来定义如何启动和调试你的应用。它允许你配置不同的调试模式、运行参数和调试选项。 基本结构 launch.json 文件位于 .vscode 文件夹内,可以通过…...
深度学习张量的秩、轴和形状
深度学习张量的秩、轴和形状 秩、轴和形状是在深度学习中我们最关心的张量属性。 秩轴形状 秩、轴和形状是在深度学习中开始使用张量时我们最关心的三个属性。这些概念相互建立,从秩开始,然后是轴,最后构建到形状,所以请注意这…...
Redis有哪些常用应用场景?
大家好,我是锋哥。今天分享关于【Redis有哪些常用应用场景?】面试题。希望对大家有帮助; Redis有哪些常用应用场景? 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Redis 是一个高性能的开源键值对(Key-Va…...
vue3+ts+element-plus 输入框el-input设置背景颜色
普通情况: 组件内容: <el-input v-model"applyBasicInfo.outerApplyId"/> 样式设置: ::v-deep .el-input__wrapper {background-color: pink; }// 也可以这样设置 ::v-deep(.el-input__wrapper) {background-color: pink…...
Ubuntu 磁盘修复
Ubuntu 磁盘修复 在 ubuntu 文件系统变成只读模式,该处理呢? 文件系统内部的错误,如索引错误、元数据损坏等,也可能导致系统进入只读状态。磁盘坏道或硬件故障也可能引发文件系统只读的问题。/etc/fstab配置错误,可能…...
使用RSyslog将Nginx Access Log写入Kafka
个人博客地址:使用RSyslog将Nginx Access Log写入Kafka | 一张假钞的真实世界 环境说明 CentOS Linux release 7.3.1611kafka_2.12-0.10.2.2nginx/1.12.2rsyslog-8.24.0-34.el7.x86_64.rpm 创建测试Topic $ ./kafka-topics.sh --zookeeper 192.168.72.25:2181/k…...
通过Apache、Nginx限制直接访问public下的静态文件
一、Apache 在public目录下的.htaccess文件中添加如下规则,来拒绝除了指定文件类型之外的所有请求 <FilesMatch "\.(?!(jpg|jpeg|png|gif|css|js|ico)$)[^.]$">Order Allow,DenyDeny from all </FilesMatch> 上述配置表示仅允许访问.jpg …...
做视频发哪个网站赚钱/百度推广app怎么收费
介绍Java7的工具类Objects 本文介绍Java7引入的工具里Objects,使用其API可以让代码更简洁。 1. 概述 在jdk7添加了一个objects工具类,它提供了一些方法来操作对象,它由一些静态的实用方法组成,这些方法是null-safe (…...
seo整站优化公司/seo网络优化师招聘
2019独角兽企业重金招聘Python工程师标准>>> 参考资料 1、CentOS/Linux 开放80、8080端口或者开放某个端口 注: 修改/etc/sysconfig/iptables配置,可以参考22端口开放特例 -A INPUT -m state --state NEW -m tcp -p tcp --dport 22 -j ACCEPT…...
南昌专业做网站公司哪家好/网站监测
两个排序的数组A和B分别含有m和n个数,找到两个排序数组的中位数,要求时间复杂度应为O(log (mn))。在线评测地址:https://www.lintcode.com/problem/median-of-two-sorted-arrays/?utm_sourcesc-zhihuzl-lm说明中位数的定义:这里的…...
网站开发建设/行业关键词查询
这几天我的上篇长篇大论引来一堆回复,大家基本上都表扬了我。 其实我公开发表之前还是有些犹豫,因为我话说得比较冲,火药味比较重。严格的说点发表前我甚至删了不少火箭只剩下温柔的子弹,使得文章温和一些。不知道何时开始&#x…...
有没有必要为B2B网站做外链/百度竞价推广技巧
备考第6周总结这个星期最大的收获就是看了何凯文老师的长难句基础课,一共有8节,这个星期学了4节。我感觉他讲的非常好,思路非常清晰,讲课时也不会忘记和大家开开玩笑,课堂气氛很好。我听了以后,受益匪浅&am…...
旅行社网站建设方案论文/怎么建立个人网站
罗德与施瓦茨 (Rohde & Schwarz, R&S) 公司成立于1933年,总部位于德国慕尼黑,是一家技术公司,为企业和政府机构开发、生产和销售广泛的电子产品,业务核心在于提供各类解决方案以打造一个更加安全的互联世界。 罗德与施瓦…...