diff --git a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs index dfe1f45f28..06ef7433f1 100644 --- a/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs +++ b/projects/RabbitMQ.Client/client/impl/AsyncConsumerWorkService.cs @@ -137,7 +137,7 @@ static async Task HandleConcurrent(Work work, ModelBase model, SemaphoreSlim lim } catch (Exception) { - + // ignored } finally { diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs b/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs index d8240a51d1..a219a8081d 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerWorkService.cs @@ -1,6 +1,7 @@ using System; using System.Collections.Concurrent; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; namespace RabbitMQ.Client.Impl @@ -61,20 +62,16 @@ internal Task StopWorkAsync(IModel model) class WorkPool { - readonly ConcurrentQueue _actions; - readonly CancellationTokenSource _tokenSource; - readonly CancellationTokenRegistration _tokenRegistration; - volatile TaskCompletionSource _syncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); + private readonly Channel _channel; private readonly int _concurrency; private Task _worker; + CancellationTokenSource _tokenSource; private SemaphoreSlim _limiter; public WorkPool(int concurrency) { _concurrency = concurrency; - _actions = new ConcurrentQueue(); - _tokenSource = new CancellationTokenSource(); - _tokenRegistration = _tokenSource.Token.Register(() => _syncSource.TrySetCanceled()); + _channel = Channel.CreateUnbounded(new UnboundedChannelOptions { SingleReader = true, SingleWriter = false, AllowSynchronousContinuations = false }); } public void Start() @@ -86,37 +83,27 @@ public void Start() else { _limiter = new SemaphoreSlim(_concurrency); + _tokenSource = new CancellationTokenSource(); _worker = Task.Run(() => LoopWithConcurrency(_tokenSource.Token), CancellationToken.None); } } public void Enqueue(Action action) { - _actions.Enqueue(action); - _syncSource.TrySetResult(true); + _channel.Writer.TryWrite(action); } async Task Loop() { - while (_tokenSource.IsCancellationRequested == false) + while (await _channel.Reader.WaitToReadAsync().ConfigureAwait(false)) { - try - { - await _syncSource.Task.ConfigureAwait(false); - _syncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - catch (TaskCanceledException) - { - // Swallowing the task cancellation exception for the semaphore in case we are stopping. - } - - while (_actions.TryDequeue(out Action action)) + while (_channel.Reader.TryRead(out Action work)) { try { - action(); + work(); } - catch (Exception) + catch(Exception) { // ignored } @@ -126,36 +113,37 @@ async Task Loop() async Task LoopWithConcurrency(CancellationToken cancellationToken) { - while (_tokenSource.IsCancellationRequested == false) + try { - try - { - await _syncSource.Task.ConfigureAwait(false); - _syncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - } - catch (TaskCanceledException) + while (await _channel.Reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false)) { - // Swallowing the task cancellation exception for the semaphore in case we are stopping. - } - - while (_actions.TryDequeue(out Action action)) - { - // Do a quick synchronous check before we resort to async/await with the state-machine overhead. - if(!_limiter.Wait(0)) + while (_channel.Reader.TryRead(out Action action)) { - await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false); - } + // Do a quick synchronous check before we resort to async/await with the state-machine overhead. + if(!_limiter.Wait(0)) + { + await _limiter.WaitAsync(cancellationToken).ConfigureAwait(false); + } - _ = OffloadToWorkerThreadPool(action, _limiter); + _ = OffloadToWorkerThreadPool(action, _limiter); + } } } + catch (OperationCanceledException) + { + // ignored + } } static async Task OffloadToWorkerThreadPool(Action action, SemaphoreSlim limiter) { try { - await Task.Run(() => action()); + // like Task.Run but doesn't closure allocate + await Task.Factory.StartNew(state => + { + ((Action)state)(); + }, action, CancellationToken.None, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default); } catch (Exception) { @@ -169,8 +157,8 @@ static async Task OffloadToWorkerThreadPool(Action action, SemaphoreSlim limiter public Task Stop() { - _tokenSource.Cancel(); - _tokenRegistration.Dispose(); + _channel.Writer.Complete(); + _tokenSource?.Cancel(); _limiter?.Dispose(); return _worker; }