diff --git a/.ci/ubuntu/cluster/gha-setup.sh b/.ci/ubuntu/cluster/gha-setup.sh
index d47e69b..d4647be 100755
--- a/.ci/ubuntu/cluster/gha-setup.sh
+++ b/.ci/ubuntu/cluster/gha-setup.sh
@@ -19,7 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}
-readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
+readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"
if [[ ! -v GITHUB_ACTIONS ]]
then
diff --git a/.ci/ubuntu/cluster/rmq/Dockerfile b/.ci/ubuntu/cluster/rmq/Dockerfile
index a52e7f6..ad1189a 100644
--- a/.ci/ubuntu/cluster/rmq/Dockerfile
+++ b/.ci/ubuntu/cluster/rmq/Dockerfile
@@ -1,4 +1,4 @@
-ARG RABBITMQ_DOCKER_TAG=rabbitmq:4.1.0-beta.4-management-alpine
+ARG RABBITMQ_DOCKER_TAG=rabbitmq:4.2-management-alpine
FROM ${RABBITMQ_DOCKER_TAG}
diff --git a/.ci/ubuntu/one-node/gha-setup.sh b/.ci/ubuntu/one-node/gha-setup.sh
index 92bd04a..6f969bf 100755
--- a/.ci/ubuntu/one-node/gha-setup.sh
+++ b/.ci/ubuntu/one-node/gha-setup.sh
@@ -9,7 +9,7 @@ readonly script_dir
echo "[INFO] script_dir: '$script_dir'"
-readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
+readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"
diff --git a/.ci/windows/versions.json b/.ci/windows/versions.json
index 058bd93..8b94270 100644
--- a/.ci/windows/versions.json
+++ b/.ci/windows/versions.json
@@ -1,4 +1,4 @@
{
"erlang": "27.2",
- "rabbitmq": "4.1.0"
+ "rabbitmq": "4.2.0"
}
diff --git a/RabbitMQ.AMQP.Client/FeatureFlags.cs b/RabbitMQ.AMQP.Client/FeatureFlags.cs
index 7ba4312..3d83238 100644
--- a/RabbitMQ.AMQP.Client/FeatureFlags.cs
+++ b/RabbitMQ.AMQP.Client/FeatureFlags.cs
@@ -24,6 +24,13 @@ public class FeatureFlags
///
public bool IsBrokerCompatible { get; internal set; } = false;
+ ///
+ /// Check if Direct Reply-To is supported.
+ /// Direct Reply-To is available in RabbitMQ 4.2 and later.
+ /// see: https://www.rabbitmq.com/docs/direct-reply-to
+ ///
+ public bool IsDirectReplyToSupported { get; internal set; } = false;
+
public void Validate()
{
if (!IsBrokerCompatible)
diff --git a/RabbitMQ.AMQP.Client/IConnection.cs b/RabbitMQ.AMQP.Client/IConnection.cs
index 2837c60..76aceaa 100644
--- a/RabbitMQ.AMQP.Client/IConnection.cs
+++ b/RabbitMQ.AMQP.Client/IConnection.cs
@@ -28,16 +28,16 @@ public interface IConnection : ILifeCycle
IConsumerBuilder ConsumerBuilder();
///
- /// Create an instance for this connection.
+ /// Create an instance for this connection.
///
- /// instance for this connection.
- IRpcServerBuilder RpcServerBuilder();
+ /// instance for this connection.
+ IResponderBuilder ResponderBuilder();
///
- /// Create an instance for this connection.
+ /// Create an instance for this connection.
///
- /// instance for this connection.
- IRpcClientBuilder RpcClientBuilder();
+ /// instance for this connection.
+ IRequesterBuilder RequesterBuilder();
///
/// Get the properties for this connection.
diff --git a/RabbitMQ.AMQP.Client/IConsumer.cs b/RabbitMQ.AMQP.Client/IConsumer.cs
index 88290d5..0afb5ad 100644
--- a/RabbitMQ.AMQP.Client/IConsumer.cs
+++ b/RabbitMQ.AMQP.Client/IConsumer.cs
@@ -37,6 +37,13 @@ public interface IConsumer : ILifeCycle
/// Returns the number of unsettled messages.
///
long UnsettledMessageCount { get; }
+
+ ///
+ /// Returns queue name the consumer is consuming from.
+ /// The Queue name is usually configured by the user via the ,
+ /// but can also be generated by the client the special direct-reply-to queue.
+ ///
+ string Queue { get; }
}
public interface IContext
diff --git a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
index 78bb1ac..a28c962 100644
--- a/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
+++ b/RabbitMQ.AMQP.Client/IConsumerBuilder.cs
@@ -19,7 +19,17 @@ public enum StreamOffsetSpecification
public interface IConsumerBuilder
{
IConsumerBuilder Queue(IQueueSpecification queueSpecification);
- IConsumerBuilder Queue(string queueName);
+ IConsumerBuilder Queue(string? queueName);
+
+ ///
+ /// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
+ /// The server must also support direct reply-to.
+ /// This feature allows the server to send the reply directly to the client without going through a reply queue.
+ /// This can improve performance and reduce latency.
+ /// Default is false.
+ /// https://www.rabbitmq.com/docs/direct-reply-to
+ ///
+ IConsumerBuilder DirectReplyTo(bool directReplyTo);
IConsumerBuilder MessageHandler(MessageHandler handler);
diff --git a/RabbitMQ.AMQP.Client/IRpcClient.cs b/RabbitMQ.AMQP.Client/IRequester.cs
similarity index 74%
rename from RabbitMQ.AMQP.Client/IRpcClient.cs
rename to RabbitMQ.AMQP.Client/IRequester.cs
index ea88f37..2f1dc61 100644
--- a/RabbitMQ.AMQP.Client/IRpcClient.cs
+++ b/RabbitMQ.AMQP.Client/IRequester.cs
@@ -8,24 +8,23 @@
namespace RabbitMQ.AMQP.Client
{
-
- public interface IRpcClientAddressBuilder : IAddressBuilder
+ public interface IRequesterAddressBuilder : IAddressBuilder
{
- IRpcClientBuilder RpcClient();
+ IRequesterBuilder Requester();
}
///
/// IRpcClientBuilder is the interface for creating an RPC client.
- /// See also and
+ /// See also and
///
- public interface IRpcClientBuilder
+ public interface IRequesterBuilder
{
///
/// Request address where the client sends requests.
/// The server consumes requests from this address.
///
///
- IRpcClientAddressBuilder RequestAddress();
+ IRequesterAddressBuilder RequestAddress();
///
/// The queue from which requests are consumed.
@@ -33,9 +32,9 @@ public interface IRpcClientBuilder
///
/// The queue name
///
- IRpcClientBuilder ReplyToQueue(string replyToQueueName);
+ IRequesterBuilder ReplyToQueue(string replyToQueueName);
- IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);
+ IRequesterBuilder ReplyToQueue(IQueueSpecification replyToQueue);
///
/// Extracts the correlation id from the request message.
@@ -45,7 +44,7 @@ public interface IRpcClientBuilder
///
///
///
- IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor);
+ IRequesterBuilder CorrelationIdExtractor(Func? correlationIdExtractor);
///
/// Post processes the reply message before sending it to the server.
@@ -56,7 +55,7 @@ public interface IRpcClientBuilder
///
///
///
- IRpcClientBuilder RequestPostProcessor(Func? requestPostProcessor);
+ IRequesterBuilder RequestPostProcessor(Func? requestPostProcessor);
///
/// Client and Server must agree on the correlation id.
@@ -66,27 +65,27 @@ public interface IRpcClientBuilder
///
///
///
-
- IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier);
+ IRequesterBuilder CorrelationIdSupplier(Func? correlationIdSupplier);
///
/// The time to wait for a reply from the server.
///
///
///
- IRpcClientBuilder Timeout(TimeSpan timeout);
+ IRequesterBuilder Timeout(TimeSpan timeout);
+
///
/// Build and return the RPC client.
///
///
- Task BuildAsync();
+ Task BuildAsync();
}
///
/// IRpcClient is the interface for an RPC client.
- /// See also and
+ /// See also and
///
- public interface IRpcClient : ILifeCycle
+ public interface IRequester : ILifeCycle
{
///
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
@@ -98,5 +97,14 @@ public interface IRpcClient : ILifeCycle
/// Cancellation token
///
Task PublishAsync(IMessage message, CancellationToken cancellationToken = default);
+
+ ///
+ /// The ReplyTo queue address can be created by:
+ /// - the client providing a specific queue name
+ /// - the client creating a temporary queue
+ /// - The server uses this address to send the reply message. with direct-reply-to
+ ///
+ ///
+ public string GetReplyToQueue();
}
}
diff --git a/RabbitMQ.AMQP.Client/IRpcServer.cs b/RabbitMQ.AMQP.Client/IResponder.cs
similarity index 80%
rename from RabbitMQ.AMQP.Client/IRpcServer.cs
rename to RabbitMQ.AMQP.Client/IResponder.cs
index a6bfc9c..9b76920 100644
--- a/RabbitMQ.AMQP.Client/IRpcServer.cs
+++ b/RabbitMQ.AMQP.Client/IResponder.cs
@@ -7,13 +7,12 @@
namespace RabbitMQ.AMQP.Client
{
-
///
/// IRpcServerBuilder is the interface for creating an RPC server.
/// The RPC server consumes requests from a queue and sends replies to a reply queue.
- /// See also and
+ /// See also and
///
- public interface IRpcServerBuilder
+ public interface IResponderBuilder
{
///
/// The queue from which requests are consumed.
@@ -21,8 +20,9 @@ public interface IRpcServerBuilder
///
///
///
- IRpcServerBuilder RequestQueue(string requestQueue);
- IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue);
+ IResponderBuilder RequestQueue(string requestQueue);
+
+ IResponderBuilder RequestQueue(IQueueSpecification requestQueue);
///
/// Extracts the correlation id from the request message.
@@ -32,8 +32,7 @@ public interface IRpcServerBuilder
///
///
///
-
- IRpcServerBuilder CorrelationIdExtractor(Func? correlationIdExtractor);
+ IResponderBuilder CorrelationIdExtractor(Func? correlationIdExtractor);
///
/// Post processes the reply message before sending it to the client.
@@ -44,38 +43,37 @@ public interface IRpcServerBuilder
///
///
///
- IRpcServerBuilder ReplyPostProcessor(Func? replyPostProcessor);
+ IResponderBuilder ReplyPostProcessor(Func? replyPostProcessor);
///
/// Handle the request message and return the reply message.
///
///
///
- IRpcServerBuilder Handler(RpcHandler handler);
+ IResponderBuilder Handler(RpcHandler handler);
///
/// Build and return the RPC server.
///
///
- Task BuildAsync();
+ Task BuildAsync();
}
///
/// Event handler for handling RPC requests.
///
// TODO cancellation token
- public delegate Task RpcHandler(IRpcServer.IContext context, IMessage request);
+ public delegate Task RpcHandler(IResponder.IContext context, IMessage request);
///
- /// IRpcServer interface for creating an RPC server.
+ /// IResponder interface for creating an RPC server.
/// The RPC is simulated by sending a request message and receiving a reply message.
/// Where the client sends the queue where wants to receive the reply.
/// RPC client ---> request queue ---> RPC server ---> reply queue ---> RPC client
- /// See also
+ /// See also
///
- public interface IRpcServer : ILifeCycle
+ public interface IResponder : ILifeCycle
{
-
public interface IContext
{
IMessage Message(byte[] body);
diff --git a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs
index 21b7420..5b68bdd 100644
--- a/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs
@@ -21,12 +21,7 @@ public T Exchange(IExchangeSpecification exchangeSpec)
public T Exchange(string? exchangeName)
{
_exchange = exchangeName;
- if (_owner == null)
- {
- throw new InvalidOperationException("Owner is null");
- }
-
- return _owner;
+ return _owner ?? throw new InvalidOperationException("Owner is null");
}
public T Queue(IQueueSpecification queueSpec) => Queue(queueSpec.QueueName);
@@ -34,23 +29,13 @@ public T Exchange(string? exchangeName)
public T Queue(string? queueName)
{
_queue = queueName;
- if (_owner == null)
- {
- throw new InvalidOperationException("Owner is null");
- }
-
- return _owner;
+ return _owner ?? throw new InvalidOperationException("Owner is null");
}
public T Key(string? key)
{
_key = key;
- if (_owner == null)
- {
- throw new InvalidOperationException("Owner is null");
- }
-
- return _owner;
+ return _owner ?? throw new InvalidOperationException("Owner is null");
}
public string Address()
@@ -85,12 +70,21 @@ public string Address()
return "";
}
- if (string.IsNullOrEmpty(_queue))
+ return string.IsNullOrEmpty(_queue)
+ ? throw new InvalidAddressException("Queue must be set")
+ : $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
+ }
+
+ public string DecodeQueuePathSegment(string path)
+ {
+ string? v = Utils.DecodePathSegment(path);
+ if (v == null)
{
- throw new InvalidAddressException("Queue must be set");
+ throw new InvalidAddressException("Invalid path segment");
}
-
- return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
+ string prefix = $"/{Consts.Queues}/";
+ // Only remove the prefix if present; otherwise, return as-is
+ return v.StartsWith(prefix) ? v.Substring(prefix.Length) : v;
}
}
@@ -124,16 +118,17 @@ public IMessage Build()
}
}
- public class RpcClientAddressBuilder : DefaultAddressBuilder, IRpcClientAddressBuilder
+ public class RequesterAddressBuilder : DefaultAddressBuilder, IRequesterAddressBuilder
{
- readonly AmqpRpcClientBuilder _builder;
- public RpcClientAddressBuilder(AmqpRpcClientBuilder builder)
+ readonly AmqpRequesterBuilder _builder;
+
+ public RequesterAddressBuilder(AmqpRequesterBuilder builder)
{
_builder = builder;
_owner = this;
}
- public IRpcClientBuilder RpcClient()
+ public IRequesterBuilder Requester()
{
return _builder;
}
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
index 0222020..0f1c38b 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConnection.cs
@@ -39,7 +39,7 @@ public class AmqpConnection : AbstractLifeCycle, IConnection
private readonly IMetricsReporter? _metricsReporter;
private readonly Dictionary _connectionProperties = new();
- internal readonly FeatureFlags _featureFlags = new FeatureFlags();
+ internal readonly FeatureFlags _featureFlags = new();
///
/// _publishersDict contains all the publishers created by the connection.
@@ -124,21 +124,21 @@ public IConsumerBuilder ConsumerBuilder()
}
///
- /// Create an instance for this connection.
+ /// Create an instance for this connection.
///
- /// instance for this connection.
- public IRpcServerBuilder RpcServerBuilder()
+ /// instance for this connection.
+ public IResponderBuilder ResponderBuilder()
{
- return new AmqpRpcServerBuilder(this);
+ return new AmqpResponderBuilder(this);
}
///
- /// Create an instance for this connection.
+ /// Create an instance for this connection.
///
- /// instance for this connection.
- public IRpcClientBuilder RpcClientBuilder()
+ /// instance for this connection.
+ public IRequesterBuilder RequesterBuilder()
{
- return new AmqpRpcClientBuilder(this);
+ return new AmqpRequesterBuilder(this);
}
///
@@ -671,6 +671,8 @@ private void HandleProperties(Fields properties)
// this is a feature that was introduced in RabbitMQ 4.2.0
_featureFlags.IsSqlFeatureEnabled = Utils.Is4_2_OrMore(brokerVersion);
+ _featureFlags.IsDirectReplyToSupported = Utils.Is4_2_OrMore(brokerVersion);
+
_featureFlags.IsFilterFeatureEnabled = Utils.SupportsFilterExpressions(brokerVersion);
}
}
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
index 39ebcaa..0762cc7 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumer.cs
@@ -14,6 +14,14 @@
namespace RabbitMQ.AMQP.Client.Impl
{
+ static class ConsumerDefaults
+ {
+ public const int AttachTimeoutSeconds = 5;
+ public const int MessageReceiveTimeoutSeconds = 60;
+ public const int CloseTimeoutSeconds = 5;
+ public const int AttachDelayMilliseconds = 10;
+ }
+
///
/// Implementation of .
///
@@ -30,13 +38,15 @@ private enum PauseStatus
private readonly Guid _id = Guid.NewGuid();
private ReceiverLink? _receiverLink;
+ private Attach? _attach;
private PauseStatus _pauseStatus = PauseStatus.UNPAUSED;
private readonly UnsettledMessageCounter _unsettledMessageCounter = new();
private readonly ConsumerConfiguration _configuration;
private readonly IMetricsReporter? _metricsReporter;
- internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration, IMetricsReporter? metricsReporter)
+ internal AmqpConsumer(AmqpConnection amqpConnection, ConsumerConfiguration configuration,
+ IMetricsReporter? metricsReporter)
{
_amqpConnection = amqpConnection;
_configuration = configuration;
@@ -48,8 +58,8 @@ public override async Task OpenAsync()
{
try
{
- TaskCompletionSource attachCompletedTcs =
- Utils.CreateTaskCompletionSource();
+ TaskCompletionSource<(ReceiverLink, Attach)> attachCompletedTcs =
+ Utils.CreateTaskCompletionSource<(ReceiverLink, Attach)>();
// this is an event to get the filters to the listener context
// it _must_ be here because in case of reconnect the original filters could be not valid anymore
@@ -63,14 +73,24 @@ public override async Task OpenAsync()
_configuration.ListenerContext(listenerContext);
}
- Attach attach = Utils.CreateAttach(_configuration.Address, DeliveryMode.AtLeastOnce, _id,
- _configuration.Filters);
+ Attach attach;
+
+ if (_configuration.DirectReplyTo)
+ {
+ attach = Utils.CreateDirectReplyToAttach(_id, _configuration.Filters);
+ }
+ else
+ {
+ string address = AddressBuilderHelper.AddressBuilder().Queue(_configuration.Queue).Address();
+ attach = Utils.CreateAttach(address, DeliveryMode.AtLeastOnce, _id,
+ _configuration.Filters);
+ }
void OnAttached(ILink argLink, Attach argAttach)
{
if (argLink is ReceiverLink link)
{
- attachCompletedTcs.SetResult(link);
+ attachCompletedTcs.SetResult((link, argAttach));
}
else
{
@@ -87,42 +107,25 @@ void OnAttached(ILink argLink, Attach argAttach)
var tmpReceiverLink = new ReceiverLink(session, _id.ToString(), attach, OnAttached);
// TODO configurable timeout
- var waitSpan = TimeSpan.FromSeconds(5);
+ var waitSpan = TimeSpan.FromSeconds(ConsumerDefaults.AttachTimeoutSeconds);
// TODO
// Even 10ms is enough to allow the links to establish,
// which tells me it allows the .NET runtime to process
- await Task.Delay(10).ConfigureAwait(false);
+ await Task.Delay(ConsumerDefaults.AttachDelayMilliseconds).ConfigureAwait(false);
- _receiverLink = await attachCompletedTcs.Task.WaitAsync(waitSpan)
+ (_receiverLink, _attach) = await attachCompletedTcs.Task.WaitAsync(waitSpan)
.ConfigureAwait(false);
+ ValidateReceiverLink();
- if (false == Object.ReferenceEquals(_receiverLink, tmpReceiverLink))
- {
- // TODO log this case?
- }
-
- if (_receiverLink is null)
- {
- throw new ConsumerException($"{ToString()} Failed to create receiver link (null was returned)");
- }
- else if (_receiverLink.LinkState != LinkState.Attached)
- {
- throw new ConsumerException(
- $"{ToString()} Failed to create receiver link. Link state is not attached, error: " +
- _receiverLink.Error?.ToString() ?? "Unknown error");
- }
- else
- {
- _receiverLink.SetCredit(_configuration.InitialCredits);
+ _receiverLink.SetCredit(_configuration.InitialCredits);
- // TODO save / cancel task
- _ = Task.Run(ProcessMessages);
+ // TODO save / cancel task
+ _ = Task.Run(ProcessMessages);
- // TODO cancellation token
- await base.OpenAsync()
- .ConfigureAwait(false);
- }
+ // TODO cancellation token
+ await base.OpenAsync()
+ .ConfigureAwait(false);
}
catch (Exception e)
{
@@ -130,16 +133,24 @@ await base.OpenAsync()
}
}
+ private void ValidateReceiverLink()
+ {
+ if (_receiverLink is null)
+ {
+ throw new ConsumerException($"{ToString()} Receiver link creation failed (null was returned)");
+ }
+
+ if (_receiverLink.LinkState != LinkState.Attached)
+ {
+ string errorMessage = _receiverLink.Error?.ToString() ?? "Unknown error";
+ throw new ConsumerException($"{ToString()} Receiver link not attached. Error: {errorMessage}");
+ }
+ }
+
private async Task ProcessMessages()
{
try
{
- if (_receiverLink is null)
- {
- // TODO is this a serious error?
- return;
- }
-
Stopwatch? stopwatch = null;
if (_metricsReporter is not null)
{
@@ -151,7 +162,7 @@ private async Task ProcessMessages()
stopwatch?.Restart();
// TODO the timeout waiting for messages should be configurable
- TimeSpan timeout = TimeSpan.FromSeconds(60);
+ TimeSpan timeout = TimeSpan.FromSeconds(ConsumerDefaults.MessageReceiveTimeoutSeconds);
Message? nativeMessage = await _receiverLink.ReceiveAsync(timeout)
.ConfigureAwait(false);
@@ -184,7 +195,6 @@ await _configuration.Handler(context, amqpMessage)
stopwatch.Stop();
_metricsReporter.Consumed(stopwatch.Elapsed);
}
-
}
}
catch (Exception e)
@@ -241,6 +251,15 @@ ref Unsafe.As(ref _pauseStatus),
///
public long UnsettledMessageCount => _unsettledMessageCounter.Get();
+ public string Queue
+ {
+ get
+ {
+ string? sourceAddress = _attach?.Source is not Source source ? null : source.Address;
+ return sourceAddress is null ? "" : AddressBuilderHelper.AddressBuilder().DecodeQueuePathSegment(sourceAddress);
+ }
+ }
+
///
/// Request to receive messages again.
///
@@ -278,7 +297,7 @@ public override async Task CloseAsync()
try
{
// TODO global timeout for closing, other async actions?
- await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
+ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(ConsumerDefaults.CloseTimeoutSeconds))
.ConfigureAwait(false);
}
catch (Exception ex)
@@ -294,7 +313,7 @@ await _receiverLink.CloseAsync(TimeSpan.FromSeconds(5))
public override string ToString()
{
- return $"Consumer{{Address='{_configuration.Address}', " +
+ return $"Consumer{{Address='{Queue}', " +
$"id={_id}, " +
$"Connection='{_amqpConnection}', " +
$"State='{State}'}}";
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
index 1be732e..92631cb 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpConsumerBuilder.cs
@@ -16,7 +16,8 @@ namespace RabbitMQ.AMQP.Client.Impl
///
internal sealed class ConsumerConfiguration
{
- public string Address { get; set; } = "";
+
+ public string? Queue { get; set; } = null;
public int InitialCredits { get; set; } = 100; // TODO use constant, check with Java lib
public Map Filters { get; set; } = new();
@@ -24,6 +25,16 @@ internal sealed class ConsumerConfiguration
// TODO is a MessageHandler *really* optional???
public MessageHandler? Handler { get; set; }
+ ///
+ /// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
+ /// The server must also support direct reply-to.
+ /// This feature allows the server to send the reply directly to the client without going through a reply queue.
+ /// This can improve performance and reduce latency.
+ /// Default is false.
+ /// https://www.rabbitmq.com/docs/direct-reply-to
+ ///
+ public bool DirectReplyTo { get; set; }
+
// TODO re-name to ListenerContextAction? Callback?
public Action? ListenerContext = null;
}
@@ -49,10 +60,9 @@ public IConsumerBuilder Queue(IQueueSpecification queueSpec)
return Queue(queueSpec.QueueName);
}
- public IConsumerBuilder Queue(string queueName)
+ public IConsumerBuilder Queue(string? queueName)
{
- string address = AddressBuilderHelper.AddressBuilder().Queue(queueName).Address();
- _configuration.Address = address;
+ _configuration.Queue = queueName;
return this;
}
@@ -62,6 +72,12 @@ public IConsumerBuilder MessageHandler(MessageHandler handler)
return this;
}
+ public IConsumerBuilder DirectReplyTo(bool directReplyTo)
+ {
+ _configuration.DirectReplyTo = directReplyTo;
+ return this;
+ }
+
public IConsumerBuilder InitialCredits(int initialCredits)
{
_configuration.InitialCredits = initialCredits;
@@ -87,10 +103,10 @@ public async Task BuildAndStartAsync(CancellationToken cancellationTo
}
if (_configuration.Filters[Consts.s_sqlFilterSymbol] is not null &&
- _amqpConnection._featureFlags.IsSqlFeatureEnabled == false)
+ !_amqpConnection._featureFlags.IsSqlFeatureEnabled)
{
- throw new ConsumerException("SQL filter is not supported by the connection." +
- "RabbitMQ 4.2.0 or later is required.");
+ throw new NotSupportedException("SQL filter is not supported by the connection. " +
+ "RabbitMQ 4.2.0 or later is required.");
}
AmqpConsumer consumer = new(_amqpConnection, _configuration, _metricsReporter);
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
index e8c47f0..3664e5c 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpPublisher.cs
@@ -138,9 +138,9 @@ public Task PublishAsync(IMessage message, CancellationToken canc
void OutcomeCallback(ILink sender, Message inMessage, Outcome outcome, object state)
{
// Note: sometimes `inMessage` is null 🤔
- Debug.Assert(Object.ReferenceEquals(this, state));
+ Debug.Assert(ReferenceEquals(this, state));
- if (false == Object.ReferenceEquals(_senderLink, sender))
+ if (!ReferenceEquals(_senderLink, sender))
{
// TODO log this case?
}
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs b/RabbitMQ.AMQP.Client/Impl/AmqpRequester.cs
similarity index 66%
rename from RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs
rename to RabbitMQ.AMQP.Client/Impl/AmqpRequester.cs
index 99f101d..e420d38 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcClient.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpRequester.cs
@@ -9,11 +9,11 @@
namespace RabbitMQ.AMQP.Client.Impl
{
- public class RpcClientConfiguration
+ public class RequesterConfiguration
{
public AmqpConnection Connection { get; set; } = null!;
public string ReplyToQueue { get; set; } = "";
- public string RequestAddress { get; set; } = "";
+
public TimeSpan Timeout { get; set; } = TimeSpan.FromSeconds(10);
public Func? CorrelationIdSupplier { get; set; } = null;
@@ -21,84 +21,85 @@ public class RpcClientConfiguration
public Func? CorrelationIdExtractor { get; set; }
public Func? RequestPostProcessor { get; set; }
+ public string RequestAddress { get; set; } = "";
}
- public class AmqpRpcClientBuilder : IRpcClientBuilder
+ public class AmqpRequesterBuilder : IRequesterBuilder
{
- private readonly RpcClientAddressBuilder _addressBuilder;
+ private readonly RequesterAddressBuilder _addressBuilder;
private readonly AmqpConnection _connection;
- private readonly RpcClientConfiguration _configuration = new();
+ private readonly RequesterConfiguration _configuration = new();
- public AmqpRpcClientBuilder(AmqpConnection connection)
+ public AmqpRequesterBuilder(AmqpConnection connection)
{
_connection = connection;
- _addressBuilder = new RpcClientAddressBuilder(this);
+ _addressBuilder = new RequesterAddressBuilder(this);
}
- public IRpcClientAddressBuilder RequestAddress()
+ public IRequesterAddressBuilder RequestAddress()
{
return _addressBuilder;
}
- public IRpcClientBuilder ReplyToQueue(string replyToQueueName)
+ public IRequesterBuilder ReplyToQueue(string replyToQueueName)
{
_configuration.ReplyToQueue = replyToQueueName;
return this;
}
- public IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue)
+ public IRequesterBuilder ReplyToQueue(IQueueSpecification replyToQueue)
{
_configuration.ReplyToQueue = replyToQueue.QueueName;
return this;
}
- public IRpcClientBuilder CorrelationIdExtractor(Func? correlationIdExtractor)
+ public IRequesterBuilder CorrelationIdExtractor(Func? correlationIdExtractor)
{
_configuration.CorrelationIdExtractor = correlationIdExtractor;
return this;
}
- public IRpcClientBuilder RequestPostProcessor(Func? requestPostProcessor)
+ public IRequesterBuilder RequestPostProcessor(Func? requestPostProcessor)
{
_configuration.RequestPostProcessor = requestPostProcessor;
return this;
}
- public IRpcClientBuilder CorrelationIdSupplier(Func? correlationIdSupplier)
+ public IRequesterBuilder CorrelationIdSupplier(Func? correlationIdSupplier)
{
_configuration.CorrelationIdSupplier = correlationIdSupplier;
return this;
}
- public IRpcClientBuilder Timeout(TimeSpan timeout)
+ public IRequesterBuilder Timeout(TimeSpan timeout)
{
_configuration.Timeout = timeout;
return this;
}
- public async Task BuildAsync()
+ public async Task BuildAsync()
{
_configuration.RequestAddress = _addressBuilder.Address();
_configuration.Connection = _connection;
- var rpcClient = new AmqpRpcClient(_configuration);
+ var rpcClient = new AmqpRequester(_configuration);
await rpcClient.OpenAsync().ConfigureAwait(false);
return rpcClient;
}
}
///
- /// AmqpRpcClient is an implementation of .
+ /// AmqpRpcClient is an implementation of .
/// It is a wrapper around and to create an RPC client over AMQP 1.0.
/// even the PublishAsync is async the RPClient blocks the thread until the response is received.
/// within the timeout.
///
/// The PublishAsync is thread-safe and can be called from multiple threads.
///
- /// See also the server side .
+ /// See also the server side .
///
- public class AmqpRpcClient : AbstractLifeCycle, IRpcClient
+ public class AmqpRequester : AbstractLifeCycle, IRequester
{
- private readonly RpcClientConfiguration _configuration;
+ private readonly RequesterConfiguration _configuration;
private IConsumer? _consumer = null;
private IPublisher? _publisher = null;
private readonly ConcurrentDictionary> _pendingRequests = new();
@@ -134,28 +135,53 @@ private IMessage RequestPostProcess(IMessage request, object correlationId)
return _configuration.RequestPostProcessor(request, correlationId);
}
- return request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(_configuration.ReplyToQueue).Address())
+ string s = GetReplyToQueue();
+ return request.ReplyTo(AddressBuilderHelper.AddressBuilder().Queue(s).Address())
.MessageId(correlationId);
}
- public AmqpRpcClient(RpcClientConfiguration configuration)
+ public AmqpRequester(RequesterConfiguration configuration)
{
_configuration = configuration;
}
+ ///
+ /// OpenAsync initializes the Requester by creating the necessary publisher and consumer.
+ /// The DirectReplyTo feature is applied if supported by the server and no explicit reply-to queue is set.
+ /// The AmqpRequester is an opinionated wrapper to simulate RPC over AMQP 1.0.
+ /// Even when the DirectReplyTo is supported the wrapper can decide to don't use it when
+ /// the user has explicitly set a reply-to queue.
+ ///
+ ///
public override async Task OpenAsync()
{
- if (string.IsNullOrEmpty(_configuration.ReplyToQueue))
+ bool isDirectReplyToSupported = _configuration.Connection._featureFlags.IsDirectReplyToSupported;
+
+ string queueReplyTo = _configuration.ReplyToQueue;
+ // if the queue is not set it means we need to create a temporary queue as reply-to
+ // only if direct-reply-to is not supported. In case of isDirectReplyToSupported the server
+ // will create the server side temporary queue.
+ if (string.IsNullOrEmpty(_configuration.ReplyToQueue) && !isDirectReplyToSupported)
{
IQueueInfo queueInfo = await _configuration.Connection.Management().Queue().AutoDelete(true)
.Exclusive(true).DeclareAsync()
.ConfigureAwait(false);
- _configuration.ReplyToQueue = queueInfo.Name();
+ queueReplyTo = queueInfo.Name();
}
+ // we can apply DirectReplyTo only if the _configuration.ReplyToQueue is not set
+ // and the server support DirectReplyTo feature
+ // AmqpRequester is a wrapper to simulate RPC over AMQP 1.0
+ // canApplyDirectReplyTo is an opinionated optimization to avoid creating temporary queues
+ // unless _configuration.ReplyToQueue is explicitly set by the user.
+ // The user is always free to create custom Requester and Responder
+ bool canApplyDirectReplyTo = isDirectReplyToSupported &&
+ string.IsNullOrEmpty(_configuration.ReplyToQueue);
+
_publisher = await _configuration.Connection.PublisherBuilder().BuildAsync().ConfigureAwait(false);
_consumer = await _configuration.Connection.ConsumerBuilder()
- .Queue(_configuration.ReplyToQueue)
+ .Queue(queueReplyTo)
+ .DirectReplyTo(canApplyDirectReplyTo)
.MessageHandler((context, message) =>
{
// TODO MessageHandler funcs should catch all exceptions
@@ -165,6 +191,7 @@ public override async Task OpenAsync()
{
request.SetResult(message);
}
+
return Task.CompletedTask;
}).BuildAndStartAsync().ConfigureAwait(false);
@@ -203,7 +230,8 @@ public async Task PublishAsync(IMessage message, CancellationToken can
if (_publisher != null)
{
PublishResult pr = await _publisher.PublishAsync(
- message.To(_configuration.RequestAddress), cancellationToken).ConfigureAwait(false);
+ message.To(_configuration.RequestAddress),
+ cancellationToken).ConfigureAwait(false);
if (pr.Outcome.State != OutcomeState.Accepted)
{
@@ -222,5 +250,16 @@ await _pendingRequests[correlationId].Task.WaitAsync(_configuration.Timeout)
_semaphore.Release();
}
}
+
+ public string GetReplyToQueue()
+ {
+ if (_consumer == null)
+ {
+ throw new InvalidOperationException("Requester is not opened");
+ }
+
+ return _consumer.Queue ??
+ throw new InvalidOperationException("ReplyToQueueAddress is not available");
+ }
}
}
diff --git a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs b/RabbitMQ.AMQP.Client/Impl/AmqpResponder.cs
similarity index 84%
rename from RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs
rename to RabbitMQ.AMQP.Client/Impl/AmqpResponder.cs
index 896c934..7f4cec1 100644
--- a/RabbitMQ.AMQP.Client/Impl/AmqpRpcServer.cs
+++ b/RabbitMQ.AMQP.Client/Impl/AmqpResponder.cs
@@ -8,7 +8,7 @@
namespace RabbitMQ.AMQP.Client.Impl
{
- public class RpcConfiguration
+ public class ResponderConfiguration
{
public AmqpConnection Connection { get; set; } = null!;
public RpcHandler? Handler { get; set; }
@@ -20,60 +20,60 @@ public class RpcConfiguration
///
/// AmqpRpcServerBuilder is a builder for creating an AMQP RPC server.
///
- public class AmqpRpcServerBuilder : IRpcServerBuilder
+ public class AmqpResponderBuilder : IResponderBuilder
{
- readonly RpcConfiguration _configuration = new();
+ readonly ResponderConfiguration _configuration = new();
- public AmqpRpcServerBuilder(AmqpConnection connection)
+ public AmqpResponderBuilder(AmqpConnection connection)
{
_configuration.Connection = connection;
}
- public IRpcServerBuilder RequestQueue(string requestQueue)
+ public IResponderBuilder RequestQueue(string requestQueue)
{
_configuration.RequestQueue = requestQueue;
return this;
}
- public IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue)
+ public IResponderBuilder RequestQueue(IQueueSpecification requestQueue)
{
_configuration.RequestQueue = requestQueue.QueueName;
return this;
}
- public IRpcServerBuilder CorrelationIdExtractor(Func? correlationIdExtractor)
+ public IResponderBuilder CorrelationIdExtractor(Func? correlationIdExtractor)
{
_configuration.CorrelationIdExtractor = correlationIdExtractor;
return this;
}
- public IRpcServerBuilder ReplyPostProcessor(Func? replyPostProcessor)
+ public IResponderBuilder ReplyPostProcessor(Func? replyPostProcessor)
{
_configuration.ReplyPostProcessor = replyPostProcessor;
return this;
}
- public IRpcServerBuilder Handler(RpcHandler handler)
+ public IResponderBuilder Handler(RpcHandler handler)
{
_configuration.Handler = handler;
return this;
}
- public async Task BuildAsync()
+ public async Task BuildAsync()
{
- AmqpRpcServer amqpRpcServer = new(_configuration);
- await amqpRpcServer.OpenAsync().ConfigureAwait(false);
- return amqpRpcServer;
+ AmqpResponder amqpResponder = new(_configuration);
+ await amqpResponder.OpenAsync().ConfigureAwait(false);
+ return amqpResponder;
}
}
///
- /// AmqpRpcServer implements the interface.
+ /// AmqpRpcServer implements the interface.
/// With the RpcClient you can create an RPC communication over AMQP 1.0.
///
- public class AmqpRpcServer : AbstractLifeCycle, IRpcServer
+ public class AmqpResponder : AbstractLifeCycle, IResponder
{
- private readonly RpcConfiguration _configuration;
+ private readonly ResponderConfiguration _configuration;
private IPublisher? _publisher = null;
private IConsumer? _consumer = null;
@@ -107,7 +107,7 @@ private IMessage ReplyPostProcessor(IMessage reply, object correlationId)
: reply.CorrelationId(correlationId);
}
- public AmqpRpcServer(RpcConfiguration configuration)
+ public AmqpResponder(ResponderConfiguration configuration)
{
_configuration = configuration;
}
@@ -164,7 +164,7 @@ await Utils.WaitWithBackOffUntilFuncAsync(async () =>
await base.OpenAsync().ConfigureAwait(false);
}
- private class RpcServerContext : IRpcServer.IContext
+ private class RpcServerContext : IResponder.IContext
{
public IMessage Message(byte[] body) => new AmqpMessage(body);
public IMessage Message(string body) => new AmqpMessage(body);
diff --git a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
index 113932d..f438d43 100644
--- a/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
+++ b/RabbitMQ.AMQP.Client/PublicAPI.Unshipped.txt
@@ -31,10 +31,10 @@ override RabbitMQ.AMQP.Client.Impl.AmqpManagement.ToString() -> string!
override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.CloseAsync() -> System.Threading.Tasks.Task!
override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.OpenAsync() -> System.Threading.Tasks.Task!
override RabbitMQ.AMQP.Client.Impl.AmqpPublisher.ToString() -> string!
-override RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.CloseAsync() -> System.Threading.Tasks.Task!
-override RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.OpenAsync() -> System.Threading.Tasks.Task!
-override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.CloseAsync() -> System.Threading.Tasks.Task!
-override RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.OpenAsync() -> System.Threading.Tasks.Task!
+override RabbitMQ.AMQP.Client.Impl.AmqpRequester.CloseAsync() -> System.Threading.Tasks.Task!
+override RabbitMQ.AMQP.Client.Impl.AmqpRequester.OpenAsync() -> System.Threading.Tasks.Task!
+override RabbitMQ.AMQP.Client.Impl.AmqpResponder.CloseAsync() -> System.Threading.Tasks.Task!
+override RabbitMQ.AMQP.Client.Impl.AmqpResponder.OpenAsync() -> System.Threading.Tasks.Task!
override RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
override RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.Builder() -> RabbitMQ.AMQP.Client.IConsumerBuilder!
override RabbitMQ.AMQP.Client.RecoveryConfiguration.ToString() -> string!
@@ -116,6 +116,7 @@ RabbitMQ.AMQP.Client.ExchangeType.TOPIC = 2 -> RabbitMQ.AMQP.Client.ExchangeType
RabbitMQ.AMQP.Client.FeatureFlags
RabbitMQ.AMQP.Client.FeatureFlags.FeatureFlags() -> void
RabbitMQ.AMQP.Client.FeatureFlags.IsBrokerCompatible.get -> bool
+RabbitMQ.AMQP.Client.FeatureFlags.IsDirectReplyToSupported.get -> bool
RabbitMQ.AMQP.Client.FeatureFlags.IsFilterFeatureEnabled.get -> bool
RabbitMQ.AMQP.Client.FeatureFlags.IsSqlFeatureEnabled.get -> bool
RabbitMQ.AMQP.Client.FeatureFlags.Validate() -> void
@@ -166,14 +167,16 @@ RabbitMQ.AMQP.Client.IConnection.Properties.get -> System.Collections.Generic.IR
RabbitMQ.AMQP.Client.IConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder!
RabbitMQ.AMQP.Client.IConnection.Publishers.get -> System.Collections.Generic.IEnumerable!
RabbitMQ.AMQP.Client.IConnection.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.IConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
+RabbitMQ.AMQP.Client.IConnection.RequesterBuilder() -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IConnection.ResponderBuilder() -> RabbitMQ.AMQP.Client.IResponderBuilder!
RabbitMQ.AMQP.Client.IConsumer
RabbitMQ.AMQP.Client.IConsumer.Pause() -> void
+RabbitMQ.AMQP.Client.IConsumer.Queue.get -> string!
RabbitMQ.AMQP.Client.IConsumer.Unpause() -> void
RabbitMQ.AMQP.Client.IConsumer.UnsettledMessageCount.get -> long
RabbitMQ.AMQP.Client.IConsumerBuilder
RabbitMQ.AMQP.Client.IConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.IConsumerBuilder.DirectReplyTo(bool directReplyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions
RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -207,7 +210,7 @@ RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.ListenerContext(RabbitMQ.A
RabbitMQ.AMQP.Client.IConsumerBuilder.ListenerContext.StreamOptions.get -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpecification) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
-RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(string! queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
+RabbitMQ.AMQP.Client.IConsumerBuilder.Queue(string? queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.IConsumerBuilder.SubscriptionListener(System.Action! listenerContext) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.IContext
@@ -359,19 +362,21 @@ RabbitMQ.AMQP.Client.Impl.AmqpConnection.Properties.get -> System.Collections.Ge
RabbitMQ.AMQP.Client.Impl.AmqpConnection.PublisherBuilder() -> RabbitMQ.AMQP.Client.IPublisherBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConnection.Publishers.get -> System.Collections.Generic.IEnumerable!
RabbitMQ.AMQP.Client.Impl.AmqpConnection.RefreshTokenAsync(string! token) -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcClientBuilder() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpConnection.RpcServerBuilder() -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpConnection.RequesterBuilder() -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpConnection.ResponderBuilder() -> RabbitMQ.AMQP.Client.IResponderBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumer
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Pause() -> void
+RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Queue.get -> string!
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.Unpause() -> void
RabbitMQ.AMQP.Client.Impl.AmqpConsumer.UnsettledMessageCount.get -> long
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.AmqpConsumerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection, RabbitMQ.AMQP.Client.IMetricsReporter? metricsReporter) -> void
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.BuildAndStartAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.DirectReplyTo(bool directReplyTo) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.InitialCredits(int initialCredits) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.MessageHandler(RabbitMQ.AMQP.Client.MessageHandler! handler) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(RabbitMQ.AMQP.Client.IQueueSpecification! queueSpec) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(string! queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Queue(string? queueName) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.Stream() -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamOptions!
RabbitMQ.AMQP.Client.Impl.AmqpConsumerBuilder.SubscriptionListener(System.Action! context) -> RabbitMQ.AMQP.Client.IConsumerBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpEnvironment
@@ -491,29 +496,30 @@ RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.DeliveryLimit(int limit) -> Ra
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.Queue() -> RabbitMQ.AMQP.Client.IQueueSpecification!
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumInitialGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
RabbitMQ.AMQP.Client.Impl.AmqpQuorumSpecification.QuorumTargetGroupSize(int size) -> RabbitMQ.AMQP.Client.IQuorumQueueSpecification!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClient
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.AmqpRpcClient(RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration! configuration) -> void
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.AmqpRpcClientBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServer
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServer.AmqpRpcServer(RabbitMQ.AMQP.Client.Impl.RpcConfiguration! configuration) -> void
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.AmqpRpcServerBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.Impl.AmqpRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpRequester
+RabbitMQ.AMQP.Client.Impl.AmqpRequester.AmqpRequester(RabbitMQ.AMQP.Client.Impl.RequesterConfiguration! configuration) -> void
+RabbitMQ.AMQP.Client.Impl.AmqpRequester.GetReplyToQueue() -> string!
+RabbitMQ.AMQP.Client.Impl.AmqpRequester.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.AmqpRequesterBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.BuildAsync() -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRequesterAddressBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpResponder
+RabbitMQ.AMQP.Client.Impl.AmqpResponder.AmqpResponder(RabbitMQ.AMQP.Client.Impl.ResponderConfiguration! configuration) -> void
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder.AmqpResponderBuilder(RabbitMQ.AMQP.Client.Impl.AmqpConnection! connection) -> void
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder.BuildAsync() -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.Impl.AmqpResponderBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IResponderBuilder!
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.AmqpStreamSpecification(RabbitMQ.AMQP.Client.Impl.AmqpQueueSpecification! parent) -> void
RabbitMQ.AMQP.Client.Impl.AmqpStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification!
@@ -543,6 +549,7 @@ RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions
RabbitMQ.AMQP.Client.Impl.ConsumerBuilderStreamOptions.ConsumerBuilderStreamOptions(RabbitMQ.AMQP.Client.IConsumerBuilder! consumerBuilder, Amqp.Types.Map! filters) -> void
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Address() -> string!
+RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.DecodeQueuePathSegment(string! path) -> string!
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.DefaultAddressBuilder() -> void
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Exchange(RabbitMQ.AMQP.Client.IExchangeSpecification! exchangeSpec) -> T
RabbitMQ.AMQP.Client.Impl.DefaultAddressBuilder.Exchange(string? exchangeName) -> T
@@ -568,37 +575,37 @@ RabbitMQ.AMQP.Client.Impl.ListenerStreamOptions.ListenerStreamOptions(Amqp.Types
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.Build() -> RabbitMQ.AMQP.Client.IMessage!
RabbitMQ.AMQP.Client.Impl.MessageAddressBuilder.MessageAddressBuilder(RabbitMQ.AMQP.Client.IMessage! message) -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder
-RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.Impl.RpcClientAddressBuilder.RpcClientAddressBuilder(RabbitMQ.AMQP.Client.Impl.AmqpRpcClientBuilder! builder) -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection!
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Connection.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdExtractor.get -> System.Func?
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdExtractor.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdSupplier.get -> System.Func?
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.CorrelationIdSupplier.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.get -> string!
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.ReplyToQueue.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.get -> string!
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestAddress.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestPostProcessor.get -> System.Func?
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RequestPostProcessor.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.RpcClientConfiguration() -> void
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.get -> System.TimeSpan
-RabbitMQ.AMQP.Client.Impl.RpcClientConfiguration.Timeout.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection!
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Connection.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.get -> System.Func?
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.CorrelationIdExtractor.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.get -> RabbitMQ.AMQP.Client.RpcHandler?
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.Handler.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.ReplyPostProcessor.get -> System.Func?
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.ReplyPostProcessor.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.get -> string!
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RequestQueue.set -> void
-RabbitMQ.AMQP.Client.Impl.RpcConfiguration.RpcConfiguration() -> void
+RabbitMQ.AMQP.Client.Impl.RequesterAddressBuilder
+RabbitMQ.AMQP.Client.Impl.RequesterAddressBuilder.Requester() -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.Impl.RequesterAddressBuilder.RequesterAddressBuilder(RabbitMQ.AMQP.Client.Impl.AmqpRequesterBuilder! builder) -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection!
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.Connection.set -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.CorrelationIdExtractor.get -> System.Func?
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.CorrelationIdExtractor.set -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.CorrelationIdSupplier.get -> System.Func?
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.CorrelationIdSupplier.set -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.ReplyToQueue.get -> string!
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.ReplyToQueue.set -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestAddress.get -> string!
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestAddress.set -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequesterConfiguration() -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestPostProcessor.get -> System.Func?
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.RequestPostProcessor.set -> void
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.Timeout.get -> System.TimeSpan
+RabbitMQ.AMQP.Client.Impl.RequesterConfiguration.Timeout.set -> void
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.Connection.get -> RabbitMQ.AMQP.Client.Impl.AmqpConnection!
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.Connection.set -> void
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.CorrelationIdExtractor.get -> System.Func?
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.CorrelationIdExtractor.set -> void
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.Handler.get -> RabbitMQ.AMQP.Client.RpcHandler?
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.Handler.set -> void
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.ReplyPostProcessor.get -> System.Func?
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.ReplyPostProcessor.set -> void
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.RequestQueue.get -> string!
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.RequestQueue.set -> void
+RabbitMQ.AMQP.Client.Impl.ResponderConfiguration.ResponderConfiguration() -> void
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.AbsoluteExpiryTime(System.DateTime absoluteExpiryTime) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
RabbitMQ.AMQP.Client.Impl.StreamFilterOptions.ContentEncoding(string! contentEncoding) -> RabbitMQ.AMQP.Client.IConsumerBuilder.IStreamFilterOptions!
@@ -687,30 +694,31 @@ RabbitMQ.AMQP.Client.IRecoveryConfiguration.GetBackOffDelayPolicy() -> RabbitMQ.
RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsActivated() -> bool
RabbitMQ.AMQP.Client.IRecoveryConfiguration.IsTopologyActive() -> bool
RabbitMQ.AMQP.Client.IRecoveryConfiguration.Topology(bool activated) -> RabbitMQ.AMQP.Client.IRecoveryConfiguration!
-RabbitMQ.AMQP.Client.IRpcClient
-RabbitMQ.AMQP.Client.IRpcClient.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.IRpcClientAddressBuilder
-RabbitMQ.AMQP.Client.IRpcClientAddressBuilder.RpcClient() -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IRpcClientBuilder
-RabbitMQ.AMQP.Client.IRpcClientBuilder.BuildAsync() -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IRpcClientBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IRpcClientBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRpcClientAddressBuilder!
-RabbitMQ.AMQP.Client.IRpcClientBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IRpcClientBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRpcClientBuilder!
-RabbitMQ.AMQP.Client.IRpcServer
-RabbitMQ.AMQP.Client.IRpcServer.IContext
-RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(byte[]! body) -> RabbitMQ.AMQP.Client.IMessage!
-RabbitMQ.AMQP.Client.IRpcServer.IContext.Message(string! body) -> RabbitMQ.AMQP.Client.IMessage!
-RabbitMQ.AMQP.Client.IRpcServerBuilder
-RabbitMQ.AMQP.Client.IRpcServerBuilder.BuildAsync() -> System.Threading.Tasks.Task!
-RabbitMQ.AMQP.Client.IRpcServerBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.IRpcServerBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.IRpcServerBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
-RabbitMQ.AMQP.Client.IRpcServerBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IRpcServerBuilder!
+RabbitMQ.AMQP.Client.IRequester
+RabbitMQ.AMQP.Client.IRequester.GetReplyToQueue() -> string!
+RabbitMQ.AMQP.Client.IRequester.PublishAsync(RabbitMQ.AMQP.Client.IMessage! message, System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.IRequesterAddressBuilder
+RabbitMQ.AMQP.Client.IRequesterAddressBuilder.Requester() -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IRequesterBuilder
+RabbitMQ.AMQP.Client.IRequesterBuilder.BuildAsync() -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.IRequesterBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IRequesterBuilder.CorrelationIdSupplier(System.Func? correlationIdSupplier) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IRequesterBuilder.ReplyToQueue(RabbitMQ.AMQP.Client.IQueueSpecification! replyToQueue) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IRequesterBuilder.ReplyToQueue(string! replyToQueueName) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IRequesterBuilder.RequestAddress() -> RabbitMQ.AMQP.Client.IRequesterAddressBuilder!
+RabbitMQ.AMQP.Client.IRequesterBuilder.RequestPostProcessor(System.Func? requestPostProcessor) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IRequesterBuilder.Timeout(System.TimeSpan timeout) -> RabbitMQ.AMQP.Client.IRequesterBuilder!
+RabbitMQ.AMQP.Client.IResponder
+RabbitMQ.AMQP.Client.IResponder.IContext
+RabbitMQ.AMQP.Client.IResponder.IContext.Message(byte[]! body) -> RabbitMQ.AMQP.Client.IMessage!
+RabbitMQ.AMQP.Client.IResponder.IContext.Message(string! body) -> RabbitMQ.AMQP.Client.IMessage!
+RabbitMQ.AMQP.Client.IResponderBuilder
+RabbitMQ.AMQP.Client.IResponderBuilder.BuildAsync() -> System.Threading.Tasks.Task!
+RabbitMQ.AMQP.Client.IResponderBuilder.CorrelationIdExtractor(System.Func? correlationIdExtractor) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.IResponderBuilder.Handler(RabbitMQ.AMQP.Client.RpcHandler! handler) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.IResponderBuilder.ReplyPostProcessor(System.Func? replyPostProcessor) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.IResponderBuilder.RequestQueue(RabbitMQ.AMQP.Client.IQueueSpecification! requestQueue) -> RabbitMQ.AMQP.Client.IResponderBuilder!
+RabbitMQ.AMQP.Client.IResponderBuilder.RequestQueue(string! requestQueue) -> RabbitMQ.AMQP.Client.IResponderBuilder!
RabbitMQ.AMQP.Client.IStreamSpecification
RabbitMQ.AMQP.Client.IStreamSpecification.FileSizePerChunk(RabbitMQ.AMQP.Client.ByteCapacity! fileSizePerChunk) -> RabbitMQ.AMQP.Client.IStreamSpecification!
RabbitMQ.AMQP.Client.IStreamSpecification.InitialClusterSize(int initialClusterSize) -> RabbitMQ.AMQP.Client.IStreamSpecification!
diff --git a/RabbitMQ.AMQP.Client/Utils.cs b/RabbitMQ.AMQP.Client/Utils.cs
index 2eb5d1e..dc4127b 100644
--- a/RabbitMQ.AMQP.Client/Utils.cs
+++ b/RabbitMQ.AMQP.Client/Utils.cs
@@ -115,6 +115,47 @@ internal static void ValidatePositive(string label, long value)
}
}
+ internal static Attach CreateDirectReplyToAttach(Guid linkId, Map? sourceFilter = null)
+ {
+ // Attach{name='receiver-ID:f99ef5a7-7b91-48cb-a52e-d86e6f14f33b:3:2:1:1',
+ // handle=null, role=RECEIVER, sndSettleMode=SETTLED, rcvSettleMode=FIRST,
+ // source=Source{address='null', durable=NONE, expiryPolicy=LINK_DETACH, timeout=0,
+ // dynamic=true, dynamicNodeProperties=null, distributionMode=null, filter=null,
+ // defaultOutcome=Modified{deliveryFailed=true, undeliverableHere=false, messageAnnotations=null},
+ // outcomes=[amqp:accepted:list, amqp:rejected:list, amqp:released:list, amqp:modified:list],
+ // capabilities=[rabbitmq:volatile-queue]}, target=Target{address='null', durable=NONE,
+ // expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null},
+ // unsettled=null, incompleteUnsettled=null, initialDeliveryCount=null, maxMessageSize=null, offeredCapabilities=null,
+ // desiredCapabilities=null, properties=null}
+
+ var a = new Attach()
+ {
+ SndSettleMode = SenderSettleMode.Settled,
+ // LinkName = $"receiver-ID:{linkId.ToString()}",
+ RcvSettleMode = ReceiverSettleMode.First,
+ Source = new Source()
+ {
+ Capabilities = new Symbol[] { new("rabbitmq:volatile-queue") },
+ ExpiryPolicy = new Symbol("link-detach"),
+ Dynamic = true,
+ Timeout = 0,
+ FilterSet = sourceFilter,
+ DefaultOutcome = new Modified()
+ {
+ DeliveryFailed = true,
+ UndeliverableHere = false,
+ }
+ },
+ // Target = new Target()
+ // {
+ // ExpiryPolicy = new Symbol("SESSION_END"),
+ // Dynamic = false,
+ // Durable = 0,
+ // },
+ };
+ return a;
+ }
+
internal static Attach CreateAttach(string? address,
DeliveryMode deliveryMode, Guid linkId, Map? sourceFilter = null)
{
@@ -151,6 +192,11 @@ internal static Attach CreateAttach(string? address,
return PercentCodec.EncodePathSegment(url);
}
+ internal static string? DecodePathSegment(string url)
+ {
+ return PercentCodec.DecodePathSegment(url);
+ }
+
internal static string EncodeHttpParameter(string url)
{
return HttpUtility.UrlEncode(url);
@@ -334,6 +380,62 @@ static PercentCodec()
s_unreserved['~'] = true;
}
+ private static int HexValue(char ch)
+ {
+ return ch switch
+ {
+ >= '0' and <= '9' => ch - '0',
+ >= 'A' and <= 'F' => ch - 'A' + 10,
+ >= 'a' and <= 'f' => ch - 'a' + 10,
+ _ => -1
+ };
+ }
+
+ internal static string? DecodePathSegment(string? segment)
+ {
+ if (segment == null)
+ {
+ return null;
+ }
+
+ var bytes = new List(segment.Length);
+ for (int i = 0; i < segment.Length; i++)
+ {
+ char c = segment[i];
+ if (c == '%')
+ {
+ if (i + 2 >= segment.Length)
+ {
+ throw new FormatException("Invalid percent-encoding: incomplete escape sequence.");
+ }
+
+ int hi = HexValue(segment[i + 1]);
+ int lo = HexValue(segment[i + 2]);
+ if (hi < 0 || lo < 0)
+ {
+ throw new FormatException($"Invalid percent-encoding: '{segment.Substring(i, 3)}'.");
+ }
+
+ bytes.Add((byte)((hi << 4) | lo));
+ i += 2;
+ }
+ else
+ {
+ // Append UTF-8 encoding of the character (handles non-ASCII chars)
+ if (c <= 0x7F)
+ {
+ bytes.Add((byte)c);
+ }
+ else
+ {
+ bytes.AddRange(Encoding.UTF8.GetBytes(new[] { c }));
+ }
+ }
+ }
+
+ return Encoding.UTF8.GetString(bytes.ToArray());
+ }
+
internal static string? EncodePathSegment(string? segment)
{
if (segment == null)
diff --git a/Tests/AddressBuilderTests.cs b/Tests/AddressBuilderTests.cs
new file mode 100644
index 0000000..98804c4
--- /dev/null
+++ b/Tests/AddressBuilderTests.cs
@@ -0,0 +1,30 @@
+// This source code is dual-licensed under the Apache License, version 2.0,
+// and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using RabbitMQ.AMQP.Client.Impl;
+using Xunit;
+
+namespace Tests
+{
+ public class AddressBuilderTests
+ {
+ [Theory]
+ [InlineData("myQueue", "/queues/myQueue")]
+ [InlineData("queue/with/slash", "/queues/queue%2Fwith%2Fslash")]
+ [InlineData("queue with spaces", "/queues/queue%20with%20spaces")]
+ [InlineData("queue+with+plus", "/queues/queue%2Bwith%2Bplus")]
+ [InlineData("queue?with?question", "/queues/queue%3Fwith%3Fquestion")]
+ [InlineData("特殊å—符", "/queues/%E7%89%B9%E6%AE%8A%E5%AD%97%E7%AC%A6")]
+ [InlineData("emoji😊queue", "/queues/emoji%F0%9F%98%8Aqueue")]
+ [InlineData("!@#$%^&*()", "/queues/%21%40%23%24%25%5E%26%2A%28%29")]
+ public void AddressBuilder_EncodeAndDecode(string queue, string queuePath)
+ {
+ AddressBuilder addressBuilder = new();
+ string fullAddress = addressBuilder.Queue(queue).Address();
+ Assert.Equal(queuePath, fullAddress);
+ string decodedQueue = addressBuilder.DecodeQueuePathSegment(fullAddress);
+ Assert.Equal(queue, decodedQueue);
+ }
+ }
+}
diff --git a/Tests/Consumer/BasicConsumerTests.cs b/Tests/Consumer/BasicConsumerTests.cs
index 968f698..1926dde 100644
--- a/Tests/Consumer/BasicConsumerTests.cs
+++ b/Tests/Consumer/BasicConsumerTests.cs
@@ -371,17 +371,13 @@ public async Task ConsumerShouldThrowWhenQueueDoesNotExist()
IConsumerBuilder consumerBuilder = _connection.ConsumerBuilder()
.Queue(doesNotExist)
- .MessageHandler((context, message) =>
- {
- return Task.CompletedTask;
- }
- );
+ .MessageHandler((context, message) => Task.CompletedTask);
// TODO these are timeout exceptions under the hood, compare
// with the Java client
ConsumerException ex = await Assert.ThrowsAsync(
() => consumerBuilder.BuildAndStartAsync());
- Assert.Contains(doesNotExist, ex.Message);
+ Assert.Contains("amqp:not-found", ex.Message);
}
[Fact]
diff --git a/Tests/DirectReply/DirectReplyTests.cs b/Tests/DirectReply/DirectReplyTests.cs
new file mode 100644
index 0000000..cb3c83a
--- /dev/null
+++ b/Tests/DirectReply/DirectReplyTests.cs
@@ -0,0 +1,55 @@
+// This source code is dual-licensed under the Apache License, version 2.0,
+// and the Mozilla Public License, version 2.0.
+// Copyright (c) 2017-2024 Broadcom. All Rights Reserved. The term "Broadcom" refers to Broadcom Inc. and/or its subsidiaries.
+
+using System.Threading.Tasks;
+using RabbitMQ.AMQP.Client;
+using RabbitMQ.AMQP.Client.Impl;
+using Xunit;
+using Xunit.Abstractions;
+
+namespace Tests.DirectReply
+{
+ public class DirectReplyTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
+ {
+ [SkippableFact]
+ public async Task ValidateDirectReplyQName()
+ {
+ Assert.NotNull(_connection);
+ Assert.NotNull(_management);
+
+ var amqpConnection = (_connection as AmqpConnection);
+ Skip.IfNot(amqpConnection is { _featureFlags.IsDirectReplyToSupported: true },
+ "DirectReply is not supported by the connection.");
+
+ IConsumer consumer = await _connection.ConsumerBuilder()
+ .DirectReplyTo(true)
+ .MessageHandler((IContext _, IMessage _) => Task.CompletedTask)
+ .BuildAndStartAsync();
+
+ Assert.Contains("amq.rabbitmq.reply-to", consumer.Queue);
+ }
+
+ [Fact]
+ public async Task UseDirectReplyToReceiveAMessage()
+ {
+ Assert.NotNull(_connection);
+ Assert.NotNull(_management);
+ TaskCompletionSource tcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
+
+ var amqpConnection = (_connection as AmqpConnection);
+ Skip.IfNot(amqpConnection is { _featureFlags.IsDirectReplyToSupported: true },
+ "DirectReply is not supported by the connection.");
+
+ IConsumer consumer = await _connection.ConsumerBuilder()
+ .DirectReplyTo(true)
+ .MessageHandler((IContext _, IMessage msg) =>
+ {
+ tcs.SetResult(msg);
+ return Task.CompletedTask;
+ }).BuildAndStartAsync();
+
+ Assert.Contains("amq.rabbitmq.reply-to", consumer.Queue);
+ }
+ }
+}
diff --git a/Tests/Rpc/RecoveryRPCTests.cs b/Tests/RequesterResponser/RecoveryRPCTests.cs
similarity index 64%
rename from Tests/Rpc/RecoveryRPCTests.cs
rename to Tests/RequesterResponser/RecoveryRPCTests.cs
index 8eae484..79d3e81 100644
--- a/Tests/Rpc/RecoveryRPCTests.cs
+++ b/Tests/RequesterResponser/RecoveryRPCTests.cs
@@ -15,8 +15,16 @@ namespace Tests.Rpc
public class RecoveryRpcTests(ITestOutputHelper testOutputHelper)
: IntegrationTest(testOutputHelper, setupConnectionAndManagement: false)
{
- [Fact]
- public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
+ ///
+ /// Test the automatic recovery of a requester and responder after the connection is killed.
+ ///
+ ///
+ /// with True the test provides reply-queue externally.
+ /// With False the test uses DirectReply feature
+ [Theory]
+ [InlineData(true)]
+ [InlineData(false)]
+ public async Task ResponderAndClientShouldRecoverAfterKillConnection(bool useReplyToQueue)
{
Assert.Null(_connection);
Assert.Null(_management);
@@ -38,7 +46,7 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
await requestQueue.DeclareAsync();
int messagesReceived = 0;
- IRpcServer rpcServer = await connection.RpcServerBuilder()
+ IResponder responder = await connection.ResponderBuilder()
.RequestQueue(rpcRequestQueueName)
.Handler((context, message) =>
{
@@ -48,16 +56,19 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
})
.BuildAsync();
- string replyQueueName = $"rpc-server-client-recovery-reply-queue-{DateTime.Now}";
-
- IQueueSpecification clientReplyQueue = management.Queue(replyQueueName)
- .Type(QueueType.CLASSIC).AutoDelete(true).Exclusive(true);
-
- await clientReplyQueue.DeclareAsync();
+ string replyQueueName = "";
+ IQueueSpecification? clientReplyQueue = null;
+ if (useReplyToQueue)
+ {
+ replyQueueName = $"rpc-server-client-recovery-reply-queue-{DateTime.Now}";
+ clientReplyQueue = management.Queue(replyQueueName)
+ .Type(QueueType.CLASSIC).AutoDelete(true).Exclusive(true);
+ await clientReplyQueue.DeclareAsync();
+ }
- IRpcClient rpcClient = await
- connection.RpcClientBuilder().RequestAddress().Queue(requestQueue).RpcClient()
- .ReplyToQueue(clientReplyQueue).BuildAsync();
+ IRequester requester = await
+ connection.RequesterBuilder().RequestAddress().Queue(requestQueue).Requester()
+ .ReplyToQueue(replyQueueName).BuildAsync();
int messagesConfirmed = 0;
for (int i = 0; i < 50; i++)
@@ -65,7 +76,7 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
IMessage request = new AmqpMessage("ping");
try
{
- IMessage response = await rpcClient.PublishAsync(request);
+ IMessage response = await requester.PublishAsync(request);
messagesConfirmed++;
Assert.Equal("pong", response.BodyAsString());
}
@@ -81,18 +92,21 @@ public async Task RpcServerAndClientShouldRecoverAfterKillConnection()
if (i % 25 == 0)
{
- await WaitUntilConnectionIsKilled(containerId);
+ await WaitUntilConnectionIsKilledAndOpen(containerId);
await Task.Delay(500);
- await WaitUntilQueueExistsAsync(clientReplyQueue.QueueName);
}
}
Assert.True(messagesConfirmed > 25);
Assert.True(messagesReceived > 25);
await requestQueue.DeleteAsync();
- await clientReplyQueue.DeleteAsync();
- await rpcClient.CloseAsync();
- await rpcServer.CloseAsync();
+ if (clientReplyQueue != null)
+ {
+ await clientReplyQueue.DeleteAsync();
+ }
+
+ await requester.CloseAsync();
+ await responder.CloseAsync();
}
}
}
diff --git a/Tests/Rpc/RpcServerTests.cs b/Tests/RequesterResponser/ResponderTests.cs
similarity index 77%
rename from Tests/Rpc/RpcServerTests.cs
rename to Tests/RequesterResponser/ResponderTests.cs
index 74df7ba..605c8c5 100644
--- a/Tests/Rpc/RpcServerTests.cs
+++ b/Tests/RequesterResponser/ResponderTests.cs
@@ -13,7 +13,7 @@
namespace Tests.Rpc
{
- public class RpcServerTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
+ public class ResponderTests(ITestOutputHelper testOutputHelper) : IntegrationTest(testOutputHelper)
{
private string _requestQueueName = string.Empty;
private readonly string _replyToName = $"queueReplyTo-{Now}-{Guid.NewGuid()}";
@@ -34,20 +34,13 @@ public override async Task InitializeAsync()
}
[Fact]
- public async Task MockRpcServerPingPong()
+ public async Task MockResponderPingPong()
{
Assert.NotNull(_connection);
TaskCompletionSource tcs = CreateTaskCompletionSource();
- Task RpcHandler(IRpcServer.IContext context, IMessage request)
- {
- IMessage reply = context.Message("pong");
- tcs.SetResult(reply);
- return Task.FromResult(reply);
- }
-
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
- .Handler(RpcHandler)
+ IResponder responder = await _connection.ResponderBuilder()
+ .Handler(Handler)
.RequestQueue(_requestQueueName)
.BuildAsync();
@@ -58,29 +51,31 @@ Task RpcHandler(IRpcServer.IContext context, IMessage request)
await p.PublishAsync(new AmqpMessage("test"));
IMessage m = await WhenTcsCompletes(tcs);
Assert.Equal("pong", m.BodyAsString());
- await rpcServer.CloseAsync();
+ await responder.CloseAsync();
+ return;
+
+ Task Handler(IResponder.IContext context, IMessage request)
+ {
+ IMessage reply = context.Message("pong");
+ tcs.SetResult(reply);
+ return Task.FromResult(reply);
+ }
}
[Fact]
- public async Task RpcServerValidateStateChange()
+ public async Task ResponderValidateStateChange()
{
Assert.NotNull(_connection);
List<(State, State)> states = [];
TaskCompletionSource tcs = CreateTaskCompletionSource();
- static Task RpcHandler(IRpcServer.IContext context, IMessage request)
- {
- IMessage m = context.Message(request.Body());
- return Task.FromResult(m);
- }
-
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
- .Handler(RpcHandler)
+ IResponder responder = await _connection.ResponderBuilder()
+ .Handler(Handler)
.RequestQueue(_requestQueueName)
.BuildAsync();
- rpcServer.ChangeState += (sender, fromState, toState, e) =>
+ responder.ChangeState += (sender, fromState, toState, e) =>
{
states.Add((fromState, toState));
if (states.Count == 2)
@@ -89,7 +84,7 @@ static Task RpcHandler(IRpcServer.IContext context, IMessage request)
}
};
- await rpcServer.CloseAsync();
+ await responder.CloseAsync();
int count = await WhenTcsCompletes(tcs);
Assert.Equal(2, count);
@@ -97,6 +92,13 @@ static Task RpcHandler(IRpcServer.IContext context, IMessage request)
Assert.Equal(State.Closing, states[0].Item2);
Assert.Equal(State.Closing, states[1].Item1);
Assert.Equal(State.Closed, states[1].Item2);
+ return;
+
+ static Task Handler(IResponder.IContext context, IMessage request)
+ {
+ IMessage m = context.Message(request.Body());
+ return Task.FromResult(m);
+ }
}
///
@@ -108,7 +110,7 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
Assert.NotNull(_connection);
Assert.NotNull(_management);
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
+ IResponder responder = await _connection.ResponderBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();
@@ -120,13 +122,6 @@ public async Task SimulateRpcCommunicationWithAPublisherShouldSuccess()
TaskCompletionSource tcs = CreateTaskCompletionSource();
- Task MessageHandler(IContext context, IMessage message)
- {
- context.Accept();
- tcs.SetResult(message);
- return Task.CompletedTask;
- }
-
IConsumer consumer = await _connection.ConsumerBuilder()
.Queue(replyQueueSpec)
.MessageHandler(MessageHandler)
@@ -145,9 +140,17 @@ Task MessageHandler(IContext context, IMessage message)
IMessage m = await WhenTcsCompletes(tcs);
Assert.Equal("pong", m.BodyAsString());
- await rpcServer.CloseAsync();
+ await responder.CloseAsync();
await consumer.CloseAsync();
await publisher.CloseAsync();
+ return;
+
+ Task MessageHandler(IContext context, IMessage msg)
+ {
+ context.Accept();
+ tcs.SetResult(msg);
+ return Task.CompletedTask;
+ }
}
///
@@ -155,39 +158,45 @@ Task MessageHandler(IContext context, IMessage message)
/// with the ReplyToQueue method
///
[Fact]
- public async Task RpcServerClientPingPongWithDefault()
+ public async Task ResponderRequesterPingPongWithDefault()
{
Assert.NotNull(_connection);
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
+ IResponder responder = await _connection.ResponderBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();
- IRpcClient rpcClient = await _connection.RpcClientBuilder()
+ IRequester requester = await _connection.RequesterBuilder()
.RequestAddress()
.Queue(_requestQueueName)
- .RpcClient()
+ .Requester()
.BuildAsync();
IMessage message = new AmqpMessage("ping");
- IMessage response = await rpcClient.PublishAsync(message);
+ IMessage response = await requester.PublishAsync(message);
Assert.Equal("pong", response.BodyAsString());
- await rpcClient.CloseAsync();
- await rpcServer.CloseAsync();
+
+ Assert.Contains(
+ _connection is AmqpConnection { _featureFlags.IsDirectReplyToSupported: true }
+ ? "amq.rabbitmq.reply-to"
+ : "client.gen-", requester.GetReplyToQueue());
+
+ await requester.CloseAsync();
+ await responder.CloseAsync();
}
///
- /// In this test the client has to use the ReplyToQueue provided by the user
+ /// In this test the Requester has to use the ReplyToQueue provided by the user
///
[Fact]
- public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdSupplier()
+ public async Task ResponderRequesterPingPongWithCustomReplyToQueueAndCorrelationIdSupplier()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
+ IResponder responder = await _connection.ResponderBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();
@@ -197,10 +206,10 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
.AutoDelete(true)
.DeclareAsync();
- IRpcClient rpcClient = await _connection.RpcClientBuilder()
+ IRequester requester = await _connection.RequesterBuilder()
.RequestAddress()
.Queue(_requestQueueName)
- .RpcClient()
+ .Requester()
.CorrelationIdSupplier(() => _correlationId)
.CorrelationIdExtractor(message => message.CorrelationId())
.ReplyToQueue(replyTo.Name())
@@ -208,11 +217,13 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
IMessage message = new AmqpMessage("ping");
- IMessage response = await rpcClient.PublishAsync(message);
+ IMessage response = await requester.PublishAsync(message);
Assert.Equal("pong", response.BodyAsString());
Assert.Equal(_correlationId, response.CorrelationId());
- await rpcClient.CloseAsync();
- await rpcServer.CloseAsync();
+ Assert.Contains(replyTo.Name(), requester.GetReplyToQueue());
+
+ await requester.CloseAsync();
+ await responder.CloseAsync();
}
///
@@ -223,12 +234,12 @@ public async Task RpcServerClientPingPongWithCustomReplyToQueueAndCorrelationIdS
///
///
[Fact]
- public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
+ public async Task ResponderRequesterOverridingTheRequestAndResponsePostProcessor()
{
Assert.NotNull(_connection);
Assert.NotNull(_management);
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
+ IResponder responder = await _connection.ResponderBuilder()
.Handler(PongRpcHandler)
.RequestQueue(_requestQueueName)
.CorrelationIdExtractor(message => message.Property("correlationId"))
@@ -243,10 +254,10 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
int correlationIdCounter = 0;
- IRpcClient rpcClient = await _connection.RpcClientBuilder()
+ IRequester requester = await _connection.RequesterBuilder()
.RequestAddress()
.Queue(_requestQueueName)
- .RpcClient()
+ .Requester()
.ReplyToQueue(replyTo.Name())
// replace the correlation id creation with a custom function
.CorrelationIdSupplier(() => $"{_correlationId}_{Interlocked.Increment(ref correlationIdCounter)}")
@@ -264,7 +275,7 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
int i = 1;
while (i < 30)
{
- IMessage response = await rpcClient.PublishAsync(message);
+ IMessage response = await requester.PublishAsync(message);
Assert.Equal("pong", response.BodyAsString());
// the server replies with the correlation id in the application properties
Assert.Equal($"{_correlationId}_{i}", response.Property("correlationId"));
@@ -273,12 +284,12 @@ public async Task RpcServerClientOverridingTheRequestAndResponsePostProcessor()
i++;
}
- await rpcClient.CloseAsync();
- await rpcServer.CloseAsync();
+ await requester.CloseAsync();
+ await responder.CloseAsync();
}
[Fact]
- public async Task RpcClientMultiThreadShouldBeSafe()
+ public async Task RequesterMultiThreadShouldBeSafe()
{
Assert.NotNull(_connection);
const int messagesToSend = 99;
@@ -286,7 +297,7 @@ public async Task RpcClientMultiThreadShouldBeSafe()
TaskCompletionSource tcs = CreateTaskCompletionSource();
List messagesReceived = [];
- Task RpcHandler(IRpcServer.IContext context, IMessage request)
+ Task RpcHandler(IResponder.IContext context, IMessage request)
{
try
{
@@ -303,14 +314,14 @@ Task RpcHandler(IRpcServer.IContext context, IMessage request)
}
}
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
+ IResponder responder = await _connection.ResponderBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();
- IRpcClient rpcClient = await _connection.RpcClientBuilder().RequestAddress()
+ IRequester requester = await _connection.RequesterBuilder().RequestAddress()
.Queue(_requestQueueName)
- .RpcClient()
+ .Requester()
.BuildAsync();
List tasks = [];
@@ -323,7 +334,7 @@ Task RpcHandler(IRpcServer.IContext context, IMessage request)
tasks.Add(Task.Run(async () =>
{
IMessage message = new AmqpMessage("ping").Property("id", i1);
- IMessage response = await rpcClient.PublishAsync(message);
+ IMessage response = await requester.PublishAsync(message);
Assert.Equal("pong", response.BodyAsString());
}));
}
@@ -342,19 +353,19 @@ Task RpcHandler(IRpcServer.IContext context, IMessage request)
Assert.Contains(messagesReceived, m => m.Property("id").Equals(i));
}
- await rpcServer.CloseAsync();
- await rpcClient.CloseAsync();
+ await responder.CloseAsync();
+ await requester.CloseAsync();
}
///
/// The RPC client `PublishAsync` should raise a timeout exception if the server does not reply within the timeout
///
[Fact]
- public async Task RpcClientShouldRaiseTimeoutError()
+ public async Task RequesterShouldRaiseTimeoutError()
{
Assert.NotNull(_connection);
- static async Task RpcHandler(IRpcServer.IContext context, IMessage request)
+ static async Task RpcHandler(IResponder.IContext context, IMessage request)
{
IMessage reply = context.Message("pong");
object millisecondsToWait = request.Property("wait");
@@ -362,30 +373,30 @@ static async Task RpcHandler(IRpcServer.IContext context, IMessage req
return reply;
}
- IRpcServer rpcServer = await _connection.RpcServerBuilder()
+ IResponder responder = await _connection.ResponderBuilder()
.Handler(RpcHandler)
.RequestQueue(_requestQueueName)
.BuildAsync();
- IRpcClient rpcClient = await _connection.RpcClientBuilder()
+ IRequester requester = await _connection.RequesterBuilder()
.RequestAddress()
.Queue(_requestQueueName)
- .RpcClient()
+ .Requester()
.Timeout(TimeSpan.FromMilliseconds(300))
.BuildAsync();
IMessage msg = new AmqpMessage("ping").Property("wait", 1);
- IMessage reply = await rpcClient.PublishAsync(msg);
+ IMessage reply = await requester.PublishAsync(msg);
Assert.Equal("pong", reply.BodyAsString());
- await Assert.ThrowsAsync(() => rpcClient.PublishAsync(
+ await Assert.ThrowsAsync(() => requester.PublishAsync(
new AmqpMessage("ping").Property("wait", 700)));
- await rpcClient.CloseAsync();
- await rpcServer.CloseAsync();
+ await requester.CloseAsync();
+ await responder.CloseAsync();
}
- private static Task PongRpcHandler(IRpcServer.IContext context, IMessage request)
+ private static Task PongRpcHandler(IResponder.IContext context, IMessage request)
{
IMessage reply = context.Message("pong");
return Task.FromResult(reply);
diff --git a/docs/Examples/Rpc/Program.cs b/docs/Examples/Rpc/Program.cs
index 4144445..81a4195 100644
--- a/docs/Examples/Rpc/Program.cs
+++ b/docs/Examples/Rpc/Program.cs
@@ -40,7 +40,8 @@
const int messagesToSend = 10_000_000;
TaskCompletionSource tcs = new();
int messagesReceived = 0;
-IRpcServer rpcServer = await connection.RpcServerBuilder().RequestQueue(rpcQueue).Handler(
+IResponder responder = await connection.ResponderBuilder().
+ RequestQueue(rpcQueue).Handler(
(context, message) =>
{
try
@@ -59,14 +60,14 @@
}
).BuildAsync();
-IRpcClient rpcClient = await connection.RpcClientBuilder().RequestAddress().Queue(rpcQueue).RpcClient().BuildAsync()
- ;
+IRequester requester = await connection.RequesterBuilder().RequestAddress().
+ Queue(rpcQueue).Requester().BuildAsync();
for (int i = 0; i < messagesToSend; i++)
{
try
{
- IMessage reply = await rpcClient.PublishAsync(
+ IMessage reply = await requester.PublishAsync(
new AmqpMessage($"ping_{DateTime.Now}"));
Trace.WriteLine(TraceLevel.Information, $"[Client] Reply received: {reply.BodyAsString()}");
}
@@ -82,8 +83,8 @@
await tcs.Task.WaitAsync(TimeSpan.FromSeconds(10));
-await rpcClient.CloseAsync();
-await rpcServer.CloseAsync();
+await requester.CloseAsync();
+await responder.CloseAsync();
await queueSpec.DeleteAsync();
await environment.CloseAsync();
Trace.WriteLine(TraceLevel.Information, "Bye!");