Skip to content

Commit 0a12f28

Browse files
committed
Introduce StompReactorNettyTcpClient
Issue: SPR-11153
1 parent 2e4f38f commit 0a12f28

File tree

3 files changed

+49
-4
lines changed

3 files changed

+49
-4
lines changed

spring-messaging/src/main/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandler.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -250,12 +250,22 @@ public String getVirtualHost() {
250250
}
251251

252252
/**
253-
* Used for unit testing.
253+
* Configure a TCP client for managing TCP connections to the STOMP broker. By default
254+
* {@link org.springframework.messaging.simp.stomp.StompReactorNettyTcpClient} is used.
254255
*/
255-
void setTcpClient(TcpOperations<byte[]> tcpClient) {
256+
public void setTcpClient(TcpOperations<byte[]> tcpClient) {
256257
this.tcpClient = tcpClient;
257258
}
258259

260+
/**
261+
* Get the configured TCP client. Never {@code null} unless not configured
262+
* invoked and this method is invoked before the handler is started and
263+
* hence a default implementation initialized.
264+
*/
265+
public TcpOperations<byte[]> getTcpClient() {
266+
return this.tcpClient;
267+
}
268+
259269

260270
@Override
261271
protected void startInternal() {
@@ -264,7 +274,7 @@ protected void startInternal() {
264274
this.brokerChannel.subscribe(this);
265275

266276
if (this.tcpClient == null) {
267-
this.tcpClient = new ReactorNettyTcpClient<byte[]>(this.relayHost, this.relayPort, new StompCodec());
277+
this.tcpClient = new StompReactorNettyTcpClient(this.relayHost, this.relayPort);
268278
}
269279

270280
if (logger.isDebugEnabled()) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2002-2013 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.messaging.simp.stomp;
18+
19+
import org.springframework.messaging.tcp.reactor.ReactorNettyTcpClient;
20+
import reactor.tcp.encoding.Codec;
21+
22+
/**
23+
* A variation of {@link ReactorNettyTcpClient} for sending and receiving STOMP frames.
24+
*
25+
* @author Rossen Stoyanchev
26+
* @since 4.0.1
27+
*/
28+
public class StompReactorNettyTcpClient extends ReactorNettyTcpClient<byte[]> {
29+
30+
31+
public StompReactorNettyTcpClient(String host, int port) {
32+
super(host, port, new StompCodec());
33+
}
34+
35+
}

spring-messaging/src/test/java/org/springframework/messaging/simp/stomp/StompBrokerRelayMessageHandlerTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public void setup() {
5555

5656
this.brokerRelay = new StompBrokerRelayMessageHandler(new StubMessageChannel(),
5757
new StubMessageChannel(), new StubMessageChannel(), Arrays.asList("/topic"));
58-
this.brokerRelay.setTcpClient(tcpClient);
58+
this.brokerRelay.setTcpClient(this.tcpClient);
5959
}
6060

6161

0 commit comments

Comments
 (0)