我们需要实现以下需求,实现多条流水线,每个流水线上都有一个长耗时任务,任务到来时总是先到达没有任务或所有任务已完成的流水线,如果所有流水线都已有任务,下一个任务来时则阻塞等待。

以下是使用Channels的高性能简单实现:

using System.Threading.Channels;
 
namespace Demo.Jobs
{
    public class Job : BackgroundService
    {
        private static Channel<string>[] _channels = new Channel<string>[Environment.ProcessorCount];
        private static SemaphoreSlim _semaphore = new SemaphoreSlim(_channels.Length, _channels.Length);
        private static int _freechannel = 0;
 
        public Job()
        {
            for (int i = 0; i < _channels.Length; i++)
            {
                _channels[i] = Channel.CreateBounded<string>(1);
            }
        }
 
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            for (int i = 0; i < _channels.Length; i++)
            {
                _ = ReadAsync(_channels[i],i);
            }
        }
 
        private async ValueTask LongTimeTask(string item)
        {
            Console.WriteLine(item + " Job start");
            await Task.Delay(TimeSpan.FromSeconds(10));
            Console.WriteLine(item + " Job finish");
        }
 
        public static async Task WriteAsync(string message, CancellationToken cancellationToken)
        {
            while (true)
            {
                await _semaphore.WaitAsync(cancellationToken);
                var writer = _channels[_freechannel].Writer;
                if (writer.TryWrite(message)) break;
                _freechannel = (_freechannel + 1) % _channels.Length;
                _semaphore.Release();
            }
        }
 
        public async ValueTask ReadAsync(Channel<string> _channel,int channelNum)
        {
            var reader = _channel.Reader;
            while (await reader.WaitToReadAsync())
            {
                while (reader.TryPeek(out var item))
                {
                    await LongTimeTask(item);
                    _ = await reader.ReadAsync();
                    _freechannel = channelNum;
                    _semaphore.Release();
                }
            }
        }
    }
 
}
 

关键点是Channels的read只能从头到尾一条一条读,这样容量只是单条流水线的实现,并行化必须引入多条Channels。待完善:超时、失败处理、执行与结束流程分离等。