Skip to content

Commit b6111d0

Browse files
committed
Ensure WebClientResponseException for malformed response
Closes gh-27262
1 parent 9bd989f commit b6111d0

File tree

3 files changed

+21
-91
lines changed

3 files changed

+21
-91
lines changed

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public Mono<WebClientResponseException> createException() {
203203
return bytes;
204204
})
205205
.defaultIfEmpty(EMPTY)
206-
.onErrorReturn(IllegalStateException.class::isInstance, EMPTY)
206+
.onErrorReturn(ex -> !(ex instanceof Error), EMPTY)
207207
.map(bodyBytes -> {
208208
HttpRequest request = this.requestSupplier.get();
209209
Charset charset = headers().contentType().map(MimeType::getCharset).orElse(null);

spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultWebClient.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,9 @@ public <T> Mono<ResponseEntity<Flux<T>>> toEntityFlux(BodyExtractor<Flux<T>, ? s
612612
public Mono<ResponseEntity<Void>> toBodilessEntity() {
613613
return this.responseMono.flatMap(response ->
614614
WebClientUtils.mapToEntity(response, handleBodyMono(response, Mono.<Void>empty()))
615-
.flatMap(entity -> response.releaseBody().thenReturn(entity))
615+
.flatMap(entity -> response.releaseBody()
616+
.onErrorResume(WebClientUtils.WRAP_EXCEPTION_PREDICATE, exceptionWrappingFunction(response))
617+
.thenReturn(entity))
616618
);
617619
}
618620

spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java

Lines changed: 17 additions & 89 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,10 @@
4242
import okhttp3.mockwebserver.MockResponse;
4343
import okhttp3.mockwebserver.MockWebServer;
4444
import okhttp3.mockwebserver.RecordedRequest;
45-
import okhttp3.mockwebserver.SocketPolicy;
4645
import org.junit.jupiter.api.AfterEach;
4746
import org.junit.jupiter.api.Test;
4847
import org.junit.jupiter.params.ParameterizedTest;
49-
import org.junit.jupiter.params.provider.Arguments;
5048
import org.junit.jupiter.params.provider.MethodSource;
51-
import org.springframework.util.SocketUtils;
52-
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
5349
import reactor.core.publisher.Flux;
5450
import reactor.core.publisher.Mono;
5551
import reactor.netty.http.client.HttpClient;
@@ -73,7 +69,9 @@
7369
import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector;
7470
import org.springframework.http.client.reactive.JettyClientHttpConnector;
7571
import org.springframework.http.client.reactive.ReactorClientHttpConnector;
72+
import org.springframework.util.SocketUtils;
7673
import org.springframework.web.reactive.function.BodyExtractors;
74+
import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
7775
import org.springframework.web.testfixture.xml.Pojo;
7876

7977
import static org.assertj.core.api.Assertions.assertThat;
@@ -1220,63 +1218,9 @@ void invalidDomain(ClientHttpConnector connector) {
12201218
.verify();
12211219
}
12221220

1223-
static Stream<Arguments> socketFaultArguments() {
1224-
Stream.Builder<Arguments> argumentsBuilder = Stream.builder();
1225-
arguments().forEach(arg -> {
1226-
argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AT_START));
1227-
argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_DURING_REQUEST_BODY));
1228-
argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AFTER_REQUEST));
1229-
});
1230-
return argumentsBuilder.build();
1231-
}
1232-
1233-
@ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]")
1234-
@MethodSource("socketFaultArguments")
1235-
void prematureClosureFault(ClientHttpConnector connector, SocketPolicy socketPolicy) {
1236-
startServer(connector);
1237-
1238-
prepareResponse(response -> response
1239-
.setSocketPolicy(socketPolicy)
1240-
.setStatus("HTTP/1.1 200 OK")
1241-
.setHeader("Response-Header-1", "value 1")
1242-
.setHeader("Response-Header-2", "value 2")
1243-
.setBody("{\"message\": \"Hello, World!\"}"));
1244-
1245-
String uri = "/test";
1246-
Mono<String> result = this.webClient
1247-
.post()
1248-
.uri(uri)
1249-
// Random non-empty body to allow us to interrupt.
1250-
.bodyValue("{\"action\": \"Say hello!\"}")
1251-
.retrieve()
1252-
.bodyToMono(String.class);
1253-
1254-
StepVerifier.create(result)
1255-
.expectErrorSatisfies(throwable -> {
1256-
assertThat(throwable).isInstanceOf(WebClientRequestException.class);
1257-
WebClientRequestException ex = (WebClientRequestException) throwable;
1258-
// Varies between connector providers.
1259-
assertThat(ex.getCause()).isInstanceOf(IOException.class);
1260-
})
1261-
.verify();
1262-
}
1263-
1264-
static Stream<Arguments> malformedResponseChunkArguments() {
1265-
return Stream.of(
1266-
Arguments.of(new ReactorClientHttpConnector(), true),
1267-
Arguments.of(new JettyClientHttpConnector(), true),
1268-
// Apache injects the Transfer-Encoding header for us, and complains with an exception if we also
1269-
// add it. The other two connectors do not add the header at all. We need this header for the test
1270-
// case to work correctly.
1271-
Arguments.of(new HttpComponentsClientHttpConnector(), false)
1272-
);
1273-
}
1274-
1275-
@ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]")
1276-
@MethodSource("malformedResponseChunkArguments")
1277-
void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector, boolean addTransferEncodingHeader) {
1278-
Mono<?> result = doMalformedResponseChunks(connector, addTransferEncodingHeader, ResponseSpec::toBodilessEntity);
1279-
1221+
@ParameterizedWebClientTest
1222+
void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector) {
1223+
Mono<?> result = doMalformedChunkedResponseTest(connector, ResponseSpec::toBodilessEntity);
12801224
StepVerifier.create(result)
12811225
.expectErrorSatisfies(throwable -> {
12821226
assertThat(throwable).isInstanceOf(WebClientException.class);
@@ -1286,11 +1230,9 @@ void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector, bool
12861230
.verify();
12871231
}
12881232

1289-
@ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]")
1290-
@MethodSource("malformedResponseChunkArguments")
1291-
void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector, boolean addTransferEncodingHeader) {
1292-
Mono<?> result = doMalformedResponseChunks(connector, addTransferEncodingHeader, spec -> spec.toEntity(String.class));
1293-
1233+
@ParameterizedWebClientTest
1234+
void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector) {
1235+
Mono<?> result = doMalformedChunkedResponseTest(connector, spec -> spec.toEntity(String.class));
12941236
StepVerifier.create(result)
12951237
.expectErrorSatisfies(throwable -> {
12961238
assertThat(throwable).isInstanceOf(WebClientException.class);
@@ -1300,17 +1242,13 @@ void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector, bool
13001242
.verify();
13011243
}
13021244

1303-
private <T> Mono<T> doMalformedResponseChunks(
1304-
ClientHttpConnector connector,
1305-
boolean addTransferEncodingHeader,
1306-
Function<ResponseSpec, Mono<T>> responseHandler
1307-
) {
1245+
private <T> Mono<T> doMalformedChunkedResponseTest(
1246+
ClientHttpConnector connector, Function<ResponseSpec, Mono<T>> handler) {
1247+
13081248
int port = SocketUtils.findAvailableTcpPort();
13091249

13101250
Thread serverThread = new Thread(() -> {
1311-
// This exists separately to the main mock server, as I had a really hard time getting that to send the
1312-
// chunked responses correctly, flushing the socket each time. This was the only way I was able to replicate
1313-
// the issue of the client not handling malformed response chunks correctly.
1251+
// No way to simulate a malformed chunked response through MockWebServer.
13141252
try (ServerSocket serverSocket = new ServerSocket(port)) {
13151253
Socket socket = serverSocket.accept();
13161254
InputStream is = socket.getInputStream();
@@ -1324,30 +1262,20 @@ private <T> Mono<T> doMalformedResponseChunks(
13241262
os.write("\r\n".getBytes(StandardCharsets.UTF_8));
13251263
os.write("lskdu018973t09sylgasjkfg1][]'./.sdlv".getBytes(StandardCharsets.UTF_8));
13261264
socket.close();
1327-
} catch (IOException ex) {
1265+
}
1266+
catch (IOException ex) {
13281267
throw new RuntimeException(ex);
13291268
}
13301269
});
13311270

1332-
serverThread.setDaemon(true);
13331271
serverThread.start();
13341272

1335-
ResponseSpec spec = WebClient
1336-
.builder()
1273+
WebClient client = WebClient.builder()
13371274
.clientConnector(connector)
13381275
.baseUrl("http://localhost:" + port)
1339-
.build()
1340-
.post()
1341-
.headers(headers -> {
1342-
if (addTransferEncodingHeader) {
1343-
headers.add(HttpHeaders.TRANSFER_ENCODING, "chunked");
1344-
}
1345-
})
1346-
.retrieve();
1276+
.build();
13471277

1348-
return responseHandler
1349-
.apply(spec)
1350-
.doFinally(signal -> serverThread.stop());
1278+
return handler.apply(client.post().retrieve());
13511279
}
13521280

13531281
private void prepareResponse(Consumer<MockResponse> consumer) {

0 commit comments

Comments
 (0)