江苏网站建设效果,网站建设目标与期望,网络营销公司全网推广公司,建设厅焊工证什么样子一.生产者-消费者模式概述
生产者-消费者模式是一种经典的设计模式#xff0c;它将数据的生成#xff08;生产者#xff09;和处理#xff08;消费者#xff09;分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区#xff0c;生产者将数据放入缓冲区#x…一.生产者-消费者模式概述
生产者-消费者模式是一种经典的设计模式它将数据的生成生产者和处理消费者分离到不同的模块或线程中。这种模式的核心在于一个共享的缓冲区生产者将数据放入缓冲区而消费者从缓冲区中取出数据进行处理。这种模式有助于提高系统的响应性和吞吐量因为它允许生产者和消费者并行工作互不干扰。
二.Channels 概念
Channels提供了一种通信机制允许生产者和消费者之间安全、可靠地交换信息即使它们在不同的执行线程上运行。自.NET Core 3.0引入以来System.Threading.Channels命名空间为我们处理生产者-消费者模式等复杂场景提供了强大的支持。Channels已经完全集成到.NET的异步模型中支持async/await关键字提供了一种异步的消息传递机制。通道本质上是一个线程安全的队列支持在生产者和消费者之间安全、可靠地传递数据。通道有两种类型有限容量的bound Channel和无限容量的unbound Channel。有限容量的通道在达到容量上限时会根据指定的策略处理新消息而无限容量的通道则没有容量限制。
三.Channels 生产者-消费者模式实现
创建通道来作为生产者和消费者之间的共享缓冲区
无界通道
无界容量的通道即没有明确限制可以存储的项目数量的通道,使用 Channel.CreateUnboundedT() 方法如
// 创建一个无界通道
var unboundedChannel Channel.CreateUnboundedstring();有界通道
创建有界通道则需要指定通道的容量上限对于有限容量的通道当通道满时生产者可能需要等待或丢弃新数据。同样当通道空时消费者可能需要等待新数据的到来。通道提供了多种策略BoundedChannelFullMode 枚举处理方式Wait当通道已满时写操作会等待直到队列中有空间来写入新的数据。这种情况下如果 TryWrite 操作会返回 false。DropOldest如果通道已满会删除最旧的数据也就是最早进入通道但还未被读取的数据以便给新的数据腾出空间。DropNewest与 DropOldest 相反会删除最新写入但还未被读取的数据来让新数据容纳进来。DropWrite直接删除当前正在尝试写入的数据。 使用 Channel.CreateBoundedT(int capacity) 方法。例如
// 创建一个 有界通道
var boundedChannel Channel.CreateBoundedstring(100);实现生产者
生产者负责生成数据并将其写入通道。通常使用循环在该循环中生产者生成数据并使用WriteAsync方法将其写入通道。
async Task ProducerAsync(ChannelWriterstring writer)
{for (int i 0; i 100; i){await writer.WriteAsync(i.ToString());await Task.Delay(100); // 模拟数据生成的时间间隔}writer.Complete(); // 标记通道为完成写入不再接受新数据
}实现消费者
消费者负责从通道中读取数据并进行处理。通常使用循环在该循环中消费者使用ReadAsync或ReadAllAsync方法从通道中读取数据并对其进行处理。 async Task ConsumerAsync(ChannelReaderstring reader)
{while (await reader.WaitToReadAsync()){if (reader.TryRead(out var msgstring)){Console.WriteLine($Consumed: {msgstring});// 在这里处理数据}}
}下面展示一个完整的生产者和消费者示例
启动 Program 类
// See https://aka.ms/new-console-template for more informationusing System.Threading.Channels;
using System.Threading.Tasks;
using TestChannels;Console.WriteLine(选择运行的模式?例如1);
Console.WriteLine(1. 单生产单消费);
Console.WriteLine(2. 多生产单消费);
Console.WriteLine(3. 单生产多消费);
Console.WriteLine(4. 多生产多消费);
Console.WriteLine(请输入编号);
var key Console.ReadKey();switch (key.KeyChar)
{case 1:await SingleProducerSingleConsumer();break;case 2:await MultiProducerSingleConsumer();break;case 3:await SingleProduceMultipleConsumers();break;case 4:await MultiProducerMultipleConsumers();break;default:Console.WriteLine(请先选择运行模式!);break;
}// 单生产单消费
static async Task SingleProducerSingleConsumer()
{var channel Channel.CreateUnboundedstring();var producer1 new Producer(channel.Writer, 1, 2000);var consumer1 new Consumer(channel.Reader, 1, 1500);Task consumerTask1 consumer1.ConsumerAsync(); // 开始消费Task producerTask1 producer1.ProducerAsync(); // 开始生产await producerTask1.ContinueWith(_ channel.Writer.Complete());await consumerTask1;
}// 多生产单消费
static async Task MultiProducerSingleConsumer()
{var channel Channel.CreateUnboundedstring();ListTask producerTasks new ListTask();for (int i 1; i 3; i){producerTasks.Add(Task.Run(async () {var producer new Producer(channel.Writer, i, 2000);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒启动另外一个生产}var consumer1 new Consumer(channel.Reader, 1, 250);Task consumerTask1 consumer1.ConsumerAsync(); // 开始消费await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ channel.Writer.Complete());await consumerTask1;
}// 单生产多消费
static async Task SingleProduceMultipleConsumers()
{var channel Channel.CreateUnboundedstring();var producer1 new Producer(channel.Writer, 1, 100);ListTask consumerTasks new ListTask();for (int i 1; i 3; i){consumerTasks.Add(Task.Run(async () {var consumer new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}Task producerTask1 producer1.ProducerAsync();await producerTask1.ContinueWith(_ channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}// 多生产多消费
static async Task MultiProducerMultipleConsumers()
{var channel Channel.CreateUnboundedstring();ListTask producerTasks new ListTask();for (int i 1; i 3; i){Console.WriteLine(线程i.ToString());producerTasks.Add(Task.Run(async () {var producer new Producer(channel.Writer, i, 100);await producer.ProducerAsync();}));await Task.Delay(500); // 暂停500毫秒启动另外一个生产}ListTask consumerTasks new ListTask();for (int i 1; i 3; i){consumerTasks.Add(Task.Run(async () {var consumer new Consumer(channel.Reader, 1, 1500);await consumer.ConsumerAsync();}));}await Task.WhenAll(producerTasks.ToArray()).ContinueWith(_ channel.Writer.Complete());await Task.WhenAll(consumerTasks.ToArray());
}
生产者Producer 类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{internal class Producer{private readonly ChannelWriterstring _writer;private readonly int _identifier;private readonly int _delay;public Producer(ChannelWriterstring writer, int identifier, int delay){_writer writer;_identifier identifier;_delay delay;}public async Task ProducerAsync(){Console.WriteLine($开始 ({_identifier}): 发布消息);for (var i 0; i 10; i){await Task.Delay(_delay); // 停顿一下方便观察数据var msg $P{_identifier} - {DateTime.Now:G}-{i};Console.WriteLine($发布 ({_identifier}): 消息成功 {msg});await _writer.WriteAsync(msg);}Console.WriteLine($发布 ({_identifier}): 完成);}}
}
消费者Consumer 类
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;namespace TestChannels
{/// summary/// 消费/// /summaryinternal class Consumer{private readonly ChannelReaderstring _reader;private readonly int _identifier;private readonly int _delay;public Consumer(ChannelReaderstring reader, int identifier, int delay){_reader reader;_identifier identifier;_delay delay;}public async Task ConsumerAsync(){Console.WriteLine($ 开始({_identifier}):消费 );while (await _reader.WaitToReadAsync()){if (_reader.TryRead(out var timeString)){await Task.Delay(_delay); // 停顿一下方便观察数据Console.WriteLine($消费 ({_identifier}): 成功 {timeString});}}Console.WriteLine($消费 ({_identifier}): 完成);}}
} [ 参考] : https://learn.microsoft.com/en-us/dotnet/api/system.threading.channels?viewnetcore-3.0