C#中通道(Channels)的应用之(生产者-消费者模式)

一.生产者-消费者模式概述

生产者-消费者模式是一种经典的设计模式,它将数据的生成(生产者)和处理(消费者)分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区,生产者将数据放入缓冲区,而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量,因为它允许生产者和消费者并行工作,互不干扰。

二.Channels 概念

Channels提供了一种通信机制,允许生产者和消费者之间安全、可靠地交换信息,即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来,System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中,支持async/await关键字,提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列,支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型:有限容量的bound Channel无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息,而无限容量的通道则没有容量限制。

三.Channels 生产者-消费者模式实现

创建通道来作为生产者和消费者之间的共享缓冲区
  1. 无界通道
  • 无界容量的通道(即没有明确限制可以存储的项目数量的通道),使用 Channel.CreateUnbounded<T>() 方法,如:
// 创建一个无界通道
var unboundedChannel = Channel.CreateUnbounded<string>();
  1. 有界通道
  • 创建有界通道则需要指定通道的容量上限,对于有限容量的通道,当通道满时,生产者可能需要等待或丢弃新数据。同样,当通道空时,消费者可能需要等待新数据的到来。通道提供了多种策略BoundedChannelFullMode 枚举处理方式:Wait:当通道已满时,写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest:如果通道已满,会删除最旧的数据(也就是最早进入通道但还未被读取的数据),以便给新的数据腾出空间。DropNewest:与 DropOldest 相反,会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite:直接删除当前正在尝试写入的数据。
    使用 Channel.CreateBounded<T>(int capacity) 方法。例如:
// 创建一个 有界通道
var boundedChannel = Channel.CreateBounded<string>(100);
实现生产者
  • 生产者负责生成数据并将其写入通道。通常使用循环,在该循环中,生产者生成数据并使用WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriter<string> writer)
{for (int i = 0; i < 100; i++){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入,不再接受新数据
}
实现消费者
  • 消费者负责从通道中读取数据并进行处理。通常使用循环,在该循环中,消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据,并对其进行处理。

async Task ConsumerAsync(ChannelReader<string> reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($"Consumed: {msgstring}");// 在这里处理数据}}
}

下面展示一个完整的生产者和消费者示例

  1. 启动 Program
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine("选择运行的模式?例如:1");
Console.WriteLine("1. 单生产单消费");
Console.WriteLine("2. 多生产单消费");
Console.WriteLine("3. 单生产多消费");
Console.WriteLine("4. 多生产多消费");
Console.WriteLine("请输入编号:");
var key = Console.ReadKey();switch (key.KeyChar)
{case '1':await SingleProducerSingleConsumer();break;case '2':await MultiProducerSingleConsumer();break;case '3':await SingleProduceMultipleConsumers();break;case '4':await MultiProducerMultipleConsumers();break;default:Console.WriteLine("请先选择运行模式!");break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 2000);var consumer1 = new Consumer(channel.Reader, 1, 1500);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费Task producerTask1 = producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <= 3; i++){producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}var consumer1 = new Consumer(channel.Reader, 1, 250);Task consumerTask1 = consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();var producer1 = new Producer(channel.Writer, 1, 100);List<Task> consumerTasks = new List<Task>();for (int i = 1; i <= 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 = producer1.ProducerAsync();await producerTask1.ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel = Channel.CreateUnbounded<string>();List<Task> producerTasks = new List<Task>();for (int i = 1; i <=3; i++){Console.WriteLine("线程"+i.ToString());producerTasks.Add(Task.Run(async () => {var producer = new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒,启动另外一个生产}List<Task> consumerTasks = new List<Task>();for (int i = 1; i < 3; i++){consumerTasks.Add(Task.Run(async () => {var consumer = new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ => channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
  1. 生产者Producer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriter<string> _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriter<string> writer, int identifier, int delay){_writer = writer;_identifier = identifier;_delay = delay;}public async Task ProducerAsync(){Console.WriteLine($"开始 ({_identifier}): 发布消息");for (var i = 0; i < 10; i++){await Task.Delay(_delay); // 停顿一下,方便观察数据var msg = $"P{_identifier} - {DateTime.Now:G}-{i}";Console.WriteLine($"发布 ({_identifier}): 消息成功 {msg}");await _writer.WriteAsync(msg);}Console.WriteLine($"发布 ({_identifier}): 完成");}}
}
  1. 消费者Consumer
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// <summary>/// 消费/// </summary>internal class Consumer{private readonly ChannelReader<string> _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReader<string> reader, int identifier, int delay){_reader = reader;_identifier = identifier;_delay = delay;}public async Task ConsumerAsync(){Console.WriteLine($" 开始({_identifier}):消费 ");while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下,方便观察数据Console.WriteLine($"消费 ({_identifier}): 成功 {timeString}");}}Console.WriteLine($"消费 ({_identifier}): 完成");}}
}

运行

  • [ 参考] : https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.rhkb.cn/news/2048.html

如若内容造成侵权/违法违规/事实不符,请联系长河编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【STM32】HAL库USB实现软件升级DFU的功能操作及配置

【STM32】HAL库USB实现软件升级DFU的功能操作及配置 文章目录 DFUHAL库的DFU配置修改代码添加条件判断和跳转代码段DFU烧录附录&#xff1a;Cortex-M架构的SysTick系统定时器精准延时和MCU位带操作SysTick系统定时器精准延时延时函数阻塞延时非阻塞延时 位带操作位带代码位带宏…

kotlin的dagger hilt依赖注入

依赖注入&#xff08;dependency injection, di&#xff09;是设计模式的一种&#xff0c;它的实际作用是给对象赋予实例变量。 基础认识 class MainActivity : ComponentActivity() {override fun onCreate(savedInstanceState: Bundle?) {super.onCreate(savedInstanceSta…

Uniapp判断设备是安卓还是 iOS,并调用不同的方法

在 UniApp 中&#xff0c;可以通过 uni.getSystemInfoSync() 方法来获取设备信息&#xff0c;然后根据系统类型判断当前设备是安卓还是 iOS&#xff0c;并调用不同的方法。 示例代码 export default {onLoad() {this.checkPlatform();},methods: {checkPlatform() {// 获取系…

【MySQL】MVCC详解, 图文并茂简单易懂

欢迎来到啊妮莫的学习小屋 祝读本文的朋友都天天开心呀 目录 MVCC简介快照读与当前读快照读当前读 隔离级别隐藏字段和Undo Log版本链✨MVCC原理--ReadView✨ReadView简介设计思路适用隔离级别重要内容 ReadView规则MVCC整体流程 不同隔离级别下的MVCC读已提交可重复读 总结 M…

VSCode Live Server 插件安装和使用

VSCode Live Server是一个由Ritwick Dey开发的Visual Studio Code扩展插件&#xff0c;它提供了一个带有实时重载功能的本地开发服务器。在VSCode中安装和使用Live Server插件进行实时预览和调试Web应用程序。这将大大提高前端开发效率&#xff0c;使网页设计和开发变得更为流畅…

MC1.12.2 macOS高清修复OptiFine运行崩溃

最近在玩RLCraft&#xff0c;在windows中运行正常的&#xff0c;移植到macOS中发现如果加载OptiFine模组就会崩溃 报错日志 报错日志如下&#xff0c;其中已经包含了各种版本信息&#xff0c;我就不单独说明了。这里说一下&#xff0c;报错的时候用的是oracle jdk x64的&…

医学图像分割半监督学习记录

半监督学习中&#xff0c;一部分数据带标签&#xff0c;一部分不带标签&#xff0c;在模型训练过程中&#xff0c;带标签的数据我们注重分类&#xff0c;无标签的数据我们注重分布。 半监督坚持一致性正则&#xff08;consistency regularization&#xff09;来进行半监督学习&…

12 USART串口通讯

1 串口物理层 两个设备的“DB9接口”之间通过串口信号建立连接&#xff0c;串口信号线中使用“RS232标准”传输数据信号。由于RS232电平标准的信号不能直接被控制器直接识别&#xff0c;所以这些信号会经过“电平转换芯片”转换成控制器能识别的“TTL校准”的电平信号&#xff…

工程水印相机结合图纸,真实现场时间地点,如何使用水印相机,超简单方法只教一次!

在工程管理领域&#xff0c;精准记录现场信息至关重要。水印相机拍照功能&#xff0c;为工程人员提供了强大的现场信息记录工具&#xff0c;助力工程管理和统计工程量&#xff0c;更可以将图片分享到电脑、分享给同事&#xff0c;协同工作。 一、打开图纸 打开手机版CAD快速看图…

abap安装cl_json类

文章来自 SAP根据源码导入/ui2/cl_json类 - pikeduo - 博客园 新建一个se38程序&#xff0c;把源码放到里&#xff0c;源码如下 *----------------------------------------------------------------------* * CLASS zcl_json DEFINITION *----------------------------…

day09_kafka高级

文章目录 kafka高级今日课程内容核心概念整理Kafka的数据位移offset**为什么 Kafka 的 offset 就像是“书签”&#xff1f;****实际意义** Kafka的基准/压力测试测试生产的效率测试消费的效率 Kafka的分片与副本机制kafka如何保证数据不丢失生产者端Broker端消费者端相关参数 K…

vue2制作长方形容器,正方形网格散点图,并且等比缩放拖动

需求&#xff1a;有个长方形的容器&#xff0c;但是需要正方形的网格线&#xff0c;网格线是等比缩放的并且可以无线拖动的&#xff0c;并且添加自适应缩放和动态切换&#xff0c;工具是plotly.js,已完成功能如下 1.正方形网格 2.散点分组 3.自定义悬浮框的数据 4.根据窗口大小…

Spring Boot 2 学习指南与资料分享

Spring Boot 2 学习资料 Spring Boot 2 学习资料 Spring Boot 2 学习资料 在当今竞争激烈的 Java 后端开发领域&#xff0c;Spring Boot 2 凭借其卓越的特性&#xff0c;为开发者们开辟了一条高效、便捷的开发之路。如果你渴望深入学习 Spring Boot 2&#xff0c;以下这份精心…

【PyQt】如何在mainwindow中添加菜单栏

[toc]如何在mainwindow中添加菜单栏 如何在mainwindow中添加菜单栏 主要有两种方法&#xff1a; 1.直接创建mainwindow进行添加 2.使用ui文件加载添加 第二种方法更为常见&#xff0c;可以应用到实际 1.直接创建mainwindow进行添加 import sysfrom PyQt5.QtWidgets import …

Vue如何构建项目

目录 1.安装Node.js 2.换源(建议) 3.选择一个目录 4.创建一个vue项目 5.验证是否成功 1.安装Node.js 安装18.3或更⾼版本的 Nodejs 点击下载->Node.Js中文网 node -v npm -v 安装好后在windows的cmd窗口下运行 如果能运行出结果就说明安装好了。 2.换源(建议) //…

uniapp 小程序 textarea 层级穿透,聚焦光标位置错误怎么办?

前言 在开发微信小程序时&#xff0c;使用 textarea 组件可能会遇到一些棘手的问题。最近我在使用 uniapp 开发微信小程序时&#xff0c;就遇到了两个非常令人头疼的问题&#xff1a; 层级穿透&#xff1a;由于 textarea 是原生组件&#xff0c;任何元素都无法遮盖住它。当其…

[c语言日寄]精英怪:三子棋(tic-tac-toe)3命慢通[附免费源码]

哈喽盆友们&#xff0c;今天带来《c语言》游戏中[三子棋boss]速通教程&#xff01;我们的目标是一边编写博文&#xff0c;一边快速用c语言实现三子棋游戏。准备好瓜子&#xff0c;我们计时开始&#xff01; 前期规划 在速通中&#xff0c;我们必须要有清晰的前期规划&#xf…

Chatper 4: Implementing a GPT model from Scratch To Generate Text

文章目录 4 Implementing a GPT model from Scratch To Generate Text4.1 Coding an LLM architecture4.2 Normalizing activations with layer normalization4.3 Implementing a feed forward network with GELU activations4.4 Adding shortcut connections4.5 Connecting at…

【Vim Masterclass 笔记13】第 7 章:Vim 核心操作之——文本对象与宏操作 + S07L28:Vim 文本对象

文章目录 Section 7&#xff1a;Text Objects and MacrosS07L28 Text Objects1 文本对象的含义2 操作文本对象的基本语法3 操作光标所在的整个单词4 删除光标所在的整个句子5 操作光标所在的整个段落6 删除光标所在的中括号内的文本7 删除光标所在的小括号内的文本8 操作尖括号…

Termora跨平台 SSH/SFTP/Terminal 客户端工具

前言 Termora一款强大的终端模拟与SSH客户端工具&#xff0c;集SFTP传输、跨平台兼容、Zmodem协议、SSH端口转发、配置同步、宏录制、关键词高亮、密钥管理、多会话命令发送及数据加密于一体&#xff0c;专为追求高效远程工作的您设计。无论是开发、管理还是日常任务&#xff…