Skip to content

Commit c097c02

Browse files
committed
Reactor Netty WebSocket server-side support
Issue: SPR-14527
1 parent 46f2aaf commit c097c02

File tree

7 files changed

+232
-8
lines changed

7 files changed

+232
-8
lines changed

build.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -818,6 +818,7 @@ project("spring-web-reactive") {
818818
optional("org.freemarker:freemarker:${freemarkerVersion}")
819819
optional "org.apache.httpcomponents:httpclient:${httpclientVersion}"
820820
optional('org.webjars:webjars-locator:0.32')
821+
optional("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
821822
optional("io.reactivex:rxnetty-http:${rxnettyVersion}") {
822823
exclude group: 'io.reactivex', module: 'rxjava'
823824
}
@@ -830,7 +831,6 @@ project("spring-web-reactive") {
830831
testCompile("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
831832
testCompile("org.eclipse.jetty:jetty-server:${jettyVersion}")
832833
testCompile("org.eclipse.jetty:jetty-servlet:${jettyVersion}")
833-
testCompile("io.projectreactor.ipc:reactor-netty:${reactorNettyVersion}")
834834
testCompile "io.reactivex.rxjava2:rxjava:${rxjava2Version}"
835835
testCompile("io.undertow:undertow-core:${undertowVersion}")
836836
testCompile("org.jboss.xnio:xnio-api:${xnioVersion}")

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/WebSocketSession.java

-6
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,6 @@
2121
import reactor.core.publisher.Flux;
2222
import reactor.core.publisher.Mono;
2323

24-
import org.springframework.core.io.buffer.DataBuffer;
25-
2624
/**
2725
* Representation for a WebSocket session.
2826
*
@@ -43,10 +41,6 @@ public interface WebSocketSession {
4341

4442
/**
4543
* Get the flux of incoming messages.
46-
* <p><strong>Note:</strong> the caller of this method is responsible for
47-
* releasing the DataBuffer payload of each message after consuming it
48-
* on runtimes where a {@code PooledByteBuffer} is used such as Netty.
49-
* @see org.springframework.core.io.buffer.DataBufferUtils#release(DataBuffer)
5044
*/
5145
Flux<WebSocketMessage> receive();
5246

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.socket.adapter;
17+
18+
import java.util.function.BiFunction;
19+
20+
import org.reactivestreams.Publisher;
21+
import reactor.ipc.netty.http.HttpInbound;
22+
import reactor.ipc.netty.http.HttpOutbound;
23+
24+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
25+
import org.springframework.http.server.reactive.ServerHttpRequest;
26+
import org.springframework.http.server.reactive.ServerHttpResponse;
27+
import org.springframework.util.Assert;
28+
import org.springframework.web.reactive.socket.WebSocketHandler;
29+
30+
/**
31+
* Reactor Netty {@code WebSocketHandler} implementation adapting and
32+
* delegating to a Spring {@link WebSocketHandler}.
33+
*
34+
* @author Rossen Stoyanchev
35+
* @since 5.0
36+
*/
37+
public class ReactorNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
38+
implements BiFunction<HttpInbound, HttpOutbound, Publisher<Void>> {
39+
40+
41+
private final NettyDataBufferFactory bufferFactory;
42+
43+
44+
public ReactorNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
45+
WebSocketHandler handler) {
46+
47+
super(request, handler);
48+
Assert.notNull("'response' is required");
49+
this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
50+
}
51+
52+
53+
public NettyDataBufferFactory getBufferFactory() {
54+
return this.bufferFactory;
55+
}
56+
57+
@Override
58+
public Publisher<Void> apply(HttpInbound inbound, HttpOutbound outbound) {
59+
ReactorNettyWebSocketSession session =
60+
new ReactorNettyWebSocketSession(inbound, outbound, getUri(), getBufferFactory());
61+
return getDelegate().handle(session);
62+
}
63+
64+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.socket.adapter;
17+
18+
import java.net.URI;
19+
20+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
21+
import org.reactivestreams.Publisher;
22+
import reactor.core.publisher.Flux;
23+
import reactor.core.publisher.Mono;
24+
import reactor.ipc.netty.http.HttpInbound;
25+
import reactor.ipc.netty.http.HttpOutbound;
26+
27+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
28+
import org.springframework.web.reactive.socket.CloseStatus;
29+
import org.springframework.web.reactive.socket.WebSocketMessage;
30+
import org.springframework.web.reactive.socket.WebSocketSession;
31+
32+
33+
/**
34+
* Spring {@link WebSocketSession} adapter for RxNetty's
35+
* {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}.
36+
*
37+
* @author Rossen Stoyanchev
38+
* @since 5.0
39+
*/
40+
public class ReactorNettyWebSocketSession
41+
extends NettyWebSocketSessionSupport<ReactorNettyWebSocketSession.WebSocketConnection> {
42+
43+
44+
protected ReactorNettyWebSocketSession(HttpInbound inbound, HttpOutbound outbound,
45+
URI uri, NettyDataBufferFactory factory) {
46+
47+
super(new WebSocketConnection(inbound, outbound), uri, factory);
48+
}
49+
50+
51+
@Override
52+
public Flux<WebSocketMessage> receive() {
53+
HttpInbound inbound = getDelegate().getHttpInbound();
54+
return toMessageFlux(inbound.receiveObject().cast(WebSocketFrame.class));
55+
}
56+
57+
@Override
58+
public Mono<Void> send(Publisher<WebSocketMessage> messages) {
59+
HttpOutbound outbound = getDelegate().getHttpOutbound();
60+
Flux<WebSocketFrame> frameFlux = Flux.from(messages).map(this::toFrame);
61+
return outbound.sendObject(frameFlux);
62+
}
63+
64+
@Override
65+
protected Mono<Void> closeInternal(CloseStatus status) {
66+
return Mono.error(new UnsupportedOperationException(
67+
"Currently in Reactor Netty applications are expected to use the Cancellation" +
68+
"returned from subscribing to the input Flux to close the WebSocket session."));
69+
}
70+
71+
72+
/**
73+
* Simple container for {@link HttpInbound} and {@link HttpOutbound}.
74+
*/
75+
public static class WebSocketConnection {
76+
77+
private final HttpInbound inbound;
78+
79+
private final HttpOutbound outbound;
80+
81+
82+
public WebSocketConnection(HttpInbound inbound, HttpOutbound outbound) {
83+
this.inbound = inbound;
84+
this.outbound = outbound;
85+
}
86+
87+
public HttpInbound getHttpInbound() {
88+
return this.inbound;
89+
}
90+
91+
public HttpOutbound getHttpOutbound() {
92+
return this.outbound;
93+
}
94+
}
95+
96+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.socket.server.upgrade;
17+
18+
import java.util.List;
19+
20+
import reactor.core.publisher.Mono;
21+
22+
import org.springframework.http.server.reactive.ReactorServerHttpRequest;
23+
import org.springframework.http.server.reactive.ReactorServerHttpResponse;
24+
import org.springframework.util.StringUtils;
25+
import org.springframework.web.reactive.socket.WebSocketHandler;
26+
import org.springframework.web.reactive.socket.adapter.ReactorNettyWebSocketHandlerAdapter;
27+
import org.springframework.web.reactive.socket.server.RequestUpgradeStrategy;
28+
import org.springframework.web.server.ServerWebExchange;
29+
30+
/**
31+
* A {@link RequestUpgradeStrategy} for use with Reactor Netty.
32+
*
33+
* @author Rossen Stoyanchev
34+
* @since 5.0
35+
*/
36+
public class ReactorNettyRequestUpgradeStrategy implements RequestUpgradeStrategy {
37+
38+
@Override
39+
public Mono<Void> upgrade(ServerWebExchange exchange, WebSocketHandler webSocketHandler) {
40+
41+
ReactorServerHttpRequest request = (ReactorServerHttpRequest) exchange.getRequest();
42+
ReactorServerHttpResponse response = (ReactorServerHttpResponse) exchange.getResponse();
43+
44+
ReactorNettyWebSocketHandlerAdapter reactorHandler =
45+
new ReactorNettyWebSocketHandlerAdapter(request, response, webSocketHandler);
46+
47+
String protocols = StringUtils.arrayToCommaDelimitedString(getSubProtocols(webSocketHandler));
48+
protocols = (StringUtils.hasText(protocols) ? protocols : null);
49+
50+
return response.getReactorResponse().upgradeToWebsocket(protocols, false, reactorHandler);
51+
}
52+
53+
private static String[] getSubProtocols(WebSocketHandler webSocketHandler) {
54+
List<String> subProtocols = webSocketHandler.getSubProtocols();
55+
return subProtocols.toArray(new String[subProtocols.size()]);
56+
}
57+
58+
}

spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/AbstractWebSocketHandlerIntegrationTests.java

+12
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,13 @@
2727
import org.springframework.context.annotation.Configuration;
2828
import org.springframework.http.server.reactive.HttpHandler;
2929
import org.springframework.http.server.reactive.bootstrap.HttpServer;
30+
import org.springframework.http.server.reactive.bootstrap.ReactorHttpServer;
3031
import org.springframework.http.server.reactive.bootstrap.RxNettyHttpServer;
3132
import org.springframework.util.SocketUtils;
3233
import org.springframework.web.reactive.DispatcherHandler;
3334
import org.springframework.web.reactive.socket.server.support.HandshakeWebSocketService;
3435
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter;
36+
import org.springframework.web.reactive.socket.server.upgrade.ReactorNettyRequestUpgradeStrategy;
3537
import org.springframework.web.reactive.socket.server.upgrade.RxNettyRequestUpgradeStrategy;
3638

3739
/**
@@ -58,6 +60,7 @@ public abstract class AbstractWebSocketHandlerIntegrationTests {
5860
@Parameters
5961
public static Object[][] arguments() {
6062
return new Object[][] {
63+
{new ReactorHttpServer(), ReactorNettyConfig.class},
6164
{new RxNettyHttpServer(), RxNettyConfig.class}
6265
};
6366
}
@@ -110,6 +113,15 @@ public WebSocketHandlerAdapter handlerAdapter() {
110113

111114
}
112115

116+
@Configuration
117+
static class ReactorNettyConfig extends AbstractHandlerAdapterConfig {
118+
119+
@Override
120+
protected RequestUpgradeStrategy createUpgradeStrategy() {
121+
return new ReactorNettyRequestUpgradeStrategy();
122+
}
123+
}
124+
113125
@Configuration
114126
static class RxNettyConfig extends AbstractHandlerAdapterConfig {
115127

spring-web-reactive/src/test/java/org/springframework/web/reactive/socket/server/BasicWebSocketHandlerIntegrationTests.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@
3939
import static org.junit.Assert.assertEquals;
4040

4141
/**
42-
* Basic WebSocket integration
42+
* Basic WebSocket integration tests.
4343
* @author Rossen Stoyanchev
4444
*/
4545
@SuppressWarnings({"unused", "WeakerAccess"})

0 commit comments

Comments
 (0)