diff --git a/.changes/next-release/bugfix-AWSSDKforJavav2-0da2191.json b/.changes/next-release/bugfix-AWSSDKforJavav2-0da2191.json new file mode 100644 index 000000000000..bc2ddf2bb4a2 --- /dev/null +++ b/.changes/next-release/bugfix-AWSSDKforJavav2-0da2191.json @@ -0,0 +1,6 @@ +{ + "type": "bugfix", + "category": "AWS SDK for Java v2", + "contributor": "", + "description": "Fix issue in `FileAsyncRequestBody` where the underlying file channel would only be closed when enough requests are sent to read *past* the end of the file; if just enough requests are sent to read to the end of the file, the file is not closed, leaving an open file descriptor around longer than it needs to be." +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java index fb36922d7f55..4dc230b54c47 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBody.java @@ -15,6 +15,8 @@ package software.amazon.awssdk.core.internal.async; +import static software.amazon.awssdk.utils.FunctionalUtils.runAndLogError; + import java.io.IOException; import java.io.UncheckedIOException; import java.nio.ByteBuffer; @@ -32,6 +34,7 @@ import software.amazon.awssdk.core.internal.util.Mimetype; import software.amazon.awssdk.core.internal.util.NoopSubscription; import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; import software.amazon.awssdk.utils.builder.SdkBuilder; /** @@ -80,17 +83,23 @@ public String contentType() { @Override public void subscribe(Subscriber s) { + AsynchronousFileChannel channel = null; try { - AsynchronousFileChannel channel = openInputChannel(this.path); + channel = openInputChannel(this.path); // We need to synchronize here because the subscriber could call // request() from within onSubscribe which would potentially // trigger onNext before onSubscribe is finished. - Subscription subscription = new FileSubscription(channel, s, chunkSizeInBytes); + // + // Note: size() can throw IOE here + Subscription subscription = new FileSubscription(channel, channel.size(), s, chunkSizeInBytes); synchronized (subscription) { s.onSubscribe(subscription); } } catch (IOException e) { + if (channel != null) { + runAndLogError(log.logger(), "Unable to close file channel", channel::close); + } // subscribe() must return normally, so we need to signal the // failure to open via onError() once onSubscribe() is signaled. s.onSubscribe(new NoopSubscription(s)); @@ -172,15 +181,20 @@ private static final class FileSubscription implements Subscription { private final int chunkSize; private final AtomicLong position = new AtomicLong(0); + private final AtomicLong remainingBytes = new AtomicLong(0); private long outstandingDemand = 0; private boolean readInProgress = false; private volatile boolean done = false; private final Object lock = new Object(); - private FileSubscription(AsynchronousFileChannel inputChannel, Subscriber subscriber, int chunkSize) { + private FileSubscription(AsynchronousFileChannel inputChannel, + long size, + Subscriber subscriber, + int chunkSize) { this.inputChannel = inputChannel; this.subscriber = subscriber; this.chunkSize = chunkSize; + this.remainingBytes.set(Validate.isNotNegative(size, "size")); } @Override @@ -239,12 +253,20 @@ private void readData() { inputChannel.read(buffer, position.get(), buffer, new CompletionHandler() { @Override public void completed(Integer result, ByteBuffer attachment) { - if (result > 0) { attachment.flip(); - position.addAndGet(attachment.remaining()); + + int readBytes = attachment.remaining(); + position.addAndGet(readBytes); + remainingBytes.addAndGet(-readBytes); + signalOnNext(attachment); + if (remainingBytes.get() == 0) { + closeFile(); + signalOnComplete(); + } + synchronized (lock) { // If we have more permits, queue up another read. if (--outstandingDemand > 0) { @@ -255,8 +277,8 @@ public void completed(Integer result, ByteBuffer attachment) { } } else { // Reached the end of the file, notify the subscriber and cleanup - signalOnComplete(); closeFile(); + signalOnComplete(); } } diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java new file mode 100644 index 000000000000..c633d6f1d43e --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/FileAsyncRequestBodyTest.java @@ -0,0 +1,95 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.async; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.channels.AsynchronousFileChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.StandardOpenOption; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.testutils.RandomTempFile; + +public class FileAsyncRequestBodyTest { + private static final long MiB = 1024 * 1024; + private static final long TEST_FILE_SIZE = 10 * MiB; + private static Path testFile; + + @BeforeClass + public static void setup() throws IOException { + testFile = new RandomTempFile(TEST_FILE_SIZE).toPath(); + } + + @AfterClass + public static void teardown() throws IOException { + Files.delete(testFile); + } + + // If we issue just enough requests to read the file entirely but not more (to go past EOF), we should still receive + // an onComplete + @Test + public void readFully_doesNotRequestPastEndOfFile_receivesComplete() throws InterruptedException, ExecutionException, TimeoutException { + int chunkSize = 16384; + AsyncRequestBody asyncRequestBody = FileAsyncRequestBody.builder() + .path(testFile) + .chunkSizeInBytes(chunkSize) + .build(); + + long totalRequests = TEST_FILE_SIZE / chunkSize; + + CompletableFuture completed = new CompletableFuture<>(); + asyncRequestBody.subscribe(new Subscriber() { + private Subscription sub; + private long requests = 0; + @Override + public void onSubscribe(Subscription subscription) { + this.sub = subscription; + if (requests++ < totalRequests) { + this.sub.request(1); + } + } + + @Override + public void onNext(ByteBuffer byteBuffer) { + if (requests++ < totalRequests) { + this.sub.request(1); + } + } + + @Override + public void onError(Throwable throwable) { + } + + @Override + public void onComplete() { + completed.complete(null); + } + }); + + completed.get(5, TimeUnit.SECONDS); + } +}