|
18 | 18 |
|
19 | 19 | import java.io.ByteArrayOutputStream;
|
20 | 20 | import java.io.IOException;
|
| 21 | +import java.io.InputStream; |
| 22 | +import java.io.OutputStream; |
21 | 23 | import java.io.UncheckedIOException;
|
22 | 24 | import java.lang.annotation.ElementType;
|
23 | 25 | import java.lang.annotation.Retention;
|
24 | 26 | import java.lang.annotation.RetentionPolicy;
|
25 | 27 | import java.lang.annotation.Target;
|
| 28 | +import java.net.ServerSocket; |
| 29 | +import java.net.Socket; |
26 | 30 | import java.net.URI;
|
27 | 31 | import java.nio.charset.StandardCharsets;
|
28 | 32 | import java.nio.file.Files;
|
|
31 | 35 | import java.util.List;
|
32 | 36 | import java.util.Map;
|
33 | 37 | import java.util.function.Consumer;
|
| 38 | +import java.util.function.Function; |
34 | 39 | import java.util.stream.Collectors;
|
35 | 40 | import java.util.stream.Stream;
|
36 | 41 |
|
37 | 42 | import okhttp3.mockwebserver.MockResponse;
|
38 | 43 | import okhttp3.mockwebserver.MockWebServer;
|
39 | 44 | import okhttp3.mockwebserver.RecordedRequest;
|
| 45 | +import okhttp3.mockwebserver.SocketPolicy; |
40 | 46 | import org.junit.jupiter.api.AfterEach;
|
41 | 47 | import org.junit.jupiter.api.Test;
|
42 | 48 | import org.junit.jupiter.params.ParameterizedTest;
|
| 49 | +import org.junit.jupiter.params.provider.Arguments; |
43 | 50 | import org.junit.jupiter.params.provider.MethodSource;
|
| 51 | +import org.springframework.util.SocketUtils; |
| 52 | +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; |
44 | 53 | import reactor.core.publisher.Flux;
|
45 | 54 | import reactor.core.publisher.Mono;
|
46 | 55 | import reactor.netty.http.client.HttpClient;
|
@@ -83,7 +92,7 @@ class WebClientIntegrationTests {
|
83 | 92 |
|
84 | 93 | @Retention(RetentionPolicy.RUNTIME)
|
85 | 94 | @Target(ElementType.METHOD)
|
86 |
| - @ParameterizedTest(name = "[{index}] webClient [{0}]") |
| 95 | + @ParameterizedTest(name = "[{index}] {displayName} [{0}]") |
87 | 96 | @MethodSource("arguments")
|
88 | 97 | @interface ParameterizedWebClientTest {
|
89 | 98 | }
|
@@ -113,7 +122,9 @@ private void startServer(ClientHttpConnector connector) {
|
113 | 122 |
|
114 | 123 | @AfterEach
|
115 | 124 | void shutdown() throws IOException {
|
116 |
| - this.server.shutdown(); |
| 125 | + if (server != null) { |
| 126 | + this.server.shutdown(); |
| 127 | + } |
117 | 128 | }
|
118 | 129 |
|
119 | 130 |
|
@@ -1209,6 +1220,135 @@ void invalidDomain(ClientHttpConnector connector) {
|
1209 | 1220 | .verify();
|
1210 | 1221 | }
|
1211 | 1222 |
|
| 1223 | + static Stream<Arguments> socketFaultArguments() { |
| 1224 | + Stream.Builder<Arguments> argumentsBuilder = Stream.builder(); |
| 1225 | + arguments().forEach(arg -> { |
| 1226 | + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AT_START)); |
| 1227 | + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_DURING_REQUEST_BODY)); |
| 1228 | + argumentsBuilder.accept(Arguments.of(arg, SocketPolicy.DISCONNECT_AFTER_REQUEST)); |
| 1229 | + }); |
| 1230 | + return argumentsBuilder.build(); |
| 1231 | + } |
| 1232 | + |
| 1233 | + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") |
| 1234 | + @MethodSource("socketFaultArguments") |
| 1235 | + void prematureClosureFault(ClientHttpConnector connector, SocketPolicy socketPolicy) { |
| 1236 | + startServer(connector); |
| 1237 | + |
| 1238 | + prepareResponse(response -> response |
| 1239 | + .setSocketPolicy(socketPolicy) |
| 1240 | + .setStatus("HTTP/1.1 200 OK") |
| 1241 | + .setHeader("Response-Header-1", "value 1") |
| 1242 | + .setHeader("Response-Header-2", "value 2") |
| 1243 | + .setBody("{\"message\": \"Hello, World!\"}")); |
| 1244 | + |
| 1245 | + String uri = "/test"; |
| 1246 | + Mono<String> result = this.webClient |
| 1247 | + .post() |
| 1248 | + .uri(uri) |
| 1249 | + // Random non-empty body to allow us to interrupt. |
| 1250 | + .bodyValue("{\"action\": \"Say hello!\"}") |
| 1251 | + .retrieve() |
| 1252 | + .bodyToMono(String.class); |
| 1253 | + |
| 1254 | + StepVerifier.create(result) |
| 1255 | + .expectErrorSatisfies(throwable -> { |
| 1256 | + assertThat(throwable).isInstanceOf(WebClientRequestException.class); |
| 1257 | + WebClientRequestException ex = (WebClientRequestException) throwable; |
| 1258 | + // Varies between connector providers. |
| 1259 | + assertThat(ex.getCause()).isInstanceOf(IOException.class); |
| 1260 | + }) |
| 1261 | + .verify(); |
| 1262 | + } |
| 1263 | + |
| 1264 | + static Stream<Arguments> malformedResponseChunkArguments() { |
| 1265 | + return Stream.of( |
| 1266 | + Arguments.of(new ReactorClientHttpConnector(), true), |
| 1267 | + Arguments.of(new JettyClientHttpConnector(), true), |
| 1268 | + // Apache injects the Transfer-Encoding header for us, and complains with an exception if we also |
| 1269 | + // add it. The other two connectors do not add the header at all. We need this header for the test |
| 1270 | + // case to work correctly. |
| 1271 | + Arguments.of(new HttpComponentsClientHttpConnector(), false) |
| 1272 | + ); |
| 1273 | + } |
| 1274 | + |
| 1275 | + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") |
| 1276 | + @MethodSource("malformedResponseChunkArguments") |
| 1277 | + void malformedResponseChunksOnBodilessEntity(ClientHttpConnector connector, boolean addTransferEncodingHeader) { |
| 1278 | + Mono<?> result = doMalformedResponseChunks(connector, addTransferEncodingHeader, ResponseSpec::toBodilessEntity); |
| 1279 | + |
| 1280 | + StepVerifier.create(result) |
| 1281 | + .expectErrorSatisfies(throwable -> { |
| 1282 | + assertThat(throwable).isInstanceOf(WebClientException.class); |
| 1283 | + WebClientException ex = (WebClientException) throwable; |
| 1284 | + assertThat(ex.getCause()).isInstanceOf(IOException.class); |
| 1285 | + }) |
| 1286 | + .verify(); |
| 1287 | + } |
| 1288 | + |
| 1289 | + @ParameterizedTest(name = "[{index}] {displayName} [{0}, {1}]") |
| 1290 | + @MethodSource("malformedResponseChunkArguments") |
| 1291 | + void malformedResponseChunksOnEntityWithBody(ClientHttpConnector connector, boolean addTransferEncodingHeader) { |
| 1292 | + Mono<?> result = doMalformedResponseChunks(connector, addTransferEncodingHeader, spec -> spec.toEntity(String.class)); |
| 1293 | + |
| 1294 | + StepVerifier.create(result) |
| 1295 | + .expectErrorSatisfies(throwable -> { |
| 1296 | + assertThat(throwable).isInstanceOf(WebClientException.class); |
| 1297 | + WebClientException ex = (WebClientException) throwable; |
| 1298 | + assertThat(ex.getCause()).isInstanceOf(IOException.class); |
| 1299 | + }) |
| 1300 | + .verify(); |
| 1301 | + } |
| 1302 | + |
| 1303 | + private <T> Mono<T> doMalformedResponseChunks( |
| 1304 | + ClientHttpConnector connector, |
| 1305 | + boolean addTransferEncodingHeader, |
| 1306 | + Function<ResponseSpec, Mono<T>> responseHandler |
| 1307 | + ) { |
| 1308 | + int port = SocketUtils.findAvailableTcpPort(); |
| 1309 | + |
| 1310 | + Thread serverThread = new Thread(() -> { |
| 1311 | + // This exists separately to the main mock server, as I had a really hard time getting that to send the |
| 1312 | + // chunked responses correctly, flushing the socket each time. This was the only way I was able to replicate |
| 1313 | + // the issue of the client not handling malformed response chunks correctly. |
| 1314 | + try (ServerSocket serverSocket = new ServerSocket(port)) { |
| 1315 | + Socket socket = serverSocket.accept(); |
| 1316 | + InputStream is = socket.getInputStream(); |
| 1317 | + |
| 1318 | + //noinspection ResultOfMethodCallIgnored |
| 1319 | + is.read(new byte[4096]); |
| 1320 | + |
| 1321 | + OutputStream os = socket.getOutputStream(); |
| 1322 | + os.write("HTTP/1.1 200 OK\r\n".getBytes(StandardCharsets.UTF_8)); |
| 1323 | + os.write("Transfer-Encoding: chunked\r\n".getBytes(StandardCharsets.UTF_8)); |
| 1324 | + os.write("\r\n".getBytes(StandardCharsets.UTF_8)); |
| 1325 | + os.write("lskdu018973t09sylgasjkfg1][]'./.sdlv".getBytes(StandardCharsets.UTF_8)); |
| 1326 | + socket.close(); |
| 1327 | + } catch (IOException ex) { |
| 1328 | + throw new RuntimeException(ex); |
| 1329 | + } |
| 1330 | + }); |
| 1331 | + |
| 1332 | + serverThread.setDaemon(true); |
| 1333 | + serverThread.start(); |
| 1334 | + |
| 1335 | + ResponseSpec spec = WebClient |
| 1336 | + .builder() |
| 1337 | + .clientConnector(connector) |
| 1338 | + .baseUrl("http://localhost:" + port) |
| 1339 | + .build() |
| 1340 | + .post() |
| 1341 | + .headers(headers -> { |
| 1342 | + if (addTransferEncodingHeader) { |
| 1343 | + headers.add(HttpHeaders.TRANSFER_ENCODING, "chunked"); |
| 1344 | + } |
| 1345 | + }) |
| 1346 | + .retrieve(); |
| 1347 | + |
| 1348 | + return responseHandler |
| 1349 | + .apply(spec) |
| 1350 | + .doFinally(signal -> serverThread.stop()); |
| 1351 | + } |
1212 | 1352 |
|
1213 | 1353 | private void prepareResponse(Consumer<MockResponse> consumer) {
|
1214 | 1354 | MockResponse response = new MockResponse();
|
@@ -1252,5 +1392,4 @@ public void setContainerValue(T containerValue) {
|
1252 | 1392 | this.containerValue = containerValue;
|
1253 | 1393 | }
|
1254 | 1394 | }
|
1255 |
| - |
1256 | 1395 | }
|
0 commit comments