Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 67 additions & 0 deletions projects/Test/Integration/TestAsyncConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,73 @@ public async Task TestDeclarationOfManyAutoDeleteQueuesWithTransientConsumer()
AssertRecordedQueues((RabbitMQ.Client.Framing.Impl.AutorecoveringConnection)_conn, 0);
}

[Fact]
public async Task TestCreateChannelWithinAsyncConsumerCallback_GH650()
{
string exchangeName = GenerateExchangeName();
string queue1Name = GenerateQueueName();
string queue2Name = GenerateQueueName();

var tcs = new TaskCompletionSource<bool>(TaskCreationOptions.RunContinuationsAsynchronously);
using var cts = new CancellationTokenSource(WaitSpan);
using CancellationTokenRegistration ctr = cts.Token.Register(() =>
{
tcs.SetCanceled();
});

_conn.ConnectionShutdown += (o, ea) =>
{
HandleConnectionShutdown(_conn, ea, (args) =>
{
MaybeSetException(ea, tcs);
});
};

_channel.ChannelShutdown += (o, ea) =>
{
HandleChannelShutdown(_channel, ea, (args) =>
{
MaybeSetException(ea, tcs);
});
};

// queue1 -> produce click to queue2
// click -> exchange
// queue2 -> consume click from queue1
await _channel.ExchangeDeclareAsync(exchangeName, ExchangeType.Direct, autoDelete: true);
await _channel.QueueDeclareAsync(queue1Name);
await _channel.QueueBindAsync(queue1Name, exchangeName, queue1Name);
await _channel.QueueDeclareAsync(queue2Name);
await _channel.QueueBindAsync(queue2Name, exchangeName, queue2Name);

var consumer1 = new AsyncEventingBasicConsumer(_channel);
consumer1.Received += async (sender, args) =>
{
using (IChannel innerChannel = await _conn.CreateChannelAsync())
{
await innerChannel.ConfirmSelectAsync();
await innerChannel.BasicPublishAsync(exchangeName, queue2Name, mandatory: true);
await innerChannel.WaitForConfirmsOrDieAsync();
await innerChannel.CloseAsync();
}
};
await _channel.BasicConsumeAsync(queue1Name, autoAck: true, consumer1);

var consumer2 = new AsyncEventingBasicConsumer(_channel);
consumer2.Received += async (sender, args) =>
{
tcs.TrySetResult(true);
await Task.Yield();
};
await _channel.BasicConsumeAsync(queue2Name, autoAck: true, consumer2);

await _channel.ConfirmSelectAsync();
await _channel.BasicPublishAsync(exchangeName, queue1Name, body: GetRandomBody(1024));
await _channel.WaitForConfirmsOrDieAsync();

Assert.True(await tcs.Task);
}

private static void SetException(Exception ex, params TaskCompletionSource<bool>[] tcsAry)
{
foreach (TaskCompletionSource<bool> tcs in tcsAry)
Expand Down