Skip to content

Commit 2ba5ded

Browse files
committed
Polish Jetty reactive HttpClient connector
1 parent 2eae37d commit 2ba5ded

File tree

2 files changed

+35
-25
lines changed

2 files changed

+35
-25
lines changed

spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpConnector.java

Lines changed: 24 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@
2121
import java.util.function.Function;
2222

2323
import org.eclipse.jetty.client.HttpClient;
24-
import org.eclipse.jetty.util.Callback;
24+
import org.eclipse.jetty.reactive.client.ContentChunk;
2525
import reactor.core.publisher.Flux;
2626
import reactor.core.publisher.Mono;
2727

@@ -33,12 +33,7 @@
3333
import org.springframework.util.Assert;
3434

3535
/**
36-
* Jetty ReactiveStreams HttpClient implementation of {@link ClientHttpConnector}.
37-
*
38-
* Implemented with buffer copy instead of optimized buffer wrapping because the latter
39-
* hangs since {@link Callback#succeeded()} doesn't allow releasing the buffer and
40-
* requesting more data at different times (required for {@code Mono<DataBuffer>} for example).
41-
* See https://github.com/eclipse/jetty.project/issues/2429 for more details.
36+
* {@link ClientHttpConnector} for the Jetty Reactive Streams HttpClient.
4237
*
4338
* @author Sebastien Deleuze
4439
* @since 5.1
@@ -63,7 +58,9 @@ public JettyClientHttpConnector() {
6358
* @param resourceFactory the {@link JettyResourceFactory} to use
6459
* @param customizer the lambda used to customize the {@link HttpClient}
6560
*/
66-
public JettyClientHttpConnector(JettyResourceFactory resourceFactory, @Nullable Consumer<HttpClient> customizer) {
61+
public JettyClientHttpConnector(
62+
JettyResourceFactory resourceFactory, @Nullable Consumer<HttpClient> customizer) {
63+
6764
HttpClient httpClient = new HttpClient();
6865
httpClient.setExecutor(resourceFactory.getExecutor());
6966
httpClient.setByteBufferPool(resourceFactory.getByteBufferPool());
@@ -107,16 +104,27 @@ public Mono<ClientHttpResponse> connect(HttpMethod method, URI uri,
107104

108105
JettyClientHttpRequest clientHttpRequest = new JettyClientHttpRequest(
109106
this.httpClient.newRequest(uri).method(method.toString()), this.bufferFactory);
107+
110108
return requestCallback.apply(clientHttpRequest).then(Mono.from(
111-
clientHttpRequest.getReactiveRequest().response((reactiveResponse, contentChunks) -> {
112-
Flux<DataBuffer> content = Flux.from(contentChunks).map(chunk -> {
113-
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
114-
buffer.write(chunk.buffer);
115-
chunk.callback.succeeded();
116-
return buffer;
117-
});
118-
return Mono.just(new JettyClientHttpResponse(reactiveResponse, content));
109+
clientHttpRequest.getReactiveRequest().response((response, chunks) -> {
110+
Flux<DataBuffer> content = Flux.from(chunks).map(this::toDataBuffer);
111+
return Mono.just(new JettyClientHttpResponse(response, content));
119112
})));
120113
}
121114

115+
private DataBuffer toDataBuffer(ContentChunk chunk) {
116+
117+
// We must copy until this is resolved:
118+
// https://github.com/eclipse/jetty.project/issues/2429
119+
120+
// Use copy instead of buffer wrapping because Callback#succeeded() is
121+
// used not only to release the buffer but also to request more data
122+
// which is a problem for codecs that buffer data.
123+
124+
DataBuffer buffer = this.bufferFactory.allocateBuffer(chunk.buffer.capacity());
125+
buffer.write(chunk.buffer);
126+
chunk.callback.succeeded();
127+
return buffer;
128+
}
129+
122130
}

spring-web/src/main/java/org/springframework/http/client/reactive/JettyClientHttpRequest.java

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.util.function.Function;
2323

2424
import org.eclipse.jetty.client.api.Request;
25-
import org.eclipse.jetty.http.HttpHeader;
2625
import org.eclipse.jetty.reactive.client.ContentChunk;
2726
import org.eclipse.jetty.reactive.client.ReactiveRequest;
2827
import org.eclipse.jetty.util.Callback;
@@ -86,24 +85,26 @@ public DataBufferFactory bufferFactory() {
8685
}
8786

8887
@Override
89-
public Mono<Void> writeWith(Publisher<? extends DataBuffer> publisher) {
90-
Flux<ContentChunk> chunks = Flux.from(publisher).map(this::toContentChunk);
91-
MediaType contentType = getHeaders().getContentType();
92-
ReactiveRequest.Content requestContent = ReactiveRequest.Content.fromPublisher(chunks,
93-
(contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE));
94-
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(requestContent).build();
88+
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
89+
Flux<ContentChunk> chunks = Flux.from(body).map(this::toContentChunk);
90+
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
91+
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
9592
return doCommit(this::completes);
9693
}
9794

9895
@Override
9996
public Mono<Void> writeAndFlushWith(Publisher<? extends Publisher<? extends DataBuffer>> body) {
100-
String contentType = this.jettyRequest.getHeaders().getField(HttpHeader.CONTENT_TYPE).getValue();
10197
Flux<ContentChunk> chunks = Flux.from(body).flatMap(Function.identity()).map(this::toContentChunk);
102-
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, contentType);
98+
ReactiveRequest.Content content = ReactiveRequest.Content.fromPublisher(chunks, getContentType());
10399
this.reactiveRequest = ReactiveRequest.newBuilder(this.jettyRequest).content(content).build();
104100
return doCommit(this::completes);
105101
}
106102

103+
private String getContentType() {
104+
MediaType contentType = getHeaders().getContentType();
105+
return contentType != null ? contentType.toString() : MediaType.APPLICATION_OCTET_STREAM_VALUE;
106+
}
107+
107108
private Mono<Void> completes() {
108109
return Mono.empty();
109110
}
@@ -146,4 +147,5 @@ ReactiveRequest getReactiveRequest() {
146147
}
147148
return this.reactiveRequest;
148149
}
150+
149151
}

0 commit comments

Comments
 (0)