Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .ci/ubuntu/cluster/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ function run_docker_compose
docker compose --file "$script_dir/docker-compose.yml" $@
}

readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"

if [[ ! -v GITHUB_ACTIONS ]]
then
Expand Down
2 changes: 1 addition & 1 deletion .ci/ubuntu/cluster/rmq/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG RABBITMQ_DOCKER_TAG=rabbitmq:4.1.0-beta.4-management-alpine
ARG RABBITMQ_DOCKER_TAG=rabbitmq:4.2-management-alpine

FROM ${RABBITMQ_DOCKER_TAG}

Expand Down
2 changes: 1 addition & 1 deletion .ci/ubuntu/one-node/gha-setup.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ readonly script_dir
echo "[INFO] script_dir: '$script_dir'"


readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-rc-management}"
readonly rabbitmq_image="${RABBITMQ_IMAGE:-rabbitmq:4.2-management}"



Expand Down
2 changes: 1 addition & 1 deletion .ci/windows/versions.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"erlang": "27.2",
"rabbitmq": "4.1.0"
"rabbitmq": "4.2.0"
}
7 changes: 7 additions & 0 deletions RabbitMQ.AMQP.Client/FeatureFlags.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ public class FeatureFlags
/// </summary>
public bool IsBrokerCompatible { get; internal set; } = false;

/// <summary>
/// Check if Direct Reply-To is supported.
/// Direct Reply-To is available in RabbitMQ 4.2 and later.
/// see: https://www.rabbitmq.com/docs/direct-reply-to
/// </summary>
public bool IsDirectReplyToSupported { get; internal set; } = false;

public void Validate()
{
if (!IsBrokerCompatible)
Expand Down
12 changes: 6 additions & 6 deletions RabbitMQ.AMQP.Client/IConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,16 @@ public interface IConnection : ILifeCycle
IConsumerBuilder ConsumerBuilder();

/// <summary>
/// Create an <see cref="IRpcServerBuilder"/> instance for this connection.
/// Create an <see cref="IResponderBuilder"/> instance for this connection.
/// </summary>
/// <returns><see cref="IRpcServerBuilder"/> instance for this connection.</returns>
IRpcServerBuilder RpcServerBuilder();
/// <returns><see cref="IResponderBuilder"/> instance for this connection.</returns>
IResponderBuilder ResponderBuilder();

/// <summary>
/// Create an <see cref="IRpcClientBuilder"/> instance for this connection.
/// Create an <see cref="IRequesterBuilder"/> instance for this connection.
/// </summary>
/// <returns><see cref="IRpcClientBuilder"/> instance for this connection.</returns>
IRpcClientBuilder RpcClientBuilder();
/// <returns><see cref="IRequesterBuilder"/> instance for this connection.</returns>
IRequesterBuilder RequesterBuilder();

/// <summary>
/// Get the properties for this connection.
Expand Down
7 changes: 7 additions & 0 deletions RabbitMQ.AMQP.Client/IConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ public interface IConsumer : ILifeCycle
/// Returns the number of unsettled messages.
/// </summary>
long UnsettledMessageCount { get; }

/// <summary>
/// Returns queue name the consumer is consuming from.
/// The Queue name is usually configured by the user via the <see cref="IConsumerBuilder"/>,
/// but can also be generated by the client the special direct-reply-to queue.
Copy link

Copilot AI Nov 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Grammar issue: "by the client the special" should be "by the client for the special". The sentence is missing the preposition "for".

Suggested change
/// but can also be generated by the client the special direct-reply-to queue.
/// but can also be generated by the client for the special direct-reply-to queue.

Copilot uses AI. Check for mistakes.
/// </summary>
string Queue { get; }
}

public interface IContext
Expand Down
12 changes: 11 additions & 1 deletion RabbitMQ.AMQP.Client/IConsumerBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,17 @@ public enum StreamOffsetSpecification
public interface IConsumerBuilder
{
IConsumerBuilder Queue(IQueueSpecification queueSpecification);
IConsumerBuilder Queue(string queueName);
IConsumerBuilder Queue(string? queueName);

/// <summary>
/// If direct reply-to is enabled, the client will use the direct reply-to feature of AMQP 1.0.
/// The server must also support direct reply-to.
/// This feature allows the server to send the reply directly to the client without going through a reply queue.
/// This can improve performance and reduce latency.
/// Default is false.
/// https://www.rabbitmq.com/docs/direct-reply-to
/// </summary>
IConsumerBuilder DirectReplyTo(bool directReplyTo);

IConsumerBuilder MessageHandler(MessageHandler handler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,33 @@

namespace RabbitMQ.AMQP.Client
{

public interface IRpcClientAddressBuilder : IAddressBuilder<IRpcClientAddressBuilder>
public interface IRequesterAddressBuilder : IAddressBuilder<IRequesterAddressBuilder>
{
IRpcClientBuilder RpcClient();
IRequesterBuilder Requester();
}

/// <summary>
/// IRpcClientBuilder is the interface for creating an RPC client.
/// See also <seealso cref="IRpcClient"/> and <seealso cref="IRpcServerBuilder"/>
/// See also <seealso cref="IRequester"/> and <seealso cref="IResponderBuilder"/>
/// </summary>
public interface IRpcClientBuilder
public interface IRequesterBuilder
{
/// <summary>
/// Request address where the client sends requests.
/// The server consumes requests from this address.
/// </summary>
/// <returns></returns>
IRpcClientAddressBuilder RequestAddress();
IRequesterAddressBuilder RequestAddress();

/// <summary>
/// The queue from which requests are consumed.
/// if not set the client will create a temporary queue.
/// </summary>
/// <param name="replyToQueueName"> The queue name</param>
/// <returns></returns>
IRpcClientBuilder ReplyToQueue(string replyToQueueName);
IRequesterBuilder ReplyToQueue(string replyToQueueName);

IRpcClientBuilder ReplyToQueue(IQueueSpecification replyToQueue);
IRequesterBuilder ReplyToQueue(IQueueSpecification replyToQueue);

/// <summary>
/// Extracts the correlation id from the request message.
Expand All @@ -45,7 +44,7 @@ public interface IRpcClientBuilder
/// </summary>
/// <param name="correlationIdExtractor"></param>
/// <returns></returns>
IRpcClientBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
IRequesterBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

/// <summary>
/// Post processes the reply message before sending it to the server.
Expand All @@ -56,7 +55,7 @@ public interface IRpcClientBuilder
/// </summary>
/// <param name="requestPostProcessor"></param>
/// <returns></returns>
IRpcClientBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);
IRequesterBuilder RequestPostProcessor(Func<IMessage, object, IMessage>? requestPostProcessor);

/// <summary>
/// Client and Server must agree on the correlation id.
Expand All @@ -66,27 +65,27 @@ public interface IRpcClientBuilder
/// </summary>
/// <param name="correlationIdSupplier"></param>
/// <returns></returns>

IRpcClientBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);
IRequesterBuilder CorrelationIdSupplier(Func<object>? correlationIdSupplier);

/// <summary>
/// The time to wait for a reply from the server.
/// </summary>
/// <param name="timeout"></param>
/// <returns></returns>
IRpcClientBuilder Timeout(TimeSpan timeout);
IRequesterBuilder Timeout(TimeSpan timeout);

/// <summary>
/// Build and return the RPC client.
/// </summary>
/// <returns></returns>
Task<IRpcClient> BuildAsync();
Task<IRequester> BuildAsync();
}

/// <summary>
/// IRpcClient is the interface for an RPC client.
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
/// See also <seealso cref="IResponder"/> and <seealso cref="IRequesterBuilder"/>
/// </summary>
public interface IRpcClient : ILifeCycle
public interface IRequester : ILifeCycle
{
/// <summary>
/// PublishAsync sends a request message to the server and blocks the thread until the response is received.
Expand All @@ -98,5 +97,14 @@ public interface IRpcClient : ILifeCycle
/// <param name="cancellationToken">Cancellation token</param>
/// <returns></returns>
Task<IMessage> PublishAsync(IMessage message, CancellationToken cancellationToken = default);

/// <summary>
/// The ReplyTo queue address can be created by:
/// - the client providing a specific queue name
/// - the client creating a temporary queue
/// - The server uses this address to send the reply message. with direct-reply-to
/// </summary>
/// <returns></returns>
public string GetReplyToQueue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,22 +7,22 @@

namespace RabbitMQ.AMQP.Client
{

/// <summary>
/// IRpcServerBuilder is the interface for creating an RPC server.
/// The RPC server consumes requests from a queue and sends replies to a reply queue.
/// See also <seealso cref="IRpcServer"/> and <seealso cref="IRpcClientBuilder"/>
/// See also <seealso cref="IResponder"/> and <seealso cref="IRequesterBuilder"/>
/// </summary>
public interface IRpcServerBuilder
public interface IResponderBuilder
{
/// <summary>
/// The queue from which requests are consumed.
/// The client sends requests to this queue and the server consumes them.
/// </summary>
/// <param name="requestQueue"></param>
/// <returns></returns>
IRpcServerBuilder RequestQueue(string requestQueue);
IRpcServerBuilder RequestQueue(IQueueSpecification requestQueue);
IResponderBuilder RequestQueue(string requestQueue);

IResponderBuilder RequestQueue(IQueueSpecification requestQueue);

/// <summary>
/// Extracts the correlation id from the request message.
Expand All @@ -32,8 +32,7 @@ public interface IRpcServerBuilder
/// </summary>
/// <param name="correlationIdExtractor"></param>
/// <returns></returns>

IRpcServerBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);
IResponderBuilder CorrelationIdExtractor(Func<IMessage, object>? correlationIdExtractor);

/// <summary>
/// Post processes the reply message before sending it to the client.
Expand All @@ -44,38 +43,37 @@ public interface IRpcServerBuilder
/// </summary>
/// <param name="replyPostProcessor"></param>
/// <returns></returns>
IRpcServerBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);
IResponderBuilder ReplyPostProcessor(Func<IMessage, object, IMessage>? replyPostProcessor);

/// <summary>
/// Handle the request message and return the reply message.
/// </summary>
/// <param name="handler"></param>
/// <returns></returns>
IRpcServerBuilder Handler(RpcHandler handler);
IResponderBuilder Handler(RpcHandler handler);

/// <summary>
/// Build and return the RPC server.
/// </summary>
/// <returns></returns>
Task<IRpcServer> BuildAsync();
Task<IResponder> BuildAsync();
}

/// <summary>
/// Event handler for handling RPC requests.
/// </summary>
// TODO cancellation token
public delegate Task<IMessage> RpcHandler(IRpcServer.IContext context, IMessage request);
public delegate Task<IMessage> RpcHandler(IResponder.IContext context, IMessage request);

/// <summary>
/// IRpcServer interface for creating an RPC server.
/// IResponder interface for creating an RPC server.
/// The RPC is simulated by sending a request message and receiving a reply message.
/// Where the client sends the queue where wants to receive the reply.
/// RPC client ---> request queue ---> RPC server ---> reply queue ---> RPC client
/// See also <seealso cref="IRpcClient"/>
/// See also <seealso cref="IRequester"/>
/// </summary>
public interface IRpcServer : ILifeCycle
public interface IResponder : ILifeCycle
{

public interface IContext
{
IMessage Message(byte[] body);
Expand Down
47 changes: 21 additions & 26 deletions RabbitMQ.AMQP.Client/Impl/AddressBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,36 +21,21 @@ public T Exchange(IExchangeSpecification exchangeSpec)
public T Exchange(string? exchangeName)
{
_exchange = exchangeName;
if (_owner == null)
{
throw new InvalidOperationException("Owner is null");
}

return _owner;
return _owner ?? throw new InvalidOperationException("Owner is null");
}

public T Queue(IQueueSpecification queueSpec) => Queue(queueSpec.QueueName);

public T Queue(string? queueName)
{
_queue = queueName;
if (_owner == null)
{
throw new InvalidOperationException("Owner is null");
}

return _owner;
return _owner ?? throw new InvalidOperationException("Owner is null");
}

public T Key(string? key)
{
_key = key;
if (_owner == null)
{
throw new InvalidOperationException("Owner is null");
}

return _owner;
return _owner ?? throw new InvalidOperationException("Owner is null");
}

public string Address()
Expand Down Expand Up @@ -85,12 +70,21 @@ public string Address()
return "";
}

if (string.IsNullOrEmpty(_queue))
return string.IsNullOrEmpty(_queue)
? throw new InvalidAddressException("Queue must be set")
: $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
}

public string DecodeQueuePathSegment(string path)
{
string? v = Utils.DecodePathSegment(path);
if (v == null)
{
throw new InvalidAddressException("Queue must be set");
throw new InvalidAddressException("Invalid path segment");
}

return $"/{Consts.Queues}/{Utils.EncodePathSegment(_queue)}";
string prefix = $"/{Consts.Queues}/";
// Only remove the prefix if present; otherwise, return as-is
return v.StartsWith(prefix) ? v.Substring(prefix.Length) : v;
}
}

Expand Down Expand Up @@ -124,16 +118,17 @@ public IMessage Build()
}
}

public class RpcClientAddressBuilder : DefaultAddressBuilder<IRpcClientAddressBuilder>, IRpcClientAddressBuilder
public class RequesterAddressBuilder : DefaultAddressBuilder<IRequesterAddressBuilder>, IRequesterAddressBuilder
{
readonly AmqpRpcClientBuilder _builder;
public RpcClientAddressBuilder(AmqpRpcClientBuilder builder)
readonly AmqpRequesterBuilder _builder;

public RequesterAddressBuilder(AmqpRequesterBuilder builder)
{
_builder = builder;
_owner = this;
}

public IRpcClientBuilder RpcClient()
public IRequesterBuilder Requester()
{
return _builder;
}
Expand Down
Loading