我们需要实现以下需求,实现多条流水线,每个流水线上都有一个长耗时任务,任务到来时总是先到达没有任务或所有任务已完成的流水线,如果所有流水线都已有任务,下一个任务来时则阻塞等待。
以下是使用Channels的高性能简单实现:
using System.Threading.Channels;
namespace Demo.Jobs
{
public class Job : BackgroundService
{
private static Channel<string>[] _channels = new Channel<string>[Environment.ProcessorCount];
private static SemaphoreSlim _semaphore = new SemaphoreSlim(_channels.Length, _channels.Length);
private static int _freechannel = 0;
public Job()
{
for (int i = 0; i < _channels.Length; i++)
{
_channels[i] = Channel.CreateBounded<string>(1);
}
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
for (int i = 0; i < _channels.Length; i++)
{
_ = ReadAsync(_channels[i],i);
}
}
private async ValueTask LongTimeTask(string item)
{
Console.WriteLine(item + " Job start");
await Task.Delay(TimeSpan.FromSeconds(10));
Console.WriteLine(item + " Job finish");
}
public static async Task WriteAsync(string message, CancellationToken cancellationToken)
{
while (true)
{
await _semaphore.WaitAsync(cancellationToken);
var writer = _channels[_freechannel].Writer;
if (writer.TryWrite(message)) break;
_freechannel = (_freechannel + 1) % _channels.Length;
_semaphore.Release();
}
}
public async ValueTask ReadAsync(Channel<string> _channel,int channelNum)
{
var reader = _channel.Reader;
while (await reader.WaitToReadAsync())
{
while (reader.TryPeek(out var item))
{
await LongTimeTask(item);
_ = await reader.ReadAsync();
_freechannel = channelNum;
_semaphore.Release();
}
}
}
}
}
关键点是Channels的read只能从头到尾一条一条读,这样容量只是单条流水线的实现,并行化必须引入多条Channels。待完善:超时、失败处理、执行与结束流程分离等。