diff --git a/.github/workflows/build-test.yaml b/.github/workflows/build-test.yaml index 9713c86e75..1994517455 100644 --- a/.github/workflows/build-test.yaml +++ b/.github/workflows/build-test.yaml @@ -73,8 +73,8 @@ jobs: Receive-Job -Job $tx; ` & "${{ github.workspace }}\.ci\windows\toxiproxy\toxiproxy-cli.exe" list; ` dotnet test ` - --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` + --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` --environment 'RABBITMQ_TOXIPROXY_TESTS=true' ` --environment 'PASSWORD=grapefruit' ` --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` @@ -114,7 +114,12 @@ jobs: id: install-start-rabbitmq run: ${{ github.workspace }}\.ci\windows\gha-setup.ps1 - name: Sequential Integration Tests - run: dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' + run: dotnet test ` + --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' ` + --environment "RABBITMQ_RABBITMQCTL_PATH=${{ steps.install-start-rabbitmq.outputs.path }}" ` + --environment 'PASSWORD=grapefruit' ` + --environment SSL_CERTS_DIR="${{ github.workspace }}\.ci\certs" ` + "${{ github.workspace }}\projects\Test\SequentialIntegration\SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() uses: actions/upload-artifact@v4 @@ -182,8 +187,8 @@ jobs: - name: Integration Tests run: | dotnet test \ - --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ @@ -222,7 +227,10 @@ jobs: - name: Sequential Integration Tests run: | dotnet test \ + --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:${{ steps.start-rabbitmq.outputs.id }}" \ + --environment 'PASSWORD=grapefruit' \ + --environment SSL_CERTS_DIR="${{ github.workspace }}/.ci/certs" \ "${{ github.workspace }}/projects/Test/SequentialIntegration/SequentialIntegration.csproj" --no-restore --no-build --logger 'console;verbosity=detailed' - name: Maybe upload RabbitMQ logs if: failure() diff --git a/Makefile b/Makefile index e8f3089543..42de4527d8 100644 --- a/Makefile +++ b/Makefile @@ -21,13 +21,17 @@ build: test: dotnet test $(CURDIR)/projects/Test/Unit/Unit.csproj --logger 'console;verbosity=detailed' dotnet test --environment 'GITHUB_ACTIONS=true' \ - --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ --environment 'RABBITMQ_TOXIPROXY_TESTS=true' \ --environment 'PASSWORD=grapefruit' \ --environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \ "$(CURDIR)/projects/Test/Integration/Integration.csproj" --logger 'console;verbosity=detailed' - dotnet test --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' + dotnet test --environment 'RABBITMQ_LONG_RUNNING_TESTS=true' \ + --environment "RABBITMQ_RABBITMQCTL_PATH=DOCKER:$$(docker inspect --format='{{.Id}}' $(RABBITMQ_DOCKER_NAME))" \ + --environment 'PASSWORD=grapefruit' \ + --environment SSL_CERTS_DIR="$(CURDIR)/.ci/certs" \ + $(CURDIR)/projects/Test/SequentialIntegration/SequentialIntegration.csproj --logger 'console;verbosity=detailed' # Note: # You must have the expected OAuth2 environment set up for this target diff --git a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt index 779fcbf2d5..bb5daaca76 100644 --- a/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt +++ b/projects/RabbitMQ.Client/PublicAPI.Unshipped.txt @@ -505,7 +505,7 @@ RabbitMQ.Client.IConnection.ClientProvidedName.get -> string RabbitMQ.Client.IConnection.CloseReason.get -> RabbitMQ.Client.ShutdownEventArgs RabbitMQ.Client.IConnection.ConnectionBlocked -> System.EventHandler RabbitMQ.Client.IConnection.ConnectionRecoveryError -> System.EventHandler -RabbitMQ.Client.IConnection.ConnectionShutdown -> System.EventHandler +RabbitMQ.Client.IConnection.ConnectionShutdownAsync -> RabbitMQ.Client.Events.AsyncEventHandler RabbitMQ.Client.IConnection.ConnectionUnblocked -> System.EventHandler RabbitMQ.Client.IConnection.ConsumerTagChangeAfterRecovery -> System.EventHandler RabbitMQ.Client.IConnection.Endpoint.get -> RabbitMQ.Client.AmqpTcpEndpoint diff --git a/projects/RabbitMQ.Client/client/api/IConnection.cs b/projects/RabbitMQ.Client/client/api/IConnection.cs index e47cc57f17..4b9457c151 100644 --- a/projects/RabbitMQ.Client/client/api/IConnection.cs +++ b/projects/RabbitMQ.Client/client/api/IConnection.cs @@ -155,7 +155,7 @@ public interface IConnection : INetworkConnection, IDisposable /// event handler is added to this event, the event handler /// will be fired immediately. /// - event EventHandler ConnectionShutdown; + event AsyncEventHandler ConnectionShutdownAsync; /// /// Raised when the connection completes recovery. diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs index 67310eb71d..025f2a2b61 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.Recovery.cs @@ -46,7 +46,7 @@ internal sealed partial class AutorecoveringConnection private Task? _recoveryTask; private readonly CancellationTokenSource _recoveryCancellationTokenSource = new CancellationTokenSource(); - private void HandleConnectionShutdown(object _, ShutdownEventArgs args) + private Task HandleConnectionShutdownAsync(object _, ShutdownEventArgs args) { if (ShouldTriggerConnectionRecovery(args)) { @@ -80,6 +80,8 @@ static bool ShouldTriggerConnectionRecovery(ShutdownEventArgs args) return false; } + + return Task.CompletedTask; } private async Task RecoverConnectionAsync() diff --git a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs index 40bc9a1960..6c79b8ce0a 100644 --- a/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs +++ b/projects/RabbitMQ.Client/client/impl/AutorecoveringConnection.cs @@ -88,7 +88,7 @@ void onException(Exception exception, string context) => _consumerTagChangeAfterRecoveryWrapper = new EventingWrapper("OnConsumerRecovery", onException); _queueNameChangedAfterRecoveryWrapper = new EventingWrapper("OnQueueRecovery", onException); - ConnectionShutdown += HandleConnectionShutdown; + ConnectionShutdownAsync += HandleConnectionShutdownAsync; } public event EventHandler RecoverySucceeded @@ -117,10 +117,10 @@ public event EventHandler ConnectionBlocked remove => InnerConnection.ConnectionBlocked -= value; } - public event EventHandler ConnectionShutdown + public event AsyncEventHandler ConnectionShutdownAsync { - add => InnerConnection.ConnectionShutdown += value; - remove => InnerConnection.ConnectionShutdown -= value; + add => InnerConnection.ConnectionShutdownAsync += value; + remove => InnerConnection.ConnectionShutdownAsync -= value; } public event EventHandler ConnectionUnblocked diff --git a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs index 9ab700bd68..082a3465ea 100644 --- a/projects/RabbitMQ.Client/client/impl/ChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ChannelBase.cs @@ -84,6 +84,7 @@ protected ChannelBase(ConnectionConfig config, ISession session) Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); + _basicAcksWrapper = new EventingWrapper("OnBasicAck", onException); _basicNacksWrapper = new EventingWrapper("OnBasicNack", onException); _basicReturnWrapper = new EventingWrapper("OnBasicReturn", onException); @@ -93,7 +94,7 @@ protected ChannelBase(ConnectionConfig config, ISession session) _channelShutdownWrapper = new EventingWrapper("OnChannelShutdown", onException); _recoveryWrapper = new EventingWrapper("OnChannelRecovery", onException); session.CommandReceived = HandleCommandAsync; - session.SessionShutdown += OnSessionShutdown; + session.SessionShutdownAsync += OnSessionShutdownAsync; Session = session; } @@ -407,12 +408,13 @@ await ModelSendAsync(method, k.CancellationToken) } } - internal void FinishClose() + internal async Task FinishCloseAsync() { ShutdownEventArgs reason = CloseReason; if (reason != null) { - Session.Close(reason); + await Session.CloseAsync(reason) + .ConfigureAwait(false); } m_connectionStartCell?.TrySetResult(null); @@ -488,7 +490,7 @@ private void OnChannelShutdown(ShutdownEventArgs reason) if (_confirmsTaskCompletionSources?.Count > 0) { var exception = new AlreadyClosedException(reason); - foreach (var confirmsTaskCompletionSource in _confirmsTaskCompletionSources) + foreach (TaskCompletionSource confirmsTaskCompletionSource in _confirmsTaskCompletionSources) { confirmsTaskCompletionSource.TrySetException(exception); } @@ -505,14 +507,12 @@ private void OnChannelShutdown(ShutdownEventArgs reason) _flowControlBlock.Set(); } - // TODO async - private void OnSessionShutdown(object sender, ShutdownEventArgs reason) + private Task OnSessionShutdownAsync(object sender, ShutdownEventArgs reason) { ConsumerDispatcher.Quiesce(); SetCloseReason(reason); OnChannelShutdown(reason); - // TODO async - ConsumerDispatcher.Shutdown(reason); + return ConsumerDispatcher.ShutdownAsync(reason); } internal bool SetCloseReason(ShutdownEventArgs reason) @@ -723,7 +723,8 @@ protected async Task HandleChannelCloseAsync(IncomingCommand cmd, Cancella channelClose._classId, channelClose._methodId)); - Session.Close(CloseReason, false); + await Session.CloseAsync(CloseReason, false) + .ConfigureAwait(false); var method = new ChannelCloseOk(); await ModelSendAsync(method, cancellationToken) @@ -734,7 +735,8 @@ await ModelSendAsync(method, cancellationToken) finally { cmd.ReturnBuffers(); - Session.Notify(); + await Session.NotifyAsync() + .ConfigureAwait(false); } } @@ -746,7 +748,8 @@ protected async Task HandleChannelCloseOkAsync(IncomingCommand cmd, Cancel * Note: * This call _must_ come before completing the async continuation */ - FinishClose(); + await FinishCloseAsync() + .ConfigureAwait(false); if (_continuationQueue.TryPeek(out var k)) { @@ -815,7 +818,8 @@ protected async Task HandleConnectionCloseAsync(IncomingCommand cmd, Cance var reason = new ShutdownEventArgs(ShutdownInitiator.Peer, method._replyCode, method._replyText, method._classId, method._methodId); try { - Session.Connection.ClosedViaPeer(reason); + await Session.Connection.ClosedViaPeerAsync(reason) + .ConfigureAwait(false); var replyMethod = new ConnectionCloseOk(); await ModelSendAsync(replyMethod, cancellationToken) diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs index 97b3b900d8..882abd31a4 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Heartbeat.cs @@ -109,7 +109,8 @@ private async void HeartbeatReadTimerCallback(object? state) { var eose = new EndOfStreamException($"Heartbeat missing with heartbeat == {_heartbeat} seconds"); LogCloseError(eose.Message, eose); - HandleMainLoopException(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose)); + await HandleMainLoopExceptionAsync(new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", eose)) + .ConfigureAwait(false); shouldTerminate = true; } } @@ -158,6 +159,13 @@ await WriteAsync(Client.Impl.Framing.Heartbeat.GetHeartbeatFrame(), _mainLoopCts _heartbeatWriteTimer?.Change((int)_heartbeatWriteTimeSpan.TotalMilliseconds, Timeout.Infinite); } } + catch (OperationCanceledException ocex) + { + if (ocex.CancellationToken != _mainLoopCts.Token) + { + throw; + } + } catch (ObjectDisposedException) { // timer is already disposed, diff --git a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs index 4929b5f25b..af1da5275e 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.Receive.cs @@ -59,7 +59,8 @@ await ReceiveLoopAsync(mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, 0, "End of stream", exception: eose); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); } catch (HardProtocolException hpe) { @@ -75,7 +76,20 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) var ea = new ShutdownEventArgs(ShutdownInitiator.Library, Constants.InternalError, fileLoadException.Message, exception: fileLoadException); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); + } + catch (OperationCanceledException ocex) + { + if (ocex.CancellationToken != mainLoopToken) + { + var ea = new ShutdownEventArgs(ShutdownInitiator.Library, + Constants.InternalError, + $"Unexpected Exception: {ocex.Message}", + exception: ocex); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); + } } catch (Exception ex) { @@ -83,7 +97,8 @@ await HardProtocolExceptionHandlerAsync(hpe, mainLoopToken) Constants.InternalError, $"Unexpected Exception: {ex.Message}", exception: ex); - HandleMainLoopException(ea); + await HandleMainLoopExceptionAsync(ea) + .ConfigureAwait(false); } using var cts = new CancellationTokenSource(InternalConstants.DefaultConnectionCloseTimeout); @@ -175,26 +190,29 @@ private void MaybeTerminateMainloopAndStopHeartbeatTimers(bool cancelMainLoop = MaybeStopHeartbeatTimers(); } - private void HandleMainLoopException(ShutdownEventArgs reason) + private Task HandleMainLoopExceptionAsync(ShutdownEventArgs reason) { string message = reason.GetLogMessage(); if (false == SetCloseReason(reason)) { LogCloseError($"Unexpected Main Loop Exception while closing: {message}", reason.Exception); - return; + return Task.CompletedTask; } _channel0.MaybeSetConnectionStartException(reason.Exception); - OnShutdown(reason); LogCloseError($"Unexpected connection closure: {message}", reason.Exception); + + return OnShutdownAsync(reason); } private async Task HardProtocolExceptionHandlerAsync(HardProtocolException hpe, CancellationToken cancellationToken) { if (SetCloseReason(hpe.ShutdownReason)) { - OnShutdown(hpe.ShutdownReason); + await OnShutdownAsync(hpe.ShutdownReason) + .ConfigureAwait(false); + await _session0.SetSessionClosingAsync(false) .ConfigureAwait(false); try diff --git a/projects/RabbitMQ.Client/client/impl/Connection.cs b/projects/RabbitMQ.Client/client/impl/Connection.cs index 62a1dba4a5..0f6305cc67 100644 --- a/projects/RabbitMQ.Client/client/impl/Connection.cs +++ b/projects/RabbitMQ.Client/client/impl/Connection.cs @@ -64,11 +64,13 @@ internal Connection(ConnectionConfig config, IFrameHandler frameHandler) _config = config; _frameHandler = frameHandler; - Action onException = (exception, context) => OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); + Action onException = (exception, context) => + OnCallbackException(CallbackExceptionEventArgs.Build(exception, context)); + _callbackExceptionWrapper = new EventingWrapper(string.Empty, (exception, context) => { }); _connectionBlockedWrapper = new EventingWrapper("OnConnectionBlocked", onException); _connectionUnblockedWrapper = new EventingWrapper("OnConnectionUnblocked", onException); - _connectionShutdownWrapper = new EventingWrapper("OnShutdown", onException); + _connectionShutdownWrapperAsync = new AsyncEventingWrapper("OnConnectionShutdown", onException); _sessionManager = new SessionManager(this, 0); _session0 = new MainSession(this); @@ -146,7 +148,7 @@ public event EventHandler RecoveringConsumer } private EventingWrapper _consumerAboutToBeRecovered; - public event EventHandler ConnectionShutdown + public event AsyncEventHandler ConnectionShutdownAsync { add { @@ -154,7 +156,7 @@ public event EventHandler ConnectionShutdown ShutdownEventArgs? reason = CloseReason; if (reason is null) { - _connectionShutdownWrapper.AddHandler(value); + _connectionShutdownWrapperAsync.AddHandler(value); } else { @@ -164,10 +166,10 @@ public event EventHandler ConnectionShutdown remove { ThrowIfDisposed(); - _connectionShutdownWrapper.RemoveHandler(value); + _connectionShutdownWrapperAsync.RemoveHandler(value); } } - private EventingWrapper _connectionShutdownWrapper; + private AsyncEventingWrapper _connectionShutdownWrapperAsync; /// /// This event is never fired by non-recovering connections but it is a part of the interface. @@ -210,7 +212,7 @@ internal void TakeOver(Connection other) _callbackExceptionWrapper.Takeover(other._callbackExceptionWrapper); _connectionBlockedWrapper.Takeover(other._connectionBlockedWrapper); _connectionUnblockedWrapper.Takeover(other._connectionUnblockedWrapper); - _connectionShutdownWrapper.Takeover(other._connectionShutdownWrapper); + _connectionShutdownWrapperAsync.Takeover(other._connectionShutdownWrapperAsync); } internal async ValueTask OpenAsync(CancellationToken cancellationToken) @@ -321,7 +323,9 @@ internal async Task CloseAsync(ShutdownEventArgs reason, bool abort, TimeSpan ti { cancellationToken.ThrowIfCancellationRequested(); - OnShutdown(reason); + await OnShutdownAsync(reason) + .ConfigureAwait(false); + await _session0.SetSessionClosingAsync(false) .ConfigureAwait(false); @@ -351,7 +355,7 @@ await _session0.TransmitAsync(method, cancellationToken) { if (_channel0.CloseReason is null) { - if (!abort) + if (false == abort) { throw; } @@ -361,13 +365,31 @@ await _session0.TransmitAsync(method, cancellationToken) } } } + catch (Exception ex) + { + if (false == abort) + { + throw; + } + else + { + LogCloseError("Couldn't close connection cleanly.", ex); + } + } finally { /* * Note: * NotifyReceivedCloseOk will cancel the main loop */ - MaybeTerminateMainloopAndStopHeartbeatTimers(); + try + { + MaybeTerminateMainloopAndStopHeartbeatTimers(); + } + catch (Exception ex) + { + LogCloseError("Couldn't close connection cleanly (main loop termination)", ex); + } } } @@ -394,7 +416,7 @@ await _frameHandler.CloseAsync(cancellationToken) } } - internal void ClosedViaPeer(ShutdownEventArgs reason) + internal async Task ClosedViaPeerAsync(ShutdownEventArgs reason) { if (false == SetCloseReason(reason)) { @@ -405,8 +427,11 @@ internal void ClosedViaPeer(ShutdownEventArgs reason) // We are quiescing, but still allow for server-close } - OnShutdown(reason); + await OnShutdownAsync(reason) + .ConfigureAwait(false); + _session0.SetSessionClosing(true); + MaybeTerminateMainloopAndStopHeartbeatTimers(cancelMainLoop: true); } @@ -414,21 +439,27 @@ internal void ClosedViaPeer(ShutdownEventArgs reason) private async Task FinishCloseAsync(CancellationToken cancellationToken) { _mainLoopCts.Cancel(); + _closed = true; + MaybeStopHeartbeatTimers(); await _frameHandler.CloseAsync(cancellationToken) .ConfigureAwait(false); + _channel0.SetCloseReason(CloseReason); - _channel0.FinishClose(); + + await _channel0.FinishCloseAsync() + .ConfigureAwait(false); + RabbitMqClientEventSource.Log.ConnectionClosed(); } ///Broadcasts notification of the final shutdown of the connection. - private void OnShutdown(ShutdownEventArgs reason) + private Task OnShutdownAsync(ShutdownEventArgs reason) { ThrowIfDisposed(); - _connectionShutdownWrapper.Invoke(this, reason); + return _connectionShutdownWrapperAsync.InvokeAsync(this, reason); } private bool SetCloseReason(ShutdownEventArgs reason) diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs index 761e6d23ea..3b19ba5d12 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/AsyncConsumerDispatcher.cs @@ -16,39 +16,49 @@ internal AsyncConsumerDispatcher(ChannelBase channel, int concurrency) protected override async Task ProcessChannelAsync(CancellationToken token) { - while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) + try { - while (_reader.TryRead(out WorkStruct work)) + while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) { - using (work) + while (_reader.TryRead(out WorkStruct work)) { - try + using (work) { - Task task = work.WorkType switch + try { - WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver( - work.ConsumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory), + Task task = work.WorkType switch + { + WorkType.Deliver => work.AsyncConsumer.HandleBasicDeliver( + work.ConsumerTag, work.DeliveryTag, work.Redelivered, + work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory), - WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag), + WorkType.Cancel => work.AsyncConsumer.HandleBasicCancel(work.ConsumerTag), - WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag), + WorkType.CancelOk => work.AsyncConsumer.HandleBasicCancelOk(work.ConsumerTag), - WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag), + WorkType.ConsumeOk => work.AsyncConsumer.HandleBasicConsumeOk(work.ConsumerTag), - WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason), + WorkType.Shutdown => work.AsyncConsumer.HandleChannelShutdown(_channel, work.Reason), - _ => Task.CompletedTask - }; - await task.ConfigureAwait(false); - } - catch (Exception e) - { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); + _ => Task.CompletedTask + }; + await task.ConfigureAwait(false); + } + catch (Exception e) + { + _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); + } } } } } + catch (OperationCanceledException ocex) + { + if (ocex.CancellationToken != token) + { + throw; + } + } } } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs index 632b7cf72b..905acb1241 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcher.cs @@ -16,45 +16,55 @@ internal ConsumerDispatcher(ChannelBase channel, int concurrency) protected override async Task ProcessChannelAsync(CancellationToken token) { - while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) + try { - while (_reader.TryRead(out var work)) + while (await _reader.WaitToReadAsync(token).ConfigureAwait(false)) { - using (work) + while (_reader.TryRead(out var work)) { - try + using (work) { - IBasicConsumer consumer = work.Consumer; - string? consumerTag = work.ConsumerTag; - switch (work.WorkType) + try { - case WorkType.Deliver: - await consumer.HandleBasicDeliverAsync( - consumerTag, work.DeliveryTag, work.Redelivered, - work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory) - .ConfigureAwait(false); - break; - case WorkType.Cancel: - consumer.HandleBasicCancel(consumerTag); - break; - case WorkType.CancelOk: - consumer.HandleBasicCancelOk(consumerTag); - break; - case WorkType.ConsumeOk: - consumer.HandleBasicConsumeOk(consumerTag); - break; - case WorkType.Shutdown: - consumer.HandleChannelShutdown(_channel, work.Reason); - break; + IBasicConsumer consumer = work.Consumer; + string? consumerTag = work.ConsumerTag; + switch (work.WorkType) + { + case WorkType.Deliver: + await consumer.HandleBasicDeliverAsync( + consumerTag, work.DeliveryTag, work.Redelivered, + work.Exchange, work.RoutingKey, work.BasicProperties, work.Body.Memory) + .ConfigureAwait(false); + break; + case WorkType.Cancel: + consumer.HandleBasicCancel(consumerTag); + break; + case WorkType.CancelOk: + consumer.HandleBasicCancelOk(consumerTag); + break; + case WorkType.ConsumeOk: + consumer.HandleBasicConsumeOk(consumerTag); + break; + case WorkType.Shutdown: + consumer.HandleChannelShutdown(_channel, work.Reason); + break; + } + } + catch (Exception e) + { + _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); } - } - catch (Exception e) - { - _channel.OnCallbackException(CallbackExceptionEventArgs.Build(e, work.WorkType.ToString(), work.Consumer)); } } } } + catch (OperationCanceledException ocex) + { + if (ocex.CancellationToken != token) + { + throw; + } + } } } } diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs index eb85a04e08..65554168b8 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherBase.cs @@ -40,12 +40,6 @@ public IBasicConsumer GetAndRemoveConsumer(string tag) } } - public void Shutdown(ShutdownEventArgs reason) - { - DoShutdownConsumers(reason); - InternalShutdown(); - } - public Task ShutdownAsync(ShutdownEventArgs reason) { DoShutdownConsumers(reason); @@ -66,8 +60,6 @@ private void DoShutdownConsumers(ShutdownEventArgs reason) protected abstract void ShutdownConsumer(IBasicConsumer consumer, ShutdownEventArgs reason); - protected abstract void InternalShutdown(); - protected abstract Task InternalShutdownAsync(); // Do not inline as it's not the default case on a hot path diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs index 5d3b3cc610..6661a3470c 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/ConsumerDispatcherChannelBase.cs @@ -131,56 +131,6 @@ private bool IsCancellationRequested } } - public void WaitForShutdown() - { - if (_disposed) - { - return; - } - - if (_quiesce) - { - if (IsCancellationRequested) - { - try - { - if (false == _reader.Completion.Wait(TimeSpan.FromSeconds(2))) - { - ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)"); - } - if (false == _worker.Wait(TimeSpan.FromSeconds(2))) - { - ESLog.Warn("consumer dispatcher did not shut down in a timely fashion (sync)"); - } - } - catch (AggregateException aex) - { - AggregateException aexf = aex.Flatten(); - bool foundUnexpectedException = false; - foreach (Exception innerAexf in aexf.InnerExceptions) - { - if (false == (innerAexf is OperationCanceledException)) - { - foundUnexpectedException = true; - break; - } - } - if (foundUnexpectedException) - { - ESLog.Warn("consumer dispatcher task had unexpected exceptions"); - } - } - catch (OperationCanceledException) - { - } - } - } - else - { - throw new InvalidOperationException("WaitForShutdown called but _quiesce is false"); - } - } - public async Task WaitForShutdownAsync() { if (_disposed) @@ -229,12 +179,6 @@ protected sealed override void ShutdownConsumer(IBasicConsumer consumer, Shutdow _writer.TryWrite(new WorkStruct(consumer, reason)); } - protected override void InternalShutdown() - { - _writer.Complete(); - CancelConsumerDispatcherCts(); - } - protected override Task InternalShutdownAsync() { _writer.Complete(); diff --git a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs index 3c1646af46..e7cb6c960c 100644 --- a/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs +++ b/projects/RabbitMQ.Client/client/impl/ConsumerDispatching/IConsumerDispatcher.cs @@ -61,9 +61,6 @@ ValueTask HandleBasicDeliverAsync(string consumerTag, void Quiesce(); - void Shutdown(ShutdownEventArgs reason); - void WaitForShutdown(); - Task ShutdownAsync(ShutdownEventArgs reason); Task WaitForShutdownAsync(); } diff --git a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs index b716af5ff2..7317272519 100644 --- a/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs +++ b/projects/RabbitMQ.Client/client/impl/EventingWrapper.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading.Tasks; using RabbitMQ.Client.Events; @@ -42,7 +43,7 @@ public void ClearHandlers() public void Invoke(object sender, T parameter) { - var handlers = _handlers; + Delegate[]? handlers = _handlers; if (handlers is null) { handlers = _event?.GetInvocationList(); @@ -53,7 +54,8 @@ public void Invoke(object sender, T parameter) _handlers = handlers; } - foreach (EventHandler action in handlers) + + foreach (EventHandler action in handlers.Cast>()) { try { @@ -61,7 +63,7 @@ public void Invoke(object sender, T parameter) } catch (Exception exception) { - var onException = _onExceptionAction; + Action? onException = _onExceptionAction; if (onException != null) { onException(exception, _context!); @@ -87,8 +89,18 @@ internal struct AsyncEventingWrapper { private event AsyncEventHandler? _event; private Delegate[]? _handlers; + private string? _context; + private Action? _onExceptionAction; - public bool IsEmpty => _event is null; + public readonly bool IsEmpty => _event is null; + + public AsyncEventingWrapper(string context, Action onExceptionAction) + { + _event = null; + _handlers = null; + _context = context; + _onExceptionAction = onExceptionAction; + } public void AddHandler(AsyncEventHandler? handler) { @@ -105,7 +117,7 @@ public void RemoveHandler(AsyncEventHandler? handler) // Do not make this function async! (This type is a struct that gets copied at the start of an async method => empty _handlers is copied) public Task InvokeAsync(object sender, T parameter) { - var handlers = _handlers; + Delegate[]? handlers = _handlers; if (handlers is null) { handlers = _event?.GetInvocationList(); @@ -117,19 +129,30 @@ public Task InvokeAsync(object sender, T parameter) _handlers = handlers; } - if (handlers.Length == 1) - { - return ((AsyncEventHandler)handlers[0])(sender, parameter); - } return InternalInvoke(handlers, sender, parameter); } - private static async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) + private async Task InternalInvoke(Delegate[] handlers, object sender, T parameter) { - foreach (AsyncEventHandler action in handlers) + foreach (AsyncEventHandler action in handlers.Cast>()) { - await action(sender, parameter) - .ConfigureAwait(false); + try + { + await action(sender, parameter) + .ConfigureAwait(false); + } + catch (Exception exception) + { + Action? onException = _onExceptionAction; + if (onException != null) + { + onException(exception, _context!); + } + else + { + throw; + } + } } } @@ -137,6 +160,8 @@ public void Takeover(in AsyncEventingWrapper other) { _event = other._event; _handlers = other._handlers; + _context = other._context; + _onExceptionAction = other._onExceptionAction; } } } diff --git a/projects/RabbitMQ.Client/client/impl/ISession.cs b/projects/RabbitMQ.Client/client/impl/ISession.cs index 500dfbcfa0..1883c4e202 100644 --- a/projects/RabbitMQ.Client/client/impl/ISession.cs +++ b/projects/RabbitMQ.Client/client/impl/ISession.cs @@ -32,6 +32,7 @@ using System; using System.Threading; using System.Threading.Tasks; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Framing.Impl; namespace RabbitMQ.Client.Impl @@ -68,15 +69,15 @@ internal interface ISession /// /// Multicast session shutdown event. /// - event EventHandler SessionShutdown; + event AsyncEventHandler SessionShutdownAsync; - void Close(ShutdownEventArgs reason); + Task CloseAsync(ShutdownEventArgs reason); - void Close(ShutdownEventArgs reason, bool notify); + Task CloseAsync(ShutdownEventArgs reason, bool notify); Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); - void Notify(); + Task NotifyAsync(); ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod; diff --git a/projects/RabbitMQ.Client/client/impl/SessionBase.cs b/projects/RabbitMQ.Client/client/impl/SessionBase.cs index 962dc81e34..f41b279130 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionBase.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionBase.cs @@ -35,6 +35,7 @@ using System.Threading; using System.Threading.Tasks; using RabbitMQ.Client.client.framing; +using RabbitMQ.Client.Events; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Client.Logging; @@ -52,18 +53,18 @@ protected SessionBase(Connection connection, ushort channelNumber) ChannelNumber = channelNumber; if (channelNumber != 0) { - connection.ConnectionShutdown += OnConnectionShutdown; + connection.ConnectionShutdownAsync += OnConnectionShutdownAsync; } RabbitMqClientEventSource.Log.ChannelOpened(); } - public event EventHandler SessionShutdown + public event AsyncEventHandler SessionShutdownAsync { add { if (CloseReason is null) { - _sessionShutdownWrapper.AddHandler(value); + _sessionShutdownWrapperAsync.AddHandler(value); } else { @@ -72,10 +73,10 @@ public event EventHandler SessionShutdown } remove { - _sessionShutdownWrapper.RemoveHandler(value); + _sessionShutdownWrapperAsync.RemoveHandler(value); } } - private EventingWrapper _sessionShutdownWrapper; + private AsyncEventingWrapper _sessionShutdownWrapperAsync; public ushort ChannelNumber { get; } @@ -84,15 +85,15 @@ public event EventHandler SessionShutdown public bool IsOpen => CloseReason is null; - public virtual void OnConnectionShutdown(object conn, ShutdownEventArgs reason) + public virtual Task OnConnectionShutdownAsync(object conn, ShutdownEventArgs reason) { - Close(reason); + return CloseAsync(reason); } - public virtual void OnSessionShutdown(ShutdownEventArgs reason) + public virtual Task OnSessionShutdownAsync(ShutdownEventArgs reason) { - Connection.ConnectionShutdown -= OnConnectionShutdown; - _sessionShutdownWrapper.Invoke(this, reason); + Connection.ConnectionShutdownAsync -= OnConnectionShutdownAsync; + return _sessionShutdownWrapperAsync.InvokeAsync(this, reason); } public override string ToString() @@ -100,12 +101,12 @@ public override string ToString() return $"{GetType().Name}#{ChannelNumber}:{Connection}"; } - public void Close(ShutdownEventArgs reason) + public Task CloseAsync(ShutdownEventArgs reason) { - Close(reason, true); + return CloseAsync(reason, true); } - public void Close(ShutdownEventArgs reason, bool notify) + public Task CloseAsync(ShutdownEventArgs reason, bool notify) { if (Interlocked.CompareExchange(ref _closeReason, reason, null) is null) { @@ -114,13 +115,17 @@ public void Close(ShutdownEventArgs reason, bool notify) if (notify) { - OnSessionShutdown(CloseReason); + return OnSessionShutdownAsync(CloseReason); + } + else + { + return Task.CompletedTask; } } public abstract Task HandleFrameAsync(InboundFrame frame, CancellationToken cancellationToken); - public void Notify() + public Task NotifyAsync() { // Ensure that we notify only when session is already closed // If not, throw exception, since this is a serious bug in the library @@ -130,7 +135,7 @@ public void Notify() throw new InvalidOperationException("Internal Error in SessionBase.Notify"); } - OnSessionShutdown(reason); + return OnSessionShutdownAsync(reason); } public virtual ValueTask TransmitAsync(in T cmd, CancellationToken cancellationToken) where T : struct, IOutgoingAmqpMethod diff --git a/projects/RabbitMQ.Client/client/impl/SessionManager.cs b/projects/RabbitMQ.Client/client/impl/SessionManager.cs index a3071e472a..7d4bdb79f5 100644 --- a/projects/RabbitMQ.Client/client/impl/SessionManager.cs +++ b/projects/RabbitMQ.Client/client/impl/SessionManager.cs @@ -30,6 +30,7 @@ //--------------------------------------------------------------------------- using System.Collections.Generic; +using System.Threading.Tasks; using RabbitMQ.Client.Exceptions; using RabbitMQ.Client.Framing.Impl; using RabbitMQ.Util; @@ -72,13 +73,13 @@ public ISession Create() } ISession session = new Session(_connection, (ushort)channelNumber); - session.SessionShutdown += HandleSessionShutdown; + session.SessionShutdownAsync += HandleSessionShutdownAsync; _sessionMap[channelNumber] = session; return session; } } - private void HandleSessionShutdown(object sender, ShutdownEventArgs reason) + private Task HandleSessionShutdownAsync(object sender, ShutdownEventArgs reason) { lock (_sessionMap) { @@ -86,6 +87,8 @@ private void HandleSessionShutdown(object sender, ShutdownEventArgs reason) _sessionMap.Remove(session.ChannelNumber); _ints.Free(session.ChannelNumber); } + + return Task.CompletedTask; } public ISession Lookup(int number) diff --git a/projects/RabbitMQ.Client/util/RmqDebug.cs b/projects/RabbitMQ.Client/util/RmqDebug.cs new file mode 100644 index 0000000000..70f826a582 --- /dev/null +++ b/projects/RabbitMQ.Client/util/RmqDebug.cs @@ -0,0 +1,30 @@ +using System; + +namespace RabbitMQ.Client +{ + internal class RmqDebug + { + internal static bool s_isVerbose = false; + + internal static void Verbose(bool isVerbose) + { + s_isVerbose = isVerbose; + } + + internal static void WriteLine(string value) + { + if (s_isVerbose) + { + Console.WriteLine(value); + } + } + + internal static void WriteLine(string format, object arg0) + { + if (s_isVerbose) + { + Console.WriteLine(format, arg0); + } + } + } +} diff --git a/projects/Test/Applications/MassPublish/Program.cs b/projects/Test/Applications/MassPublish/Program.cs index a3938b69a8..75878a2cff 100644 --- a/projects/Test/Applications/MassPublish/Program.cs +++ b/projects/Test/Applications/MassPublish/Program.cs @@ -69,7 +69,7 @@ static Program() static async Task Main() { using IConnection consumeConnection = await s_consumeConnectionFactory.CreateConnectionAsync(); - consumeConnection.ConnectionShutdown += Connection_ConnectionShutdown; + consumeConnection.ConnectionShutdownAsync += Connection_ConnectionShutdownAsync; using IChannel consumeChannel = await consumeConnection.CreateChannelAsync(); consumeChannel.ChannelShutdown += Channel_ChannelShutdown; @@ -93,7 +93,7 @@ await consumeChannel.BasicConsumeAsync(queue: QueueName, autoAck: true, consumer for (int i = 0; i < ConnectionCount; i++) { IConnection publishConnection = await s_publishConnectionFactory.CreateConnectionAsync($"{AppId}-PUBLISH-{i}"); - publishConnection.ConnectionShutdown += Connection_ConnectionShutdown; + publishConnection.ConnectionShutdownAsync += Connection_ConnectionShutdownAsync; publishConnections.Add(publishConnection); } @@ -163,13 +163,15 @@ private static void PublishChannel_BasicNacks(object sender, BasicNackEventArgs Console.Error.WriteLine("[ERROR] unexpected nack on publish: {0}", e); } - private static void Connection_ConnectionShutdown(object sender, ShutdownEventArgs e) + private static Task Connection_ConnectionShutdownAsync(object sender, ShutdownEventArgs e) { if (e.Initiator != ShutdownInitiator.Application) { Console.Error.WriteLine("[ERROR] unexpected connection shutdown: {0}", e); s_consumeDoneEvent.TrySetResult(false); } + + return Task.CompletedTask; } private static void Channel_ChannelShutdown(object sender, ShutdownEventArgs e) diff --git a/projects/Test/Common/IntegrationFixture.cs b/projects/Test/Common/IntegrationFixture.cs index e1a0e5db6e..28a1c2e635 100644 --- a/projects/Test/Common/IntegrationFixture.cs +++ b/projects/Test/Common/IntegrationFixture.cs @@ -176,12 +176,13 @@ public virtual async Task DisposeAsync() { try { - if (_channel != null) + if (_conn != null && _conn.IsOpen && + _channel != null && _channel.IsOpen) { await _channel.CloseAsync(); } - if (_conn != null) + if (_conn != null && _conn.IsOpen) { await _conn.CloseAsync(); } @@ -279,9 +280,9 @@ protected void AddCallbackShutdownHandlers() { if (_conn != null) { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { try { @@ -532,22 +533,27 @@ protected ConnectionFactory CreateConnectionFactory() }; } - protected void HandleConnectionShutdown(object sender, ShutdownEventArgs args) + protected Task HandleConnectionShutdownAsync(object sender, ShutdownEventArgs args) { if (args.Initiator == ShutdownInitiator.Peer) { IConnection conn = (IConnection)sender; _output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}"); } + + return Task.CompletedTask; } - protected void HandleConnectionShutdown(IConnection conn, ShutdownEventArgs args, Action a) + protected Task HandleConnectionShutdownAsync(IConnection conn, ShutdownEventArgs args, Action a) { if (args.Initiator == ShutdownInitiator.Peer) { _output.WriteLine($"{_testDisplayName} connection {conn.ClientProvidedName} shut down: {args}"); } + a(args); + + return Task.CompletedTask; } protected void HandleChannelShutdown(object sender, ShutdownEventArgs args) diff --git a/projects/Test/Common/TestConnectionRecoveryBase.cs b/projects/Test/Common/TestConnectionRecoveryBase.cs index 0718d8c285..c368b65457 100644 --- a/projects/Test/Common/TestConnectionRecoveryBase.cs +++ b/projects/Test/Common/TestConnectionRecoveryBase.cs @@ -202,6 +202,8 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) { using (IChannel publishingChannel = await publishingConn.CreateChannelAsync()) { + await publishingChannel.ConfirmSelectAsync(); + for (ushort i = 0; i < TotalMessageCount; i++) { if (i == CloseAtCount) @@ -210,6 +212,7 @@ protected async Task PublishMessagesWhileClosingConnAsync(string queueName) } await publishingChannel.BasicPublishAsync(string.Empty, queueName, _messageBody); + await publishingChannel.WaitForConfirmsOrDieAsync(); } await publishingChannel.CloseAsync(); @@ -222,7 +225,11 @@ protected static TaskCompletionSource PrepareForShutdown(IConnection conn) var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); AutorecoveringConnection aconn = conn as AutorecoveringConnection; - aconn.ConnectionShutdown += (c, args) => tcs.TrySetResult(true); + aconn.ConnectionShutdownAsync += (c, args) => + { + tcs.TrySetResult(true); + return Task.CompletedTask; + }; return tcs; } diff --git a/projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs b/projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs index 0055750b9d..c66a7691ad 100644 --- a/projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs +++ b/projects/Test/Integration/ConnectionRecovery/TestEventHandlerRecovery.cs @@ -126,7 +126,11 @@ public async Task TestRecoveringConsumerHandlerOnConnection_EventArgumentsArePas public async Task TestShutdownEventHandlersRecoveryOnConnection() { int counter = 0; - _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); + _conn.ConnectionShutdownAsync += (c, args) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; Assert.True(_conn.IsOpen); await CloseAndWaitForRecoveryAsync(); diff --git a/projects/Test/Integration/TestAsyncConsumer.cs b/projects/Test/Integration/TestAsyncConsumer.cs index 990f29f10d..9478c9bb87 100644 --- a/projects/Test/Integration/TestAsyncConsumer.cs +++ b/projects/Test/Integration/TestAsyncConsumer.cs @@ -64,9 +64,8 @@ public async Task TestBasicRoundtripConcurrent() var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var maximumWaitTime = TimeSpan.FromSeconds(10); - var tokenSource = new CancellationTokenSource(maximumWaitTime); + var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { publish1SyncSource.TrySetResult(false); @@ -75,9 +74,9 @@ public async Task TestBasicRoundtripConcurrent() try { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (args.Initiator == ShutdownInitiator.Peer) { @@ -99,19 +98,24 @@ public async Task TestBasicRoundtripConcurrent() }); }; - consumer.Received += async (o, a) => + consumer.Received += (o, a) => { string decoded = _encoding.GetString(a.Body.ToArray()); if (decoded == publish1) { publish1SyncSource.TrySetResult(true); - await publish2SyncSource.Task; } else if (decoded == publish2) { publish2SyncSource.TrySetResult(true); - await publish1SyncSource.Task; } + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + publish1SyncSource.TrySetException(ex); + publish2SyncSource.TrySetException(ex); + } + return Task.CompletedTask; }; await _channel.BasicConsumeAsync(q.QueueName, true, string.Empty, false, false, null, consumer); @@ -120,10 +124,10 @@ public async Task TestBasicRoundtripConcurrent() await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result1, $"1 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result2, $"2 - Non concurrent dispatch lead to deadlock after {WaitSpan}"); } finally { @@ -136,7 +140,7 @@ public async Task TestBasicRoundtripConcurrent() public async Task TestBasicRoundtripConcurrentManyMessages() { const int publish_total = 4096; - string queueName = $"{nameof(TestBasicRoundtripConcurrentManyMessages)}-{Guid.NewGuid()}"; + string queueName = GenerateQueueName(); string publish1 = GetUniqueString(32768); byte[] body1 = _encoding.GetBytes(publish1); @@ -145,8 +149,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() var publish1SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); var publish2SyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - var maximumWaitTime = TimeSpan.FromSeconds(30); - var tokenSource = new CancellationTokenSource(maximumWaitTime); + var tokenSource = new CancellationTokenSource(WaitSpan); CancellationTokenRegistration ctsr = tokenSource.Token.Register(() => { publish1SyncSource.TrySetResult(false); @@ -155,14 +158,14 @@ public async Task TestBasicRoundtripConcurrentManyMessages() try { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetException(ea.Exception); + publish2SyncSource.TrySetException(ea.Exception); } }); }; @@ -173,21 +176,20 @@ public async Task TestBasicRoundtripConcurrentManyMessages() { if (args.Initiator == ShutdownInitiator.Peer) { - publish1SyncSource.TrySetResult(false); - publish2SyncSource.TrySetResult(false); + publish1SyncSource.TrySetException(ea.Exception); + publish2SyncSource.TrySetException(ea.Exception); } }); }; - QueueDeclareOk q = await _channel.QueueDeclareAsync(queue: queueName, exclusive: false, durable: true); + QueueDeclareOk q = await _channel.QueueDeclareAsync( + queue: queueName, exclusive: true, durable: false); Assert.Equal(q, queueName); Task publishTask = Task.Run(async () => { using (IChannel publishChannel = await _conn.CreateChannelAsync()) { - QueueDeclareOk pubQ = await publishChannel.QueueDeclareAsync(queue: queueName, exclusive: false, durable: true); - Assert.Equal(queueName, pubQ.QueueName); for (int i = 0; i < publish_total; i++) { await publishChannel.BasicPublishAsync(string.Empty, queueName, body1); @@ -207,7 +209,7 @@ public async Task TestBasicRoundtripConcurrentManyMessages() int publish1_count = 0; int publish2_count = 0; - consumer.Received += async (o, a) => + consumer.Received += (o, a) => { string decoded = _encoding.GetString(a.Body.ToArray()); if (decoded == publish1) @@ -215,7 +217,6 @@ public async Task TestBasicRoundtripConcurrentManyMessages() if (Interlocked.Increment(ref publish1_count) >= publish_total) { publish1SyncSource.TrySetResult(true); - await publish2SyncSource.Task; } } else if (decoded == publish2) @@ -223,9 +224,15 @@ public async Task TestBasicRoundtripConcurrentManyMessages() if (Interlocked.Increment(ref publish2_count) >= publish_total) { publish2SyncSource.TrySetResult(true); - await publish1SyncSource.Task; } } + else + { + var ex = new InvalidOperationException("incorrect message - should never happen!"); + publish1SyncSource.TrySetException(ex); + publish2SyncSource.TrySetException(ex); + } + return Task.CompletedTask; }; await consumeChannel.BasicConsumeAsync(queueName, true, string.Empty, false, false, null, consumer); @@ -234,10 +241,10 @@ public async Task TestBasicRoundtripConcurrentManyMessages() await AssertRanToCompletion(publish1SyncSource.Task, publish2SyncSource.Task); bool result1 = await publish1SyncSource.Task; - Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result1, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); bool result2 = await publish2SyncSource.Task; - Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {maximumWaitTime}"); + Assert.True(result2, $"Non concurrent dispatch lead to deadlock after {WaitSpan}"); await consumeChannel.CloseAsync(); } @@ -264,9 +271,9 @@ public async Task TestBasicRejectAsync() try { - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (args.Initiator == ShutdownInitiator.Peer) { @@ -362,9 +369,9 @@ public async Task TestBasicAckAsync() var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (args.Initiator == ShutdownInitiator.Peer) { @@ -427,9 +434,9 @@ public async Task TestBasicNackAsync() { var publishSyncSource = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (args.Initiator == ShutdownInitiator.Peer) { diff --git a/projects/Test/Integration/TestBasicPublish.cs b/projects/Test/Integration/TestBasicPublish.cs index 7bab6cfb41..2f18f6a89a 100644 --- a/projects/Test/Integration/TestBasicPublish.cs +++ b/projects/Test/Integration/TestBasicPublish.cs @@ -195,9 +195,10 @@ public async Task TestMaxMessageSize() using (IConnection c = await cf.CreateConnectionAsync()) { - c.ConnectionShutdown += (o, a) => + c.ConnectionShutdownAsync += (o, a) => { sawConnectionShutdown = true; + return Task.CompletedTask; }; Assert.Equal(maxMsgSize, cf.MaxMessageSize); diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs index 8d2edccc0f..5b3ff5729a 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnection.cs @@ -70,12 +70,14 @@ private async Task TestConcurrentChannelOperationsAsync(Func var tasks = new List(); for (int i = 0; i < _processorCount; i++) { - tasks.Add(Task.Run(async () => + tasks.Add(Task.Run(() => { + var subTasks = new List(); for (int j = 0; j < iterations; j++) { - await action(_conn); + subTasks.Add(action(_conn)); } + return Task.WhenAll(subTasks); })); } diff --git a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs index de11787dbf..2ac5d09081 100644 --- a/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs +++ b/projects/Test/Integration/TestConcurrentAccessWithSharedConnectionAsync.cs @@ -52,7 +52,7 @@ public TestConcurrentAccessWithSharedConnectionAsync(ITestOutputHelper output) public override async Task InitializeAsync() { await base.InitializeAsync(); - _conn.ConnectionShutdown += HandleConnectionShutdown; + _conn.ConnectionShutdownAsync += HandleConnectionShutdownAsync; } [Fact] diff --git a/projects/Test/Integration/TestConnectionShutdown.cs b/projects/Test/Integration/TestConnectionShutdown.cs index 2f3dce5f51..a6a51fb5c2 100644 --- a/projects/Test/Integration/TestConnectionShutdown.cs +++ b/projects/Test/Integration/TestConnectionShutdown.cs @@ -96,17 +96,24 @@ public async Task TestDisposedWithSocketClosedOutOfBand() _channel.ChannelShutdown += (channel, args) => { + _output.WriteLine("_channel.ChannelShutdown called"); tcs.SetResult(true); }; var c = (AutorecoveringConnection)_conn; - await c.CloseFrameHandlerAsync(); + Task frameHandlerCloseTask = c.CloseFrameHandlerAsync(); - _conn.Dispose(); - _conn = null; - - TimeSpan waitSpan = TimeSpan.FromSeconds(10); - await WaitAsync(tcs, waitSpan, "channel shutdown"); + try + { + _conn.Dispose(); + await WaitAsync(tcs, WaitSpan, "channel shutdown"); + await frameHandlerCloseTask.WaitAsync(WaitSpan); + } + finally + { + _conn = null; + _channel = null; + } } [Fact] diff --git a/projects/Test/Integration/TestFloodPublishing.cs b/projects/Test/Integration/TestFloodPublishing.cs index 11c0bec638..590ed26b92 100644 --- a/projects/Test/Integration/TestFloodPublishing.cs +++ b/projects/Test/Integration/TestFloodPublishing.cs @@ -66,9 +66,9 @@ public async Task TestUnthrottledFloodPublishing() Assert.IsNotType(_conn); _channel = await _conn.CreateChannelAsync(); - _conn.ConnectionShutdown += (_, ea) => + _conn.ConnectionShutdownAsync += (_, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (args.Initiator == ShutdownInitiator.Peer) { @@ -132,9 +132,9 @@ public async Task TestMultithreadFloodPublishing() var tcs = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously); - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (args.Initiator == ShutdownInitiator.Peer) { diff --git a/projects/Test/Integration/TestPublishSharedChannelAsync.cs b/projects/Test/Integration/TestPublishSharedChannelAsync.cs index d132d51339..46c642e19b 100644 --- a/projects/Test/Integration/TestPublishSharedChannelAsync.cs +++ b/projects/Test/Integration/TestPublishSharedChannelAsync.cs @@ -78,7 +78,7 @@ public async Task MultiThreadPublishOnSharedChannel() try { Assert.IsNotType(conn); - conn.ConnectionShutdown += HandleConnectionShutdown; + conn.ConnectionShutdownAsync += HandleConnectionShutdownAsync; using (IChannel channel = await conn.CreateChannelAsync()) { diff --git a/projects/Test/Integration/TestQueueDeclare.cs b/projects/Test/Integration/TestQueueDeclare.cs index f3a6b3e25d..3159975315 100644 --- a/projects/Test/Integration/TestQueueDeclare.cs +++ b/projects/Test/Integration/TestQueueDeclare.cs @@ -62,9 +62,9 @@ public async void TestConcurrentQueueDeclareAndBindAsync() { bool sawShutdown = false; - _conn.ConnectionShutdown += (o, ea) => + _conn.ConnectionShutdownAsync += (o, ea) => { - HandleConnectionShutdown(_conn, ea, (args) => + return HandleConnectionShutdownAsync(_conn, ea, (args) => { if (ea.Initiator == ShutdownInitiator.Peer) { diff --git a/projects/Test/Integration/TestToxiproxy.cs b/projects/Test/Integration/TestToxiproxy.cs index 72b5597e70..032fe98b30 100644 --- a/projects/Test/Integration/TestToxiproxy.cs +++ b/projects/Test/Integration/TestToxiproxy.cs @@ -97,7 +97,7 @@ public async Task TestCloseConnection() recoverySucceededTcs.SetResult(false); }; - conn.ConnectionShutdown += (s, ea) => + conn.ConnectionShutdownAsync += (s, ea) => { if (IsVerbose) { @@ -109,6 +109,8 @@ public async Task TestCloseConnection() * test exits, and connectionShutdownTcs will have already been set */ connectionShutdownTcs.TrySetResult(true); + + return Task.CompletedTask; }; conn.RecoverySucceeded += (s, ea) => @@ -265,9 +267,10 @@ public async Task TestTcpReset_GH1464() { using (IConnection conn = await cf.CreateConnectionAsync()) { - conn.ConnectionShutdown += (o, ea) => + conn.ConnectionShutdownAsync += (o, ea) => { - connectionShutdownTcs.SetResult(true); + connectionShutdownTcs.TrySetResult(true); + return Task.CompletedTask; }; using (IChannel ch = await conn.CreateChannelAsync()) diff --git a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs index 2eb2f73800..acb704a885 100644 --- a/projects/Test/SequentialIntegration/TestConnectionRecovery.cs +++ b/projects/Test/SequentialIntegration/TestConnectionRecovery.cs @@ -248,7 +248,11 @@ public async Task TestServerNamedTransientAutoDeleteQueueAndBindingRecovery() public async Task TestShutdownEventHandlersRecoveryOnConnectionAfterDelayedServerRestart() { int counter = 0; - _conn.ConnectionShutdown += (c, args) => Interlocked.Increment(ref counter); + _conn.ConnectionShutdownAsync += (c, args) => + { + Interlocked.Increment(ref counter); + return Task.CompletedTask; + }; TaskCompletionSource shutdownLatch = PrepareForShutdown(_conn); TaskCompletionSource recoveryLatch = PrepareForRecovery((AutorecoveringConnection)_conn); diff --git a/projects/Test/SequentialIntegration/TestHeartbeats.cs b/projects/Test/SequentialIntegration/TestHeartbeats.cs index ea9e8f0299..a83dc0aa65 100644 --- a/projects/Test/SequentialIntegration/TestHeartbeats.cs +++ b/projects/Test/SequentialIntegration/TestHeartbeats.cs @@ -114,10 +114,11 @@ public async Task TestHundredsOfConnectionsWithRandomHeartbeatInterval() IConnection conn = await cf.CreateConnectionAsync($"{_testDisplayName}:{i}"); conns.Add(conn); IChannel ch = await conn.CreateChannelAsync(); - conn.ConnectionShutdown += (sender, evt) => - { - CheckInitiator(evt); - }; + conn.ConnectionShutdownAsync += (sender, evt) => + { + CheckInitiator(evt); + return Task.CompletedTask; + }; } await SleepFor(60); @@ -139,7 +140,7 @@ private async Task RunSingleConnectionTestAsync(ConnectionFactory cf) { bool wasShutdown = false; - conn.ConnectionShutdown += (sender, evt) => + conn.ConnectionShutdownAsync += (sender, evt) => { lock (conn) { @@ -149,6 +150,8 @@ private async Task RunSingleConnectionTestAsync(ConnectionFactory cf) wasShutdown = true; } } + + return Task.CompletedTask; }; await SleepFor(30);