diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 41a526f1..c8bb69f7 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -254,11 +254,18 @@ public ValueTask Publish(T msg) where T : struct, ICommand public async Task DeletePublisher(byte publisherId) { - var result = - await Request(corr => - new DeletePublisherRequest(corr, publisherId)); - publishers.Remove(publisherId); - return result; + try + { + var result = + await Request(corr => + new DeletePublisherRequest(corr, publisherId)); + + return result; + } + finally + { + publishers.Remove(publisherId); + } } public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType, diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index 22545756..d0062ca6 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -127,9 +127,6 @@ private async Task ProcessIncomingFrames() { // Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled. - // Console.WriteLine( - // $"B TryReadFrame {buffer.Length} {result.IsCompleted} {result.Buffer.IsEmpty} {frame.Length}"); - var memory = ArrayPool.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length); frame.CopyTo(memory.Span); diff --git a/RabbitMQ.Stream.Client/Producer.cs b/RabbitMQ.Stream.Client/Producer.cs index 31e45102..74926e20 100644 --- a/RabbitMQ.Stream.Client/Producer.cs +++ b/RabbitMQ.Stream.Client/Producer.cs @@ -32,7 +32,7 @@ public record ProducerConfig : INamedEntity public class Producer : AbstractEntity, IDisposable { - private readonly bool _disposed; + private bool _disposed; private byte publisherId; private readonly ProducerConfig config; private readonly Channel messageBuffer; @@ -131,7 +131,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, private async Task SemaphoreWait() { - if (!semaphore.Wait(0)) + if (!semaphore.Wait(0) && !client.IsClosed) { // Nope, we have maxed our In-Flight messages, let's asynchronously wait for confirms if (!await semaphore.WaitAsync(1000).ConfigureAwait(false)) @@ -159,7 +159,7 @@ private async Task ProcessBuffer() { // TODO: make the batch size configurable. var messages = new List<(ulong, Message)>(100); - while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false)) + while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false) && !client.IsClosed) { while (messageBuffer.Reader.TryRead(out var msg)) { @@ -188,18 +188,35 @@ async Task SendMessages(List<(ulong, Message)> messages) } } - public async Task Close() + public Task Close() { if (client.IsClosed) { - return ResponseCode.Ok; + return Task.FromResult(ResponseCode.Ok); + } + + var result = ResponseCode.Ok; + try + { + var deletePublisherResponseTask = client.DeletePublisher(publisherId); + // The default timeout is usually 10 seconds + // in this case we reduce the waiting time + // the producer could be removed because of stream deleted + // so it is not necessary to wait. + deletePublisherResponseTask.Wait(TimeSpan.FromSeconds(3)); + if (deletePublisherResponseTask.IsCompletedSuccessfully) + { + result = deletePublisherResponseTask.Result.ResponseCode; + } + } + catch (Exception e) + { + LogEventSource.Log.LogError($"Error removing the producer id: {publisherId} from the server. {e}"); } - var deletePublisherResponse = await client.DeletePublisher(publisherId); - var result = deletePublisherResponse.ResponseCode; var closed = client.MaybeClose($"client-close-publisher: {publisherId}"); ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {publisherId}"); - return result; + return Task.FromResult(result); } public static async Task Create(ClientParameters clientParameters, @@ -228,6 +245,7 @@ private void Dispose(bool disposing) closeProducer.Wait(1000); ClientExceptions.MaybeThrowException(closeProducer.Result, $"Error during remove producer. Producer: {publisherId}"); + _disposed = true; } public void Dispose() diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs new file mode 100644 index 00000000..cb2d214e --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs @@ -0,0 +1,127 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using System.Timers; +using Timer = System.Timers.Timer; + +namespace RabbitMQ.Stream.Client.Reliable; + +/// +/// ConfirmationStatus can be: +/// +public enum ConfirmationStatus : ushort +{ + WaitForConfirmation = 0, + Confirmed = 1, + TimeoutError = 2, + StreamNotAvailable = 6, + InternalError = 15, + AccessRefused = 16, + PreconditionFailed = 17, + PublisherDoesNotExist = 18, + UndefinedError = 200, +} + +/// +/// MessagesConfirmation is a wrapper around the message/s +/// This class is returned to the user to understand +/// the message status. +/// +public class MessagesConfirmation +{ + public ulong PublishingId { get; internal set; } + public List Messages { get; internal init; } + public DateTime InsertDateTime { get; init; } + public ConfirmationStatus Status { get; internal set; } +} + +/// +/// ConfirmationPipe maintains the status for the sent and received messages. +/// TPL Action block sends the confirmation to the user in async way +/// So the send/1 is not blocking. +/// +public class ConfirmationPipe +{ + private ActionBlock> _waitForConfirmationActionBlock; + private readonly ConcurrentDictionary _waitForConfirmation = new(); + private readonly Timer _invalidateTimer = new(); + private Func ConfirmHandler { get; } + + public ConfirmationPipe(Func confirmHandler) + { + ConfirmHandler = confirmHandler; + } + + public void Start() + { + _waitForConfirmationActionBlock = new ActionBlock>( + request => + { + var (confirmationStatus, publishingId) = request; + + _waitForConfirmation.TryRemove(publishingId, out var message); + if (message == null) + { + return; + } + + message.Status = confirmationStatus; + ConfirmHandler?.Invoke(message); + }, new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = 1, + // throttling + BoundedCapacity = 50_000 + }); + + _invalidateTimer.Elapsed += OnTimedEvent; + _invalidateTimer.Interval = 2000; + _invalidateTimer.Enabled = true; + } + + public void Stop() + { + _invalidateTimer.Enabled = false; + _waitForConfirmationActionBlock.Complete(); + } + + private async void OnTimedEvent(object? sender, ElapsedEventArgs e) + { + { + foreach (var pair in _waitForConfirmation.Where(pair => + (DateTime.Now - pair.Value.InsertDateTime).Seconds > 2)) + { + await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError); + } + } + } + + public void AddUnConfirmedMessage(ulong publishingId, Message message) + { + AddUnConfirmedMessage(publishingId, new List() { message }); + } + + public void AddUnConfirmedMessage(ulong publishingId, List messages) + { + _waitForConfirmation.TryAdd(publishingId, + new MessagesConfirmation() + { + Messages = messages, + PublishingId = publishingId, + InsertDateTime = DateTime.Now + }); + } + + public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus) + { + return _waitForConfirmationActionBlock.SendAsync( + Tuple.Create(confirmationStatus, publishingId)); + } +} diff --git a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs new file mode 100644 index 00000000..5fe16097 --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs @@ -0,0 +1,11 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +namespace RabbitMQ.Stream.Client.Reliable; + +public interface IReconnectStrategy +{ + void WhenDisconnected(out bool reconnect); + void WhenConnected(); +} diff --git a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs new file mode 100644 index 00000000..2ae1638f --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs @@ -0,0 +1,18 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System.Threading.Tasks; + +namespace RabbitMQ.Stream.Client.Reliable; + +/// +/// Define PublishingId Strategy. +/// Can be automatic, so the ReliableProducer will provide +/// the PublishingId +/// +public interface IPublishingIdStrategy +{ + ulong GetPublishingId(); + Task InitPublishingId(); +} diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs new file mode 100644 index 00000000..c68755c1 --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs @@ -0,0 +1,303 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System; +using System.Collections.Generic; +using System.Threading; +using System.Threading.Tasks; + +namespace RabbitMQ.Stream.Client.Reliable; + +internal class AutoPublishingId : IPublishingIdStrategy +{ + private ulong _lastPublishingId = 0; + private readonly ReliableProducerConfig _reliableProducerConfig; + + public ulong GetPublishingId() + { + return ++_lastPublishingId; + } + + public AutoPublishingId(ReliableProducerConfig reliableProducerConfig) + { + _reliableProducerConfig = reliableProducerConfig; + } + + public async Task InitPublishingId() + { + try + { + _lastPublishingId = + await _reliableProducerConfig.StreamSystem.QuerySequence(_reliableProducerConfig.Reference, + _reliableProducerConfig.Stream); + } + catch (Exception) + { + _lastPublishingId = 0; + } + } +} + +internal class BackOffReconnectStrategy : IReconnectStrategy +{ + private int Tentatives { get; set; } = 1; + + public void WhenDisconnected(out bool reconnect) + { + Tentatives <<= 1; + LogEventSource.Log.LogInformation( + $"Producer disconnected, check if reconnection needed in {Tentatives * 100} ms."); + Thread.Sleep(TimeSpan.FromMilliseconds(Tentatives * 100)); + reconnect = true; + } + + public void WhenConnected() + { + Tentatives = 1; + } +} + +public record ReliableProducerConfig +{ + public StreamSystem StreamSystem { get; set; } + public string Stream { get; set; } + public string Reference { get; set; } + public Func ConfirmationHandler { get; init; } + public string ClientProvidedName { get; set; } = "dotnet-stream-rproducer"; + public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy(); +} + +/// +/// ReliableProducer is a wrapper around the standard Producer. +/// Main features are: +/// - Auto-reconnection if the connection is dropped +/// - Trace sent and received messages. The event ReliableProducer:ConfirmationHandler/2 +/// receives back messages sent with the status. +/// - Handle the Metadata Update. In case the stream is deleted ReliableProducer closes Producer/Connection. +/// Reconnect the Producer if the stream still exists. +/// - Set automatically the next PublisherID +/// - Automatically retrieves the last sequence. By default is AutoPublishingId see IPublishingIdStrategy. +/// +public class ReliableProducer +{ + private Producer _producer; + private readonly AutoPublishingId _autoPublishingId; + private readonly ReliableProducerConfig _reliableProducerConfig; + private readonly SemaphoreSlim _semProducer = new(1); + private readonly ConfirmationPipe _confirmationPipe; + private bool _needReconnect = true; + private bool _inReconnection; + + private ReliableProducer(ReliableProducerConfig reliableProducerConfig) + { + _reliableProducerConfig = reliableProducerConfig; + _autoPublishingId = new AutoPublishingId(_reliableProducerConfig); + _confirmationPipe = new ConfirmationPipe(reliableProducerConfig.ConfirmationHandler); + _confirmationPipe.Start(); + } + + public static async Task CreateReliableProducer(ReliableProducerConfig reliableProducerConfig) + { + var rProducer = new ReliableProducer(reliableProducerConfig); + await rProducer.Init(); + return rProducer; + } + + private async Task Init() + { + await _autoPublishingId.InitPublishingId(); + await GetNewProducer(); + } + + private async Task GetNewProducer() + { + await _semProducer.WaitAsync(); + + try + { + _producer = await _reliableProducerConfig.StreamSystem.CreateProducer(new ProducerConfig() + { + Stream = _reliableProducerConfig.Stream, + ClientProvidedName = _reliableProducerConfig.ClientProvidedName, + Reference = _reliableProducerConfig.Reference, + MetadataHandler = update => + { + HandleMetaDataMaybeReconnect(update.Stream).Wait(); + }, + ConnectionClosedHandler = async _ => + { + await TryToReconnect(); + }, + ConfirmHandler = confirmation => + { + var confirmationStatus = confirmation.Code switch + { + ResponseCode.PublisherDoesNotExist => ConfirmationStatus.PublisherDoesNotExist, + ResponseCode.AccessRefused => ConfirmationStatus.AccessRefused, + ResponseCode.InternalError => ConfirmationStatus.InternalError, + ResponseCode.PreconditionFailed => ConfirmationStatus.PreconditionFailed, + ResponseCode.StreamNotAvailable => ConfirmationStatus.StreamNotAvailable, + ResponseCode.Ok => ConfirmationStatus.Confirmed, + _ => ConfirmationStatus.UndefinedError + }; + + _confirmationPipe.RemoveUnConfirmedMessage(confirmation.PublishingId, + confirmationStatus); + } + }); + _reliableProducerConfig.ReconnectStrategy.WhenConnected(); + } + + catch (CreateProducerException ce) + { + LogEventSource.Log.LogError($"{ce}. ReliableProducer closed"); + } + catch (Exception e) + { + LogEventSource.Log.LogError($"Error during initialization: {e}."); + _semProducer.Release(); + await TryToReconnect(); + } + + _semProducer.Release(); + } + + private async Task TryToReconnect() + { + _inReconnection = true; + try + { + _reliableProducerConfig.ReconnectStrategy.WhenDisconnected(out var reconnect); + if (reconnect && _needReconnect) + { + await GetNewProducer(); + } + } + finally + { + _inReconnection = false; + } + } + + /// + /// When the clients receives a meta data update, it doesn't know + /// the reason. + /// Metadata update can be raised when: + /// - stream is deleted + /// - change the stream topology (ex: add a follower) + /// + /// HandleMetaDataMaybeReconnect checks if the stream still exists + /// and try to reconnect. + /// (internal because it is needed for tests) + /// + internal async Task HandleMetaDataMaybeReconnect(string stream) + { + LogEventSource.Log.LogInformation( + $"Meta data update for the stream: {stream} " + + $"Producer {_reliableProducerConfig.Reference} closed."); + + // This sleep is needed. When a stream is deleted it takes sometime. + // The StreamExists/1 could return true even the stream doesn't exist anymore. + Thread.Sleep(500); + if (await _reliableProducerConfig.StreamSystem.StreamExists(stream)) + { + LogEventSource.Log.LogInformation( + $"Meta data update, the stream {stream} still exist. " + + $"Producer {_reliableProducerConfig.Reference} will try to reconnect."); + // Here we just close the producer connection + // the func TryToReconnect/0 will be called. + await CloseProducer(); + } + else + { + // In this case the stream doesn't exist anymore + // the ReliableProducer is just closed. + await Close(); + } + } + + private async Task CloseProducer() + { + await _semProducer.WaitAsync(10); + try + { + await _producer.Close(); + } + finally + { + _semProducer.Release(); + } + } + + public bool IsOpen() + { + return _needReconnect; + } + + public async Task Close() + { + await _semProducer.WaitAsync(10); + try + { + _needReconnect = false; + _confirmationPipe.Stop(); + await _producer.Close(); + } + finally + { + _semProducer.Release(); + } + } + + public async ValueTask Send(Message message) + { + var pid = _autoPublishingId.GetPublishingId(); + _confirmationPipe.AddUnConfirmedMessage(pid, message); + await _semProducer.WaitAsync(); + try + { + // This flags avoid some race condition, + // since the reconnection can arrive from different threads. + // In this case it skips the publish until + // the producer is connected. Messages are safe since are stored + // on the _waitForConfirmation list. The user will get Timeout Error + if (!(_inReconnection)) + { + await _producer.Send(pid, message); + } + } + + catch (Exception e) + { + LogEventSource.Log.LogError($"Error sending message: {e}."); + } + finally + { + _semProducer.Release(); + } + } + + public async ValueTask Send(List messages, CompressionType compressionType) + { + var pid = _autoPublishingId.GetPublishingId(); + _confirmationPipe.AddUnConfirmedMessage(pid, messages); + await _semProducer.WaitAsync(); + try + { + if (!_inReconnection) + { + await _producer.Send(pid, messages, compressionType); + } + } + + catch (Exception e) + { + LogEventSource.Log.LogError($"Error sending messages: {e}."); + } + finally + { + _semProducer.Release(); + } + } +} diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 47e21c64..45e93f6a 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Stream.Client @@ -29,7 +30,7 @@ public record StreamSystemConfig : INamedEntity public class StreamSystem { private readonly ClientParameters clientParameters; - private readonly Client client; + private Client client; private StreamSystem(ClientParameters clientParameters, Client client) { @@ -80,8 +81,32 @@ public async Task Close() await client.Close("system close"); } + private readonly SemaphoreSlim _semLocatorConnection = new(1); + + private async Task MayBeReconnectLocator() + { + try + { + await _semLocatorConnection.WaitAsync(); + { + if (client.IsClosed) + { + client = await Client.Create(clientParameters with + { + ClientProvidedName = clientParameters.ClientProvidedName + }); + } + } + } + finally + { + _semLocatorConnection.Release(); + } + } + public async Task CreateProducer(ProducerConfig producerConfig) { + await MayBeReconnectLocator(); var meta = await client.QueryMetadata(new[] { producerConfig.Stream }); var metaStreamInfo = meta.StreamInfos[producerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) @@ -111,6 +136,7 @@ public async Task StreamExists(string stream) return response.StreamInfos is { Count: >= 1 } && response.StreamInfos[stream].ResponseCode == ResponseCode.Ok; } + private static void MaybeThrowQueryException(string reference, string stream) { if (string.IsNullOrEmpty(reference) || string.IsNullOrEmpty(stream)) @@ -145,6 +171,7 @@ public async Task QueryOffset(string reference, string stream) /// public async Task QuerySequence(string reference, string stream) { + await MayBeReconnectLocator(); MaybeThrowQueryException(reference, stream); var response = await client.QueryPublisherSequence(reference, stream); @@ -155,6 +182,7 @@ public async Task QuerySequence(string reference, string stream) public async Task DeleteStream(string stream) { + await MayBeReconnectLocator(); var response = await client.DeleteStream(stream); if (response.ResponseCode == ResponseCode.Ok) { diff --git a/Tests/ApiApproval.Approve.verified.txt b/Tests/ApiApproval.Approve.verified.txt index cfef7836..4a206245 100644 --- a/Tests/ApiApproval.Approve.verified.txt +++ b/Tests/ApiApproval.Approve.verified.txt @@ -969,4 +969,74 @@ namespace RabbitMQ.Stream.Client { public VirtualHostAccessFailureException(string s) { } } +} +namespace RabbitMQ.Stream.Client.Reliable +{ + public class ConfirmationPipe + { + public ConfirmationPipe(System.Func confirmHandler) { } + public void AddUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Message message) { } + public void AddUnConfirmedMessage(ulong publishingId, System.Collections.Generic.List messages) { } + public System.Threading.Tasks.Task RemoveUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Reliable.ConfirmationStatus confirmationStatus) { } + public void Start() { } + public void Stop() { } + } + public enum ConfirmationStatus : ushort + { + WaitForConfirmation = 0, + Confirmed = 1, + TimeoutError = 2, + StreamNotAvailable = 6, + InternalError = 15, + AccessRefused = 16, + PreconditionFailed = 17, + PublisherDoesNotExist = 18, + UndefinedError = 200, + } + public interface IPublishingIdStrategy + { + ulong GetPublishingId(); + System.Threading.Tasks.Task InitPublishingId(); + } + public interface IReconnectStrategy + { + void WhenConnected(); + void WhenDisconnected(out bool reconnect); + } + public class MessagesConfirmation + { + public MessagesConfirmation() { } + public System.DateTime InsertDateTime { get; set; } + public System.Collections.Generic.List Messages { get; } + public ulong PublishingId { get; } + public RabbitMQ.Stream.Client.Reliable.ConfirmationStatus Status { get; } + } + public class ReliableProducer + { + public System.Threading.Tasks.Task Close() { } + public bool IsOpen() { } + public System.Threading.Tasks.ValueTask Send(RabbitMQ.Stream.Client.Message message) { } + public System.Threading.Tasks.ValueTask Send(System.Collections.Generic.List messages, RabbitMQ.Stream.Client.CompressionType compressionType) { } + public static System.Threading.Tasks.Task CreateReliableProducer(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig reliableProducerConfig) { } + } + public class ReliableProducerConfig : System.IEquatable + { + public ReliableProducerConfig() { } + protected ReliableProducerConfig(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig original) { } + public string ClientProvidedName { get; set; } + public System.Func ConfirmationHandler { get; set; } + protected virtual System.Type EqualityContract { get; } + public RabbitMQ.Stream.Client.Reliable.IReconnectStrategy ReconnectStrategy { get; set; } + public string Reference { get; set; } + public string Stream { get; set; } + public RabbitMQ.Stream.Client.StreamSystem StreamSystem { get; set; } + public virtual RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig $() { } + public virtual bool Equals(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? other) { } + public override bool Equals(object? obj) { } + public override int GetHashCode() { } + protected virtual bool PrintMembers(System.Text.StringBuilder builder) { } + public override string ToString() { } + public static bool operator !=(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? left, RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? right) { } + public static bool operator ==(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? left, RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? right) { } + } } \ No newline at end of file diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs new file mode 100644 index 00000000..637c1649 --- /dev/null +++ b/Tests/ReliableTests.cs @@ -0,0 +1,304 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using System.Threading.Tasks; +using RabbitMQ.Stream.Client; +using RabbitMQ.Stream.Client.Reliable; +using Xunit; +using Xunit.Abstractions; + +namespace Tests; + +public class ReliableTests +{ + private readonly ITestOutputHelper _testOutputHelper; + + public ReliableTests(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + + [Fact] + public void MessageWithoutConfirmationRaiseTimeout() + { + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); + var confirmationPipe = new ConfirmationPipe(confirmation => + { + l.Add(confirmation); + if (confirmation.PublishingId == 2) + { + confirmationTask.SetResult(l); + } + + return Task.CompletedTask; + } + ); + confirmationPipe.Start(); + var message = new Message(Encoding.UTF8.GetBytes($"hello")); + confirmationPipe.AddUnConfirmedMessage(1, message); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + // time out error is sent by the internal time that checks the status + // if the message doesn't receive the confirmation within X time, the timeout error is raised. + Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[0].Status); + Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[1].Status); + confirmationPipe.Stop(); + } + + [Fact] + public void MessageConfirmationShouldHaveTheSameMessages() + { + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); + var confirmationPipe = new ConfirmationPipe(confirmation => + { + l.Add(confirmation); + if (confirmation.PublishingId == 2) + { + confirmationTask.SetResult(l); + } + + return Task.CompletedTask; + } + ); + confirmationPipe.Start(); + var message = new Message(Encoding.UTF8.GetBytes($"hello")); + confirmationPipe.AddUnConfirmedMessage(1, message); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); + confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); + confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].Status); + Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].Status); + confirmationPipe.Stop(); + } + + [Fact] + public async void ConfirmRProducerMessages() + { + var testPassed = new TaskCompletionSource(); + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var count = 0; + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() + { + Stream = stream, + StreamSystem = system, + ConfirmationHandler = _ => + { + if (Interlocked.Increment(ref count) == 10) + { + testPassed.SetResult(true); + } + + return Task.CompletedTask; + } + } + ); + for (var i = 0; i < 5; i++) + { + await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); + } + + List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; + + for (var i = 0; i < 5; i++) + { + await rProducer.Send(messages, CompressionType.None); + } + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + await rProducer.Close(); + await system.Close(); + } + + [Fact] + public async void SendMessageAfterKillConnectionShouldContinueToWork() + { + // Test the auto-reconnect client + // When the client connection is closed by the management UI + // see HttpKillConnections/1. + // The RProducer has to detect the disconnection and reconnect the client + // + var testPassed = new TaskCompletionSource(); + + var clientProvidedNameLocator = Guid.NewGuid().ToString(); + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream, clientProvidedNameLocator); + var count = 0; + var clientProvidedName = Guid.NewGuid().ToString(); + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = clientProvidedName, + ConfirmationHandler = _ => + { + if (Interlocked.Increment(ref count) == 10) + { + testPassed.SetResult(true); + } + + return Task.CompletedTask; + } + } + ); + for (var i = 0; i < 5; i++) + { + await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); + } + + SystemUtils.Wait(TimeSpan.FromSeconds(6)); + Assert.Equal(1, SystemUtils.HttpKillConnections(clientProvidedName).Result); + await SystemUtils.HttpKillConnections(clientProvidedNameLocator); + + for (var i = 0; i < 5; i++) + { + List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; + await rProducer.Send(messages, CompressionType.None); + } + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + // here the locator connection is closed. + // the auto-reconnect has to connect the locator again + await system.DeleteStream(stream); + await rProducer.Close(); + await system.Close(); + } + + [Fact] + public async void HandleDeleteStreamWithMetaDataUpdate() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var clientProviderName = Guid.NewGuid().ToString(); + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = clientProviderName, + ConfirmationHandler = _ => Task.CompletedTask + } + ); + + Assert.True(rProducer.IsOpen()); + // When the stream is deleted the producer has to close the + // connection an become inactive. + await system.DeleteStream(stream); + + SystemUtils.Wait(TimeSpan.FromSeconds(5)); + Assert.False(rProducer.IsOpen()); + await system.Close(); + } + + [Fact] + public async void HandleChangeStreamConfigurationWithMetaDataUpdate() + { + // When stream topology changes the MetadataUpdate is raised. + // in this test we simulate it using await ReliableProducer:HandleMetaDataMaybeReconnect/1; + // Producer must reconnect + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var clientProviderName = Guid.NewGuid().ToString(); + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = clientProviderName, + ConfirmationHandler = _ => Task.CompletedTask + } + ); + + Assert.True(rProducer.IsOpen()); + await rProducer.HandleMetaDataMaybeReconnect(stream); + SystemUtils.Wait(); + Assert.True(rProducer.IsOpen()); + // await system.DeleteStream(stream); + await system.Close(); + } + + [Fact] + public async void AutoPublishIdDefaultShouldStartFromTheLast() + { + // RProducer automatically retrieves the last producer offset. + // see IPublishingIdStrategy implementation + // This tests if the the last id stored + // A new RProducer should restart from the last offset. + + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var testPassed = new TaskCompletionSource(); + var clientProviderName = Guid.NewGuid().ToString(); + var reference = Guid.NewGuid().ToString(); + var count = 0; + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = clientProviderName, + Reference = reference, + ConfirmationHandler = confirm => + { + if (Interlocked.Increment(ref count) != 5) + { + return Task.CompletedTask; + } + + Assert.Equal(ConfirmationStatus.Confirmed, confirm.Status); + + if (confirm.Status == ConfirmationStatus.Confirmed) + { + testPassed.SetResult(confirm.PublishingId); + } + + return Task.CompletedTask; + } + } + ); + + Assert.True(rProducer.IsOpen()); + + for (var i = 0; i < 5; i++) + { + await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); + } + + // We check if the publishing id is actually 5 + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + Assert.Equal((ulong)5, testPassed.Task.Result); + + await rProducer.Close(); + var testPassedSecond = new TaskCompletionSource(); + var rProducerSecond = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() + { + Stream = stream, + StreamSystem = system, + Reference = reference, + ClientProvidedName = clientProviderName, + ConfirmationHandler = confirm => + { + testPassedSecond.SetResult(confirm.PublishingId); + return Task.CompletedTask; + } + } + ); + + // given the same reference, the publishingId should restart from the last + // in this cas5 is 5 + await rProducerSecond.Send(new Message(Encoding.UTF8.GetBytes($"hello"))); + // +1 here, so 6 + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassedSecond); + Assert.Equal((ulong)6, testPassedSecond.Task.Result); + + await rProducerSecond.Close(); + await system.DeleteStream(stream); + await system.Close(); + } +} diff --git a/Tests/SystemTests.cs b/Tests/SystemTests.cs index f998205a..3140d84a 100644 --- a/Tests/SystemTests.cs +++ b/Tests/SystemTests.cs @@ -230,7 +230,7 @@ public async void CloseProducerConsumerAfterForceCloseShouldNotRaiseError() // we kill _only_ producer and consumer connection // leave the locator up and running to delete the stream - Assert.Equal(2, SystemUtils.HttpKillConnections().Result); + Assert.Equal(2, SystemUtils.HttpKillConnections("to_kill").Result); Assert.Equal(ResponseCode.Ok, await producer.Close()); Assert.Equal(ResponseCode.Ok, await producer.Close()); // close two time it should not raise an exception diff --git a/Tests/Utils.cs b/Tests/Utils.cs index f63086fa..a1e587c2 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -59,6 +59,16 @@ public static void Wait(TimeSpan wait) Thread.Sleep(wait); } + public static void InitStreamSystemWithRandomStream(out StreamSystem system, out string stream, + string clientProviderNameLocator = "stream-locator") + { + stream = Guid.NewGuid().ToString(); + var config = new StreamSystemConfig { ClientProvidedName = clientProviderNameLocator }; + system = StreamSystem.Create(config).Result; + var x = system.CreateStream(new StreamSpec(stream)); + x.Wait(); + } + public static async Task PublishMessages(StreamSystem system, string stream, int numberOfMessages, ITestOutputHelper testOutputHelper) { @@ -103,19 +113,19 @@ public static async Task PublishMessages(StreamSystem system, string stream, int producer.Dispose(); } - private class Connecction + private class Connection { public string name { get; set; } public Dictionary client_properties { get; set; } } - public static async Task HttpKillConnections() + public static async Task HttpKillConnections(string connectionName) { using var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; using var client = new HttpClient(handler); var result = await client.GetAsync("http://localhost:15672/api/connections"); var json = await result.Content.ReadAsStringAsync(); - var connections = JsonSerializer.Deserialize>(json); + var connections = JsonSerializer.Deserialize>(json); if (connections == null) { return 0; @@ -123,7 +133,7 @@ public static async Task HttpKillConnections() // we kill _only_ producer and consumer connections // leave the locator up and running to delete the stream - var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains("to_kill")); + var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains(connectionName)); foreach (var conn in iEnumerable) { await client.DeleteAsync($"http://localhost:15672/api/connections/{conn.name}");