diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterFunction.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterFunction.java new file mode 100644 index 000000000000..76146b1eda97 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterFunction.java @@ -0,0 +1,88 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.function.client; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import org.springframework.core.io.buffer.DataBuffer; +import org.springframework.core.io.buffer.DataBufferUtils; +import org.springframework.http.client.ClientHttpRequestInterceptor; + +/** + * {@link ClientHttpRequestInterceptor} to limit response body. + * Filter will throw {@link TooLargeResponseBodyException} on response body exceed or truncate body to specified limit + * depending on `throwOnExceed` parameter. + * + * @author Sergey Galkin + * @since 5.1 + */ +public class ResponseBodyLimitFilterFunction implements ExchangeFilterFunction { + + private final int bodyByteLimit; + private final boolean throwOnExceed; + + public ResponseBodyLimitFilterFunction(int bodyByteLimit, boolean throwOnExceed) { + if (bodyByteLimit < 0) { + throw new IllegalArgumentException( + "Response body limit should be non-negative, but '" + bodyByteLimit + "' given" + ); + } + + this.bodyByteLimit = bodyByteLimit; + this.throwOnExceed = throwOnExceed; + } + + @Override + public Mono filter(ClientRequest request, ExchangeFunction next) { + if (this.throwOnExceed) { + return next.exchange(request).flatMap(this::throwOnExceed); + } + + return next.exchange(request).flatMap(this::truncateOnExceed); + } + + private Mono throwOnExceed(ClientResponse response) { + Flux buffers = response.body( + (message, ctx) -> DataBufferUtils + .takeUntilByteCount(message.getBody(), this.bodyByteLimit + 1) + ); + + Mono buffer = DataBufferUtils + .join(buffers) + .map(buf -> { + if (buf.readableByteCount() > this.bodyByteLimit) { + byte[] truncatedBody = new byte[this.bodyByteLimit]; + buf.read(truncatedBody, 0, this.bodyByteLimit); + DataBufferUtils.release(buf); + throw new TooLargeResponseBodyException(truncatedBody); + } + return buf; + }); + + return Mono.just(ClientResponse.create(response.statusCode()).body(Flux.from(buffer)).build()); + } + + private Mono truncateOnExceed(ClientResponse response) { + Flux buffers = response.body( + (message, ctx) -> DataBufferUtils + .takeUntilByteCount(message.getBody(), this.bodyByteLimit) + ); + + return Mono.just(ClientResponse.create(response.statusCode()).body(buffers).build()); + } +} diff --git a/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/TooLargeResponseBodyException.java b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/TooLargeResponseBodyException.java new file mode 100644 index 000000000000..c408c17a0546 --- /dev/null +++ b/spring-webflux/src/main/java/org/springframework/web/reactive/function/client/TooLargeResponseBodyException.java @@ -0,0 +1,39 @@ +/* + * Copyright 2002-2018 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.web.reactive.function.client; + +/** + * Exception thrown by {@link ResponseBodyLimitFilterFunction} when response body is greater then configured + * limit. + * + * @author Sergey Galkin + * @since 5.1 + */ +public class TooLargeResponseBodyException extends RuntimeException { + + private static final long serialVersionUID = 1L; + private final byte[] truncatedBody; + + TooLargeResponseBodyException(byte[] truncatedBody) { + super("Too large response body"); + this.truncatedBody = truncatedBody; + } + + public byte[] getTruncatedBody() { + return this.truncatedBody; + } +} diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterIntegrationTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterIntegrationTests.java new file mode 100644 index 000000000000..7826abcb092a --- /dev/null +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterIntegrationTests.java @@ -0,0 +1,93 @@ +package org.springframework.web.reactive.function.client; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasProperty; + +import com.google.common.base.Strings; +import java.io.IOException; +import java.time.Duration; +import okhttp3.mockwebserver.MockResponse; +import okhttp3.mockwebserver.MockWebServer; +import org.junit.After; +import org.junit.Test; +import reactor.core.publisher.Mono; +import reactor.test.StepVerifier; + +public class ResponseBodyLimitFilterIntegrationTests { + + private static final int BODY_BYTES_LIMIT = 1500; + private static final String EXACT_BODY = Strings.repeat("1", BODY_BYTES_LIMIT); + private static final String BIG_BODY = Strings.repeat("1", BODY_BYTES_LIMIT * 1000); + + private final MockWebServer server = new MockWebServer(); + + @After + public void shutdown() throws IOException { + this.server.shutdown(); + } + + @Test + public void responseSizeBelowLimitThrowOnExceedConfigured() { + enqueueResponse(EXACT_BODY); + + Mono result = runWithFilter(new ResponseBodyLimitFilterFunction(BODY_BYTES_LIMIT, true)); + + StepVerifier.create(result) + .expectNext(EXACT_BODY) + .expectComplete() + .verify(Duration.ofSeconds(3)); + } + + @Test + public void responseSizeBelowLimitNoThrowOnExceedConfigured() { + enqueueResponse(EXACT_BODY); + + Mono result = runWithFilter(new ResponseBodyLimitFilterFunction(BODY_BYTES_LIMIT, false)); + + StepVerifier.create(result) + .expectNext(EXACT_BODY) + .expectComplete() + .verify(Duration.ofSeconds(3)); + } + + @Test + public void responseSizeAboveLimitThrowOnExceedConfigured() { + enqueueResponse(BIG_BODY); + + Mono result = runWithFilter(new ResponseBodyLimitFilterFunction(BODY_BYTES_LIMIT, true)); + + StepVerifier.create(result) + .expectErrorSatisfies(e -> assertThat(e, hasProperty("truncatedBody", equalTo(EXACT_BODY.getBytes())))) + .verify(Duration.ofSeconds(3)); + } + + @Test + public void responseSizeAboveLimitNoThrowOnExceedConfigured() { + enqueueResponse(BIG_BODY); + + Mono result = runWithFilter(new ResponseBodyLimitFilterFunction(BODY_BYTES_LIMIT, false)); + + StepVerifier.create(result) + .expectNext(EXACT_BODY) + .expectComplete() + .verify(Duration.ofSeconds(3)); + } + + private Mono runWithFilter(ResponseBodyLimitFilterFunction filter) { + WebClient webClient = WebClient + .builder() + .baseUrl(this.server.url("/").toString()) + .filter(filter) + .build(); + + return webClient + .get() + .retrieve() + .bodyToMono(String.class); + } + + private void enqueueResponse(String body) { + server.enqueue(new MockResponse().setBody(body)); + } +} diff --git a/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterTests.java b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterTests.java new file mode 100644 index 000000000000..0f931eb17e90 --- /dev/null +++ b/spring-webflux/src/test/java/org/springframework/web/reactive/function/client/ResponseBodyLimitFilterTests.java @@ -0,0 +1,11 @@ +package org.springframework.web.reactive.function.client; + +import org.junit.Test; + +public class ResponseBodyLimitFilterTests { + + @Test(expected = IllegalArgumentException.class) + public void negativeLimit() { + new ResponseBodyLimitFilterFunction(-1, false); + } +}