From 358ca62baed86268047cf0816b6d4d4ce871f473 Mon Sep 17 00:00:00 2001 From: Ashley Scopes <73482956+ascopes@users.noreply.github.com> Date: Wed, 11 Aug 2021 16:13:41 +0100 Subject: [PATCH 1/3] WebClient tests for socket and response format issues Added test case for malformed response chunk, which is now failing as expected. --- .../client/WebClientIntegrationTests.java | 145 +++++++++++++++++- 1 file changed, 142 insertions(+), 3 deletions(-) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index e8a082a6b628..4878ca78c2f9 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -18,11 +18,15 @@ import java.io.ByteArrayOutputStream; import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.UncheckedIOException; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; +import java.net.ServerSocket; +import java.net.Socket; import java.net.URI; import java.nio.charset.StandardCharsets; import java.nio.file.Files; @@ -31,16 +35,21 @@ import java.util.List; import java.util.Map; import java.util.function.Consumer; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; import okhttp3.mockwebserver.MockResponse; import okhttp3.mockwebserver.MockWebServer; import okhttp3.mockwebserver.RecordedRequest; +import okhttp3.mockwebserver.SocketPolicy; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; +import org.springframework.util.SocketUtils; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -83,7 +92,7 @@ class WebClientIntegrationTests { @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) - @ParameterizedTest(name = "[{index}] webClient [{0}]") + @ParameterizedTest(name = "[{index}] {displayName} [{0}]") @MethodSource("arguments") @interface ParameterizedWebClientTest { } @@ -113,7 +122,9 @@ private void startServer(ClientHttpConnector connector) { @AfterEach void shutdown() throws IOException { - this.server.shutdown(); + if (server != null) { + this.server.shutdown(); + } } @@ -1209,6 +1220,135 @@ void invalidDomain(ClientHttpConnector connector) { .verify(); } + static Stream socketFaultArguments() { + Stream.Builder argumentsBuilder = Stream.builder(); + arguments().forEach(arg -> { + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AT_START)); + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AFTER_REQUEST)); + }); + return argumentsBuilder.build(); + } + + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") + @MethodSource("socketFaultArguments") + void prematureClosureFault(ClientHttpConnector connector, SocketPolicy socketPolicy) { + startServer(connector); + + prepareResponse(response -> response + .setSocketPolicy(socketPolicy) + .setStatus("HTTP/1.1 200 OK") + .setHeader("Response-Header-1", "value 1") + .setHeader("Response-Header-2", "value 2") + .setBody("{\"message\": \"Hello, World!\"}")); + + String uri = "/test"; + Mono result = this.webClient + .post() + .uri(uri) + // Random non-empty body to allow us to interrupt. + .bodyValue("{\"action\": \"Say hello!\"}") + .retrieve() + .bodyToMono(String.class); + + StepVerifier.create(result) + .expectErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(WebClientRequestException.class); + WebClientRequestException ex = (WebClientRequestException) throwable; + // Varies between connector providers. + assertThat(ex.getCause()).isInstanceOf(IOException.class); + }) + .verify(); + } + + static Stream malformedResponseChunkArguments() { + return Stream.of( + Arguments.of(new ReactorClientHttpConnector(), true), + Arguments.of(new JettyClientHttpConnector(), true), + // Apache injects the Transfer-Encoding header for us, and complains with an exception if we also + // add it. The other two connectors do not add the header at all. We need this header for the test + // case to work correctly. + Arguments.of(new HttpComponentsClientHttpConnector(), false) + ); + } + + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") + @MethodSource("malformedResponseChunkArguments") + void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector, boolean addTransferEncodingHeader) { + Mono result = doMalformedResponseChunks(connector, addTransferEncodingHeader, ResponseSpec::toBodilessEntity); + + StepVerifier.create(result) + .expectErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(WebClientException.class); + WebClientException ex = (WebClientException) throwable; + assertThat(ex.getCause()).isInstanceOf(IOException.class); + }) + .verify(); + } + + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") + @MethodSource("malformedResponseChunkArguments") + void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector, boolean addTransferEncodingHeader) { + Mono result = doMalformedResponseChunks(connector, addTransferEncodingHeader, spec -> spec.toEntity(String.class)); + + StepVerifier.create(result) + .expectErrorSatisfies(throwable -> { + assertThat(throwable).isInstanceOf(WebClientException.class); + WebClientException ex = (WebClientException) throwable; + assertThat(ex.getCause()).isInstanceOf(IOException.class); + }) + .verify(); + } + + private Mono doMalformedResponseChunks( + ClientHttpConnector connector, + boolean addTransferEncodingHeader, + Function> responseHandler + ) { + int port = SocketUtils.findAvailableTcpPort(); + + Thread serverThread = new Thread(() -> { + // This exists separately to the main mock server, as I had a really hard time getting that to send the + // chunked responses correctly, flushing the socket each time. This was the only way I was able to replicate + // the issue of the client not handling malformed response chunks correctly. + try (ServerSocket serverSocket = new ServerSocket(port)) { + Socket socket = serverSocket.accept(); + InputStream is = socket.getInputStream(); + + //noinspection ResultOfMethodCallIgnored + is.read(new byte[4096]); + + OutputStream os = socket.getOutputStream(); + os.write("HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8)); + os.write("Transfer-Encoding: chunked\r\n".getBytes(StandardCharsets.UTF_8)); + os.write("\r\n".getBytes(StandardCharsets.UTF_8)); + os.write("lskdu018973t09sylgasjkfg1][]'./.sdlv".getBytes(StandardCharsets.UTF_8)); + socket.close(); + } catch (IOException ex) { + throw new RuntimeException(ex); + } + }); + + serverThread.setDaemon(true); + serverThread.start(); + + ResponseSpec spec = WebClient + .builder() + .clientConnector(connector) + .baseUrl("http://localhost:" + port) + .build() + .post() + .headers(headers -> { + if (addTransferEncodingHeader) { + headers.add(HttpHeaders.TRANSFER_ENCODING, "chunked"); + } + }) + .retrieve(); + + return responseHandler + .apply(spec) + .doFinally(signal -> serverThread.stop()); + } private void prepareResponse(Consumer consumer) { MockResponse response = new MockResponse(); @@ -1252,5 +1392,4 @@ public void setContainerValue(T containerValue) { this.containerValue = containerValue; } } - } From 5cdcc1d37e6f16999182c3da609bede9cfd71761 Mon Sep 17 00:00:00 2001 From: Ashley Scopes <73482956+ascopes@users.noreply.github.com> Date: Wed, 11 Aug 2021 20:22:28 +0100 Subject: [PATCH 2/3] Added error mapper to DefaultClientResponse#body This targets an edge case that can occur when the Reactor client fails to process a response body due to an issue such as a malformed HTTP/1.1 response chunk, but since this issue only occurred after the body began to be parsed, nothing existed to catch it. This previously resulted in an unhandled ReactorException being thrown out of the subscriber to WebClient calls. Upon testing this further, it also appears to fix edge cases for Jetty and Apache HTTP Components as well when parsing a "bodiless entity" that has a malformed response chunk present, as this would beforehand defer the exception until later, again leading it to be thrown without being decorated in the Spring exception. Without this fix, error handling can be somewhat problematic, since each HTTP connector implementation throws a different exception type, and it would rely on the developer using this interface to know exactly what the internals of their chosen HTTP client were doing prior to the exception occurring. --- .../client/DefaultClientResponse.java | 47 ++++++++++++++----- 1 file changed, 34 insertions(+), 13 deletions(-) diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java index 927fcdf205d5..d777452ac742 100644 --- a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/DefaultClientResponse.java @@ -40,11 +40,13 @@ import org.springframework.http.client.reactive.ClientHttpResponse; import org.springframework.http.codec.HttpMessageReader; import org.springframework.http.server.reactive.ServerHttpResponse; +import org.springframework.lang.Nullable; import org.springframework.util.MimeType; import org.springframework.util.MultiValueMap; import org.springframework.web.reactive.function.BodyExtractor; import org.springframework.web.reactive.function.BodyExtractors; + /** * Default implementation of {@link ClientResponse}. * @@ -130,13 +132,21 @@ public MultiValueMap cookies() { public T body(BodyExtractor extractor) { T result = extractor.extract(this.response, this.bodyExtractorContext); String description = "Body from " + this.requestDescription + " [DefaultClientResponse]"; + if (result instanceof Mono) { - return (T) ((Mono) result).checkpoint(description); + return (T) ((Mono) result) + .checkpoint(description) + .onErrorMap(WebClientUtils.WRAP_EXCEPTION_PREDICATE, t -> toException(t, null, null)); } + else if (result instanceof Flux) { - return (T) ((Flux) result).checkpoint(description); + return (T) ((Flux) result) + .checkpoint(description) + .onErrorMap(WebClientUtils.WRAP_EXCEPTION_PREDICATE, t -> toException(t, null, null)); } + else { + // XXX: is there a way to preemptively handle uncaught exceptions here? return result; } } @@ -205,18 +215,12 @@ public Mono createException() { .defaultIfEmpty(EMPTY) .onErrorReturn(IllegalStateException.class::isInstance, EMPTY) .map(bodyBytes -> { - HttpRequest request = this.requestSupplier.get(); - Charset charset = headers().contentType().map(MimeType::getCharset).orElse(null); int statusCode = rawStatusCode(); HttpStatus httpStatus = HttpStatus.resolve(statusCode); + Charset charset = headers().contentType().map(MimeType::getCharset).orElse(null); + if (httpStatus != null) { - return WebClientResponseException.create( - statusCode, - httpStatus.getReasonPhrase(), - headers().asHttpHeaders(), - bodyBytes, - charset, - request); + return toException(null, bodyBytes, charset); } else { return new UnknownHttpStatusCodeException( @@ -224,7 +228,8 @@ public Mono createException() { headers().asHttpHeaders(), bodyBytes, charset, - request); + this.requestSupplier.get() + ); } }); } @@ -239,6 +244,23 @@ HttpRequest request() { return this.requestSupplier.get(); } + private WebClientResponseException toException( + @Nullable Throwable cause, @Nullable byte[] bodyBytes, @Nullable Charset charset) { + + WebClientResponseException ex = new WebClientResponseException( + this.response.getRawStatusCode(), + this.response.getStatusCode().getReasonPhrase(), + headers().asHttpHeaders(), + bodyBytes, + charset, + this.requestSupplier.get() + ); + + ex.initCause(cause); + + return ex; + } + private class DefaultHeaders implements Headers { private final HttpHeaders httpHeaders = @@ -269,5 +291,4 @@ private OptionalLong toOptionalLong(long value) { return (value != -1 ? OptionalLong.of(value) : OptionalLong.empty()); } } - } From eb0274710ebdc7d0942faa2ee302c1d7690327a0 Mon Sep 17 00:00:00 2001 From: Ashley Scopes <73482956+ascopes@users.noreply.github.com> Date: Thu, 12 Aug 2021 13:54:04 +0100 Subject: [PATCH 3/3] Fixed formatting of integration tests. --- .../function/client/WebClientIntegrationTests.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java index 4878ca78c2f9..b7664b0c16e1 100644 --- a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/WebClientIntegrationTests.java @@ -48,8 +48,6 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; import org.junit.jupiter.params.provider.MethodSource; -import org.springframework.util.SocketUtils; -import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; import reactor.netty.http.client.HttpClient; @@ -73,7 +71,9 @@ import org.springframework.http.client.reactive.HttpComponentsClientHttpConnector; import org.springframework.http.client.reactive.JettyClientHttpConnector; import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.util.SocketUtils; import org.springframework.web.reactive.function.BodyExtractors; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; import org.springframework.web.testfixture.xml.Pojo; import static org.assertj.core.api.Assertions.assertThat; @@ -1324,7 +1324,8 @@ private Mono doMalformedResponseChunks( os.write("\r\n".getBytes(StandardCharsets.UTF_8)); os.write("lskdu018973t09sylgasjkfg1][]'./.sdlv".getBytes(StandardCharsets.UTF_8)); socket.close(); - } catch (IOException ex) { + } + catch (IOException ex) { throw new RuntimeException(ex); } });