Skip to content

Commit c187cb2

Browse files
committed
Ensure client response is drained with onStatus hook
Issue: SPR-17473
1 parent ed1d63d commit c187cb2

File tree

6 files changed

+224
-25
lines changed

6 files changed

+224
-25
lines changed

spring-core/src/test/java/org/springframework/core/io/buffer/AbstractDataBufferAllocatingTestCase.java

Lines changed: 41 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package org.springframework.core.io.buffer;
1818

1919
import java.nio.charset.StandardCharsets;
20+
import java.time.Duration;
21+
import java.time.Instant;
2022
import java.util.Arrays;
2123
import java.util.List;
2224
import java.util.function.Consumer;
@@ -37,7 +39,11 @@
3739
import static org.junit.Assert.*;
3840

3941
/**
42+
* Base class for tests that read or write data buffers with a rule to check
43+
* that allocated buffers have been released.
44+
*
4045
* @author Arjen Poutsma
46+
* @author Rossen Stoyanchev
4147
*/
4248
@RunWith(Parameterized.class)
4349
public abstract class AbstractDataBufferAllocatingTestCase {
@@ -62,6 +68,7 @@ public static Object[][] dataBufferFactories() {
6268
@Rule
6369
public final Verifier leakDetector = new LeakDetector();
6470

71+
6572
protected DataBuffer createDataBuffer(int capacity) {
6673
return this.bufferFactory.allocateBuffer(capacity);
6774
}
@@ -93,30 +100,45 @@ protected Consumer<DataBuffer> stringConsumer(String expected) {
93100
};
94101
}
95102

96-
97-
private class LeakDetector extends Verifier {
98-
99-
@Override
100-
protected void verify() throws Throwable {
101-
if (bufferFactory instanceof NettyDataBufferFactory) {
102-
ByteBufAllocator byteBufAllocator =
103-
((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
104-
if (byteBufAllocator instanceof PooledByteBufAllocator) {
105-
PooledByteBufAllocator pooledByteBufAllocator =
106-
(PooledByteBufAllocator) byteBufAllocator;
107-
PooledByteBufAllocatorMetric metric = pooledByteBufAllocator.metric();
108-
long allocations = calculateAllocations(metric.directArenas()) +
109-
calculateAllocations(metric.heapArenas());
110-
assertTrue("ByteBuf leak detected: " + allocations +
111-
" allocations were not released", allocations == 0);
112-
}
103+
/**
104+
* Wait until allocations are at 0, or the given duration elapses.
105+
*/
106+
protected void waitForDataBufferRelease(Duration duration) throws InterruptedException {
107+
Instant start = Instant.now();
108+
while (Instant.now().isBefore(start.plus(duration))) {
109+
try {
110+
verifyAllocations();
111+
break;
112+
}
113+
catch (AssertionError ex) {
114+
// ignore;
113115
}
116+
Thread.sleep(50);
114117
}
118+
}
115119

116-
private long calculateAllocations(List<PoolArenaMetric> metrics) {
117-
return metrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
120+
private void verifyAllocations() {
121+
if (this.bufferFactory instanceof NettyDataBufferFactory) {
122+
ByteBufAllocator allocator = ((NettyDataBufferFactory) this.bufferFactory).getByteBufAllocator();
123+
if (allocator instanceof PooledByteBufAllocator) {
124+
PooledByteBufAllocatorMetric metric = ((PooledByteBufAllocator) allocator).metric();
125+
long total = getAllocations(metric.directArenas()) + getAllocations(metric.heapArenas());
126+
assertEquals("ByteBuf Leak: " + total + " unreleased allocations", 0, total);
127+
}
118128
}
129+
}
130+
131+
private static long getAllocations(List<PoolArenaMetric> metrics) {
132+
return metrics.stream().mapToLong(PoolArenaMetric::numActiveAllocations).sum();
133+
}
134+
119135

136+
protected class LeakDetector extends Verifier {
137+
138+
@Override
139+
public void verify() {
140+
AbstractDataBufferAllocatingTestCase.this.verifyAllocations();
141+
}
120142
}
121143

122144
}

spring-web/src/main/java/org/springframework/http/client/reactive/ReactorClientHttpResponse.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package org.springframework.http.client.reactive;
1818

1919
import java.util.Collection;
20+
import java.util.concurrent.atomic.AtomicBoolean;
2021

2122
import io.netty.buffer.ByteBufAllocator;
2223
import reactor.core.publisher.Flux;
@@ -28,6 +29,7 @@
2829
import org.springframework.http.HttpHeaders;
2930
import org.springframework.http.HttpStatus;
3031
import org.springframework.http.ResponseCookie;
32+
import org.springframework.util.Assert;
3133
import org.springframework.util.CollectionUtils;
3234
import org.springframework.util.LinkedMultiValueMap;
3335
import org.springframework.util.MultiValueMap;
@@ -47,6 +49,8 @@ class ReactorClientHttpResponse implements ClientHttpResponse {
4749

4850
private final NettyInbound inbound;
4951

52+
private final AtomicBoolean bodyConsumed = new AtomicBoolean();
53+
5054

5155
public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbound, ByteBufAllocator alloc) {
5256
this.response = response;
@@ -58,6 +62,10 @@ public ReactorClientHttpResponse(HttpClientResponse response, NettyInbound inbou
5862
@Override
5963
public Flux<DataBuffer> getBody() {
6064
return this.inbound.receive()
65+
.doOnSubscribe(s ->
66+
// See https://github.com/reactor/reactor-netty/issues/503
67+
Assert.state(this.bodyConsumed.compareAndSet(false, true),
68+
"The client response body can only be consumed once."))
6169
.map(byteBuf -> {
6270
byteBuf.retain();
6371
return this.bufferFactory.wrap(byteBuf);

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -436,19 +436,30 @@ private <T> Flux<T> monoThrowableToFlux(Mono<? extends Throwable> mono) {
436436

437437
private <T extends Publisher<?>> T bodyToPublisher(ClientResponse response,
438438
T bodyPublisher, Function<Mono<? extends Throwable>, T> errorFunction) {
439+
439440
if (HttpStatus.resolve(response.rawStatusCode()) != null) {
440-
return this.statusHandlers.stream()
441-
.filter(statusHandler -> statusHandler.test(response.statusCode()))
442-
.findFirst()
443-
.map(statusHandler -> statusHandler.apply(response))
444-
.map(errorFunction::apply)
445-
.orElse(bodyPublisher);
441+
for (StatusHandler handler : this.statusHandlers) {
442+
if (handler.test(response.statusCode())) {
443+
Mono<? extends Throwable> exMono = handler.apply(response);
444+
exMono = exMono.flatMap(ex -> drainBody(response, ex));
445+
exMono = exMono.onErrorResume(ex -> drainBody(response, ex));
446+
return errorFunction.apply(exMono);
447+
}
448+
}
449+
return bodyPublisher;
446450
}
447451
else {
448452
return errorFunction.apply(createResponseException(response));
449453
}
450454
}
451455

456+
@SuppressWarnings("unchecked")
457+
private <T> Mono<T> drainBody(ClientResponse response, Throwable ex) {
458+
// Ensure the body is drained, even if the StatusHandler didn't consume it,
459+
// but ignore errors in case it did consume it.
460+
return (Mono<T>) response.bodyToMono(Void.class).onErrorMap(ex2 -> ex).thenReturn(ex);
461+
}
462+
452463
private static Mono<WebClientResponseException> createResponseException(ClientResponse response) {
453464
return DataBufferUtils.join(response.body(BodyExtractors.toDataBuffers()))
454465
.map(dataBuffer -> {

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/WebClient.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -595,6 +595,9 @@ interface ResponseSpec {
595595
* {@link WebClientResponseException} when the response status code is 4xx or 5xx.
596596
* @param statusPredicate a predicate that indicates whether {@code exceptionFunction}
597597
* applies
598+
* <p><strong>NOTE:</strong> if the response is expected to have content,
599+
* the exceptionFunction should consume it. If not, the content will be
600+
* automatically drained to ensure resources are released.
598601
* @param exceptionFunction the function that returns the exception
599602
* @return this builder
600603
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,151 @@
1+
/*
2+
* Copyright 2002-2018 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package org.springframework.web.reactive.function.client;
17+
18+
import java.time.Duration;
19+
import java.util.function.Function;
20+
21+
import io.netty.buffer.ByteBufAllocator;
22+
import io.netty.channel.ChannelOption;
23+
import okhttp3.mockwebserver.MockResponse;
24+
import okhttp3.mockwebserver.MockWebServer;
25+
import org.junit.After;
26+
import org.junit.Before;
27+
import org.junit.Test;
28+
import reactor.core.publisher.Mono;
29+
import reactor.test.StepVerifier;
30+
31+
import org.springframework.core.io.buffer.AbstractDataBufferAllocatingTestCase;
32+
import org.springframework.core.io.buffer.NettyDataBufferFactory;
33+
import org.springframework.http.HttpStatus;
34+
import org.springframework.http.MediaType;
35+
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
36+
import org.springframework.http.client.reactive.ReactorResourceFactory;
37+
38+
import static org.junit.Assert.*;
39+
40+
/**
41+
* WebClient integration tests focusing on data buffer management.
42+
* @author Rossen Stoyanchev
43+
*/
44+
public class WebClientDataBufferAllocatingTests extends AbstractDataBufferAllocatingTestCase {
45+
46+
private static final Duration DELAY = Duration.ofSeconds(5);
47+
48+
49+
private MockWebServer server;
50+
51+
private WebClient webClient;
52+
53+
private ReactorResourceFactory factory;
54+
55+
56+
@Before
57+
public void setUp() {
58+
59+
this.factory = new ReactorResourceFactory();
60+
this.factory.afterPropertiesSet();
61+
62+
this.server = new MockWebServer();
63+
this.webClient = WebClient
64+
.builder()
65+
.clientConnector(initConnector())
66+
.baseUrl(this.server.url("/").toString())
67+
.build();
68+
}
69+
70+
private ReactorClientHttpConnector initConnector() {
71+
if (bufferFactory instanceof NettyDataBufferFactory) {
72+
ByteBufAllocator allocator = ((NettyDataBufferFactory) bufferFactory).getByteBufAllocator();
73+
return new ReactorClientHttpConnector(this.factory, httpClient ->
74+
httpClient.tcpConfiguration(tcpClient -> tcpClient.option(ChannelOption.ALLOCATOR, allocator)));
75+
}
76+
else {
77+
return new ReactorClientHttpConnector();
78+
}
79+
}
80+
81+
@After
82+
public void shutDown() throws InterruptedException {
83+
waitForDataBufferRelease(Duration.ofSeconds(2));
84+
this.factory.destroy();
85+
}
86+
87+
88+
89+
@Test
90+
public void bodyToMonoVoid() {
91+
92+
this.server.enqueue(new MockResponse()
93+
.setResponseCode(201)
94+
.setHeader("Content-Type", "application/json")
95+
.setChunkedBody("{\"foo\" : {\"bar\" : \"123\", \"baz\" : \"456\"}}", 5));
96+
97+
Mono<Void> mono = this.webClient.get()
98+
.uri("/json").accept(MediaType.APPLICATION_JSON)
99+
.retrieve()
100+
.bodyToMono(Void.class);
101+
102+
StepVerifier.create(mono).expectComplete().verify(Duration.ofSeconds(3));
103+
assertEquals(1, this.server.getRequestCount());
104+
}
105+
106+
107+
@Test
108+
public void onStatusWithBodyNotConsumed() {
109+
RuntimeException ex = new RuntimeException("response error");
110+
testOnStatus(ex, response -> Mono.just(ex));
111+
}
112+
113+
@Test
114+
public void onStatusWithBodyConsumed() {
115+
RuntimeException ex = new RuntimeException("response error");
116+
testOnStatus(ex, response -> response.bodyToMono(Void.class).thenReturn(ex));
117+
}
118+
119+
@Test // SPR-17473
120+
public void onStatusWithMonoErrorAndBodyNotConsumed() {
121+
RuntimeException ex = new RuntimeException("response error");
122+
testOnStatus(ex, response -> Mono.error(ex));
123+
}
124+
125+
@Test
126+
public void onStatusWithMonoErrorAndBodyConsumed() {
127+
RuntimeException ex = new RuntimeException("response error");
128+
testOnStatus(ex, response -> response.bodyToMono(Void.class).then(Mono.error(ex)));
129+
}
130+
131+
private void testOnStatus(Throwable expected,
132+
Function<ClientResponse, Mono<? extends Throwable>> exceptionFunction) {
133+
134+
HttpStatus errorStatus = HttpStatus.BAD_GATEWAY;
135+
136+
this.server.enqueue(new MockResponse()
137+
.setResponseCode(errorStatus.value())
138+
.setHeader("Content-Type", "application/json")
139+
.setChunkedBody("{\"error\" : {\"status\" : 502, \"message\" : \"Bad gateway.\"}}", 5));
140+
141+
Mono<String> mono = this.webClient.get()
142+
.uri("/json").accept(MediaType.APPLICATION_JSON)
143+
.retrieve()
144+
.onStatus(status -> status.equals(errorStatus), exceptionFunction)
145+
.bodyToMono(String.class);
146+
147+
StepVerifier.create(mono).expectErrorSatisfies(actual -> assertSame(expected, actual)).verify(DELAY);
148+
assertEquals(1, this.server.getRequestCount());
149+
}
150+
151+
}

src/docs/asciidoc/web/webflux-webclient.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,10 @@ as the following example shows:
294294
----
295295
====
296296

297+
When `onStatus` is used, if the response is expected to have content, then the `onStatus`
298+
callback should consume it. If not, the content will be automatically drained to ensure
299+
resources are released.
300+
297301

298302

299303

0 commit comments

Comments
 (0)