当前位置: 首页 > news >正文

C#中通道(Channels)的应用之(生产者-消费者模式)

一.生产者-消费者模式概述

生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。

二.Channels 概念

Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。

三.Channels 生产者-消费者模式实现

创建通道来作为生产者和消费者之间的共享缓冲区
  1. 无界通道
  • 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用 Channel.CreateUnbounded<T>() 方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
  1. 有界通道
  • 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略BoundedChannelFullMode 枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
    使用 Channel.CreateBounded<T>(int capacity) 方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
  • 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
  • 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。

async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}

下面展示一个完整的生产者和消费者示例

  1. 启动 Program
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
  1. 生产者Producer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
  1. 消费者Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}

运行

  • [ 参考] : https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0

相关文章:

C#中通道(Channels)的应用之(生产者-消费者模式)

一.生产者-消费者模式概述 生产者-消费者模式是一种经典的设计模式&#xff0c;它将数据的生成&#xff08;生产者&#xff09;和处理&#xff08;消费者&#xff09;分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区&#xff0c;生产者将数据放入缓冲区&#x…...

git: hint:use --reapply-cherry-picks to include skipped commits

问&#xff1a; 当我在feture分支写完功能&#xff0c;切换到dev更新了远端dev代码&#xff0c;切回feture分支&#xff0c;git rebase dev分支后出现报错&#xff1a; warning skipped previously applied commit 709xxxx hint:use --reapply-cherry-picks to include skippe…...

AI:对比ChatGPT这类聊天机器人,人形机器人对人类有哪些不一样的影响?

人形机器人与像ChatGPT这样的聊天机器人相比&#xff0c;虽然都属于人工智能技术的应用&#xff0c;但由于其具备的物理形态和与环境的互动能力&#xff0c;它们对人类的影响会有很大的不同。下面从多个角度进行对比&#xff0c;阐述它们各自对人类的不同影响&#xff1a; 1. …...

vue3 +ts 学习记录

1 父子传参 父传子 父组件 <TestFuzichuancan :title"title"/> const title 父组件标题子组件 import { defineProps } from vue; interface Props {title?: string,arr: number[]; } const props withDefaults(defineProps<Props>(), {title: 默认…...

微服务的配置共享

1.什么是微服务的配置共享 微服务架构中&#xff0c;配置共享是一个重要环节&#xff0c;它有助于提升服务间的协同效率和数据一致性。以下是对微服务配置共享的详细阐述&#xff1a; 1.1.配置共享的概念 配置共享是指在微服务架构中&#xff0c;将某些通用或全局的配置信息…...

Scala分布式语言二(基础功能搭建、面向对象基础、面向对象高级、异常、集合)

章节3基础功能搭建 46.函数作为值三 package cn . itbaizhan . chapter03 // 函数作为值&#xff0c;函数也是个对象 object FunctionToTypeValue { def main ( args : Array [ String ]): Unit { //Student stu new Student() /*val a ()>{"GTJin"…...

Chromium 132 编译指南 Windows 篇 - 配置核心环境变量 (三)

1. 引言 在之前的 Chromium 编译指南系列文章中&#xff0c;我们已经完成了编译前的准备工作以及 depot_tools 工具的安装与配置。本篇我们将聚焦于 Chromium 编译过程中至关重要的环境变量设置&#xff0c;这些配置是您顺利进行 Chromium 构建的基石。 2. 启用本地编译&…...

开源文件存储分享平台Seafile部署与应用

Seafile 是一款开源的企业云盘,注重可靠性和性能,支持全平台客户端。Seafile 内置协同文档 SeaDoc ,让协作撰写、管理和发布文档更便捷。适用于团队协作、文件存储和同步的开源解决方案,它提供了可靠、安全和易用的云存储服务。主要有以下特点: 文件存储和同步:Seafile 允…...

MYSQL-创建数据库 CREATE DATABASE (十一)

13.1.11 CREATE DATABASE 语句 -- 创建 数据库的 CREATE 权限 CREATE {DATABASE | SCHEMA} [IF NOT EXISTS] db_name[create_option] ...create_option: [DEFAULT] {CHARACTER SET [] charset_name| COLLATE [] collation_name } -- 删除 数据库具有 DROP 权限 DROP {DATABASE…...

Java高频面试之SE-11

hello啊&#xff0c;各位观众姥爷们&#xff01;&#xff01;&#xff01;本牛马baby今天又来了&#xff01;哈哈哈哈哈嗝&#x1f436; Java中是引用传递还是值传递&#xff1f; 在 Java 中&#xff0c;方法参数传递是通过 值传递 的方式实现的&#xff0c;但这可能会引起一…...

C#结构体,枚举,泛型,事件,委托--10

目录 一.结构体 二.特殊的结构体(ref struct): 三.枚举 四.泛型 泛型的使用: 1.泛型类:定义一个泛型类,使用类型参数T 2.泛型方法:在方法定义中使用类型参数 3.泛型接口 五.委托及泛型委托 委托 泛型委托 六.事件 事件: 泛型事件:使用泛型委托&#xff08;如Event…...

MapReduce完整工作流程

1、mapreduce工作流程(终极版) 0. 任务提交 1. 拆-split逻辑切片--任务切分。 FileInputFormat--split切片计算工具 FileSplit--单个计算任务的数据范围。 2. 获得split信息和个数。 MapTask阶段 1. 读取split范围内的数据。k(偏移量)-v(行数据) 关键API&#xff1a;TextI…...

网络编程(1)

网络编程概述 Java是 Internet 上的语言&#xff0c;它从语言级上提供了对网络应用程序的支持&#xff0c;程序员能够很容易开发常见的网络应用程序。 Java提供的网络类库&#xff0c;可以实现无痛的网络连接&#xff0c;联网的底层细节被隐藏在 Java 的本机安装系统里&#…...

mysql中创建计算字段

目录 1、计算字段 2、拼接字段 3、去除空格和使用别名 &#xff08;1&#xff09;去除空格 &#xff08;2&#xff09;使用别名&#xff1a;AS 4、执行算术计算 5、小结 博主用的是mysql8 DBMS&#xff0c;附上示例资料&#xff1a; 百度网盘链接: https://pan.baidu.co…...

【算法】判断一个链表是否为回文结构

问&#xff1a; 给定一个单链表的头节点head&#xff0c;请判断该链表是否为回文结构 例&#xff1a; 1 -> 2 -> 1返回true&#xff1b;1 -> 2 -> 2 -> 1返回true&#xff1b;15 -> 6 -> 15返回true 答&#xff1a; 笔试&#xff1a;初始化一个栈用来…...

计算机网络之---ICMP协议与Ping命令

ICMP 协议 ICMP (Internet Control Message Protocol) 是一种网络层协议&#xff0c;主要用于在 IP 网络中传递控制消息。ICMP 主要用于网络设备之间的故障报告和诊断&#xff0c;帮助设备检测网络连接问题。它是 IP 协议的核心部分之一&#xff0c;用于发送错误消息和操作信息…...

【硬件介绍】Type-C接口详解

一、Type-C接口概述 Type-C接口特点&#xff1a;以其独特的扁头设计和无需区分正反两面的便捷性而广受欢迎。这种设计大大提高了用户的使用体验&#xff0c;避免了传统USB接口需要多次尝试才能正确插入的问题。Type-C接口内部结构&#xff1a;内部上下两排引脚的设计虽然可能不…...

【Pandas】pandas Series rtruediv

Pandas2.2 Series Binary operator functions 方法描述Series.add()用于对两个 Series 进行逐元素加法运算Series.sub()用于对两个 Series 进行逐元素减法运算Series.mul()用于对两个 Series 进行逐元素乘法运算Series.div()用于对两个 Series 进行逐元素除法运算Series.true…...

项目开发版本控制Git流程规范

个人&测试&预发布&生产分支命名 1&#xff09;个人分支&#xff1a; 从sit或者master进行切出&#xff0c;姓名切出分支命名&#xff0c;或者日期切出分支命名 示例&#xff1a;liuys_sit、20250110_sit2&#xff09;测试分支&#xff1a; sit3&#xff09;用户验…...

STM32 : 波特率发生器

波特率发生器 1. 发送器和接收器的波特率 波特率寄存器 (BRR): 在串行通信中&#xff0c;发送器和接收器的波特率是由波特率寄存器&#xff08;BRR&#xff09;中的一个值 DIV 来确定的。 2. 计算公式 计算公式: 详细解释 1. 波特率寄存器 (BRR) BRR: 波特率寄存器是一…...

STM32 USB组合设备 MSC CDC

STM32 USB组合设备 MSC CDC实现 教程 教程请看大佬niu_88 手把手教你使用USB的CDCMSC复合设备&#xff08;基于stm32f407&#xff09; 大佬的教程很好&#xff0c;很详细&#xff0c;我调出来了&#xff0c;代码请见我绑定的资源 注意事项 值得注意的是&#xff1a; 1、 cu…...

继续以“实用”指导Pythonic编码(re通配表达式)(2024年终总结2)

弃现成工具手剥任务&#x1f9d0;&#xff0c;我哈哈滴就像笨笨的傻大个儿&#x1f60b;。 (笔记模板由python脚本于2025年01月12日 23:29:33创建&#xff0c;本篇笔记适合熟悉正则表达式的coder翻阅) 【学习的细节是欢悦的历程】 Python官网&#xff1a;https://www.python.or…...

Flutter使用BorderRadiusTween实现由矩形变成圆形的动画

BorderRadiusTween 是插值动画中&#xff0c;用于组件边框半径的类&#xff0c;专门作用于组件边框和半径动化过度。 这个类继承自Tween&#xff0c;用法相似。 下面是示例写法 class BorderRadiusTweenPage extends StatefulWidget {overrideState<StatefulWidget> c…...

VSCode 中的 launch.json 配置使用

VSCode 中的 launch.json 配置使用 在 VSCode 中&#xff0c;launch.json 文件用于配置调试设置&#xff0c;特别是用来定义如何启动和调试你的应用。它允许你配置不同的调试模式、运行参数和调试选项。 基本结构 launch.json 文件位于 .vscode 文件夹内&#xff0c;可以通过…...

深度学习张量的秩、轴和形状

深度学习张量的秩、轴和形状 秩、轴和形状是在深度学习中我们最关心的张量属性。 秩轴形状 秩、轴和形状是在深度学习中开始使用张量时我们最关心的三个属性。这些概念相互建立&#xff0c;从秩开始&#xff0c;然后是轴&#xff0c;最后构建到形状&#xff0c;所以请注意这…...

Redis有哪些常用应用场景?

大家好&#xff0c;我是锋哥。今天分享关于【Redis有哪些常用应用场景&#xff1f;】面试题。希望对大家有帮助&#xff1b; Redis有哪些常用应用场景&#xff1f; 1000道 互联网大厂Java工程师 精选面试题-Java资源分享网 Redis 是一个高性能的开源键值对&#xff08;Key-Va…...

vue3+ts+element-plus 输入框el-input设置背景颜色

普通情况&#xff1a; 组件内容&#xff1a; <el-input v-model"applyBasicInfo.outerApplyId"/> 样式设置&#xff1a; ::v-deep .el-input__wrapper {background-color: pink; }// 也可以这样设置 ::v-deep(.el-input__wrapper) {background-color: pink…...

Ubuntu 磁盘修复

Ubuntu 磁盘修复 在 ubuntu 文件系统变成只读模式&#xff0c;该处理呢&#xff1f; 文件系统内部的错误&#xff0c;如索引错误、元数据损坏等&#xff0c;也可能导致系统进入只读状态。磁盘坏道或硬件故障也可能引发文件系统只读的问题。/etc/fstab配置错误&#xff0c;可能…...

使用RSyslog将Nginx Access Log写入Kafka

个人博客地址&#xff1a;使用RSyslog将Nginx Access Log写入Kafka | 一张假钞的真实世界 环境说明 CentOS Linux release 7.3.1611kafka_2.12-0.10.2.2nginx/1.12.2rsyslog-8.24.0-34.el7.x86_64.rpm 创建测试Topic $ ./kafka-topics.sh --zookeeper 192.168.72.25:2181/k…...

通过Apache、Nginx限制直接访问public下的静态文件

一、Apache 在public目录下的.htaccess文件中添加如下规则&#xff0c;来拒绝除了指定文件类型之外的所有请求 <FilesMatch "\.(?!(jpg|jpeg|png|gif|css|js|ico)$)[^.]$">Order Allow,DenyDeny from all </FilesMatch> 上述配置表示仅允许访问.jpg …...

做视频发哪个网站赚钱/百度推广app怎么收费

介绍Java7的工具类Objects 本文介绍Java7引入的工具里Objects&#xff0c;使用其API可以让代码更简洁。 1. 概述 在jdk7添加了一个objects工具类&#xff0c;它提供了一些方法来操作对象&#xff0c;它由一些静态的实用方法组成&#xff0c;这些方法是null-safe &#xff08…...

seo整站优化公司/seo网络优化师招聘

2019独角兽企业重金招聘Python工程师标准>>> 参考资料 1、CentOS/Linux 开放80、8080端口或者开放某个端口 注&#xff1a; 修改/etc/sysconfig/iptables配置&#xff0c;可以参考22端口开放特例 -A INPUT -m state --state NEW -m tcp -p tcp --dport 22 -j ACCEPT…...

南昌专业做网站公司哪家好/网站监测

两个排序的数组A和B分别含有m和n个数&#xff0c;找到两个排序数组的中位数&#xff0c;要求时间复杂度应为O(log (mn))。在线评测地址&#xff1a;https://www.lintcode.com/problem/median-of-two-sorted-arrays/?utm_sourcesc-zhihuzl-lm说明中位数的定义&#xff1a;这里的…...

网站开发建设/行业关键词查询

这几天我的上篇长篇大论引来一堆回复&#xff0c;大家基本上都表扬了我。 其实我公开发表之前还是有些犹豫&#xff0c;因为我话说得比较冲&#xff0c;火药味比较重。严格的说点发表前我甚至删了不少火箭只剩下温柔的子弹&#xff0c;使得文章温和一些。不知道何时开始&#x…...

有没有必要为B2B网站做外链/百度竞价推广技巧

备考第6周总结这个星期最大的收获就是看了何凯文老师的长难句基础课&#xff0c;一共有8节&#xff0c;这个星期学了4节。我感觉他讲的非常好&#xff0c;思路非常清晰&#xff0c;讲课时也不会忘记和大家开开玩笑&#xff0c;课堂气氛很好。我听了以后&#xff0c;受益匪浅&am…...

旅行社网站建设方案论文/怎么建立个人网站

罗德与施瓦茨 (Rohde & Schwarz, R&S) 公司成立于1933年&#xff0c;总部位于德国慕尼黑&#xff0c;是一家技术公司&#xff0c;为企业和政府机构开发、生产和销售广泛的电子产品&#xff0c;业务核心在于提供各类解决方案以打造一个更加安全的互联世界。 罗德与施瓦…...