Skip to content

Commit ffbc75a

Browse files
violetaggrstoyanchev
authored andcommitted
Upgrade to Reactor Netty 0.8
Issue: SPR-16387
1 parent 61ffbe5 commit ffbc75a

File tree

20 files changed

+144
-178
lines changed

20 files changed

+144
-178
lines changed

spring-messaging/spring-messaging.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ dependencies {
1818
compile(project(":spring-core"))
1919
optional(project(":spring-context"))
2020
optional(project(":spring-oxm"))
21-
optional("io.projectreactor.ipc:reactor-netty")
21+
optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
2222
optional("org.eclipse.jetty.websocket:websocket-server:${jettyVersion}") {
2323
exclude group: "javax.servlet", module: "javax.servlet-api"
2424
}

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpClient.java

+28-64
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,13 @@
3939
import reactor.core.publisher.MonoProcessor;
4040
import reactor.core.scheduler.Scheduler;
4141
import reactor.core.scheduler.Schedulers;
42-
import reactor.ipc.netty.FutureMono;
43-
import reactor.ipc.netty.NettyContext;
44-
import reactor.ipc.netty.NettyInbound;
45-
import reactor.ipc.netty.NettyOutbound;
46-
import reactor.ipc.netty.options.ClientOptions;
47-
import reactor.ipc.netty.resources.LoopResources;
48-
import reactor.ipc.netty.resources.PoolResources;
49-
import reactor.ipc.netty.tcp.TcpClient;
42+
import reactor.netty.Connection;
43+
import reactor.netty.FutureMono;
44+
import reactor.netty.NettyInbound;
45+
import reactor.netty.NettyOutbound;
46+
import reactor.netty.resources.ConnectionProvider;
47+
import reactor.netty.resources.LoopResources;
48+
import reactor.netty.tcp.TcpClient;
5049

5150
import org.springframework.lang.Nullable;
5251
import org.springframework.messaging.Message;
@@ -83,7 +82,7 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
8382
private LoopResources loopResources;
8483

8584
@Nullable
86-
private PoolResources poolResources;
85+
private ConnectionProvider poolResources;
8786

8887
private final Scheduler scheduler = Schedulers.newParallel("tcp-client-scheduler");
8988

@@ -98,57 +97,18 @@ public class ReactorNettyTcpClient<P> implements TcpOperations<P> {
9897
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
9998
*/
10099
public ReactorNettyTcpClient(String host, int port, ReactorNettyCodec<P> codec) {
101-
this(builder -> builder.host(host).port(port), codec);
102-
}
103-
104-
/**
105-
* Constructor with a {@link ClientOptions.Builder} that can be used to
106-
* customize Reactor Netty client options.
107-
*
108-
* <p><strong>Note: </strong> this constructor manages the lifecycle of the
109-
* {@link TcpClient} and its underlying resources. Please do not customize
110-
* any of the following options:
111-
* {@link ClientOptions.Builder#channelGroup(ChannelGroup) ChannelGroup},
112-
* {@link ClientOptions.Builder#loopResources(LoopResources) LoopResources}, and
113-
* {@link ClientOptions.Builder#poolResources(PoolResources) PoolResources}.
114-
* You may set the {@link ClientOptions.Builder#disablePool() disablePool}
115-
* option if you simply want to turn off pooling.
116-
*
117-
* <p>For full control over the initialization and lifecycle of the TcpClient,
118-
* see {@link #ReactorNettyTcpClient(TcpClient, ReactorNettyCodec)}.
119-
*
120-
* @param optionsConsumer consumer to customize client options
121-
* @param codec the code to use
122-
* @see org.springframework.messaging.simp.stomp.StompReactorNettyCodec
123-
*/
124-
public ReactorNettyTcpClient(Consumer<ClientOptions.Builder<?>> optionsConsumer,
125-
ReactorNettyCodec<P> codec) {
126-
127-
Assert.notNull(optionsConsumer, "Consumer<ClientOptions.Builder<?> is required");
100+
Assert.notNull(host, "host is required");
101+
Assert.notNull(port, "port is required");
128102
Assert.notNull(codec, "ReactorNettyCodec is required");
129103

130104
this.channelGroup = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE);
131-
132-
Consumer<ClientOptions.Builder<?>> builtInConsumer = builder -> {
133-
134-
Assert.isTrue(!builder.isLoopAvailable() && !builder.isPoolAvailable(),
135-
"The provided ClientOptions.Builder contains LoopResources and/or PoolResources. " +
136-
"Please, use the constructor that accepts a TcpClient instance " +
137-
"for full control over initialization and lifecycle.");
138-
139-
builder.channelGroup(this.channelGroup);
140-
builder.preferNative(false);
141-
142-
this.loopResources = LoopResources.create("tcp-client-loop");
143-
builder.loopResources(this.loopResources);
144-
145-
if (!builder.isPoolDisabled()) {
146-
this.poolResources = PoolResources.elastic("tcp-client-pool");
147-
builder.poolResources(this.poolResources);
148-
}
149-
};
150-
151-
this.tcpClient = TcpClient.create(optionsConsumer.andThen(builtInConsumer));
105+
this.loopResources = LoopResources.create("tcp-client-loop");
106+
this.poolResources = ConnectionProvider.elastic("tcp-client-pool");
107+
this.tcpClient = TcpClient.create(poolResources)
108+
.host(host)
109+
.port(port)
110+
.runOn(loopResources, false)
111+
.doOnConnected(c -> channelGroup.add(c.channel()));
152112
this.codec = codec;
153113
}
154114

@@ -181,7 +141,8 @@ public ListenableFuture<Void> connect(final TcpConnectionHandler<P> handler) {
181141
}
182142

183143
Mono<Void> connectMono = this.tcpClient
184-
.newHandler(new ReactorNettyHandler(handler))
144+
.handle(new ReactorNettyHandler(handler))
145+
.connect()
185146
.doOnError(handler::afterConnectFailure)
186147
.then();
187148

@@ -201,11 +162,12 @@ public ListenableFuture<Void> connect(TcpConnectionHandler<P> handler, Reconnect
201162
MonoProcessor<Void> connectMono = MonoProcessor.create();
202163

203164
this.tcpClient
204-
.newHandler(new ReactorNettyHandler(handler))
165+
.handle(new ReactorNettyHandler(handler))
166+
.connect()
205167
.doOnNext(updateConnectMono(connectMono))
206168
.doOnError(updateConnectMono(connectMono))
207169
.doOnError(handler::afterConnectFailure) // report all connect failures to the handler
208-
.flatMap(NettyContext::onClose) // post-connect issues
170+
.flatMap(Connection::onDispose) // post-connect issues
209171
.retryWhen(reconnectFunction(strategy))
210172
.repeatWhen(reconnectFunction(strategy))
211173
.subscribe();
@@ -302,14 +264,16 @@ private class ReactorNettyHandler implements BiFunction<NettyInbound, NettyOutbo
302264
@Override
303265
@SuppressWarnings("unchecked")
304266
public Publisher<Void> apply(NettyInbound inbound, NettyOutbound outbound) {
305-
if (logger.isDebugEnabled()) {
306-
logger.debug("Connected to " + inbound.remoteAddress());
307-
}
267+
inbound.withConnection(c -> {
268+
if (logger.isDebugEnabled()) {
269+
logger.debug("Connected to " + c.address());
270+
}
271+
});
308272
DirectProcessor<Void> completion = DirectProcessor.create();
309273
TcpConnection<P> connection = new ReactorNettyTcpConnection<>(inbound, outbound, codec, completion);
310274
scheduler.schedule(() -> connectionHandler.afterConnected(connection));
311275

312-
inbound.context().addHandler(new StompMessageDecoder<>(codec));
276+
inbound.withConnection(c -> c.addHandler(new StompMessageDecoder<>(codec)));
313277

314278
inbound.receiveObject()
315279
.cast(Message.class)

spring-messaging/src/main/java/org/springframework/messaging/tcp/reactor/ReactorNettyTcpConnection.java

+4-13
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,10 @@
1717
package org.springframework.messaging.tcp.reactor;
1818

1919
import io.netty.buffer.ByteBuf;
20-
import io.netty.channel.ChannelPipeline;
2120
import reactor.core.publisher.DirectProcessor;
2221
import reactor.core.publisher.Mono;
23-
import reactor.ipc.netty.NettyInbound;
24-
import reactor.ipc.netty.NettyOutbound;
25-
import reactor.ipc.netty.NettyPipeline;
22+
import reactor.netty.NettyInbound;
23+
import reactor.netty.NettyOutbound;
2624

2725
import org.springframework.messaging.Message;
2826
import org.springframework.messaging.tcp.TcpConnection;
@@ -66,20 +64,13 @@ public ListenableFuture<Void> send(Message<P> message) {
6664
@Override
6765
@SuppressWarnings("deprecation")
6866
public void onReadInactivity(Runnable runnable, long inactivityDuration) {
69-
// TODO: workaround for https://github.com/reactor/reactor-netty/issues/22
70-
ChannelPipeline pipeline = this.inbound.context().channel().pipeline();
71-
String name = NettyPipeline.OnChannelReadIdle;
72-
if (pipeline.context(name) != null) {
73-
pipeline.remove(name);
74-
}
75-
76-
this.inbound.onReadIdle(inactivityDuration, runnable);
67+
this.inbound.withConnection(c -> c.onReadIdle(inactivityDuration, runnable));
7768
}
7869

7970
@Override
8071
@SuppressWarnings("deprecation")
8172
public void onWriteInactivity(Runnable runnable, long inactivityDuration) {
82-
this.outbound.onWriteIdle(inactivityDuration, runnable);
73+
this.inbound.withConnection(c -> c.onWriteIdle(inactivityDuration, runnable));
8374
}
8475

8576
@Override

spring-test/spring-test.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ dependencies {
8080
testCompile("org.apache.httpcomponents:httpclient:4.5.5") {
8181
exclude group: "commons-logging", module: "commons-logging"
8282
}
83-
testCompile('io.projectreactor.ipc:reactor-netty')
83+
testCompile('io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT')
8484
testCompile('de.bechte.junit:junit-hierarchicalcontextrunner:4.12.1')
8585
// Pull in the latest JUnit 5 Launcher API and the Vintage engine as well
8686
// so that we can run JUnit 4 tests in IntelliJ IDEA.

spring-web/spring-web.gradle

+1-1
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ dependencies {
3434
optional("io.reactivex:rxjava-reactive-streams:${rxjavaAdapterVersion}")
3535
optional("io.reactivex.rxjava2:rxjava:${rxjava2Version}")
3636
optional("io.netty:netty-all")
37-
optional("io.projectreactor.ipc:reactor-netty")
37+
optional("io.projectreactor.netty:reactor-netty:0.8.0.BUILD-SNAPSHOT")
3838
optional("org.apache.tomcat.embed:tomcat-embed-core:${tomcatVersion}")
3939
optional("org.eclipse.jetty:jetty-server:${jettyVersion}") {
4040
exclude group: "javax.servlet", module: "javax.servlet-api"

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpConnector.java

+24-22
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,25 @@
1717
package org.springframework.http.client.reactive;
1818

1919
import java.net.URI;
20-
import java.util.function.Consumer;
2120
import java.util.function.Function;
2221

2322
import reactor.core.publisher.Mono;
24-
import reactor.ipc.netty.http.client.HttpClient;
25-
import reactor.ipc.netty.http.client.HttpClientOptions;
26-
import reactor.ipc.netty.http.client.HttpClientRequest;
27-
import reactor.ipc.netty.http.client.HttpClientResponse;
28-
import reactor.ipc.netty.options.ClientOptions;
23+
import reactor.netty.NettyInbound;
24+
import reactor.netty.NettyOutbound;
25+
import reactor.netty.http.client.HttpClient;
26+
import reactor.netty.http.client.HttpClientRequest;
27+
import reactor.netty.http.client.HttpClientResponse;
2928

3029
import org.springframework.http.HttpMethod;
3130

31+
import io.netty.buffer.ByteBufAllocator;
32+
3233
/**
3334
* Reactor-Netty implementation of {@link ClientHttpConnector}.
3435
*
3536
* @author Brian Clozel
3637
* @since 5.0
37-
* @see reactor.ipc.netty.http.client.HttpClient
38+
* @see reactor.netty.http.client.HttpClient
3839
*/
3940
public class ReactorClientHttpConnector implements ClientHttpConnector {
4041

@@ -43,20 +44,19 @@ public class ReactorClientHttpConnector implements ClientHttpConnector {
4344

4445
/**
4546
* Create a Reactor Netty {@link ClientHttpConnector}
46-
* with default {@link ClientOptions} and HTTP compression support enabled.
47+
* with a default configuration and HTTP compression support enabled.
4748
*/
4849
public ReactorClientHttpConnector() {
49-
this.httpClient = HttpClient.builder()
50-
.options(options -> options.compression(true))
51-
.build();
50+
this.httpClient = HttpClient.create()
51+
.compress();
5252
}
5353

5454
/**
5555
* Create a Reactor Netty {@link ClientHttpConnector} with the given
56-
* {@link HttpClientOptions.Builder}
56+
* {@link HttpClient}
5757
*/
58-
public ReactorClientHttpConnector(Consumer<? super HttpClientOptions.Builder> clientOptions) {
59-
this.httpClient = HttpClient.create(clientOptions);
58+
public ReactorClientHttpConnector(HttpClient httpClient) {
59+
this.httpClient = httpClient;
6060
}
6161

6262

@@ -69,22 +69,24 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
6969
}
7070

7171
return this.httpClient
72-
.request(adaptHttpMethod(method),
73-
uri.toString(),
74-
request -> requestCallback.apply(adaptRequest(method, uri, request)))
75-
.map(this::adaptResponse);
72+
.request(adaptHttpMethod(method))
73+
.uri(uri.toString())
74+
.send((req, out) -> requestCallback.apply(adaptRequest(method, uri, req, out)))
75+
.responseConnection((res, con) -> Mono.just(adaptResponse(res, con.inbound(), con.outbound().alloc())))
76+
.next();
7677
}
7778

7879
private io.netty.handler.codec.http.HttpMethod adaptHttpMethod(HttpMethod method) {
7980
return io.netty.handler.codec.http.HttpMethod.valueOf(method.name());
8081
}
8182

82-
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request) {
83-
return new ReactorClientHttpRequest(method, uri, request);
83+
private ReactorClientHttpRequest adaptRequest(HttpMethod method, URI uri, HttpClientRequest request, NettyOutbound out) {
84+
return new ReactorClientHttpRequest(method, uri, request, out);
8485
}
8586

86-
private ClientHttpResponse adaptResponse(HttpClientResponse response) {
87-
return new ReactorClientHttpResponse(response);
87+
private ClientHttpResponse adaptResponse(HttpClientResponse response, NettyInbound nettyInbound,
88+
ByteBufAllocator alloc) {
89+
return new ReactorClientHttpResponse(response, nettyInbound, alloc);
8890
}
8991

9092
}

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpRequest.java

+13-9
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,8 @@
2525
import org.reactivestreams.Publisher;
2626
import reactor.core.publisher.Flux;
2727
import reactor.core.publisher.Mono;
28-
import reactor.ipc.netty.http.client.HttpClientRequest;
28+
import reactor.netty.NettyOutbound;
29+
import reactor.netty.http.client.HttpClientRequest;
2930

3031
import org.springframework.core.io.buffer.DataBuffer;
3132
import org.springframework.core.io.buffer.DataBufferFactory;
@@ -38,7 +39,7 @@
3839
*
3940
* @author Brian Clozel
4041
* @since 5.0
41-
* @see reactor.ipc.netty.http.client.HttpClient
42+
* @see reactor.netty.http.client.HttpClient
4243
*/
4344
class ReactorClientHttpRequest extends AbstractClientHttpRequest implements ZeroCopyHttpOutputMessage {
4445

@@ -48,15 +49,18 @@ class ReactorClientHttpRequest extends AbstractClientHttpRequest implements Zero
4849

4950
private final HttpClientRequest httpRequest;
5051

52+
private final NettyOutbound out;
53+
5154
private final NettyDataBufferFactory bufferFactory;
5255

5356

5457
public ReactorClientHttpRequest(HttpMethod httpMethod, URI uri,
55-
HttpClientRequest httpRequest) {
58+
HttpClientRequest httpRequest, NettyOutbound out) {
5659
this.httpMethod = httpMethod;
5760
this.uri = uri;
58-
this.httpRequest = httpRequest.failOnClientError(false).failOnServerError(false);
59-
this.bufferFactory = new NettyDataBufferFactory(httpRequest.alloc());
61+
this.httpRequest = httpRequest;
62+
this.out = out;
63+
this.bufferFactory = new NettyDataBufferFactory(out.alloc());
6064
}
6165

6266

@@ -77,14 +81,14 @@ public URI getURI() {
7781

7882
@Override
7983
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
80-
return doCommit(() -> this.httpRequest
84+
return doCommit(() -> this.out
8185
.send(Flux.from(body).map(NettyDataBufferFactory::toByteBuf)).then());
8286
}
8387

8488
@Override
8589
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
8690
Publisher<Publisher<ByteBuf>> byteBufs = Flux.from(body).map(ReactorClientHttpRequest::toByteBufs);
87-
return doCommit(() -> this.httpRequest.sendGroups(byteBufs).then());
91+
return doCommit(() -> this.out.sendGroups(byteBufs).then());
8892
}
8993

9094
private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dataBuffers) {
@@ -93,12 +97,12 @@ private static Publisher<ByteBuf> toByteBufs(Publisher<? extends DataBuffer> dat
9397

9498
@Override
9599
public Mono<Void> writeWith(File file, long position, long count) {
96-
return doCommit(() -> this.httpRequest.sendFile(file.toPath(), position, count).then());
100+
return doCommit(() -> this.out.sendFile(file.toPath(), position, count).then());
97101
}
98102

99103
@Override
100104
public Mono<Void> setComplete() {
101-
return doCommit(() -> httpRequest.sendHeaders().then());
105+
return doCommit(() -> out.then());
102106
}
103107

104108
@Override

0 commit comments

Comments
 (0)