Skip to content

Commit 46f2aaf

Browse files
committed
Extract Netty WebSocket session + adapter base classes
Issue: SPR-14527
1 parent 30bd3d8 commit 46f2aaf

File tree

4 files changed

+198
-103
lines changed

4 files changed

+198
-103
lines changed
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.socket.adapter;
17+
18+
import java.net.URI;
19+
import java.util.HashMap;
20+
import java.util.List;
21+
import java.util.Map;
22+
23+
import io.netty.buffer.ByteBuf;
24+
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
25+
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
26+
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
27+
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
28+
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
29+
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
30+
import reactor.core.publisher.Flux;
31+
32+
import org.springframework.core.io.buffer.NettyDataBuffer;
33+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
34+
import org.springframework.util.Assert;
35+
import org.springframework.util.ObjectUtils;
36+
import org.springframework.web.reactive.socket.WebSocketMessage;
37+
import org.springframework.web.reactive.socket.WebSocketSession;
38+
39+
/**
40+
* Base class for Netty-based {@link WebSocketSession} adapters.
41+
*
42+
* @author Rossen Stoyanchev
43+
* @since 5.0
44+
*/
45+
public abstract class NettyWebSocketSessionSupport<T> extends WebSocketSessionSupport<T> {
46+
47+
private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;
48+
49+
static {
50+
MESSAGE_TYPES = new HashMap<>(4);
51+
MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
52+
MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
53+
MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
54+
MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
55+
}
56+
57+
58+
protected final String id;
59+
60+
protected final URI uri;
61+
62+
protected final NettyDataBufferFactory bufferFactory;
63+
64+
65+
protected NettyWebSocketSessionSupport(T delegate, URI uri, NettyDataBufferFactory factory) {
66+
super(delegate);
67+
Assert.notNull(uri, "'uri' is required.");
68+
Assert.notNull(uri, "'bufferFactory' is required.");
69+
this.uri = uri;
70+
this.bufferFactory = factory;
71+
this.id = ObjectUtils.getIdentityHexString(getDelegate());
72+
}
73+
74+
75+
@Override
76+
public String getId() {
77+
return this.id;
78+
}
79+
80+
@Override
81+
public URI getUri() {
82+
return this.uri;
83+
}
84+
85+
protected Flux<WebSocketMessage> toMessageFlux(Flux<WebSocketFrame> frameFlux) {
86+
return frameFlux
87+
.filter(frame -> !(frame instanceof CloseWebSocketFrame))
88+
.window()
89+
.concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer())
90+
.map(this::toMessage);
91+
}
92+
93+
@SuppressWarnings("OptionalGetWithoutIsPresent")
94+
private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
95+
Class<?> frameType = frames.get(0).getClass();
96+
if (frames.size() == 1) {
97+
NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content());
98+
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
99+
}
100+
return frames.stream()
101+
.map(socketFrame -> bufferFactory.wrap(socketFrame.content()))
102+
.reduce(NettyDataBuffer::write)
103+
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
104+
.get();
105+
}
106+
107+
protected WebSocketFrame toFrame(WebSocketMessage message) {
108+
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
109+
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
110+
return new TextWebSocketFrame(byteBuf);
111+
}
112+
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
113+
return new BinaryWebSocketFrame(byteBuf);
114+
}
115+
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
116+
return new PingWebSocketFrame(byteBuf);
117+
}
118+
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
119+
return new PongWebSocketFrame(byteBuf);
120+
}
121+
else {
122+
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
123+
}
124+
}
125+
126+
}

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketHandlerAdapter.java

Lines changed: 9 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515
*/
1616
package org.springframework.web.reactive.socket.adapter;
1717

18-
import java.net.URI;
19-
2018
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
2119
import reactor.core.publisher.Mono;
2220
import rx.Observable;
@@ -35,37 +33,30 @@
3533
* @author Rossen Stoyanchev
3634
* @since 5.0
3735
*/
38-
public class RxNettyWebSocketHandlerAdapter
36+
public class RxNettyWebSocketHandlerAdapter extends WebSocketHandlerAdapterSupport
3937
implements io.reactivex.netty.protocol.http.ws.server.WebSocketHandler {
4038

41-
private final URI uri;
42-
4339
private final NettyDataBufferFactory bufferFactory;
4440

45-
private final WebSocketHandler handler;
46-
4741

4842
public RxNettyWebSocketHandlerAdapter(ServerHttpRequest request, ServerHttpResponse response,
4943
WebSocketHandler handler) {
5044

51-
Assert.notNull("'request' is required");
45+
super(request, handler);
5246
Assert.notNull("'response' is required");
53-
Assert.notNull("'handler' handler is required");
54-
55-
this.uri = request.getURI();
5647
this.bufferFactory = (NettyDataBufferFactory) response.bufferFactory();
57-
this.handler = handler;
5848
}
5949

6050

61-
@Override
62-
public Observable<Void> handle(WebSocketConnection connection) {
63-
Mono<Void> result = this.handler.handle(createSession(connection));
64-
return RxReactiveStreams.toObservable(result);
51+
public NettyDataBufferFactory getBufferFactory() {
52+
return this.bufferFactory;
6553
}
6654

67-
private RxNettyWebSocketSession createSession(WebSocketConnection conn) {
68-
return new RxNettyWebSocketSession(conn, this.uri, this.bufferFactory);
55+
@Override
56+
public Observable<Void> handle(WebSocketConnection conn) {
57+
RxNettyWebSocketSession session = new RxNettyWebSocketSession(conn, getUri(), getBufferFactory());
58+
Mono<Void> result = getDelegate().handle(session);
59+
return RxReactiveStreams.toObservable(result);
6960
}
7061

7162
}

spring-web-reactive/src/main/java/org/springframework/web/reactive/socket/adapter/RxNettyWebSocketSession.java

Lines changed: 10 additions & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -16,16 +16,7 @@
1616
package org.springframework.web.reactive.socket.adapter;
1717

1818
import java.net.URI;
19-
import java.util.HashMap;
20-
import java.util.List;
21-
import java.util.Map;
2219

23-
import io.netty.buffer.ByteBuf;
24-
import io.netty.handler.codec.http.websocketx.BinaryWebSocketFrame;
25-
import io.netty.handler.codec.http.websocketx.CloseWebSocketFrame;
26-
import io.netty.handler.codec.http.websocketx.PingWebSocketFrame;
27-
import io.netty.handler.codec.http.websocketx.PongWebSocketFrame;
28-
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
2920
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
3021
import io.reactivex.netty.protocol.http.ws.WebSocketConnection;
3122
import org.reactivestreams.Publisher;
@@ -34,79 +25,31 @@
3425
import rx.Observable;
3526
import rx.RxReactiveStreams;
3627

37-
import org.springframework.core.io.buffer.NettyDataBuffer;
3828
import org.springframework.core.io.buffer.NettyDataBufferFactory;
39-
import org.springframework.util.Assert;
40-
import org.springframework.util.ObjectUtils;
4129
import org.springframework.web.reactive.socket.CloseStatus;
4230
import org.springframework.web.reactive.socket.WebSocketMessage;
31+
import org.springframework.web.reactive.socket.WebSocketSession;
4332

4433
/**
34+
* Spring {@link WebSocketSession} adapter for RxNetty's
35+
* {@link io.reactivex.netty.protocol.http.ws.WebSocketConnection}.
4536
*
4637
* @author Rossen Stoyanchev
4738
* @since 5.0
4839
*/
49-
public class RxNettyWebSocketSession extends WebSocketSessionSupport<WebSocketConnection> {
50-
51-
private static final Map<Class<?>, WebSocketMessage.Type> MESSAGE_TYPES;
52-
53-
static {
54-
MESSAGE_TYPES = new HashMap<>(4);
55-
MESSAGE_TYPES.put(TextWebSocketFrame.class, WebSocketMessage.Type.TEXT);
56-
MESSAGE_TYPES.put(BinaryWebSocketFrame.class, WebSocketMessage.Type.BINARY);
57-
MESSAGE_TYPES.put(PingWebSocketFrame.class, WebSocketMessage.Type.PING);
58-
MESSAGE_TYPES.put(PongWebSocketFrame.class, WebSocketMessage.Type.PONG);
59-
}
60-
61-
62-
private final String id;
63-
64-
private final URI uri;
65-
66-
private final NettyDataBufferFactory bufferFactory;
40+
public class RxNettyWebSocketSession extends NettyWebSocketSessionSupport<WebSocketConnection> {
6741

6842

6943
public RxNettyWebSocketSession(WebSocketConnection conn, URI uri, NettyDataBufferFactory factory) {
70-
super(conn);
71-
Assert.notNull(uri, "'uri' is required.");
72-
Assert.notNull(uri, "'bufferFactory' is required.");
73-
this.id = ObjectUtils.getIdentityHexString(getDelegate());
74-
this.uri = uri;
75-
this.bufferFactory = factory;
76-
}
77-
78-
79-
@Override
80-
public String getId() {
81-
return this.id;
44+
super(conn, uri, factory);
8245
}
8346

84-
@Override
85-
public URI getUri() {
86-
return this.uri;
87-
}
8847

8948
@Override
9049
public Flux<WebSocketMessage> receive() {
91-
return Flux.from(RxReactiveStreams.toPublisher(getDelegate().getInput()))
92-
.filter(frame -> !(frame instanceof CloseWebSocketFrame))
93-
.window()
94-
.concatMap(flux -> flux.takeUntil(WebSocketFrame::isFinalFragment).buffer())
95-
.map(this::toMessage);
96-
}
97-
98-
@SuppressWarnings("OptionalGetWithoutIsPresent")
99-
private WebSocketMessage toMessage(List<WebSocketFrame> frames) {
100-
Class<?> frameType = frames.get(0).getClass();
101-
if (frames.size() == 1) {
102-
NettyDataBuffer buffer = this.bufferFactory.wrap(frames.get(0).content());
103-
return WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer);
104-
}
105-
return frames.stream()
106-
.map(socketFrame -> bufferFactory.wrap(socketFrame.content()))
107-
.reduce(NettyDataBuffer::write)
108-
.map(buffer -> WebSocketMessage.create(MESSAGE_TYPES.get(frameType), buffer))
109-
.get();
50+
Observable<WebSocketFrame> observable = getDelegate().getInput();
51+
Flux<WebSocketFrame> flux = Flux.from(RxReactiveStreams.toPublisher(observable));
52+
return toMessageFlux(flux);
11053
}
11154

11255
@Override
@@ -116,28 +59,10 @@ public Mono<Void> send(Publisher<WebSocketMessage> messages) {
11659
return Mono.from(RxReactiveStreams.toPublisher(completion));
11760
}
11861

119-
private WebSocketFrame toFrame(WebSocketMessage message) {
120-
ByteBuf byteBuf = NettyDataBufferFactory.toByteBuf(message.getPayload());
121-
if (WebSocketMessage.Type.TEXT.equals(message.getType())) {
122-
return new TextWebSocketFrame(byteBuf);
123-
}
124-
else if (WebSocketMessage.Type.BINARY.equals(message.getType())) {
125-
return new BinaryWebSocketFrame(byteBuf);
126-
}
127-
else if (WebSocketMessage.Type.PING.equals(message.getType())) {
128-
return new PingWebSocketFrame(byteBuf);
129-
}
130-
else if (WebSocketMessage.Type.PONG.equals(message.getType())) {
131-
return new PongWebSocketFrame(byteBuf);
132-
}
133-
else {
134-
throw new IllegalArgumentException("Unexpected message type: " + message.getType());
135-
}
136-
}
137-
13862
@Override
13963
protected Mono<Void> closeInternal(CloseStatus status) {
140-
return Mono.from(RxReactiveStreams.toPublisher(getDelegate().close()));
64+
Observable<Void> completion = getDelegate().close();
65+
return Mono.from(RxReactiveStreams.toPublisher(completion));
14166
}
14267

14368
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/*
2+
* Copyright 2002-2016 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.socket.adapter;
17+
18+
import java.net.URI;
19+
20+
import org.springframework.http.server.reactive.ServerHttpRequest;
21+
import org.springframework.util.Assert;
22+
import org.springframework.web.reactive.socket.WebSocketHandler;
23+
24+
/**
25+
* Base class for {@link WebSocketHandler} implementations.
26+
*
27+
* @author Rossen Stoyanchev
28+
* @since 5.0
29+
*/
30+
public abstract class WebSocketHandlerAdapterSupport {
31+
32+
private final URI uri;
33+
34+
private final WebSocketHandler delegate;
35+
36+
37+
protected WebSocketHandlerAdapterSupport(ServerHttpRequest request, WebSocketHandler handler) {
38+
Assert.notNull("'request' is required");
39+
Assert.notNull("'handler' handler is required");
40+
this.uri = request.getURI();
41+
this.delegate = handler;
42+
}
43+
44+
45+
public URI getUri() {
46+
return this.uri;
47+
}
48+
49+
public WebSocketHandler getDelegate() {
50+
return this.delegate;
51+
}
52+
53+
}

0 commit comments

Comments
 (0)