diff --git a/.changes/next-release/bugfix-AWSCRTHTTPClient-32100be.json b/.changes/next-release/bugfix-AWSCRTHTTPClient-32100be.json new file mode 100644 index 000000000000..9af3c92a10ba --- /dev/null +++ b/.changes/next-release/bugfix-AWSCRTHTTPClient-32100be.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS CRT HTTP Client", + "contributor": "", + "description": "Fixed the issue in the AWS CRT sync HTTP client where the connection was left open after the stream was aborted." +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/AbortableInputStreamSubscriber.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/AbortableInputStreamSubscriber.java new file mode 100644 index 000000000000..c6c2ad6151b0 --- /dev/null +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/AbortableInputStreamSubscriber.java @@ -0,0 +1,87 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal.response; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.ByteBuffer; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.http.Abortable; +import software.amazon.awssdk.utils.async.InputStreamSubscriber; + +/** + * Wrapper of {@link InputStreamSubscriber} that also implements {@link Abortable} and closes the underlying connections when + * {@link #close()} or {@link #abort()} is invoked. + */ +@SdkInternalApi +public final class AbortableInputStreamSubscriber extends InputStream implements Subscriber, Abortable { + + private final InputStreamSubscriber delegate; + private final Runnable closeConnection; + + public AbortableInputStreamSubscriber(Runnable onClose, InputStreamSubscriber inputStreamSubscriber) { + this.delegate = inputStreamSubscriber; + this.closeConnection = onClose; + } + + @Override + public void abort() { + close(); + } + + @Override + public int read() throws IOException { + return delegate.read(); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return delegate.read(b, off, len); + } + + @Override + public int read(byte[] b) throws IOException { + return delegate.read(b); + } + + @Override + public void onSubscribe(Subscription s) { + delegate.onSubscribe(s); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + delegate.onNext(byteBuffer); + } + + @Override + public void onError(Throwable t) { + delegate.onError(t); + } + + @Override + public void onComplete() { + delegate.onComplete(); + } + + @Override + public void close() { + closeConnection.run(); + delegate.close(); + } +} diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java index bc03f23829bb..bfb75050e55a 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/CrtResponseAdapter.java @@ -126,7 +126,7 @@ private void onSuccessfulResponseComplete(HttpStream stream) { private void handlePublisherError(HttpStream stream, Throwable failure) { failResponseHandlerAndFuture(stream, failure); - responseHandlerHelper.releaseConnection(stream); + responseHandlerHelper.closeConnection(stream); } private void onFailedResponseComplete(HttpStream stream, HttpException error) { diff --git a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java index cd0f3183f427..939405db38b8 100644 --- a/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java +++ b/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/response/InputStreamAdaptingHttpStreamResponseHandler.java @@ -31,6 +31,7 @@ import software.amazon.awssdk.http.SdkHttpFullResponse; import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.crt.AwsCrtHttpClient; +import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.async.InputStreamSubscriber; import software.amazon.awssdk.utils.async.SimplePublisher; @@ -39,8 +40,8 @@ */ @SdkInternalApi public final class InputStreamAdaptingHttpStreamResponseHandler implements HttpStreamResponseHandler { - - private volatile InputStreamSubscriber inputStreamSubscriber; + private static final Logger log = Logger.loggerFor(InputStreamAdaptingHttpStreamResponseHandler.class); + private volatile AbortableInputStreamSubscriber inputStreamSubscriber; private final SimplePublisher simplePublisher = new SimplePublisher<>(); private final CompletableFuture requestCompletionFuture; @@ -66,12 +67,19 @@ public void onResponseHeaders(HttpStream stream, int responseStatusCode, int blo } responseBuilder.statusCode(responseStatusCode); } + + // Propagate cancellation + requestCompletionFuture.exceptionally(t -> { + responseHandlerHelper.closeConnection(stream); + return null; + }); } @Override public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { if (inputStreamSubscriber == null) { - inputStreamSubscriber = new InputStreamSubscriber(); + inputStreamSubscriber = new AbortableInputStreamSubscriber(() -> responseHandlerHelper.closeConnection(stream), + new InputStreamSubscriber()); simplePublisher.subscribe(inputStreamSubscriber); // For response with a payload, we need to complete the future here to allow downstream to retrieve the data from // the stream directly. @@ -88,7 +96,9 @@ public int onResponseBody(HttpStream stream, byte[] bodyBytesIn) { writeFuture.whenComplete((result, failure) -> { if (failure != null) { - failFutureAndReleaseConnection(stream, failure); + log.debug(() -> "The subscriber failed to receive the data, closing the connection and failing the future", + failure); + failFutureAndCloseConnection(stream, failure); return; } @@ -111,11 +121,6 @@ public void onResponseComplete(HttpStream stream, int errorCode) { } } - private void failFutureAndReleaseConnection(HttpStream stream, Throwable failure) { - requestCompletionFuture.completeExceptionally(failure); - responseHandlerHelper.releaseConnection(stream); - } - private void failFutureAndCloseConnection(HttpStream stream, Throwable failure) { requestCompletionFuture.completeExceptionally(failure); responseHandlerHelper.closeConnection(stream); diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/AbortableInputStreamSubscriberTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/AbortableInputStreamSubscriberTest.java new file mode 100644 index 000000000000..ca9ea61cecb2 --- /dev/null +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/AbortableInputStreamSubscriberTest.java @@ -0,0 +1,52 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.http.crt.internal; + +import static org.mockito.Mockito.verify; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import software.amazon.awssdk.http.crt.internal.response.AbortableInputStreamSubscriber; +import software.amazon.awssdk.utils.async.InputStreamSubscriber; + +@ExtendWith(MockitoExtension.class) +public class AbortableInputStreamSubscriberTest { + + private AbortableInputStreamSubscriber abortableInputStreamSubscriber; + + @Mock + private Runnable onClose; + + @BeforeEach + void setUp() { + abortableInputStreamSubscriber = new AbortableInputStreamSubscriber(onClose, new InputStreamSubscriber()); + } + + @Test + void close_shouldInvokeOnClose() { + abortableInputStreamSubscriber.close(); + verify(onClose).run(); + } + + @Test + void abort_shouldInvokeOnClose() { + abortableInputStreamSubscriber.abort(); + verify(onClose).run(); + } +} diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java index d3c223901614..10c51b1a6028 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/BaseHttpStreamResponseHandlerTest.java @@ -42,9 +42,9 @@ public abstract class BaseHttpStreamResponseHandlerTest { CompletableFuture requestFuture; @Mock - private HttpStream httpStream; + HttpStream httpStream; - private HttpStreamResponseHandler responseHandler; + HttpStreamResponseHandler responseHandler; abstract HttpStreamResponseHandler responseHandler(); @@ -113,7 +113,7 @@ void streamClosed_shouldNotIncreaseStreamWindow() throws InterruptedException { verify(httpStream, never()).incrementWindow(anyInt()); } - private static HttpHeader[] getHttpHeaders() { + static HttpHeader[] getHttpHeaders() { HttpHeader[] httpHeaders = new HttpHeader[1]; httpHeaders[0] = new HttpHeader("Content-Length", "1"); return httpHeaders; diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java index 2b18f684b728..8668e0fc0054 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/CrtResponseHandlerTest.java @@ -15,14 +15,29 @@ package software.amazon.awssdk.http.crt.internal; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; + +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.function.Function; +import org.junit.jupiter.api.Test; import org.mockito.Mockito; +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; import software.amazon.awssdk.core.http.HttpResponseHandler; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.internal.http.async.AsyncResponseHandler; +import software.amazon.awssdk.crt.http.HttpHeader; +import software.amazon.awssdk.crt.http.HttpHeaderBlock; import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.http.SdkHttpResponse; import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; @@ -37,4 +52,62 @@ HttpStreamResponseHandler responseHandler() { responseHandler.prepare(); return CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler); } + + @Test + void publisherFailedToDeliverEvents_shouldShutDownConnection() { + SdkAsyncHttpResponseHandler responseHandler = new TestAsyncHttpResponseHandler(); + + HttpStreamResponseHandler crtResponseHandler = CrtResponseAdapter.toCrtResponseHandler(crtConn, requestFuture, responseHandler); + HttpHeader[] httpHeaders = getHttpHeaders(); + crtResponseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), + httpHeaders); + crtResponseHandler.onResponseHeadersDone(httpStream, 0); + crtResponseHandler.onResponseBody(httpStream, "{}".getBytes(StandardCharsets.UTF_8)); + + crtResponseHandler.onResponseComplete(httpStream, 0); + assertThatThrownBy(() -> requestFuture.join()).isInstanceOf(CancellationException.class).hasMessageContaining( + "subscription has been cancelled"); + verify(crtConn).shutdown(); + verify(crtConn).close(); + verify(httpStream).close(); + } + + private static class TestAsyncHttpResponseHandler implements SdkAsyncHttpResponseHandler { + + @Override + public void onHeaders(SdkHttpResponse headers) { + } + + @Override + public void onStream(Publisher stream) { + stream.subscribe(new Subscriber() { + private Subscription subscription; + @Override + public void onSubscribe(Subscription s) { + subscription = s; + s.request(1); + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + subscription.cancel(); + } + + @Override + public void onError(Throwable t) { + + } + + @Override + public void onComplete() { + + } + }); + } + + @Override + public void onError(Throwable error) { + + } + } } diff --git a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java index f76e799d2cdb..b5ef71dbc5cd 100644 --- a/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java +++ b/http-clients/aws-crt-client/src/test/java/software/amazon/awssdk/http/crt/internal/InputStreamAdaptingHttpStreamResponseHandlerTest.java @@ -15,10 +15,19 @@ package software.amazon.awssdk.http.crt.internal; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; +import io.reactivex.Completable; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; +import net.bytebuddy.utility.RandomString; +import org.apache.commons.lang3.RandomStringUtils; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; @@ -31,7 +40,10 @@ import software.amazon.awssdk.crt.http.HttpHeaderBlock; import software.amazon.awssdk.crt.http.HttpStream; import software.amazon.awssdk.crt.http.HttpStreamResponseHandler; +import software.amazon.awssdk.http.AbortableInputStream; import software.amazon.awssdk.http.SdkHttpFullResponse; +import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler; +import software.amazon.awssdk.http.crt.internal.response.CrtResponseAdapter; import software.amazon.awssdk.http.crt.internal.response.InputStreamAdaptingHttpStreamResponseHandler; public class InputStreamAdaptingHttpStreamResponseHandlerTest extends BaseHttpStreamResponseHandlerTest { @@ -40,4 +52,61 @@ public class InputStreamAdaptingHttpStreamResponseHandlerTest extends BaseHttpSt HttpStreamResponseHandler responseHandler() { return new InputStreamAdaptingHttpStreamResponseHandler(crtConn, requestFuture); } + + @Test + void abortStream_shouldShutDownConnection() throws IOException { + HttpHeader[] httpHeaders = getHttpHeaders(); + + responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(), + httpHeaders); + responseHandler.onResponseHeadersDone(httpStream, 0); + responseHandler.onResponseBody(httpStream, + RandomStringUtils.random(1 * 1024 * 1024).getBytes(StandardCharsets.UTF_8)); + + SdkHttpFullResponse response = ((CompletableFuture) requestFuture).join(); + assertThat(response.content()).isPresent(); + AbortableInputStream abortableInputStream = response.content().get(); + + abortableInputStream.read(); + abortableInputStream.abort(); + + verify(crtConn).shutdown(); + verify(crtConn).close(); + verify(httpStream).close(); + } + + @Test + void closeStream_shouldShutdownConnection() throws IOException { + HttpHeader[] httpHeaders = getHttpHeaders(); + + responseHandler.onResponseHeaders(httpStream, 500, HttpHeaderBlock.MAIN.getValue(), + httpHeaders); + responseHandler.onResponseHeadersDone(httpStream, 0); + responseHandler.onResponseBody(httpStream, + RandomStringUtils.random(1 * 1024 * 1024).getBytes(StandardCharsets.UTF_8)); + + SdkHttpFullResponse response = ((CompletableFuture) requestFuture).join(); + assertThat(response.content()).isPresent(); + AbortableInputStream abortableInputStream = response.content().get(); + + abortableInputStream.read(); + abortableInputStream.abort(); + + verify(crtConn).shutdown(); + verify(crtConn).close(); + verify(httpStream).close(); + } + + @Test + void cancelFuture_shouldCloseConnection() { + HttpHeader[] httpHeaders = getHttpHeaders(); + + responseHandler.onResponseHeaders(httpStream, 200, HttpHeaderBlock.MAIN.getValue(), + httpHeaders); + + requestFuture.completeExceptionally(new RuntimeException()); + verify(crtConn).shutdown(); + verify(crtConn).close(); + verify(httpStream).close(); + } }