Skip to content

Commit 220711d

Browse files
committed
Reactor2TcpClient cleans up TcpClient instances
Issue: SPR-14231
1 parent 9778408 commit 220711d

File tree

1 file changed

+29
-5
lines changed

1 file changed

+29
-5
lines changed

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/Reactor2TcpClient.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,8 @@ private static NioEventLoopGroup initEventLoopGroup() {
163163
public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHandler) {
164164
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
165165

166-
TcpClient<Message<P>, Message<P>> tcpClient;
166+
final TcpClient<Message<P>, Message<P>> tcpClient;
167+
Runnable cleanupTask;
167168
synchronized (this.tcpClients) {
168169
if (this.stopping) {
169170
IllegalStateException ex = new IllegalStateException("Shutting down.");
@@ -172,9 +173,18 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> connectionHa
172173
}
173174
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
174175
this.tcpClients.add(tcpClient);
176+
cleanupTask = new Runnable() {
177+
@Override
178+
public void run() {
179+
synchronized (tcpClients) {
180+
tcpClients.remove(tcpClient);
181+
}
182+
}
183+
};
175184
}
176185

177-
Promise<Void> promise = tcpClient.start(new MessageChannelStreamHandler<P>(connectionHandler));
186+
Promise<Void> promise = tcpClient.start(
187+
new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask));
178188

179189
return new PassThroughPromiseToListenableFutureAdapter<Void>(
180190
promise.onError(new Consumer<Throwable>() {
@@ -191,7 +201,8 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler,
191201
Assert.notNull(connectionHandler, "TcpConnectionHandler must not be null");
192202
Assert.notNull(strategy, "ReconnectStrategy must not be null");
193203

194-
TcpClient<Message<P>, Message<P>> tcpClient;
204+
final TcpClient<Message<P>, Message<P>> tcpClient;
205+
Runnable cleanupTask;
195206
synchronized (this.tcpClients) {
196207
if (this.stopping) {
197208
IllegalStateException ex = new IllegalStateException("Shutting down.");
@@ -200,10 +211,18 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> connectionHandler,
200211
}
201212
tcpClient = NetStreams.tcpClient(REACTOR_TCP_CLIENT_TYPE, this.tcpClientSpecFactory);
202213
this.tcpClients.add(tcpClient);
214+
cleanupTask = new Runnable() {
215+
@Override
216+
public void run() {
217+
synchronized (tcpClients) {
218+
tcpClients.remove(tcpClient);
219+
}
220+
}
221+
};
203222
}
204223

205224
Stream<Tuple2<InetSocketAddress, Integer>> stream = tcpClient.start(
206-
new MessageChannelStreamHandler<P>(connectionHandler),
225+
new MessageChannelStreamHandler<P>(connectionHandler, cleanupTask),
207226
new ReactorReconnectAdapter(strategy));
208227

209228
return new PassThroughPromiseToListenableFutureAdapter<Void>(stream.next().after());
@@ -249,6 +268,7 @@ public void operationComplete(Future<Object> future) throws Exception {
249268
});
250269
promise = eventLoopPromise;
251270
}
271+
252272
return new PassThroughPromiseToListenableFutureAdapter<Void>(promise);
253273
}
254274

@@ -278,8 +298,11 @@ private static class MessageChannelStreamHandler<P>
278298

279299
private final TcpConnectionHandler<P> connectionHandler;
280300

281-
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler) {
301+
private final Runnable cleanupTask;
302+
303+
public MessageChannelStreamHandler(TcpConnectionHandler<P> connectionHandler, Runnable cleanupTask) {
282304
this.connectionHandler = connectionHandler;
305+
this.cleanupTask = cleanupTask;
283306
}
284307

285308
@Override
@@ -290,6 +313,7 @@ public Publisher<Void> apply(ChannelStream<Message<P>, Message<P>> channelStream
290313
.finallyDo(new Consumer<Signal<Message<P>>>() {
291314
@Override
292315
public void accept(Signal<Message<P>> signal) {
316+
cleanupTask.run();
293317
if (signal.isOnError()) {
294318
connectionHandler.handleFailure(signal.getThrowable());
295319
}

0 commit comments

Comments
 (0)