From 086cd0c6cc977cc6537171515658d44df78ae590 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 11 Apr 2022 09:26:42 +0200 Subject: [PATCH 01/21] Work in progress Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConfirmationPipecs.cs | 106 ++++++++++++ .../Reliable/IReconnectStrategy.cs | 11 ++ .../Reliable/PublishingIdStrategy.cs | 14 ++ RabbitMQ.Stream.Client/Reliable/RProducer.cs | 154 ++++++++++++++++++ RabbitMQ.Stream.Client/StreamSystem.cs | 44 ++++- Tests/ReliableTests.cs | 63 +++++++ 6 files changed, 383 insertions(+), 9 deletions(-) create mode 100644 RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs create mode 100644 RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs create mode 100644 RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs create mode 100644 RabbitMQ.Stream.Client/Reliable/RProducer.cs create mode 100644 Tests/ReliableTests.cs diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs new file mode 100644 index 00000000..ce70e269 --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -0,0 +1,106 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using System.Timers; +using Timer = System.Timers.Timer; + +namespace RabbitMQ.Stream.Client.Reliable; + +public enum ConfirmationStatus : ushort +{ + WaitForConfirmation = 0, + Confirmed = 1, + TimeoutError = 2, +} + +public class ConfirmationMessage +{ + public ulong PublishingId { get; internal init; } + public Message Message { get; internal set; } + public DateTime DateTime { get; init; } + public ConfirmationStatus ConfirmationStatus { get; internal set; } +} + +public class ConfirmationPipe +{ + private ActionBlock> _waitForConfirmationActionBlock; + private readonly Dictionary _waitForConfirmation = new(); + private readonly Timer _invalidateTimer = new(); + private Func ConfirmHandler { get; } + + + public ConfirmationPipe(Func confirmHandler) + { + ConfirmHandler = confirmHandler; + } + + + public void Start() + { + _waitForConfirmationActionBlock = new ActionBlock>( + async request => + { + // if (_waitForConfirmation.Count > 5000) + // Console.WriteLine($"_waitForConfirmation Count: {_waitForConfirmation.Count}"); + var (confirmationStatus, confirmation) = request; + switch (confirmationStatus) + { + case ConfirmationStatus.WaitForConfirmation: + _waitForConfirmation.TryAdd(confirmation.PublishingId, confirmation); + break; + case ConfirmationStatus.Confirmed: + case ConfirmationStatus.TimeoutError: + _waitForConfirmation.Remove(confirmation.PublishingId, out var message); + if (message != null) + { + message.ConfirmationStatus = confirmationStatus; + ConfirmHandler?.Invoke(message); + } + break; + } + }, new ExecutionDataflowBlockOptions + { + MaxDegreeOfParallelism = 1 + }); + + _invalidateTimer.Elapsed += OnTimedEvent; + _invalidateTimer.Interval = 1000; + _invalidateTimer.Enabled = false; + } + + public void Stop() + { + _invalidateTimer.Enabled = false; + _waitForConfirmationActionBlock.Complete(); + } + + private async void OnTimedEvent(object? sender, ElapsedEventArgs e) + { + { + foreach (var pair in _waitForConfirmation.Where(pair => (DateTime.Now - pair.Value.DateTime).Seconds > 1)) + { + await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError); + } + } + } + + public Task AddUnConfirmedMessage(ulong publishingId, Message message) + { + return _waitForConfirmationActionBlock.SendAsync(Tuple.Create(ConfirmationStatus.WaitForConfirmation, + new ConfirmationMessage() {Message = message, PublishingId = publishingId, DateTime = DateTime.Now})); + } + + public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus) + { + return _waitForConfirmationActionBlock.SendAsync( + Tuple.Create(confirmationStatus, + new ConfirmationMessage() {PublishingId = publishingId})); + } +} diff --git a/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs new file mode 100644 index 00000000..5fe16097 --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/IReconnectStrategy.cs @@ -0,0 +1,11 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +namespace RabbitMQ.Stream.Client.Reliable; + +public interface IReconnectStrategy +{ + void WhenDisconnected(out bool reconnect); + void WhenConnected(); +} diff --git a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs new file mode 100644 index 00000000..0463d41a --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs @@ -0,0 +1,14 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +namespace RabbitMQ.Stream.Client.Reliable; +/// +/// Define PublishingId Strategy. +/// Can be automatic, so the RProducer will provide +/// the ID, or the use the way to +/// +public interface IPublishingIdStrategy +{ + ulong GetPublishingId(); +} diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs new file mode 100644 index 00000000..add23b5f --- /dev/null +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -0,0 +1,154 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System; +using System.Collections.Concurrent; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using System.Threading.Tasks.Dataflow; +using System.Timers; + +namespace RabbitMQ.Stream.Client.Reliable; + +internal class AutoPublishingId : IPublishingIdStrategy +{ + private ulong _lastPublishingId; + + public ulong GetPublishingId() + { + return ++_lastPublishingId; + } +} + +internal class BackOffReconnectStrategy : IReconnectStrategy +{ + private int Tentatives { get; set; } = 1; + + public void WhenDisconnected(out bool reconnect) + { + Tentatives <<= 1; + Thread.Sleep(TimeSpan.FromMilliseconds(Tentatives * 100)); + Console.WriteLine($"WhenDisconnected raised - Tent {Tentatives}"); + reconnect = true; + } + + public void WhenConnected() + { + Tentatives = 1; + } +} + +public record RProducerConfig +{ + public StreamSystem StreamSystem { get; set; } + public string Stream { get; set; } + public string Reference { get; set; } + public Func ConfirmationHandler { get; init; } + public string ClientProvidedName { get; set; } + public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy(); +} + +public class RProducer +{ + private Producer _producer; + private readonly AutoPublishingId _autoPublishingId; + private readonly RProducerConfig _rProducerConfig; + private readonly SemaphoreSlim _semProducer = new(1000); + private readonly ConfirmationPipe _confirmationPipe; + private bool _needReconnect = true; + + + private RProducer(RProducerConfig rProducerConfig) + { + _autoPublishingId = new AutoPublishingId(); + _rProducerConfig = rProducerConfig; + _confirmationPipe = new ConfirmationPipe(rProducerConfig.ConfirmationHandler); + _confirmationPipe.Start(); + } + + + public static async Task CreateRProducer(RProducerConfig rProducerConfig) + { + var rProducer = new RProducer(rProducerConfig); + await rProducer.Init(); + return rProducer; + } + + private async Task Init() + { + await _semProducer.WaitAsync(); + try + { + _producer = await _rProducerConfig.StreamSystem.CreateProducer(new ProducerConfig() + { + Stream = _rProducerConfig.Stream, + ConnectionClosedHandler = async _ => + { + await TryToReconnect(); + }, + ConfirmHandler = confirmation => + { + // _confirmationPipe.RemoveUnConfirmedMessage(confirmation.PublishingId, + // ConfirmationStatus.Confirmed); + } + }); + _rProducerConfig.ReconnectStrategy.WhenConnected(); + } + catch (Exception e) + { + Console.WriteLine($"Init Error. e: {e.Message}"); + _semProducer.Release(); + await TryToReconnect(); + } + + _semProducer.Release(); + } + + private async Task TryToReconnect() + { + _rProducerConfig.ReconnectStrategy.WhenDisconnected(out var reconnect); + if (reconnect && _needReconnect) + { + await Init(); + Console.WriteLine($"End Reconnect {DateTime.Now}"); + } + } + + public async Task Close() + { + await _semProducer.WaitAsync(); + try + { + _needReconnect = false; + _confirmationPipe.Stop(); + await _producer.Close(); + } + finally + { + _semProducer.Release(); + } + } + + public async ValueTask Send(Message message) + { + var pid = _autoPublishingId.GetPublishingId(); + + await _confirmationPipe.AddUnConfirmedMessage(pid, message); + await _semProducer.WaitAsync(); + try + { + await _producer.Send(pid, message); + } + + catch (Exception e) + { + Console.WriteLine($"Send error {e.Message}"); + } + finally + { + _semProducer.Release(); + } + } +} diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 47e21c64..9dbaef3b 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -5,6 +5,7 @@ using System; using System.Collections.Generic; using System.Net; +using System.Threading; using System.Threading.Tasks; namespace RabbitMQ.Stream.Client @@ -20,7 +21,7 @@ public record StreamSystemConfig : INamedEntity /// public SslOption Ssl { get; set; } = new SslOption(); - public IList Endpoints { get; set; } = new List { new IPEndPoint(IPAddress.Loopback, 5552) }; + public IList Endpoints { get; set; } = new List {new IPEndPoint(IPAddress.Loopback, 5552)}; public AddressResolver AddressResolver { get; set; } = null; public string ClientProvidedName { get; set; } = "dotnet-stream-locator"; @@ -29,7 +30,7 @@ public record StreamSystemConfig : INamedEntity public class StreamSystem { private readonly ClientParameters clientParameters; - private readonly Client client; + private Client client; private StreamSystem(ClientParameters clientParameters, Client client) { @@ -55,7 +56,7 @@ public static async Task Create(StreamSystemConfig config) { try { - var client = await Client.Create(clientParams with { Endpoint = endPoint }); + var client = await Client.Create(clientParams with {Endpoint = endPoint}); if (!client.IsClosed) { return new StreamSystem(clientParams, client); @@ -80,16 +81,40 @@ public async Task Close() await client.Close("system close"); } + private readonly SemaphoreSlim _semLocatorConnection = new(1); + + private async Task MayBeReconnectLocator() + { + try + { + await _semLocatorConnection.WaitAsync(); + { + if (client.IsClosed) + { + client = await Client.Create(clientParameters with + { + ClientProvidedName = clientParameters.ClientProvidedName + }); + } + } + } + finally + { + _semLocatorConnection.Release(); + } + } + public async Task CreateProducer(ProducerConfig producerConfig) { - var meta = await client.QueryMetadata(new[] { producerConfig.Stream }); + await MayBeReconnectLocator(); + var meta = await client.QueryMetadata(new[] {producerConfig.Stream}); var metaStreamInfo = meta.StreamInfos[producerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Producer.Create(clientParameters with { ClientProvidedName = producerConfig.ClientProvidedName }, + return await Producer.Create(clientParameters with {ClientProvidedName = producerConfig.ClientProvidedName}, producerConfig, metaStreamInfo); } @@ -106,11 +131,12 @@ public async Task CreateStream(StreamSpec spec) public async Task StreamExists(string stream) { - var streams = new[] { stream }; + var streams = new[] {stream}; var response = await client.QueryMetadata(streams); - return response.StreamInfos is { Count: >= 1 } && + return response.StreamInfos is {Count: >= 1} && response.StreamInfos[stream].ResponseCode == ResponseCode.Ok; } + private static void MaybeThrowQueryException(string reference, string stream) { if (string.IsNullOrEmpty(reference) || string.IsNullOrEmpty(stream)) @@ -166,14 +192,14 @@ public async Task DeleteStream(string stream) public async Task CreateConsumer(ConsumerConfig consumerConfig) { - var meta = await client.QueryMetadata(new[] { consumerConfig.Stream }); + var meta = await client.QueryMetadata(new[] {consumerConfig.Stream}); var metaStreamInfo = meta.StreamInfos[consumerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Consumer.Create(clientParameters with { ClientProvidedName = consumerConfig.ClientProvidedName }, + return await Consumer.Create(clientParameters with {ClientProvidedName = consumerConfig.ClientProvidedName}, consumerConfig, metaStreamInfo); } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs new file mode 100644 index 00000000..2d6063b3 --- /dev/null +++ b/Tests/ReliableTests.cs @@ -0,0 +1,63 @@ +// This source code is dual-licensed under the Apache License, version +// 2.0, and the Mozilla Public License, version 2.0. +// Copyright (c) 2007-2020 VMware, Inc. + +using System; +using System.Text; +using System.Threading.Tasks; +using RabbitMQ.Stream.Client.Reliable; +using Xunit; +using Xunit.Abstractions; +using Xunit.Sdk; + +namespace Tests; + +public class ReliableTests +{ + private readonly ITestOutputHelper _testOutputHelper; + + public ReliableTests(ITestOutputHelper testOutputHelper) + { + _testOutputHelper = testOutputHelper; + } + + [Fact] + public void MessageWithoutConfirmationRaiseTimeout() + { + var confirmationTask = new TaskCompletionSource(); + var confirmationPipe = new ConfirmationPipe(confirmation => + { + confirmationTask.SetResult(confirmation); + return Task.CompletedTask; + } + ); + confirmationPipe.Start(); + var message = new RabbitMQ.Stream.Client.Message(Encoding.UTF8.GetBytes($"hello")); + confirmationPipe.AddUnConfirmedMessage(1, message); + new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result.ConfirmationStatus); + confirmationPipe.Stop(); + } + + + [Fact] + public void MessageConfirmationShouldHaveTheSameMessages() + { + var confirmationTask = new TaskCompletionSource(); + var confirmationPipe = new ConfirmationPipe(confirmation => + { + confirmationTask.SetResult(confirmation); + return Task.CompletedTask; + } + ); + confirmationPipe.Start(); + var message = new RabbitMQ.Stream.Client.Message(Encoding.UTF8.GetBytes($"hello")); + confirmationPipe.AddUnConfirmedMessage(1, message); + confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); + new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result.ConfirmationStatus); + confirmationPipe.Stop(); + } + + +} From ec5d1931639e6946514cdf79a620308261a7c3cc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 11 Apr 2022 15:57:20 +0200 Subject: [PATCH 02/21] First implementation Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConfirmationPipecs.cs | 38 ++++++++------- RabbitMQ.Stream.Client/Reliable/RProducer.cs | 29 ++++++++++-- Tests/ReliableTests.cs | 47 +++++++++++++------ 3 files changed, 79 insertions(+), 35 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index ce70e269..948f25f7 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -23,7 +23,7 @@ public enum ConfirmationStatus : ushort public class ConfirmationMessage { public ulong PublishingId { get; internal init; } - public Message Message { get; internal set; } + public List Messages { get; internal set; } public DateTime DateTime { get; init; } public ConfirmationStatus ConfirmationStatus { get; internal set; } } @@ -31,7 +31,7 @@ public class ConfirmationMessage public class ConfirmationPipe { private ActionBlock> _waitForConfirmationActionBlock; - private readonly Dictionary _waitForConfirmation = new(); + private readonly ConcurrentDictionary _waitForConfirmation = new(); private readonly Timer _invalidateTimer = new(); private Func ConfirmHandler { get; } @@ -45,34 +45,27 @@ public ConfirmationPipe(Func confirmHandler) public void Start() { _waitForConfirmationActionBlock = new ActionBlock>( - async request => + request => { - // if (_waitForConfirmation.Count > 5000) - // Console.WriteLine($"_waitForConfirmation Count: {_waitForConfirmation.Count}"); var (confirmationStatus, confirmation) = request; switch (confirmationStatus) { - case ConfirmationStatus.WaitForConfirmation: - _waitForConfirmation.TryAdd(confirmation.PublishingId, confirmation); - break; case ConfirmationStatus.Confirmed: case ConfirmationStatus.TimeoutError: - _waitForConfirmation.Remove(confirmation.PublishingId, out var message); + _waitForConfirmation.Remove(confirmation.PublishingId, out var message); if (message != null) { message.ConfirmationStatus = confirmationStatus; ConfirmHandler?.Invoke(message); } - break; + + break; } - }, new ExecutionDataflowBlockOptions - { - MaxDegreeOfParallelism = 1 - }); + }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1, BoundedCapacity = 20_000}); _invalidateTimer.Elapsed += OnTimedEvent; _invalidateTimer.Interval = 1000; - _invalidateTimer.Enabled = false; + _invalidateTimer.Enabled = true; } public void Stop() @@ -91,10 +84,19 @@ private async void OnTimedEvent(object? sender, ElapsedEventArgs e) } } - public Task AddUnConfirmedMessage(ulong publishingId, Message message) + public void AddUnConfirmedMessage(ulong publishingId, Message message) + { + _waitForConfirmation.TryAdd(publishingId, + new ConfirmationMessage() + { + Messages = new List() {message}, PublishingId = publishingId, DateTime = DateTime.Now + }); + } + + public void AddUnConfirmedMessage(ulong publishingId, List messages) { - return _waitForConfirmationActionBlock.SendAsync(Tuple.Create(ConfirmationStatus.WaitForConfirmation, - new ConfirmationMessage() {Message = message, PublishingId = publishingId, DateTime = DateTime.Now})); + _waitForConfirmation.TryAdd(publishingId, + new ConfirmationMessage() {Messages = messages, PublishingId = publishingId, DateTime = DateTime.Now}); } public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus) diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index add23b5f..6a9bfd1f 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -4,6 +4,7 @@ using System; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Linq; using System.Threading; using System.Threading.Tasks; @@ -90,8 +91,8 @@ private async Task Init() }, ConfirmHandler = confirmation => { - // _confirmationPipe.RemoveUnConfirmedMessage(confirmation.PublishingId, - // ConfirmationStatus.Confirmed); + _confirmationPipe.RemoveUnConfirmedMessage(confirmation.PublishingId, + ConfirmationStatus.Confirmed); } }); _rProducerConfig.ReconnectStrategy.WhenConnected(); @@ -135,7 +136,7 @@ public async ValueTask Send(Message message) { var pid = _autoPublishingId.GetPublishingId(); - await _confirmationPipe.AddUnConfirmedMessage(pid, message); + _confirmationPipe.AddUnConfirmedMessage(pid, message); await _semProducer.WaitAsync(); try { @@ -151,4 +152,26 @@ public async ValueTask Send(Message message) _semProducer.Release(); } } + + + public async ValueTask Send(List messages, CompressionType compressionType) + { + var pid = _autoPublishingId.GetPublishingId(); + + _confirmationPipe.AddUnConfirmedMessage(pid, messages); + await _semProducer.WaitAsync(); + try + { + await _producer.Send(pid, messages, compressionType); + } + + catch (Exception e) + { + Console.WriteLine($"Send error {e.Message}"); + } + finally + { + _semProducer.Release(); + } + } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 2d6063b3..8c79a02a 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -3,8 +3,10 @@ // Copyright (c) 2007-2020 VMware, Inc. using System; +using System.Collections.Generic; using System.Text; using System.Threading.Tasks; +using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; using Xunit; using Xunit.Abstractions; @@ -24,40 +26,57 @@ public ReliableTests(ITestOutputHelper testOutputHelper) [Fact] public void MessageWithoutConfirmationRaiseTimeout() { - var confirmationTask = new TaskCompletionSource(); + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); var confirmationPipe = new ConfirmationPipe(confirmation => { - confirmationTask.SetResult(confirmation); + l.Add(confirmation); + if (confirmation.PublishingId == 2) + { + confirmationTask.SetResult(l); + } + return Task.CompletedTask; } ); confirmationPipe.Start(); - var message = new RabbitMQ.Stream.Client.Message(Encoding.UTF8.GetBytes($"hello")); + var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); - Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result.ConfirmationStatus); + confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + // time out error is sent by the internal time that checks the status + // if the message doesn't receive the confirmation within X time, the timeout error is raised. + Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[0].ConfirmationStatus); + Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[1].ConfirmationStatus); confirmationPipe.Stop(); } - - + + [Fact] public void MessageConfirmationShouldHaveTheSameMessages() { - var confirmationTask = new TaskCompletionSource(); + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); var confirmationPipe = new ConfirmationPipe(confirmation => { - confirmationTask.SetResult(confirmation); + l.Add(confirmation); + if (confirmation.PublishingId == 2) + { + confirmationTask.SetResult(l); + } + return Task.CompletedTask; } ); confirmationPipe.Start(); - var message = new RabbitMQ.Stream.Client.Message(Encoding.UTF8.GetBytes($"hello")); + var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); + confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); - new Utils(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); - Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result.ConfirmationStatus); + confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].ConfirmationStatus); + Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].ConfirmationStatus); confirmationPipe.Stop(); } - - } From 1f8942b54bb6eec2897d4010b0089aebcbd9b795 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Mon, 11 Apr 2022 15:59:44 +0200 Subject: [PATCH 03/21] First implementation Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConfirmationPipecs.cs | 12 ++++++------ RabbitMQ.Stream.Client/Reliable/RProducer.cs | 9 +-------- RabbitMQ.Stream.Client/StreamSystem.cs | 16 ++++++++-------- Tests/ReliableTests.cs | 7 ++----- 4 files changed, 17 insertions(+), 27 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index 948f25f7..8fe061da 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -35,13 +35,11 @@ public class ConfirmationPipe private readonly Timer _invalidateTimer = new(); private Func ConfirmHandler { get; } - public ConfirmationPipe(Func confirmHandler) { ConfirmHandler = confirmHandler; } - public void Start() { _waitForConfirmationActionBlock = new ActionBlock>( @@ -61,7 +59,7 @@ public void Start() break; } - }, new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 1, BoundedCapacity = 20_000}); + }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 20_000 }); _invalidateTimer.Elapsed += OnTimedEvent; _invalidateTimer.Interval = 1000; @@ -89,20 +87,22 @@ public void AddUnConfirmedMessage(ulong publishingId, Message message) _waitForConfirmation.TryAdd(publishingId, new ConfirmationMessage() { - Messages = new List() {message}, PublishingId = publishingId, DateTime = DateTime.Now + Messages = new List() { message }, + PublishingId = publishingId, + DateTime = DateTime.Now }); } public void AddUnConfirmedMessage(ulong publishingId, List messages) { _waitForConfirmation.TryAdd(publishingId, - new ConfirmationMessage() {Messages = messages, PublishingId = publishingId, DateTime = DateTime.Now}); + new ConfirmationMessage() { Messages = messages, PublishingId = publishingId, DateTime = DateTime.Now }); } public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus) { return _waitForConfirmationActionBlock.SendAsync( Tuple.Create(confirmationStatus, - new ConfirmationMessage() {PublishingId = publishingId})); + new ConfirmationMessage() { PublishingId = publishingId })); } } diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index 6a9bfd1f..34de23be 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -3,13 +3,9 @@ // Copyright (c) 2007-2020 VMware, Inc. using System; -using System.Collections.Concurrent; using System.Collections.Generic; -using System.Linq; using System.Threading; using System.Threading.Tasks; -using System.Threading.Tasks.Dataflow; -using System.Timers; namespace RabbitMQ.Stream.Client.Reliable; @@ -60,7 +56,6 @@ public class RProducer private readonly ConfirmationPipe _confirmationPipe; private bool _needReconnect = true; - private RProducer(RProducerConfig rProducerConfig) { _autoPublishingId = new AutoPublishingId(); @@ -69,7 +64,6 @@ private RProducer(RProducerConfig rProducerConfig) _confirmationPipe.Start(); } - public static async Task CreateRProducer(RProducerConfig rProducerConfig) { var rProducer = new RProducer(rProducerConfig); @@ -153,8 +147,7 @@ public async ValueTask Send(Message message) } } - - public async ValueTask Send(List messages, CompressionType compressionType) + public async ValueTask Send(List messages, CompressionType compressionType) { var pid = _autoPublishingId.GetPublishingId(); diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 9dbaef3b..405276de 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -21,7 +21,7 @@ public record StreamSystemConfig : INamedEntity /// public SslOption Ssl { get; set; } = new SslOption(); - public IList Endpoints { get; set; } = new List {new IPEndPoint(IPAddress.Loopback, 5552)}; + public IList Endpoints { get; set; } = new List { new IPEndPoint(IPAddress.Loopback, 5552) }; public AddressResolver AddressResolver { get; set; } = null; public string ClientProvidedName { get; set; } = "dotnet-stream-locator"; @@ -56,7 +56,7 @@ public static async Task Create(StreamSystemConfig config) { try { - var client = await Client.Create(clientParams with {Endpoint = endPoint}); + var client = await Client.Create(clientParams with { Endpoint = endPoint }); if (!client.IsClosed) { return new StreamSystem(clientParams, client); @@ -107,14 +107,14 @@ private async Task MayBeReconnectLocator() public async Task CreateProducer(ProducerConfig producerConfig) { await MayBeReconnectLocator(); - var meta = await client.QueryMetadata(new[] {producerConfig.Stream}); + var meta = await client.QueryMetadata(new[] { producerConfig.Stream }); var metaStreamInfo = meta.StreamInfos[producerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Producer.Create(clientParameters with {ClientProvidedName = producerConfig.ClientProvidedName}, + return await Producer.Create(clientParameters with { ClientProvidedName = producerConfig.ClientProvidedName }, producerConfig, metaStreamInfo); } @@ -131,9 +131,9 @@ public async Task CreateStream(StreamSpec spec) public async Task StreamExists(string stream) { - var streams = new[] {stream}; + var streams = new[] { stream }; var response = await client.QueryMetadata(streams); - return response.StreamInfos is {Count: >= 1} && + return response.StreamInfos is { Count: >= 1 } && response.StreamInfos[stream].ResponseCode == ResponseCode.Ok; } @@ -192,14 +192,14 @@ public async Task DeleteStream(string stream) public async Task CreateConsumer(ConsumerConfig consumerConfig) { - var meta = await client.QueryMetadata(new[] {consumerConfig.Stream}); + var meta = await client.QueryMetadata(new[] { consumerConfig.Stream }); var metaStreamInfo = meta.StreamInfos[consumerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Consumer.Create(clientParameters with {ClientProvidedName = consumerConfig.ClientProvidedName}, + return await Consumer.Create(clientParameters with { ClientProvidedName = consumerConfig.ClientProvidedName }, consumerConfig, metaStreamInfo); } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 8c79a02a..23997e90 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -2,7 +2,6 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2020 VMware, Inc. -using System; using System.Collections.Generic; using System.Text; using System.Threading.Tasks; @@ -10,7 +9,6 @@ using RabbitMQ.Stream.Client.Reliable; using Xunit; using Xunit.Abstractions; -using Xunit.Sdk; namespace Tests; @@ -42,7 +40,7 @@ public void MessageWithoutConfirmationRaiseTimeout() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); // time out error is sent by the internal time that checks the status // if the message doesn't receive the confirmation within X time, the timeout error is raised. @@ -51,7 +49,6 @@ public void MessageWithoutConfirmationRaiseTimeout() confirmationPipe.Stop(); } - [Fact] public void MessageConfirmationShouldHaveTheSameMessages() { @@ -71,7 +68,7 @@ public void MessageConfirmationShouldHaveTheSameMessages() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); From 032a196205798259252a1fe5c6152560ca742e9a Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 13 Apr 2022 16:03:58 +0200 Subject: [PATCH 04/21] Add tests Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConfirmationPipecs.cs | 53 +++++--- RabbitMQ.Stream.Client/Reliable/RProducer.cs | 14 +- RabbitMQ.Stream.Client/StreamSystem.cs | 17 +-- Tests/ReliableTests.cs | 120 ++++++++++++++++-- Tests/SystemTests.cs | 2 +- Tests/Utils.cs | 20 ++- 6 files changed, 173 insertions(+), 53 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index 8fe061da..f2fd9212 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -12,37 +12,48 @@ using Timer = System.Timers.Timer; namespace RabbitMQ.Stream.Client.Reliable; - +/// +/// ConfirmationStatus can be: +/// public enum ConfirmationStatus : ushort { WaitForConfirmation = 0, Confirmed = 1, TimeoutError = 2, } - -public class ConfirmationMessage +/// +/// Confirmation is a wrapper around the message/s +/// This class is returned to the user to understand +/// the message status. +/// +public class Confirmation { public ulong PublishingId { get; internal init; } public List Messages { get; internal set; } - public DateTime DateTime { get; init; } - public ConfirmationStatus ConfirmationStatus { get; internal set; } + public DateTime InsertDateTime { get; init; } + public ConfirmationStatus Status { get; internal set; } } +/// +/// ConfirmationPipe maintains the status for the sent and received messages. +/// TPL Action block sends the confirmation to the user in async way +/// So the send/1 is not blocking. +/// public class ConfirmationPipe { - private ActionBlock> _waitForConfirmationActionBlock; - private readonly ConcurrentDictionary _waitForConfirmation = new(); + private ActionBlock> _waitForConfirmationActionBlock; + private readonly ConcurrentDictionary _waitForConfirmation = new(); private readonly Timer _invalidateTimer = new(); - private Func ConfirmHandler { get; } + private Func ConfirmHandler { get; } - public ConfirmationPipe(Func confirmHandler) + public ConfirmationPipe(Func confirmHandler) { ConfirmHandler = confirmHandler; } public void Start() { - _waitForConfirmationActionBlock = new ActionBlock>( + _waitForConfirmationActionBlock = new ActionBlock>( request => { var (confirmationStatus, confirmation) = request; @@ -50,19 +61,21 @@ public void Start() { case ConfirmationStatus.Confirmed: case ConfirmationStatus.TimeoutError: - _waitForConfirmation.Remove(confirmation.PublishingId, out var message); + _waitForConfirmation.TryRemove(confirmation.PublishingId, out var message); if (message != null) { - message.ConfirmationStatus = confirmationStatus; + message.Status = confirmationStatus; ConfirmHandler?.Invoke(message); } - break; } - }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, BoundedCapacity = 20_000 }); + }, new ExecutionDataflowBlockOptions { + MaxDegreeOfParallelism = 1, + // throttling + BoundedCapacity = 50_000 }); _invalidateTimer.Elapsed += OnTimedEvent; - _invalidateTimer.Interval = 1000; + _invalidateTimer.Interval = 2000; _invalidateTimer.Enabled = true; } @@ -75,7 +88,7 @@ public void Stop() private async void OnTimedEvent(object? sender, ElapsedEventArgs e) { { - foreach (var pair in _waitForConfirmation.Where(pair => (DateTime.Now - pair.Value.DateTime).Seconds > 1)) + foreach (var pair in _waitForConfirmation.Where(pair => (DateTime.Now - pair.Value.InsertDateTime).Seconds > 2)) { await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError); } @@ -85,24 +98,24 @@ private async void OnTimedEvent(object? sender, ElapsedEventArgs e) public void AddUnConfirmedMessage(ulong publishingId, Message message) { _waitForConfirmation.TryAdd(publishingId, - new ConfirmationMessage() + new Confirmation() { Messages = new List() { message }, PublishingId = publishingId, - DateTime = DateTime.Now + InsertDateTime = DateTime.Now }); } public void AddUnConfirmedMessage(ulong publishingId, List messages) { _waitForConfirmation.TryAdd(publishingId, - new ConfirmationMessage() { Messages = messages, PublishingId = publishingId, DateTime = DateTime.Now }); + new Confirmation() { Messages = messages, PublishingId = publishingId, InsertDateTime = DateTime.Now }); } public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus) { return _waitForConfirmationActionBlock.SendAsync( Tuple.Create(confirmationStatus, - new ConfirmationMessage() { PublishingId = publishingId })); + new Confirmation() { PublishingId = publishingId })); } } diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index 34de23be..052f710d 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -25,9 +25,9 @@ internal class BackOffReconnectStrategy : IReconnectStrategy public void WhenDisconnected(out bool reconnect) { + Console.WriteLine($"WhenDisconnected raised - Going to reconnect, tentative: {Tentatives}"); Tentatives <<= 1; Thread.Sleep(TimeSpan.FromMilliseconds(Tentatives * 100)); - Console.WriteLine($"WhenDisconnected raised - Tent {Tentatives}"); reconnect = true; } @@ -42,7 +42,7 @@ public record RProducerConfig public StreamSystem StreamSystem { get; set; } public string Stream { get; set; } public string Reference { get; set; } - public Func ConfirmationHandler { get; init; } + public Func ConfirmationHandler { get; init; } public string ClientProvidedName { get; set; } public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy(); } @@ -52,7 +52,7 @@ public class RProducer private Producer _producer; private readonly AutoPublishingId _autoPublishingId; private readonly RProducerConfig _rProducerConfig; - private readonly SemaphoreSlim _semProducer = new(1000); + private readonly SemaphoreSlim _semProducer = new(1); private readonly ConfirmationPipe _confirmationPipe; private bool _needReconnect = true; @@ -79,6 +79,7 @@ private async Task Init() _producer = await _rProducerConfig.StreamSystem.CreateProducer(new ProducerConfig() { Stream = _rProducerConfig.Stream, + ClientProvidedName = _rProducerConfig.ClientProvidedName, ConnectionClosedHandler = async _ => { await TryToReconnect(); @@ -93,7 +94,7 @@ private async Task Init() } catch (Exception e) { - Console.WriteLine($"Init Error. e: {e.Message}"); + LogEventSource.Log.LogError($"Error during initialization: {e}."); _semProducer.Release(); await TryToReconnect(); } @@ -107,7 +108,6 @@ private async Task TryToReconnect() if (reconnect && _needReconnect) { await Init(); - Console.WriteLine($"End Reconnect {DateTime.Now}"); } } @@ -139,7 +139,7 @@ public async ValueTask Send(Message message) catch (Exception e) { - Console.WriteLine($"Send error {e.Message}"); + LogEventSource.Log.LogError($"Error sending message: {e}."); } finally { @@ -160,7 +160,7 @@ public async ValueTask Send(List messages, CompressionType compressionT catch (Exception e) { - Console.WriteLine($"Send error {e.Message}"); + LogEventSource.Log.LogError($"Error sending messages: {e}."); } finally { diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 405276de..045342af 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -21,7 +21,7 @@ public record StreamSystemConfig : INamedEntity /// public SslOption Ssl { get; set; } = new SslOption(); - public IList Endpoints { get; set; } = new List { new IPEndPoint(IPAddress.Loopback, 5552) }; + public IList Endpoints { get; set; } = new List {new IPEndPoint(IPAddress.Loopback, 5552)}; public AddressResolver AddressResolver { get; set; } = null; public string ClientProvidedName { get; set; } = "dotnet-stream-locator"; @@ -56,7 +56,7 @@ public static async Task Create(StreamSystemConfig config) { try { - var client = await Client.Create(clientParams with { Endpoint = endPoint }); + var client = await Client.Create(clientParams with {Endpoint = endPoint}); if (!client.IsClosed) { return new StreamSystem(clientParams, client); @@ -107,14 +107,14 @@ private async Task MayBeReconnectLocator() public async Task CreateProducer(ProducerConfig producerConfig) { await MayBeReconnectLocator(); - var meta = await client.QueryMetadata(new[] { producerConfig.Stream }); + var meta = await client.QueryMetadata(new[] {producerConfig.Stream}); var metaStreamInfo = meta.StreamInfos[producerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Producer.Create(clientParameters with { ClientProvidedName = producerConfig.ClientProvidedName }, + return await Producer.Create(clientParameters with {ClientProvidedName = producerConfig.ClientProvidedName}, producerConfig, metaStreamInfo); } @@ -131,9 +131,9 @@ public async Task CreateStream(StreamSpec spec) public async Task StreamExists(string stream) { - var streams = new[] { stream }; + var streams = new[] {stream}; var response = await client.QueryMetadata(streams); - return response.StreamInfos is { Count: >= 1 } && + return response.StreamInfos is {Count: >= 1} && response.StreamInfos[stream].ResponseCode == ResponseCode.Ok; } @@ -181,6 +181,7 @@ public async Task QuerySequence(string reference, string stream) public async Task DeleteStream(string stream) { + await MayBeReconnectLocator(); var response = await client.DeleteStream(stream); if (response.ResponseCode == ResponseCode.Ok) { @@ -192,14 +193,14 @@ public async Task DeleteStream(string stream) public async Task CreateConsumer(ConsumerConfig consumerConfig) { - var meta = await client.QueryMetadata(new[] { consumerConfig.Stream }); + var meta = await client.QueryMetadata(new[] {consumerConfig.Stream}); var metaStreamInfo = meta.StreamInfos[consumerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Consumer.Create(clientParameters with { ClientProvidedName = consumerConfig.ClientProvidedName }, + return await Consumer.Create(clientParameters with {ClientProvidedName = consumerConfig.ClientProvidedName}, consumerConfig, metaStreamInfo); } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 23997e90..7f1c5f0f 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -2,13 +2,17 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2020 VMware, Inc. +using System; using System.Collections.Generic; +using System.Runtime.InteropServices; using System.Text; +using System.Threading; using System.Threading.Tasks; using RabbitMQ.Stream.Client; using RabbitMQ.Stream.Client.Reliable; using Xunit; using Xunit.Abstractions; +using Confirmation = RabbitMQ.Stream.Client.Reliable.Confirmation; namespace Tests; @@ -24,8 +28,8 @@ public ReliableTests(ITestOutputHelper testOutputHelper) [Fact] public void MessageWithoutConfirmationRaiseTimeout() { - var confirmationTask = new TaskCompletionSource>(); - var l = new List(); + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); var confirmationPipe = new ConfirmationPipe(confirmation => { l.Add(confirmation); @@ -40,20 +44,20 @@ public void MessageWithoutConfirmationRaiseTimeout() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); - new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); // time out error is sent by the internal time that checks the status // if the message doesn't receive the confirmation within X time, the timeout error is raised. - Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[0].ConfirmationStatus); - Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[1].ConfirmationStatus); + Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[0].Status); + Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[1].Status); confirmationPipe.Stop(); } [Fact] public void MessageConfirmationShouldHaveTheSameMessages() { - var confirmationTask = new TaskCompletionSource>(); - var l = new List(); + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); var confirmationPipe = new ConfirmationPipe(confirmation => { l.Add(confirmation); @@ -68,12 +72,104 @@ public void MessageConfirmationShouldHaveTheSameMessages() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); + confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); - new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); - Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].ConfirmationStatus); - Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].ConfirmationStatus); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].Status); + Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].Status); confirmationPipe.Stop(); } + + + [Fact] + public async void ConfirmRProducerMessages() + { + var testPassed = new TaskCompletionSource(); + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var count = 0; + var rProducer = await RProducer.CreateRProducer( + new RProducerConfig() + { + Stream = stream, + StreamSystem = system, + ConfirmationHandler = confirmation => + { + if (Interlocked.Increment(ref count) == 10) + { + testPassed.SetResult(true); + } + + return Task.CompletedTask; + } + } + ); + for (var i = 0; i < 5; i++) + { + await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); + } + + List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; + + for (var i = 0; i < 5; i++) + { + await rProducer.Send(messages, CompressionType.None); + } + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + await rProducer.Close(); + await system.Close(); + } + + [Fact] + public async void SendMessageAfterKillConnectionShouldContinueToWork() + { + // Test the auto-reconnect client + // When the client connection is closed by the management UI + // see HttpKillConnections/1. + // The RProducer has to detect the disconnection and reconnect the client + // + var testPassed = new TaskCompletionSource(); + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var count = 0; + var rProducer = await RProducer.CreateRProducer( + new RProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = "producer_to_kill", + ConfirmationHandler = confirmation => + { + if (Interlocked.Increment(ref count) == 5) + { + testPassed.SetResult(true); + } + + return Task.CompletedTask; + } + } + ); + for (var i = 0; i < 5; i++) + { + await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); + } + + SystemUtils.Wait(TimeSpan.FromSeconds(6)); + Assert.Equal(1, SystemUtils.HttpKillConnections("producer_to_kill").Result); + Assert.Equal(1, SystemUtils.HttpKillConnections("dotnet-stream-locator").Result); + + + for (var i = 0; i < 5; i++) + { + List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; + await rProducer.Send(messages, CompressionType.None); + } + + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + // here the locator connection is closed. + // the auto-reconnect has to connect the locator again + await system.DeleteStream(stream); + await rProducer.Close(); + await system.Close(); + } } diff --git a/Tests/SystemTests.cs b/Tests/SystemTests.cs index f998205a..3140d84a 100644 --- a/Tests/SystemTests.cs +++ b/Tests/SystemTests.cs @@ -230,7 +230,7 @@ public async void CloseProducerConsumerAfterForceCloseShouldNotRaiseError() // we kill _only_ producer and consumer connection // leave the locator up and running to delete the stream - Assert.Equal(2, SystemUtils.HttpKillConnections().Result); + Assert.Equal(2, SystemUtils.HttpKillConnections("to_kill").Result); Assert.Equal(ResponseCode.Ok, await producer.Close()); Assert.Equal(ResponseCode.Ok, await producer.Close()); // close two time it should not raise an exception diff --git a/Tests/Utils.cs b/Tests/Utils.cs index f63086fa..a13057e8 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -59,6 +59,16 @@ public static void Wait(TimeSpan wait) Thread.Sleep(wait); } + + public static void InitStreamSystemWithRandomStream(out StreamSystem system, out string stream) + { + stream = Guid.NewGuid().ToString(); + var config = new StreamSystemConfig(); + system = StreamSystem.Create(config).Result; + var x = system.CreateStream(new StreamSpec(stream)); + x.Wait(); + } + public static async Task PublishMessages(StreamSystem system, string stream, int numberOfMessages, ITestOutputHelper testOutputHelper) { @@ -103,19 +113,19 @@ public static async Task PublishMessages(StreamSystem system, string stream, int producer.Dispose(); } - private class Connecction + private class Connection { public string name { get; set; } public Dictionary client_properties { get; set; } } - public static async Task HttpKillConnections() + public static async Task HttpKillConnections(string connectionName) { - using var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; + using var handler = new HttpClientHandler {Credentials = new NetworkCredential("guest", "guest"),}; using var client = new HttpClient(handler); var result = await client.GetAsync("http://localhost:15672/api/connections"); var json = await result.Content.ReadAsStringAsync(); - var connections = JsonSerializer.Deserialize>(json); + var connections = JsonSerializer.Deserialize>(json); if (connections == null) { return 0; @@ -123,7 +133,7 @@ public static async Task HttpKillConnections() // we kill _only_ producer and consumer connections // leave the locator up and running to delete the stream - var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains("to_kill")); + var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains(connectionName)); foreach (var conn in iEnumerable) { await client.DeleteAsync($"http://localhost:15672/api/connections/{conn.name}"); From 121b66b2c0f66391d2e633cb69f39261d382a43b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 13 Apr 2022 16:07:01 +0200 Subject: [PATCH 05/21] Formatting Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConfirmationPipecs.cs | 7 +++++-- RabbitMQ.Stream.Client/StreamSystem.cs | 16 ++++++++-------- Tests/ReliableTests.cs | 11 ++++------- Tests/Utils.cs | 3 +-- 4 files changed, 18 insertions(+), 19 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index f2fd9212..84084830 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -67,12 +67,15 @@ public void Start() message.Status = confirmationStatus; ConfirmHandler?.Invoke(message); } + break; } - }, new ExecutionDataflowBlockOptions { + }, new ExecutionDataflowBlockOptions + { MaxDegreeOfParallelism = 1, // throttling - BoundedCapacity = 50_000 }); + BoundedCapacity = 50_000 + }); _invalidateTimer.Elapsed += OnTimedEvent; _invalidateTimer.Interval = 2000; diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index 045342af..fdf2b65d 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -21,7 +21,7 @@ public record StreamSystemConfig : INamedEntity /// public SslOption Ssl { get; set; } = new SslOption(); - public IList Endpoints { get; set; } = new List {new IPEndPoint(IPAddress.Loopback, 5552)}; + public IList Endpoints { get; set; } = new List { new IPEndPoint(IPAddress.Loopback, 5552) }; public AddressResolver AddressResolver { get; set; } = null; public string ClientProvidedName { get; set; } = "dotnet-stream-locator"; @@ -56,7 +56,7 @@ public static async Task Create(StreamSystemConfig config) { try { - var client = await Client.Create(clientParams with {Endpoint = endPoint}); + var client = await Client.Create(clientParams with { Endpoint = endPoint }); if (!client.IsClosed) { return new StreamSystem(clientParams, client); @@ -107,14 +107,14 @@ private async Task MayBeReconnectLocator() public async Task CreateProducer(ProducerConfig producerConfig) { await MayBeReconnectLocator(); - var meta = await client.QueryMetadata(new[] {producerConfig.Stream}); + var meta = await client.QueryMetadata(new[] { producerConfig.Stream }); var metaStreamInfo = meta.StreamInfos[producerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateProducerException($"producer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Producer.Create(clientParameters with {ClientProvidedName = producerConfig.ClientProvidedName}, + return await Producer.Create(clientParameters with { ClientProvidedName = producerConfig.ClientProvidedName }, producerConfig, metaStreamInfo); } @@ -131,9 +131,9 @@ public async Task CreateStream(StreamSpec spec) public async Task StreamExists(string stream) { - var streams = new[] {stream}; + var streams = new[] { stream }; var response = await client.QueryMetadata(streams); - return response.StreamInfos is {Count: >= 1} && + return response.StreamInfos is { Count: >= 1 } && response.StreamInfos[stream].ResponseCode == ResponseCode.Ok; } @@ -193,14 +193,14 @@ public async Task DeleteStream(string stream) public async Task CreateConsumer(ConsumerConfig consumerConfig) { - var meta = await client.QueryMetadata(new[] {consumerConfig.Stream}); + var meta = await client.QueryMetadata(new[] { consumerConfig.Stream }); var metaStreamInfo = meta.StreamInfos[consumerConfig.Stream]; if (metaStreamInfo.ResponseCode != ResponseCode.Ok) { throw new CreateConsumerException($"consumer could not be created code: {metaStreamInfo.ResponseCode}"); } - return await Consumer.Create(clientParameters with {ClientProvidedName = consumerConfig.ClientProvidedName}, + return await Consumer.Create(clientParameters with { ClientProvidedName = consumerConfig.ClientProvidedName }, consumerConfig, metaStreamInfo); } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 7f1c5f0f..a5d72b2e 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -4,7 +4,6 @@ using System; using System.Collections.Generic; -using System.Runtime.InteropServices; using System.Text; using System.Threading; using System.Threading.Tasks; @@ -44,7 +43,7 @@ public void MessageWithoutConfirmationRaiseTimeout() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); // time out error is sent by the internal time that checks the status // if the message doesn't receive the confirmation within X time, the timeout error is raised. @@ -72,7 +71,7 @@ public void MessageConfirmationShouldHaveTheSameMessages() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); @@ -81,7 +80,6 @@ public void MessageConfirmationShouldHaveTheSameMessages() confirmationPipe.Stop(); } - [Fact] public async void ConfirmRProducerMessages() { @@ -109,7 +107,7 @@ public async void ConfirmRProducerMessages() await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); } - List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; + List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; for (var i = 0; i < 5; i++) { @@ -158,10 +156,9 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() Assert.Equal(1, SystemUtils.HttpKillConnections("producer_to_kill").Result); Assert.Equal(1, SystemUtils.HttpKillConnections("dotnet-stream-locator").Result); - for (var i = 0; i < 5; i++) { - List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; + List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; await rProducer.Send(messages, CompressionType.None); } diff --git a/Tests/Utils.cs b/Tests/Utils.cs index a13057e8..02dcf15d 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -59,7 +59,6 @@ public static void Wait(TimeSpan wait) Thread.Sleep(wait); } - public static void InitStreamSystemWithRandomStream(out StreamSystem system, out string stream) { stream = Guid.NewGuid().ToString(); @@ -121,7 +120,7 @@ private class Connection public static async Task HttpKillConnections(string connectionName) { - using var handler = new HttpClientHandler {Credentials = new NetworkCredential("guest", "guest"),}; + using var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; using var client = new HttpClient(handler); var result = await client.GetAsync("http://localhost:15672/api/connections"); var json = await result.Content.ReadAsStringAsync(); From c1f6593c88c21f18e1387d4ead90b23bd02be07e Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 13 Apr 2022 16:18:49 +0200 Subject: [PATCH 06/21] Approve Signed-off-by: Gabriele Santomaggio --- Tests/ApiApproval.Approve.verified.txt | 62 ++++++++++++++++++++++++++ 1 file changed, 62 insertions(+) diff --git a/Tests/ApiApproval.Approve.verified.txt b/Tests/ApiApproval.Approve.verified.txt index cfef7836..2630059e 100644 --- a/Tests/ApiApproval.Approve.verified.txt +++ b/Tests/ApiApproval.Approve.verified.txt @@ -969,4 +969,66 @@ namespace RabbitMQ.Stream.Client { public VirtualHostAccessFailureException(string s) { } } +} +namespace RabbitMQ.Stream.Client.Reliable +{ + public class Confirmation + { + public Confirmation() { } + public System.DateTime InsertDateTime { get; set; } + public System.Collections.Generic.List Messages { get; } + public ulong PublishingId { get; } + public RabbitMQ.Stream.Client.Reliable.ConfirmationStatus Status { get; } + } + public class ConfirmationPipe + { + public ConfirmationPipe(System.Func confirmHandler) { } + public void AddUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Message message) { } + public void AddUnConfirmedMessage(ulong publishingId, System.Collections.Generic.List messages) { } + public System.Threading.Tasks.Task RemoveUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Reliable.ConfirmationStatus confirmationStatus) { } + public void Start() { } + public void Stop() { } + } + public enum ConfirmationStatus : ushort + { + WaitForConfirmation = 0, + Confirmed = 1, + TimeoutError = 2, + } + public interface IPublishingIdStrategy + { + ulong GetPublishingId(); + } + public interface IReconnectStrategy + { + void WhenConnected(); + void WhenDisconnected(out bool reconnect); + } + public class RProducer + { + public System.Threading.Tasks.Task Close() { } + public System.Threading.Tasks.ValueTask Send(RabbitMQ.Stream.Client.Message message) { } + public System.Threading.Tasks.ValueTask Send(System.Collections.Generic.List messages, RabbitMQ.Stream.Client.CompressionType compressionType) { } + public static System.Threading.Tasks.Task CreateRProducer(RabbitMQ.Stream.Client.Reliable.RProducerConfig rProducerConfig) { } + } + public class RProducerConfig : System.IEquatable + { + public RProducerConfig() { } + protected RProducerConfig(RabbitMQ.Stream.Client.Reliable.RProducerConfig original) { } + public string ClientProvidedName { get; set; } + public System.Func ConfirmationHandler { get; set; } + protected virtual System.Type EqualityContract { get; } + public RabbitMQ.Stream.Client.Reliable.IReconnectStrategy ReconnectStrategy { get; set; } + public string Reference { get; set; } + public string Stream { get; set; } + public RabbitMQ.Stream.Client.StreamSystem StreamSystem { get; set; } + public virtual RabbitMQ.Stream.Client.Reliable.RProducerConfig $() { } + public virtual bool Equals(RabbitMQ.Stream.Client.Reliable.RProducerConfig? other) { } + public override bool Equals(object? obj) { } + public override int GetHashCode() { } + protected virtual bool PrintMembers(System.Text.StringBuilder builder) { } + public override string ToString() { } + public static bool operator !=(RabbitMQ.Stream.Client.Reliable.RProducerConfig? left, RabbitMQ.Stream.Client.Reliable.RProducerConfig? right) { } + public static bool operator ==(RabbitMQ.Stream.Client.Reliable.RProducerConfig? left, RabbitMQ.Stream.Client.Reliable.RProducerConfig? right) { } + } } \ No newline at end of file From 9503e107585dafd612def25d7b5b0133b680ec64 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 14 Apr 2022 12:46:16 +0200 Subject: [PATCH 07/21] Logs Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Reliable/RProducer.cs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index 052f710d..76dea107 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -25,8 +25,9 @@ internal class BackOffReconnectStrategy : IReconnectStrategy public void WhenDisconnected(out bool reconnect) { - Console.WriteLine($"WhenDisconnected raised - Going to reconnect, tentative: {Tentatives}"); Tentatives <<= 1; + LogEventSource.Log.LogInformation( + $"Producer disconnected, reconnection in {Tentatives * 100} ms."); Thread.Sleep(TimeSpan.FromMilliseconds(Tentatives * 100)); reconnect = true; } @@ -43,7 +44,7 @@ public record RProducerConfig public string Stream { get; set; } public string Reference { get; set; } public Func ConfirmationHandler { get; init; } - public string ClientProvidedName { get; set; } + public string ClientProvidedName { get; set; } = "dotnet-stream-rproducer"; public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy(); } @@ -80,6 +81,7 @@ private async Task Init() { Stream = _rProducerConfig.Stream, ClientProvidedName = _rProducerConfig.ClientProvidedName, + Reference = _rProducerConfig.Reference, ConnectionClosedHandler = async _ => { await TryToReconnect(); @@ -129,7 +131,6 @@ public async Task Close() public async ValueTask Send(Message message) { var pid = _autoPublishingId.GetPublishingId(); - _confirmationPipe.AddUnConfirmedMessage(pid, message); await _semProducer.WaitAsync(); try From 9e017f348a6cdc16668330219a2efa2533877e5b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 19 Apr 2022 17:25:04 +0200 Subject: [PATCH 08/21] Handle metadata update Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Client.cs | 2 +- RabbitMQ.Stream.Client/Producer.cs | 34 ++++++-- .../Reliable/ConfirmationPipecs.cs | 39 ++++++--- RabbitMQ.Stream.Client/Reliable/RProducer.cs | 79 ++++++++++++++++++- Tests/ReliableTests.cs | 42 +++++++--- Tests/Utils.cs | 17 ++++ 6 files changed, 179 insertions(+), 34 deletions(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index 41a526f1..e8ceae37 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -254,10 +254,10 @@ public ValueTask Publish(T msg) where T : struct, ICommand public async Task DeletePublisher(byte publisherId) { + publishers.Remove(publisherId); var result = await Request(corr => new DeletePublisherRequest(corr, publisherId)); - publishers.Remove(publisherId); return result; } diff --git a/RabbitMQ.Stream.Client/Producer.cs b/RabbitMQ.Stream.Client/Producer.cs index 31e45102..74926e20 100644 --- a/RabbitMQ.Stream.Client/Producer.cs +++ b/RabbitMQ.Stream.Client/Producer.cs @@ -32,7 +32,7 @@ public record ProducerConfig : INamedEntity public class Producer : AbstractEntity, IDisposable { - private readonly bool _disposed; + private bool _disposed; private byte publisherId; private readonly ProducerConfig config; private readonly Channel messageBuffer; @@ -131,7 +131,7 @@ public async ValueTask Send(ulong publishingId, List subEntryMessages, private async Task SemaphoreWait() { - if (!semaphore.Wait(0)) + if (!semaphore.Wait(0) && !client.IsClosed) { // Nope, we have maxed our In-Flight messages, let's asynchronously wait for confirms if (!await semaphore.WaitAsync(1000).ConfigureAwait(false)) @@ -159,7 +159,7 @@ private async Task ProcessBuffer() { // TODO: make the batch size configurable. var messages = new List<(ulong, Message)>(100); - while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false)) + while (await messageBuffer.Reader.WaitToReadAsync().ConfigureAwait(false) && !client.IsClosed) { while (messageBuffer.Reader.TryRead(out var msg)) { @@ -188,18 +188,35 @@ async Task SendMessages(List<(ulong, Message)> messages) } } - public async Task Close() + public Task Close() { if (client.IsClosed) { - return ResponseCode.Ok; + return Task.FromResult(ResponseCode.Ok); + } + + var result = ResponseCode.Ok; + try + { + var deletePublisherResponseTask = client.DeletePublisher(publisherId); + // The default timeout is usually 10 seconds + // in this case we reduce the waiting time + // the producer could be removed because of stream deleted + // so it is not necessary to wait. + deletePublisherResponseTask.Wait(TimeSpan.FromSeconds(3)); + if (deletePublisherResponseTask.IsCompletedSuccessfully) + { + result = deletePublisherResponseTask.Result.ResponseCode; + } + } + catch (Exception e) + { + LogEventSource.Log.LogError($"Error removing the producer id: {publisherId} from the server. {e}"); } - var deletePublisherResponse = await client.DeletePublisher(publisherId); - var result = deletePublisherResponse.ResponseCode; var closed = client.MaybeClose($"client-close-publisher: {publisherId}"); ClientExceptions.MaybeThrowException(closed.ResponseCode, $"client-close-publisher: {publisherId}"); - return result; + return Task.FromResult(result); } public static async Task Create(ClientParameters clientParameters, @@ -228,6 +245,7 @@ private void Dispose(bool disposing) closeProducer.Wait(1000); ClientExceptions.MaybeThrowException(closeProducer.Result, $"Error during remove producer. Producer: {publisherId}"); + _disposed = true; } public void Dispose() diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index 84084830..9427ac70 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -12,6 +12,7 @@ using Timer = System.Timers.Timer; namespace RabbitMQ.Stream.Client.Reliable; + /// /// ConfirmationStatus can be: /// @@ -20,16 +21,22 @@ public enum ConfirmationStatus : ushort WaitForConfirmation = 0, Confirmed = 1, TimeoutError = 2, + StreamNotAvailable = 6, + InternalError = 15, + AccessRefused = 16, + PreconditionFailed = 17, + PublisherDoesNotExist = 18, } + /// -/// Confirmation is a wrapper around the message/s +/// MessagesConfirmation is a wrapper around the message/s /// This class is returned to the user to understand /// the message status. /// -public class Confirmation +public class MessagesConfirmation { - public ulong PublishingId { get; internal init; } - public List Messages { get; internal set; } + public ulong PublishingId { get; internal set; } + public List Messages { get; internal init; } public DateTime InsertDateTime { get; init; } public ConfirmationStatus Status { get; internal set; } } @@ -41,19 +48,19 @@ public class Confirmation /// public class ConfirmationPipe { - private ActionBlock> _waitForConfirmationActionBlock; - private readonly ConcurrentDictionary _waitForConfirmation = new(); + private ActionBlock> _waitForConfirmationActionBlock; + private readonly ConcurrentDictionary _waitForConfirmation = new(); private readonly Timer _invalidateTimer = new(); - private Func ConfirmHandler { get; } + private Func ConfirmHandler { get; } - public ConfirmationPipe(Func confirmHandler) + public ConfirmationPipe(Func confirmHandler) { ConfirmHandler = confirmHandler; } public void Start() { - _waitForConfirmationActionBlock = new ActionBlock>( + _waitForConfirmationActionBlock = new ActionBlock>( request => { var (confirmationStatus, confirmation) = request; @@ -91,7 +98,8 @@ public void Stop() private async void OnTimedEvent(object? sender, ElapsedEventArgs e) { { - foreach (var pair in _waitForConfirmation.Where(pair => (DateTime.Now - pair.Value.InsertDateTime).Seconds > 2)) + foreach (var pair in _waitForConfirmation.Where(pair => + (DateTime.Now - pair.Value.InsertDateTime).Seconds > 2)) { await RemoveUnConfirmedMessage(pair.Value.PublishingId, ConfirmationStatus.TimeoutError); } @@ -101,7 +109,7 @@ private async void OnTimedEvent(object? sender, ElapsedEventArgs e) public void AddUnConfirmedMessage(ulong publishingId, Message message) { _waitForConfirmation.TryAdd(publishingId, - new Confirmation() + new MessagesConfirmation() { Messages = new List() { message }, PublishingId = publishingId, @@ -112,13 +120,18 @@ public void AddUnConfirmedMessage(ulong publishingId, Message message) public void AddUnConfirmedMessage(ulong publishingId, List messages) { _waitForConfirmation.TryAdd(publishingId, - new Confirmation() { Messages = messages, PublishingId = publishingId, InsertDateTime = DateTime.Now }); + new MessagesConfirmation() + { + Messages = messages, + PublishingId = publishingId, + InsertDateTime = DateTime.Now + }); } public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus) { return _waitForConfirmationActionBlock.SendAsync( Tuple.Create(confirmationStatus, - new Confirmation() { PublishingId = publishingId })); + new MessagesConfirmation() { PublishingId = publishingId })); } } diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index 76dea107..8537b5fa 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -27,7 +27,7 @@ public void WhenDisconnected(out bool reconnect) { Tentatives <<= 1; LogEventSource.Log.LogInformation( - $"Producer disconnected, reconnection in {Tentatives * 100} ms."); + $"Producer disconnected, check if reconnection needed in {Tentatives * 100} ms."); Thread.Sleep(TimeSpan.FromMilliseconds(Tentatives * 100)); reconnect = true; } @@ -43,7 +43,7 @@ public record RProducerConfig public StreamSystem StreamSystem { get; set; } public string Stream { get; set; } public string Reference { get; set; } - public Func ConfirmationHandler { get; init; } + public Func ConfirmationHandler { get; init; } public string ClientProvidedName { get; set; } = "dotnet-stream-rproducer"; public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy(); } @@ -82,18 +82,37 @@ private async Task Init() Stream = _rProducerConfig.Stream, ClientProvidedName = _rProducerConfig.ClientProvidedName, Reference = _rProducerConfig.Reference, + MetadataHandler = update => + { + HandleMetaDataMaybeReconnect(update.Stream).Wait(); + }, ConnectionClosedHandler = async _ => { await TryToReconnect(); }, ConfirmHandler = confirmation => { + var confirmationStatus = confirmation.Code switch + { + ResponseCode.PublisherDoesNotExist => ConfirmationStatus.PublisherDoesNotExist, + ResponseCode.AccessRefused => ConfirmationStatus.AccessRefused, + ResponseCode.InternalError => ConfirmationStatus.InternalError, + ResponseCode.PreconditionFailed => ConfirmationStatus.PreconditionFailed, + ResponseCode.StreamNotAvailable => ConfirmationStatus.StreamNotAvailable, + _ => ConfirmationStatus.Confirmed + }; + _confirmationPipe.RemoveUnConfirmedMessage(confirmation.PublishingId, - ConfirmationStatus.Confirmed); + confirmationStatus); } }); _rProducerConfig.ReconnectStrategy.WhenConnected(); } + + catch (CreateProducerException ce) + { + LogEventSource.Log.LogError($"{ce}. RProducer closed"); + } catch (Exception e) { LogEventSource.Log.LogError($"Error during initialization: {e}."); @@ -113,6 +132,60 @@ private async Task TryToReconnect() } } + /// + /// When the clients receives a meta data update, it doesn't know + /// the reason. + /// Metadata update can be raised when: + /// - stream is deleted + /// - change the stream topology (ex: add a follower) + /// + /// HandleMetaDataMaybeReconnect checks if the stream still exists + /// and try to reconnect. + /// (internal because it is needed for tests) + /// + internal async Task HandleMetaDataMaybeReconnect(string stream) + { + LogEventSource.Log.LogInformation( + $"Meta data update for the stream: {stream} " + + $"Producer {_rProducerConfig.Reference} closed."); + + // This sleep is needed. When a stream is deleted it takes sometime. + // The StreamExists/1 could return true even the stream doesn't exist anymore. + Thread.Sleep(500); + if (await _rProducerConfig.StreamSystem.StreamExists(stream)) + { + LogEventSource.Log.LogInformation( + $"Meta data update, the stream {stream} still exist. " + + $"Producer {_rProducerConfig.Reference} will try to reconnect."); + // Here we just close the producer connection + // the func TryToReconnect/0 will be called. + await CloseProducer(); + } + else + { + // In this case the stream doesn't exist anymore + // the RProducer is just closed. + await Close(); + } + } + + private async Task CloseProducer() + { + await _semProducer.WaitAsync(); + try + { + await _producer.Close(); + } + finally + { + _semProducer.Release(); + } + } + + public bool IsOpen() + { + return _needReconnect; + } public async Task Close() { await _semProducer.WaitAsync(); diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index a5d72b2e..bb412436 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -11,7 +11,6 @@ using RabbitMQ.Stream.Client.Reliable; using Xunit; using Xunit.Abstractions; -using Confirmation = RabbitMQ.Stream.Client.Reliable.Confirmation; namespace Tests; @@ -27,8 +26,8 @@ public ReliableTests(ITestOutputHelper testOutputHelper) [Fact] public void MessageWithoutConfirmationRaiseTimeout() { - var confirmationTask = new TaskCompletionSource>(); - var l = new List(); + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); var confirmationPipe = new ConfirmationPipe(confirmation => { l.Add(confirmation); @@ -44,7 +43,7 @@ public void MessageWithoutConfirmationRaiseTimeout() var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); - new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); // time out error is sent by the internal time that checks the status // if the message doesn't receive the confirmation within X time, the timeout error is raised. Assert.Equal(ConfirmationStatus.TimeoutError, confirmationTask.Task.Result[0].Status); @@ -55,8 +54,8 @@ public void MessageWithoutConfirmationRaiseTimeout() [Fact] public void MessageConfirmationShouldHaveTheSameMessages() { - var confirmationTask = new TaskCompletionSource>(); - var l = new List(); + var confirmationTask = new TaskCompletionSource>(); + var l = new List(); var confirmationPipe = new ConfirmationPipe(confirmation => { l.Add(confirmation); @@ -74,7 +73,7 @@ public void MessageConfirmationShouldHaveTheSameMessages() confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); - new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); + new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[0].Status); Assert.Equal(ConfirmationStatus.Confirmed, confirmationTask.Task.Result[1].Status); confirmationPipe.Stop(); @@ -91,7 +90,7 @@ public async void ConfirmRProducerMessages() { Stream = stream, StreamSystem = system, - ConfirmationHandler = confirmation => + ConfirmationHandler = _ => { if (Interlocked.Increment(ref count) == 10) { @@ -136,7 +135,7 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() Stream = stream, StreamSystem = system, ClientProvidedName = "producer_to_kill", - ConfirmationHandler = confirmation => + ConfirmationHandler = _ => { if (Interlocked.Increment(ref count) == 5) { @@ -169,4 +168,29 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() await rProducer.Close(); await system.Close(); } + + [Fact] + public async void HandleDeleteStreamWithMetaDataUpdate() + { + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var clientProviderName = Guid.NewGuid().ToString(); + var rProducer = await RProducer.CreateRProducer( + new RProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = clientProviderName, + ConfirmationHandler = _ => Task.CompletedTask + } + ); + + Assert.True(rProducer.IsOpen()); + // When the stream is deleted the producer has to close the + // connection an become inactive. + await system.DeleteStream(stream); + + SystemUtils.Wait(); + Assert.False(rProducer.IsOpen()); + await system.Close(); + } } diff --git a/Tests/Utils.cs b/Tests/Utils.cs index 02dcf15d..dbb8d570 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -141,6 +141,23 @@ public static async Task HttpKillConnections(string connectionName) return iEnumerable.Count(); } + public static async Task GetConnectionsNumber(string connectionName) + { + using var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; + using var client = new HttpClient(handler); + var result = await client.GetAsync("http://localhost:15672/api/connections"); + var json = await result.Content.ReadAsStringAsync(); + var connections = JsonSerializer.Deserialize>(json); + if (connections == null) + { + return 0; + } + + var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains(connectionName)); + + return iEnumerable.Count(); + } + public static void HttpPost(string jsonBody, string api) { var httpWebRequest = (HttpWebRequest)WebRequest.Create($"http://localhost:15672/api/{api}"); From 014a509b4be732dcbcab75463f0fde0833a0b032 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 19 Apr 2022 17:34:40 +0200 Subject: [PATCH 09/21] Approve Signed-off-by: Gabriele Santomaggio --- Tests/ApiApproval.Approve.verified.txt | 26 ++++++++++++++++---------- Tests/ReliableTests.cs | 2 +- 2 files changed, 17 insertions(+), 11 deletions(-) diff --git a/Tests/ApiApproval.Approve.verified.txt b/Tests/ApiApproval.Approve.verified.txt index 2630059e..adb0fad6 100644 --- a/Tests/ApiApproval.Approve.verified.txt +++ b/Tests/ApiApproval.Approve.verified.txt @@ -972,17 +972,9 @@ namespace RabbitMQ.Stream.Client } namespace RabbitMQ.Stream.Client.Reliable { - public class Confirmation - { - public Confirmation() { } - public System.DateTime InsertDateTime { get; set; } - public System.Collections.Generic.List Messages { get; } - public ulong PublishingId { get; } - public RabbitMQ.Stream.Client.Reliable.ConfirmationStatus Status { get; } - } public class ConfirmationPipe { - public ConfirmationPipe(System.Func confirmHandler) { } + public ConfirmationPipe(System.Func confirmHandler) { } public void AddUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Message message) { } public void AddUnConfirmedMessage(ulong publishingId, System.Collections.Generic.List messages) { } public System.Threading.Tasks.Task RemoveUnConfirmedMessage(ulong publishingId, RabbitMQ.Stream.Client.Reliable.ConfirmationStatus confirmationStatus) { } @@ -994,6 +986,11 @@ namespace RabbitMQ.Stream.Client.Reliable WaitForConfirmation = 0, Confirmed = 1, TimeoutError = 2, + StreamNotAvailable = 6, + InternalError = 15, + AccessRefused = 16, + PreconditionFailed = 17, + PublisherDoesNotExist = 18, } public interface IPublishingIdStrategy { @@ -1004,9 +1001,18 @@ namespace RabbitMQ.Stream.Client.Reliable void WhenConnected(); void WhenDisconnected(out bool reconnect); } + public class MessagesConfirmation + { + public MessagesConfirmation() { } + public System.DateTime InsertDateTime { get; set; } + public System.Collections.Generic.List Messages { get; } + public ulong PublishingId { get; } + public RabbitMQ.Stream.Client.Reliable.ConfirmationStatus Status { get; } + } public class RProducer { public System.Threading.Tasks.Task Close() { } + public bool IsOpen() { } public System.Threading.Tasks.ValueTask Send(RabbitMQ.Stream.Client.Message message) { } public System.Threading.Tasks.ValueTask Send(System.Collections.Generic.List messages, RabbitMQ.Stream.Client.CompressionType compressionType) { } public static System.Threading.Tasks.Task CreateRProducer(RabbitMQ.Stream.Client.Reliable.RProducerConfig rProducerConfig) { } @@ -1016,7 +1022,7 @@ namespace RabbitMQ.Stream.Client.Reliable public RProducerConfig() { } protected RProducerConfig(RabbitMQ.Stream.Client.Reliable.RProducerConfig original) { } public string ClientProvidedName { get; set; } - public System.Func ConfirmationHandler { get; set; } + public System.Func ConfirmationHandler { get; set; } protected virtual System.Type EqualityContract { get; } public RabbitMQ.Stream.Client.Reliable.IReconnectStrategy ReconnectStrategy { get; set; } public string Reference { get; set; } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index bb412436..b7aace72 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -189,7 +189,7 @@ public async void HandleDeleteStreamWithMetaDataUpdate() // connection an become inactive. await system.DeleteStream(stream); - SystemUtils.Wait(); + SystemUtils.Wait(TimeSpan.FromSeconds(5)); Assert.False(rProducer.IsOpen()); await system.Close(); } From 35b082e538dffe8fc34980f69484b603e02f1685 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 19 Apr 2022 17:42:35 +0200 Subject: [PATCH 10/21] Be sure that the publisher is removed even if there is an error Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Client.cs | 19 +++++++++++++------ 1 file changed, 13 insertions(+), 6 deletions(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index e8ceae37..f7a08e7e 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -254,11 +254,18 @@ public ValueTask Publish(T msg) where T : struct, ICommand public async Task DeletePublisher(byte publisherId) { - publishers.Remove(publisherId); - var result = - await Request(corr => - new DeletePublisherRequest(corr, publisherId)); - return result; + try + { + var result = + await Request(corr => + new DeletePublisherRequest(corr, publisherId)); + + return result; + } + finally + { + publishers.Remove(publisherId); + } } public async Task<(byte, SubscribeResponse)> Subscribe(string stream, IOffsetType offsetType, @@ -538,7 +545,7 @@ public static ManualResetValueTaskSource Rent() } else { - return new ManualResetValueTaskSource() { RunContinuationsAsynchronously = true }; + return new ManualResetValueTaskSource() {RunContinuationsAsynchronously = true}; } } From 9853fce27cea4260868b3268af5db9ecd043d630 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Tue, 19 Apr 2022 17:44:35 +0200 Subject: [PATCH 11/21] Be sure that the publisher is removed even if there is an error Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Client.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/RabbitMQ.Stream.Client/Client.cs b/RabbitMQ.Stream.Client/Client.cs index f7a08e7e..c8bb69f7 100644 --- a/RabbitMQ.Stream.Client/Client.cs +++ b/RabbitMQ.Stream.Client/Client.cs @@ -545,7 +545,7 @@ public static ManualResetValueTaskSource Rent() } else { - return new ManualResetValueTaskSource() {RunContinuationsAsynchronously = true}; + return new ManualResetValueTaskSource() { RunContinuationsAsynchronously = true }; } } From 66eef943e4520a9ff20da9cecb20ed6fe030f327 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 20 Apr 2022 14:32:29 +0200 Subject: [PATCH 12/21] Add the last sequence RProducer will get the last SequenceID by the reference name. If it can't get it Default value is 0 Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConfirmationPipecs.cs | 1 + .../Reliable/PublishingIdStrategy.cs | 4 +- RabbitMQ.Stream.Client/Reliable/RProducer.cs | 39 +++++- Tests/ReliableTests.cs | 117 +++++++++++++++++- Tests/Utils.cs | 32 ++--- 5 files changed, 170 insertions(+), 23 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index 9427ac70..4b4ae3a2 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -26,6 +26,7 @@ public enum ConfirmationStatus : ushort AccessRefused = 16, PreconditionFailed = 17, PublisherDoesNotExist = 18, + UndefinedError = 200, } /// diff --git a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs index 0463d41a..4d60aee0 100644 --- a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs +++ b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs @@ -3,12 +3,14 @@ // Copyright (c) 2007-2020 VMware, Inc. namespace RabbitMQ.Stream.Client.Reliable; + /// /// Define PublishingId Strategy. /// Can be automatic, so the RProducer will provide -/// the ID, or the use the way to +/// the PublishingId /// public interface IPublishingIdStrategy { ulong GetPublishingId(); + void InitPublishingId(); } diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index 8537b5fa..11675ca4 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -12,11 +12,31 @@ namespace RabbitMQ.Stream.Client.Reliable; internal class AutoPublishingId : IPublishingIdStrategy { private ulong _lastPublishingId; + private readonly RProducerConfig _rProducerConfig; public ulong GetPublishingId() { return ++_lastPublishingId; } + + public AutoPublishingId(RProducerConfig rProducerConfig) + { + _rProducerConfig = rProducerConfig; + } + + public void InitPublishingId() + { + try + { + var queryTask = + _rProducerConfig.StreamSystem.QuerySequence(_rProducerConfig.Reference, _rProducerConfig.Stream); + _lastPublishingId = queryTask.Result; + } + catch (Exception e) + { + _lastPublishingId = 0; + } + } } internal class BackOffReconnectStrategy : IReconnectStrategy @@ -48,6 +68,17 @@ public record RProducerConfig public IReconnectStrategy ReconnectStrategy { get; set; } = new BackOffReconnectStrategy(); } +/// +/// RProducer is a wrapper around the standard Producer. +/// Main features are: +/// - Auto-reconnection if the connection is dropped +/// - Trace sent and received messages. The event RProducerConfig:ConfirmationHandler/2 +/// receives back messages sent with the status. +/// - Handle the Metadata Update. In case the stream is deleted RProducer closes Producer/Connection. +/// Reconnect the Producer if the stream still exists. +/// - Set automatically the next PublisherID +/// - Automatically retrieves the last sequence. By default is AutoPublishingId see IPublishingIdStrategy. +/// public class RProducer { private Producer _producer; @@ -59,8 +90,8 @@ public class RProducer private RProducer(RProducerConfig rProducerConfig) { - _autoPublishingId = new AutoPublishingId(); _rProducerConfig = rProducerConfig; + _autoPublishingId = new AutoPublishingId(_rProducerConfig); _confirmationPipe = new ConfirmationPipe(rProducerConfig.ConfirmationHandler); _confirmationPipe.Start(); } @@ -77,6 +108,8 @@ private async Task Init() await _semProducer.WaitAsync(); try { + _autoPublishingId.InitPublishingId(); + _producer = await _rProducerConfig.StreamSystem.CreateProducer(new ProducerConfig() { Stream = _rProducerConfig.Stream, @@ -99,7 +132,8 @@ private async Task Init() ResponseCode.InternalError => ConfirmationStatus.InternalError, ResponseCode.PreconditionFailed => ConfirmationStatus.PreconditionFailed, ResponseCode.StreamNotAvailable => ConfirmationStatus.StreamNotAvailable, - _ => ConfirmationStatus.Confirmed + ResponseCode.Ok => ConfirmationStatus.Confirmed, + _ => ConfirmationStatus.UndefinedError }; _confirmationPipe.RemoveUnConfirmedMessage(confirmation.PublishingId, @@ -186,6 +220,7 @@ public bool IsOpen() { return _needReconnect; } + public async Task Close() { await _semProducer.WaitAsync(); diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index b7aace72..37baaf1a 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -42,7 +42,7 @@ public void MessageWithoutConfirmationRaiseTimeout() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); + confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); // time out error is sent by the internal time that checks the status // if the message doesn't receive the confirmation within X time, the timeout error is raised. @@ -70,7 +70,7 @@ public void MessageConfirmationShouldHaveTheSameMessages() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); + confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); @@ -106,7 +106,7 @@ public async void ConfirmRProducerMessages() await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); } - List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; + List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; for (var i = 0; i < 5; i++) { @@ -157,7 +157,7 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() for (var i = 0; i < 5; i++) { - List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; + List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; await rProducer.Send(messages, CompressionType.None); } @@ -193,4 +193,113 @@ public async void HandleDeleteStreamWithMetaDataUpdate() Assert.False(rProducer.IsOpen()); await system.Close(); } + + [Fact] + public async void HandleChangeStreamConfigurationWithMetaDataUpdate() + { + // When stream topology changes the MetadataUpdate is raised. + // in this test we simulate it using await rProducer:HandleMetaDataMaybeReconnect/1; + // Producer must reconnect + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var clientProviderName = Guid.NewGuid().ToString(); + var rProducer = await RProducer.CreateRProducer( + new RProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = clientProviderName, + ConfirmationHandler = _ => Task.CompletedTask + } + ); + + Assert.True(rProducer.IsOpen()); + await rProducer.HandleMetaDataMaybeReconnect(stream); + SystemUtils.Wait(); + Assert.True(rProducer.IsOpen()); + // await system.DeleteStream(stream); + await system.Close(); + } + + + [Fact] + public async void AutoPublishIdDefaultShouldStartFromTheLast() + { + // RProducer automatically retrieves the last producer offset. + // see IPublishingIdStrategy implementation + // This tests if the the last id stored + // A new RProducer should restart from the last offset. + + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + var testPassed = new TaskCompletionSource(); + var clientProviderName = Guid.NewGuid().ToString(); + var reference = Guid.NewGuid().ToString(); + var count = 0; + var rProducer = await RProducer.CreateRProducer( + new RProducerConfig() + { + Stream = stream, + StreamSystem = system, + ClientProvidedName = clientProviderName, + Reference = reference, + ConfirmationHandler = confirm => + { + if (Interlocked.Increment(ref count) != 5) + { + return Task.CompletedTask; + } + + Assert.Equal(ConfirmationStatus.Confirmed, confirm.Status); + + if (confirm.Status == ConfirmationStatus.Confirmed) + { + testPassed.SetResult(confirm.PublishingId); + } + + + return Task.CompletedTask; + } + } + ); + + Assert.True(rProducer.IsOpen()); + + for (var i = 0; i < 5; i++) + { + await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); + } + + + // We check if the publishing id is actually 5 + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); + Assert.Equal((ulong)5, testPassed.Task.Result); + + await rProducer.Close(); + var testPassedSecond = new TaskCompletionSource(); + var rProducerSecond = await RProducer.CreateRProducer( + new RProducerConfig() + { + Stream = stream, + StreamSystem = system, + Reference = reference, + ClientProvidedName = clientProviderName, + ConfirmationHandler = confirm => + { + testPassedSecond.SetResult(confirm.PublishingId); + return Task.CompletedTask; + } + } + ); + + // given the same reference, the publishingId should restart from the last + // in this cas5 is 5 + await rProducerSecond.Send(new Message(Encoding.UTF8.GetBytes($"hello"))); + // +1 here, so 6 + new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassedSecond); + Assert.Equal((ulong)6, testPassedSecond.Task.Result); + + + await rProducerSecond.Close(); + await system.DeleteStream(stream); + await system.Close(); + } } diff --git a/Tests/Utils.cs b/Tests/Utils.cs index dbb8d570..e2c491fb 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -141,22 +141,22 @@ public static async Task HttpKillConnections(string connectionName) return iEnumerable.Count(); } - public static async Task GetConnectionsNumber(string connectionName) - { - using var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; - using var client = new HttpClient(handler); - var result = await client.GetAsync("http://localhost:15672/api/connections"); - var json = await result.Content.ReadAsStringAsync(); - var connections = JsonSerializer.Deserialize>(json); - if (connections == null) - { - return 0; - } - - var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains(connectionName)); - - return iEnumerable.Count(); - } + // public static async Task GetConnectionsNumber(string connectionName) + // { + // using var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; + // using var client = new HttpClient(handler); + // var result = await client.GetAsync("http://localhost:15672/api/connections"); + // var json = await result.Content.ReadAsStringAsync(); + // var connections = JsonSerializer.Deserialize>(json); + // if (connections == null) + // { + // return 0; + // } + // + // var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains(connectionName)); + // + // return iEnumerable.Count(); + // } public static void HttpPost(string jsonBody, string api) { From 3582bb44a556f3f8edff3fd63c003cbab7f7a8a0 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 20 Apr 2022 14:35:25 +0200 Subject: [PATCH 13/21] Formatting Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Reliable/RProducer.cs | 2 +- Tests/ReliableTests.cs | 26 +++++++++----------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index 11675ca4..71f4551d 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -32,7 +32,7 @@ public void InitPublishingId() _rProducerConfig.StreamSystem.QuerySequence(_rProducerConfig.Reference, _rProducerConfig.Stream); _lastPublishingId = queryTask.Result; } - catch (Exception e) + catch (Exception) { _lastPublishingId = 0; } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 37baaf1a..fcc32c88 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -42,7 +42,7 @@ public void MessageWithoutConfirmationRaiseTimeout() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); // time out error is sent by the internal time that checks the status // if the message doesn't receive the confirmation within X time, the timeout error is raised. @@ -70,7 +70,7 @@ public void MessageConfirmationShouldHaveTheSameMessages() confirmationPipe.Start(); var message = new Message(Encoding.UTF8.GetBytes($"hello")); confirmationPipe.AddUnConfirmedMessage(1, message); - confirmationPipe.AddUnConfirmedMessage(2, new List() {message}); + confirmationPipe.AddUnConfirmedMessage(2, new List() { message }); confirmationPipe.RemoveUnConfirmedMessage(1, ConfirmationStatus.Confirmed); confirmationPipe.RemoveUnConfirmedMessage(2, ConfirmationStatus.Confirmed); new Utils>(_testOutputHelper).WaitUntilTaskCompletes(confirmationTask); @@ -106,7 +106,7 @@ public async void ConfirmRProducerMessages() await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); } - List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; + List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; for (var i = 0; i < 5; i++) { @@ -157,7 +157,7 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() for (var i = 0; i < 5; i++) { - List messages = new() {new Message(Encoding.UTF8.GetBytes($"hello list"))}; + List messages = new() { new Message(Encoding.UTF8.GetBytes($"hello list")) }; await rProducer.Send(messages, CompressionType.None); } @@ -193,13 +193,13 @@ public async void HandleDeleteStreamWithMetaDataUpdate() Assert.False(rProducer.IsOpen()); await system.Close(); } - + [Fact] public async void HandleChangeStreamConfigurationWithMetaDataUpdate() { - // When stream topology changes the MetadataUpdate is raised. - // in this test we simulate it using await rProducer:HandleMetaDataMaybeReconnect/1; - // Producer must reconnect + // When stream topology changes the MetadataUpdate is raised. + // in this test we simulate it using await rProducer:HandleMetaDataMaybeReconnect/1; + // Producer must reconnect SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); var clientProviderName = Guid.NewGuid().ToString(); var rProducer = await RProducer.CreateRProducer( @@ -220,7 +220,6 @@ public async void HandleChangeStreamConfigurationWithMetaDataUpdate() await system.Close(); } - [Fact] public async void AutoPublishIdDefaultShouldStartFromTheLast() { @@ -228,7 +227,7 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() // see IPublishingIdStrategy implementation // This tests if the the last id stored // A new RProducer should restart from the last offset. - + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); var testPassed = new TaskCompletionSource(); var clientProviderName = Guid.NewGuid().ToString(); @@ -255,7 +254,6 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() testPassed.SetResult(confirm.PublishingId); } - return Task.CompletedTask; } } @@ -268,11 +266,10 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() await rProducer.Send(new Message(Encoding.UTF8.GetBytes($"hello {i}"))); } - // We check if the publishing id is actually 5 new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassed); Assert.Equal((ulong)5, testPassed.Task.Result); - + await rProducer.Close(); var testPassedSecond = new TaskCompletionSource(); var rProducerSecond = await RProducer.CreateRProducer( @@ -289,7 +286,7 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() } } ); - + // given the same reference, the publishingId should restart from the last // in this cas5 is 5 await rProducerSecond.Send(new Message(Encoding.UTF8.GetBytes($"hello"))); @@ -297,7 +294,6 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() new Utils(_testOutputHelper).WaitUntilTaskCompletes(testPassedSecond); Assert.Equal((ulong)6, testPassedSecond.Task.Result); - await rProducerSecond.Close(); await system.DeleteStream(stream); await system.Close(); From 8112a0c881d31c34bbfb45304cd25d08240d3e2b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 20 Apr 2022 14:44:01 +0200 Subject: [PATCH 14/21] Verified Signed-off-by: Gabriele Santomaggio --- Tests/ApiApproval.Approve.verified.txt | 2 ++ 1 file changed, 2 insertions(+) diff --git a/Tests/ApiApproval.Approve.verified.txt b/Tests/ApiApproval.Approve.verified.txt index adb0fad6..f1740682 100644 --- a/Tests/ApiApproval.Approve.verified.txt +++ b/Tests/ApiApproval.Approve.verified.txt @@ -991,10 +991,12 @@ namespace RabbitMQ.Stream.Client.Reliable AccessRefused = 16, PreconditionFailed = 17, PublisherDoesNotExist = 18, + UndefinedError = 200, } public interface IPublishingIdStrategy { ulong GetPublishingId(); + void InitPublishingId(); } public interface IReconnectStrategy { From bfade22daede8cb04f58a3f7fe19a0fcb8fd903b Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 20 Apr 2022 19:18:39 +0200 Subject: [PATCH 15/21] Resolve dead lock during the reconnection change the test. Add the isInReconnection to avoid some race codition Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Connection.cs | 3 -- .../Reliable/ConfirmationPipecs.cs | 17 +++----- RabbitMQ.Stream.Client/Reliable/RProducer.cs | 41 ++++++++++++++----- RabbitMQ.Stream.Client/StreamSystem.cs | 1 + Tests/ReliableTests.cs | 13 +++--- Tests/Utils.cs | 5 ++- 6 files changed, 48 insertions(+), 32 deletions(-) diff --git a/RabbitMQ.Stream.Client/Connection.cs b/RabbitMQ.Stream.Client/Connection.cs index 22545756..d0062ca6 100644 --- a/RabbitMQ.Stream.Client/Connection.cs +++ b/RabbitMQ.Stream.Client/Connection.cs @@ -127,9 +127,6 @@ private async Task ProcessIncomingFrames() { // Let's rent some memory to copy the frame from the network stream. This memory will be reclaimed once the frame has been handled. - // Console.WriteLine( - // $"B TryReadFrame {buffer.Length} {result.IsCompleted} {result.Buffer.IsEmpty} {frame.Length}"); - var memory = ArrayPool.Shared.Rent((int)frame.Length).AsMemory(0, (int)frame.Length); frame.CopyTo(memory.Span); diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index 4b4ae3a2..28bab4b0 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -65,19 +65,14 @@ public void Start() request => { var (confirmationStatus, confirmation) = request; - switch (confirmationStatus) + _waitForConfirmation.TryRemove(confirmation.PublishingId, out var message); + if (message == null) { - case ConfirmationStatus.Confirmed: - case ConfirmationStatus.TimeoutError: - _waitForConfirmation.TryRemove(confirmation.PublishingId, out var message); - if (message != null) - { - message.Status = confirmationStatus; - ConfirmHandler?.Invoke(message); - } - - break; + return; } + + message.Status = confirmationStatus; + ConfirmHandler?.Invoke(message); }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1, diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/RProducer.cs index 71f4551d..812e03b5 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/RProducer.cs @@ -11,7 +11,7 @@ namespace RabbitMQ.Stream.Client.Reliable; internal class AutoPublishingId : IPublishingIdStrategy { - private ulong _lastPublishingId; + private ulong _lastPublishingId = 0; private readonly RProducerConfig _rProducerConfig; public ulong GetPublishingId() @@ -87,12 +87,14 @@ public class RProducer private readonly SemaphoreSlim _semProducer = new(1); private readonly ConfirmationPipe _confirmationPipe; private bool _needReconnect = true; + private bool _inReconnection = false; private RProducer(RProducerConfig rProducerConfig) { _rProducerConfig = rProducerConfig; _autoPublishingId = new AutoPublishingId(_rProducerConfig); _confirmationPipe = new ConfirmationPipe(rProducerConfig.ConfirmationHandler); + _autoPublishingId.InitPublishingId(); _confirmationPipe.Start(); } @@ -106,10 +108,9 @@ public static async Task CreateRProducer(RProducerConfig rProducerCon private async Task Init() { await _semProducer.WaitAsync(); + try { - _autoPublishingId.InitPublishingId(); - _producer = await _rProducerConfig.StreamSystem.CreateProducer(new ProducerConfig() { Stream = _rProducerConfig.Stream, @@ -159,10 +160,18 @@ private async Task Init() private async Task TryToReconnect() { - _rProducerConfig.ReconnectStrategy.WhenDisconnected(out var reconnect); - if (reconnect && _needReconnect) + _inReconnection = true; + try + { + _rProducerConfig.ReconnectStrategy.WhenDisconnected(out var reconnect); + if (reconnect && _needReconnect) + { + await Init(); + } + } + finally { - await Init(); + _inReconnection = false; } } @@ -205,7 +214,7 @@ internal async Task HandleMetaDataMaybeReconnect(string stream) private async Task CloseProducer() { - await _semProducer.WaitAsync(); + await _semProducer.WaitAsync(10); try { await _producer.Close(); @@ -223,7 +232,7 @@ public bool IsOpen() public async Task Close() { - await _semProducer.WaitAsync(); + await _semProducer.WaitAsync(10); try { _needReconnect = false; @@ -243,7 +252,15 @@ public async ValueTask Send(Message message) await _semProducer.WaitAsync(); try { - await _producer.Send(pid, message); + // This flags avoid some race condition, + // since the reconnection can arrive from different threads + // so in this case it skips the publish until + // the producer is connected. Messages are safe since are stored + // on the _waitForConfirmation list. The user will get Timeout Error + if (!(_inReconnection)) + { + await _producer.Send(pid, message); + } } catch (Exception e) @@ -259,12 +276,14 @@ public async ValueTask Send(Message message) public async ValueTask Send(List messages, CompressionType compressionType) { var pid = _autoPublishingId.GetPublishingId(); - _confirmationPipe.AddUnConfirmedMessage(pid, messages); await _semProducer.WaitAsync(); try { - await _producer.Send(pid, messages, compressionType); + if (!(_inReconnection)) + { + await _producer.Send(pid, messages, compressionType); + } } catch (Exception e) diff --git a/RabbitMQ.Stream.Client/StreamSystem.cs b/RabbitMQ.Stream.Client/StreamSystem.cs index fdf2b65d..45e93f6a 100644 --- a/RabbitMQ.Stream.Client/StreamSystem.cs +++ b/RabbitMQ.Stream.Client/StreamSystem.cs @@ -171,6 +171,7 @@ public async Task QueryOffset(string reference, string stream) /// public async Task QuerySequence(string reference, string stream) { + await MayBeReconnectLocator(); MaybeThrowQueryException(reference, stream); var response = await client.QueryPublisherSequence(reference, stream); diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index fcc32c88..923714ab 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -127,17 +127,20 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() // The RProducer has to detect the disconnection and reconnect the client // var testPassed = new TaskCompletionSource(); - SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); + + var clientProvidedNameLocator = Guid.NewGuid().ToString(); + SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream, clientProvidedNameLocator); var count = 0; + var clientProvidedName = Guid.NewGuid().ToString(); var rProducer = await RProducer.CreateRProducer( new RProducerConfig() { Stream = stream, StreamSystem = system, - ClientProvidedName = "producer_to_kill", + ClientProvidedName = clientProvidedName, ConfirmationHandler = _ => { - if (Interlocked.Increment(ref count) == 5) + if (Interlocked.Increment(ref count) == 10) { testPassed.SetResult(true); } @@ -152,8 +155,8 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() } SystemUtils.Wait(TimeSpan.FromSeconds(6)); - Assert.Equal(1, SystemUtils.HttpKillConnections("producer_to_kill").Result); - Assert.Equal(1, SystemUtils.HttpKillConnections("dotnet-stream-locator").Result); + Assert.Equal(1, SystemUtils.HttpKillConnections(clientProvidedName).Result); + await SystemUtils.HttpKillConnections(clientProvidedNameLocator); for (var i = 0; i < 5; i++) { diff --git a/Tests/Utils.cs b/Tests/Utils.cs index e2c491fb..fd03a60a 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -59,10 +59,11 @@ public static void Wait(TimeSpan wait) Thread.Sleep(wait); } - public static void InitStreamSystemWithRandomStream(out StreamSystem system, out string stream) + public static void InitStreamSystemWithRandomStream(out StreamSystem system, out string stream, + string clientProviderNameLocator = "stream-locator") { stream = Guid.NewGuid().ToString(); - var config = new StreamSystemConfig(); + var config = new StreamSystemConfig { ClientProvidedName = clientProviderNameLocator }; system = StreamSystem.Create(config).Result; var x = system.CreateStream(new StreamSpec(stream)); x.Wait(); From f3c961ec9a92d4f76ac9e55aa780ac249c33bb06 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 21 Apr 2022 10:50:27 +0200 Subject: [PATCH 16/21] Rename to ReliableProducer Signed-off-by: Gabriele Santomaggio --- .../Reliable/PublishingIdStrategy.cs | 2 +- .../{RProducer.cs => ReliableProducer.cs} | 54 +++++++++---------- Tests/ReliableTests.cs | 26 ++++----- 3 files changed, 41 insertions(+), 41 deletions(-) rename RabbitMQ.Stream.Client/Reliable/{RProducer.cs => ReliableProducer.cs} (79%) diff --git a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs index 4d60aee0..8ac739f4 100644 --- a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs +++ b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs @@ -6,7 +6,7 @@ namespace RabbitMQ.Stream.Client.Reliable; /// /// Define PublishingId Strategy. -/// Can be automatic, so the RProducer will provide +/// Can be automatic, so the ReliableProducer will provide /// the PublishingId /// public interface IPublishingIdStrategy diff --git a/RabbitMQ.Stream.Client/Reliable/RProducer.cs b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs similarity index 79% rename from RabbitMQ.Stream.Client/Reliable/RProducer.cs rename to RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs index 812e03b5..a2143cca 100644 --- a/RabbitMQ.Stream.Client/Reliable/RProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs @@ -12,16 +12,16 @@ namespace RabbitMQ.Stream.Client.Reliable; internal class AutoPublishingId : IPublishingIdStrategy { private ulong _lastPublishingId = 0; - private readonly RProducerConfig _rProducerConfig; + private readonly ReliableProducerConfig _reliableProducerConfig; public ulong GetPublishingId() { return ++_lastPublishingId; } - public AutoPublishingId(RProducerConfig rProducerConfig) + public AutoPublishingId(ReliableProducerConfig reliableProducerConfig) { - _rProducerConfig = rProducerConfig; + _reliableProducerConfig = reliableProducerConfig; } public void InitPublishingId() @@ -29,7 +29,7 @@ public void InitPublishingId() try { var queryTask = - _rProducerConfig.StreamSystem.QuerySequence(_rProducerConfig.Reference, _rProducerConfig.Stream); + _reliableProducerConfig.StreamSystem.QuerySequence(_reliableProducerConfig.Reference, _reliableProducerConfig.Stream); _lastPublishingId = queryTask.Result; } catch (Exception) @@ -58,7 +58,7 @@ public void WhenConnected() } } -public record RProducerConfig +public record ReliableProducerConfig { public StreamSystem StreamSystem { get; set; } public string Stream { get; set; } @@ -69,38 +69,38 @@ public record RProducerConfig } /// -/// RProducer is a wrapper around the standard Producer. +/// ReliableProducer is a wrapper around the standard Producer. /// Main features are: /// - Auto-reconnection if the connection is dropped -/// - Trace sent and received messages. The event RProducerConfig:ConfirmationHandler/2 +/// - Trace sent and received messages. The event ReliableProducer:ConfirmationHandler/2 /// receives back messages sent with the status. -/// - Handle the Metadata Update. In case the stream is deleted RProducer closes Producer/Connection. +/// - Handle the Metadata Update. In case the stream is deleted ReliableProducer closes Producer/Connection. /// Reconnect the Producer if the stream still exists. /// - Set automatically the next PublisherID /// - Automatically retrieves the last sequence. By default is AutoPublishingId see IPublishingIdStrategy. /// -public class RProducer +public class ReliableProducer { private Producer _producer; private readonly AutoPublishingId _autoPublishingId; - private readonly RProducerConfig _rProducerConfig; + private readonly ReliableProducerConfig _reliableProducerConfig; private readonly SemaphoreSlim _semProducer = new(1); private readonly ConfirmationPipe _confirmationPipe; private bool _needReconnect = true; private bool _inReconnection = false; - private RProducer(RProducerConfig rProducerConfig) + private ReliableProducer(ReliableProducerConfig reliableProducerConfig) { - _rProducerConfig = rProducerConfig; - _autoPublishingId = new AutoPublishingId(_rProducerConfig); - _confirmationPipe = new ConfirmationPipe(rProducerConfig.ConfirmationHandler); + _reliableProducerConfig = reliableProducerConfig; + _autoPublishingId = new AutoPublishingId(_reliableProducerConfig); + _confirmationPipe = new ConfirmationPipe(reliableProducerConfig.ConfirmationHandler); _autoPublishingId.InitPublishingId(); _confirmationPipe.Start(); } - public static async Task CreateRProducer(RProducerConfig rProducerConfig) + public static async Task CreateReliableProducer(ReliableProducerConfig reliableProducerConfig) { - var rProducer = new RProducer(rProducerConfig); + var rProducer = new ReliableProducer(reliableProducerConfig); await rProducer.Init(); return rProducer; } @@ -111,11 +111,11 @@ private async Task Init() try { - _producer = await _rProducerConfig.StreamSystem.CreateProducer(new ProducerConfig() + _producer = await _reliableProducerConfig.StreamSystem.CreateProducer(new ProducerConfig() { - Stream = _rProducerConfig.Stream, - ClientProvidedName = _rProducerConfig.ClientProvidedName, - Reference = _rProducerConfig.Reference, + Stream = _reliableProducerConfig.Stream, + ClientProvidedName = _reliableProducerConfig.ClientProvidedName, + Reference = _reliableProducerConfig.Reference, MetadataHandler = update => { HandleMetaDataMaybeReconnect(update.Stream).Wait(); @@ -141,12 +141,12 @@ private async Task Init() confirmationStatus); } }); - _rProducerConfig.ReconnectStrategy.WhenConnected(); + _reliableProducerConfig.ReconnectStrategy.WhenConnected(); } catch (CreateProducerException ce) { - LogEventSource.Log.LogError($"{ce}. RProducer closed"); + LogEventSource.Log.LogError($"{ce}. ReliableProducer closed"); } catch (Exception e) { @@ -163,7 +163,7 @@ private async Task TryToReconnect() _inReconnection = true; try { - _rProducerConfig.ReconnectStrategy.WhenDisconnected(out var reconnect); + _reliableProducerConfig.ReconnectStrategy.WhenDisconnected(out var reconnect); if (reconnect && _needReconnect) { await Init(); @@ -190,16 +190,16 @@ internal async Task HandleMetaDataMaybeReconnect(string stream) { LogEventSource.Log.LogInformation( $"Meta data update for the stream: {stream} " + - $"Producer {_rProducerConfig.Reference} closed."); + $"Producer {_reliableProducerConfig.Reference} closed."); // This sleep is needed. When a stream is deleted it takes sometime. // The StreamExists/1 could return true even the stream doesn't exist anymore. Thread.Sleep(500); - if (await _rProducerConfig.StreamSystem.StreamExists(stream)) + if (await _reliableProducerConfig.StreamSystem.StreamExists(stream)) { LogEventSource.Log.LogInformation( $"Meta data update, the stream {stream} still exist. " + - $"Producer {_rProducerConfig.Reference} will try to reconnect."); + $"Producer {_reliableProducerConfig.Reference} will try to reconnect."); // Here we just close the producer connection // the func TryToReconnect/0 will be called. await CloseProducer(); @@ -207,7 +207,7 @@ internal async Task HandleMetaDataMaybeReconnect(string stream) else { // In this case the stream doesn't exist anymore - // the RProducer is just closed. + // the ReliableProducer is just closed. await Close(); } } diff --git a/Tests/ReliableTests.cs b/Tests/ReliableTests.cs index 923714ab..637c1649 100644 --- a/Tests/ReliableTests.cs +++ b/Tests/ReliableTests.cs @@ -85,8 +85,8 @@ public async void ConfirmRProducerMessages() var testPassed = new TaskCompletionSource(); SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); var count = 0; - var rProducer = await RProducer.CreateRProducer( - new RProducerConfig() + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() { Stream = stream, StreamSystem = system, @@ -132,8 +132,8 @@ public async void SendMessageAfterKillConnectionShouldContinueToWork() SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream, clientProvidedNameLocator); var count = 0; var clientProvidedName = Guid.NewGuid().ToString(); - var rProducer = await RProducer.CreateRProducer( - new RProducerConfig() + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() { Stream = stream, StreamSystem = system, @@ -177,8 +177,8 @@ public async void HandleDeleteStreamWithMetaDataUpdate() { SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); var clientProviderName = Guid.NewGuid().ToString(); - var rProducer = await RProducer.CreateRProducer( - new RProducerConfig() + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() { Stream = stream, StreamSystem = system, @@ -201,12 +201,12 @@ public async void HandleDeleteStreamWithMetaDataUpdate() public async void HandleChangeStreamConfigurationWithMetaDataUpdate() { // When stream topology changes the MetadataUpdate is raised. - // in this test we simulate it using await rProducer:HandleMetaDataMaybeReconnect/1; + // in this test we simulate it using await ReliableProducer:HandleMetaDataMaybeReconnect/1; // Producer must reconnect SystemUtils.InitStreamSystemWithRandomStream(out var system, out var stream); var clientProviderName = Guid.NewGuid().ToString(); - var rProducer = await RProducer.CreateRProducer( - new RProducerConfig() + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() { Stream = stream, StreamSystem = system, @@ -236,8 +236,8 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() var clientProviderName = Guid.NewGuid().ToString(); var reference = Guid.NewGuid().ToString(); var count = 0; - var rProducer = await RProducer.CreateRProducer( - new RProducerConfig() + var rProducer = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() { Stream = stream, StreamSystem = system, @@ -275,8 +275,8 @@ public async void AutoPublishIdDefaultShouldStartFromTheLast() await rProducer.Close(); var testPassedSecond = new TaskCompletionSource(); - var rProducerSecond = await RProducer.CreateRProducer( - new RProducerConfig() + var rProducerSecond = await ReliableProducer.CreateReliableProducer( + new ReliableProducerConfig() { Stream = stream, StreamSystem = system, From ec874434f46286a8a314ac62685368fef0a73564 Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Thu, 21 Apr 2022 11:02:15 +0200 Subject: [PATCH 17/21] Change InitPublishingId Signed-off-by: Gabriele Santomaggio --- RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs | 4 +++- RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs | 8 ++++---- 2 files changed, 7 insertions(+), 5 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs index 8ac739f4..2ae1638f 100644 --- a/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs +++ b/RabbitMQ.Stream.Client/Reliable/PublishingIdStrategy.cs @@ -2,6 +2,8 @@ // 2.0, and the Mozilla Public License, version 2.0. // Copyright (c) 2007-2020 VMware, Inc. +using System.Threading.Tasks; + namespace RabbitMQ.Stream.Client.Reliable; /// @@ -12,5 +14,5 @@ namespace RabbitMQ.Stream.Client.Reliable; public interface IPublishingIdStrategy { ulong GetPublishingId(); - void InitPublishingId(); + Task InitPublishingId(); } diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs index a2143cca..9d602262 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs @@ -24,13 +24,13 @@ public AutoPublishingId(ReliableProducerConfig reliableProducerConfig) _reliableProducerConfig = reliableProducerConfig; } - public void InitPublishingId() + public async Task InitPublishingId() { try { - var queryTask = - _reliableProducerConfig.StreamSystem.QuerySequence(_reliableProducerConfig.Reference, _reliableProducerConfig.Stream); - _lastPublishingId = queryTask.Result; + _lastPublishingId = + await _reliableProducerConfig.StreamSystem.QuerySequence(_reliableProducerConfig.Reference, + _reliableProducerConfig.Stream); } catch (Exception) { From 82b51cdd2fa03e13b124d88a6701235829c3a5bf Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 21 Apr 2022 07:09:40 -0700 Subject: [PATCH 18/21] Rename file --- .../Reliable/{ConfirmationPipecs.cs => ConfirmationPipe.cs} | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename RabbitMQ.Stream.Client/Reliable/{ConfirmationPipecs.cs => ConfirmationPipe.cs} (100%) diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs similarity index 100% rename from RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs rename to RabbitMQ.Stream.Client/Reliable/ConfirmationPipe.cs From a18af21aeb08c06a54023b9af262450add13c080 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 21 Apr 2022 08:05:04 -0700 Subject: [PATCH 19/21] Update APIApproval --- Tests/ApiApproval.Approve.verified.txt | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/Tests/ApiApproval.Approve.verified.txt b/Tests/ApiApproval.Approve.verified.txt index f1740682..4a206245 100644 --- a/Tests/ApiApproval.Approve.verified.txt +++ b/Tests/ApiApproval.Approve.verified.txt @@ -996,7 +996,7 @@ namespace RabbitMQ.Stream.Client.Reliable public interface IPublishingIdStrategy { ulong GetPublishingId(); - void InitPublishingId(); + System.Threading.Tasks.Task InitPublishingId(); } public interface IReconnectStrategy { @@ -1011,18 +1011,18 @@ namespace RabbitMQ.Stream.Client.Reliable public ulong PublishingId { get; } public RabbitMQ.Stream.Client.Reliable.ConfirmationStatus Status { get; } } - public class RProducer + public class ReliableProducer { public System.Threading.Tasks.Task Close() { } public bool IsOpen() { } public System.Threading.Tasks.ValueTask Send(RabbitMQ.Stream.Client.Message message) { } public System.Threading.Tasks.ValueTask Send(System.Collections.Generic.List messages, RabbitMQ.Stream.Client.CompressionType compressionType) { } - public static System.Threading.Tasks.Task CreateRProducer(RabbitMQ.Stream.Client.Reliable.RProducerConfig rProducerConfig) { } + public static System.Threading.Tasks.Task CreateReliableProducer(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig reliableProducerConfig) { } } - public class RProducerConfig : System.IEquatable + public class ReliableProducerConfig : System.IEquatable { - public RProducerConfig() { } - protected RProducerConfig(RabbitMQ.Stream.Client.Reliable.RProducerConfig original) { } + public ReliableProducerConfig() { } + protected ReliableProducerConfig(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig original) { } public string ClientProvidedName { get; set; } public System.Func ConfirmationHandler { get; set; } protected virtual System.Type EqualityContract { get; } @@ -1030,13 +1030,13 @@ namespace RabbitMQ.Stream.Client.Reliable public string Reference { get; set; } public string Stream { get; set; } public RabbitMQ.Stream.Client.StreamSystem StreamSystem { get; set; } - public virtual RabbitMQ.Stream.Client.Reliable.RProducerConfig $() { } - public virtual bool Equals(RabbitMQ.Stream.Client.Reliable.RProducerConfig? other) { } + public virtual RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig $() { } + public virtual bool Equals(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? other) { } public override bool Equals(object? obj) { } public override int GetHashCode() { } protected virtual bool PrintMembers(System.Text.StringBuilder builder) { } public override string ToString() { } - public static bool operator !=(RabbitMQ.Stream.Client.Reliable.RProducerConfig? left, RabbitMQ.Stream.Client.Reliable.RProducerConfig? right) { } - public static bool operator ==(RabbitMQ.Stream.Client.Reliable.RProducerConfig? left, RabbitMQ.Stream.Client.Reliable.RProducerConfig? right) { } + public static bool operator !=(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? left, RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? right) { } + public static bool operator ==(RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? left, RabbitMQ.Stream.Client.Reliable.ReliableProducerConfig? right) { } } } \ No newline at end of file From 906bc1cec1f6d77ac3260152e7eb5fcef9145fd3 Mon Sep 17 00:00:00 2001 From: Luke Bakken Date: Thu, 21 Apr 2022 09:04:06 -0700 Subject: [PATCH 20/21] Remove commented-out code --- Tests/Utils.cs | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/Tests/Utils.cs b/Tests/Utils.cs index fd03a60a..a1e587c2 100644 --- a/Tests/Utils.cs +++ b/Tests/Utils.cs @@ -142,23 +142,6 @@ public static async Task HttpKillConnections(string connectionName) return iEnumerable.Count(); } - // public static async Task GetConnectionsNumber(string connectionName) - // { - // using var handler = new HttpClientHandler { Credentials = new NetworkCredential("guest", "guest"), }; - // using var client = new HttpClient(handler); - // var result = await client.GetAsync("http://localhost:15672/api/connections"); - // var json = await result.Content.ReadAsStringAsync(); - // var connections = JsonSerializer.Deserialize>(json); - // if (connections == null) - // { - // return 0; - // } - // - // var iEnumerable = connections.Where(x => x.client_properties["connection_name"].Contains(connectionName)); - // - // return iEnumerable.Count(); - // } - public static void HttpPost(string jsonBody, string api) { var httpWebRequest = (HttpWebRequest)WebRequest.Create($"http://localhost:15672/api/{api}"); From 54674f4fb94720dbcd3e06127b33fcae3ca14afc Mon Sep 17 00:00:00 2001 From: Gabriele Santomaggio Date: Wed, 27 Apr 2022 15:17:05 +0200 Subject: [PATCH 21/21] Remove the class and use onlu ulong for TPL. Signed-off-by: Gabriele Santomaggio --- .../Reliable/ConfirmationPipecs.cs | 20 +++++++------------ .../Reliable/ReliableProducer.cs | 17 ++++++++++------ 2 files changed, 18 insertions(+), 19 deletions(-) diff --git a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs index 28bab4b0..cb2d214e 100644 --- a/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs +++ b/RabbitMQ.Stream.Client/Reliable/ConfirmationPipecs.cs @@ -49,7 +49,7 @@ public class MessagesConfirmation /// public class ConfirmationPipe { - private ActionBlock> _waitForConfirmationActionBlock; + private ActionBlock> _waitForConfirmationActionBlock; private readonly ConcurrentDictionary _waitForConfirmation = new(); private readonly Timer _invalidateTimer = new(); private Func ConfirmHandler { get; } @@ -61,11 +61,12 @@ public ConfirmationPipe(Func confirmHandler) public void Start() { - _waitForConfirmationActionBlock = new ActionBlock>( + _waitForConfirmationActionBlock = new ActionBlock>( request => { - var (confirmationStatus, confirmation) = request; - _waitForConfirmation.TryRemove(confirmation.PublishingId, out var message); + var (confirmationStatus, publishingId) = request; + + _waitForConfirmation.TryRemove(publishingId, out var message); if (message == null) { return; @@ -104,13 +105,7 @@ private async void OnTimedEvent(object? sender, ElapsedEventArgs e) public void AddUnConfirmedMessage(ulong publishingId, Message message) { - _waitForConfirmation.TryAdd(publishingId, - new MessagesConfirmation() - { - Messages = new List() { message }, - PublishingId = publishingId, - InsertDateTime = DateTime.Now - }); + AddUnConfirmedMessage(publishingId, new List() { message }); } public void AddUnConfirmedMessage(ulong publishingId, List messages) @@ -127,7 +122,6 @@ public void AddUnConfirmedMessage(ulong publishingId, List messages) public Task RemoveUnConfirmedMessage(ulong publishingId, ConfirmationStatus confirmationStatus) { return _waitForConfirmationActionBlock.SendAsync( - Tuple.Create(confirmationStatus, - new MessagesConfirmation() { PublishingId = publishingId })); + Tuple.Create(confirmationStatus, publishingId)); } } diff --git a/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs index 9d602262..c68755c1 100644 --- a/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs +++ b/RabbitMQ.Stream.Client/Reliable/ReliableProducer.cs @@ -87,14 +87,13 @@ public class ReliableProducer private readonly SemaphoreSlim _semProducer = new(1); private readonly ConfirmationPipe _confirmationPipe; private bool _needReconnect = true; - private bool _inReconnection = false; + private bool _inReconnection; private ReliableProducer(ReliableProducerConfig reliableProducerConfig) { _reliableProducerConfig = reliableProducerConfig; _autoPublishingId = new AutoPublishingId(_reliableProducerConfig); _confirmationPipe = new ConfirmationPipe(reliableProducerConfig.ConfirmationHandler); - _autoPublishingId.InitPublishingId(); _confirmationPipe.Start(); } @@ -106,6 +105,12 @@ public static async Task CreateReliableProducer(ReliableProduc } private async Task Init() + { + await _autoPublishingId.InitPublishingId(); + await GetNewProducer(); + } + + private async Task GetNewProducer() { await _semProducer.WaitAsync(); @@ -166,7 +171,7 @@ private async Task TryToReconnect() _reliableProducerConfig.ReconnectStrategy.WhenDisconnected(out var reconnect); if (reconnect && _needReconnect) { - await Init(); + await GetNewProducer(); } } finally @@ -253,8 +258,8 @@ public async ValueTask Send(Message message) try { // This flags avoid some race condition, - // since the reconnection can arrive from different threads - // so in this case it skips the publish until + // since the reconnection can arrive from different threads. + // In this case it skips the publish until // the producer is connected. Messages are safe since are stored // on the _waitForConfirmation list. The user will get Timeout Error if (!(_inReconnection)) @@ -280,7 +285,7 @@ public async ValueTask Send(List messages, CompressionType compressionT await _semProducer.WaitAsync(); try { - if (!(_inReconnection)) + if (!_inReconnection) { await _producer.Send(pid, messages, compressionType); }