Skip to content

Commit 24848ec

Browse files
committed
Configurable TcpClient for ReactorNettyTcpClient
Issue: SPR-17523
1 parent fef0e21 commit 24848ec

File tree

2 files changed

+29
-12
lines changed

2 files changed

+29
-12
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

+23-1
Original file line numberDiff line numberDiff line change
@@ -110,19 +110,41 @@ public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec)
110110
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
111111
this.loopResources = LoopResources.create("tcp-client-loop");
112112
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
113+
this.codec = codec;
113114

114115
this.tcpClient = TcpClient.create(this.poolResources)
115116
.host(host).port(port)
116117
.runOn(this.loopResources, false)
117118
.doOnConnected(conn -> this.channelGroup.add(conn.channel()));
119+
}
118120

121+
/**
122+
* A variant of {@link #ReactorNettyTcpClient(String, int, ReactorNettyCodec)}
123+
* that still manages the lifecycle of the {@link TcpClient} and underlying
124+
* resources, but allows for direct configuration of other properties of the
125+
* client through a {@code Function<TcpClient, TcpClient>}.
126+
* @param clientConfigurer the configurer function
127+
* @param codec for encoding and decoding the input/output byte streams
128+
* @since 5.1.3
129+
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
130+
*/
131+
public ReactorNettyTcpClient(Function<TcpClient, TcpClient> clientConfigurer, ReactorNettyCodec<P> codec) {
132+
Assert.notNull(codec, "ReactorNettyCodec is required");
133+
134+
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
135+
this.loopResources = LoopResources.create("tcp-client-loop");
136+
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
119137
this.codec = codec;
138+
139+
this.tcpClient = clientConfigurer.apply(TcpClient
140+
.create(this.poolResources)
141+
.runOn(this.loopResources, false)
142+
.doOnConnected(conn -> this.channelGroup.add(conn.channel())));
120143
}
121144

122145
/**
123146
* Constructor with an externally created {@link TcpClient} instance whose
124147
* lifecycle is expected to be managed externally.
125-
*
126148
* @param tcpClient the TcpClient instance to use
127149
* @param codec for encoding and decoding the input/output byte streams
128150
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec

src/docs/asciidoc/web/websocket.adoc

+6-11
Original file line numberDiff line numberDiff line change
@@ -1641,10 +1641,10 @@ to receive notifications when the "`system`" connection to the broker is lost an
16411641
re-established. For example, a Stock Quote service that broadcasts stock quotes can
16421642
stop trying to send messages when there is no active "`system`" connection.
16431643

1644-
By default, the STOMP broker relay always connects (and reconnects as needed if
1645-
connectivity is lost) to the same host and port. If you wish to supply multiple addresses,
1644+
By default, the STOMP broker relay always connects, and reconnects as needed if
1645+
connectivity is lost, to the same host and port. If you wish to supply multiple addresses,
16461646
on each attempt to connect, you can configure a supplier of addresses, instead of a
1647-
fixed host and port. The following example shows how to do so:
1647+
fixed host and port. The following example shows how to do that:
16481648

16491649
====
16501650
[source,java,indent=0]
@@ -1663,14 +1663,9 @@ public class WebSocketConfig extends AbstractWebSocketMessageBrokerConfigurer {
16631663
}
16641664
16651665
private ReactorNettyTcpClient<byte[]> createTcpClient() {
1666-
1667-
Consumer<ClientOptions.Builder<?>> builderConsumer = builder -> {
1668-
builder.connectAddress(()-> {
1669-
// Select address to connect to ...
1670-
});
1671-
};
1672-
1673-
return new ReactorNettyTcpClient<>(builderConsumer, new StompReactorNettyCodec());
1666+
return new ReactorNettyTcpClient<>(
1667+
client -> client.addressSupplier(() -> ... ),
1668+
new StompReactorNettyCodec());
16741669
}
16751670
}
16761671
----

0 commit comments

Comments
 (0)