From 2be7e89863ff45ecae750ee00b39fc5e476e16a4 Mon Sep 17 00:00:00 2001 From: Zoe Wang <33073555+zoewangg@users.noreply.github.com> Date: Fri, 14 Jul 2023 17:10:26 -0700 Subject: [PATCH] Implement multipart copy in Java-based S3 async client --- ...S3ClientMultiPartCopyIntegrationTest.java} | 59 ++++++++++++------- .../internal/crt/DefaultS3CrtAsyncClient.java | 5 +- .../{crt => multipart}/CopyObjectHelper.java | 9 ++- .../multipart/MultipartS3AsyncClient.java | 14 +++++ .../multipart/SdkPojoConversionUtils.java | 27 ++++++++- .../s3/internal/crt/CopyObjectHelperTest.java | 25 +++++++- 6 files changed, 110 insertions(+), 29 deletions(-) rename services/s3/src/it/java/software/amazon/awssdk/services/s3/{crt/S3CrtClientCopyIntegrationTest.java => multipart/S3ClientMultiPartCopyIntegrationTest.java} (77%) rename services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/{crt => multipart}/CopyObjectHelper.java (97%) diff --git a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientCopyIntegrationTest.java b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java similarity index 77% rename from services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientCopyIntegrationTest.java rename to services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java index d0f92bb5b29a..46cc06e3b415 100644 --- a/services/s3/src/it/java/software/amazon/awssdk/services/s3/crt/S3CrtClientCopyIntegrationTest.java +++ b/services/s3/src/it/java/software/amazon/awssdk/services/s3/multipart/S3ClientMultiPartCopyIntegrationTest.java @@ -13,7 +13,7 @@ * permissions and limitations under the License. */ -package software.amazon.awssdk.services.s3.crt; +package software.amazon.awssdk.services.s3.multipart; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; @@ -24,26 +24,32 @@ import java.nio.ByteBuffer; import java.security.SecureRandom; import java.util.Base64; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.stream.Stream; import javax.crypto.KeyGenerator; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Timeout; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; import software.amazon.awssdk.core.ResponseBytes; import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.sync.ResponseTransformer; import software.amazon.awssdk.services.s3.S3AsyncClient; import software.amazon.awssdk.services.s3.S3IntegrationTestBase; import software.amazon.awssdk.services.s3.internal.crt.S3CrtAsyncClient; +import software.amazon.awssdk.services.s3.internal.multipart.MultipartS3AsyncClient; import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.GetObjectResponse; import software.amazon.awssdk.services.s3.model.MetadataDirective; import software.amazon.awssdk.utils.Md5Utils; -public class S3CrtClientCopyIntegrationTest extends S3IntegrationTestBase { - private static final String BUCKET = temporaryBucketName(S3CrtClientCopyIntegrationTest.class); +@Timeout(value = 3, unit = TimeUnit.MINUTES) +public class S3ClientMultiPartCopyIntegrationTest extends S3IntegrationTestBase { + private static final String BUCKET = temporaryBucketName(S3ClientMultiPartCopyIntegrationTest.class); private static final String ORIGINAL_OBJ = "test_file.dat"; private static final String COPIED_OBJ = "test_file_copy.dat"; private static final String ORIGINAL_OBJ_SPECIAL_CHARACTER = "original-special-chars-@$%"; @@ -51,6 +57,7 @@ public class S3CrtClientCopyIntegrationTest extends S3IntegrationTestBase { private static final long OBJ_SIZE = ThreadLocalRandom.current().nextLong(8 * 1024 * 1024, 16 * 1024 * 1024 + 1); private static final long SMALL_OBJ_SIZE = 1024 * 1024; private static S3AsyncClient s3CrtAsyncClient; + private static S3AsyncClient s3MpuClient; @BeforeAll public static void setUp() throws Exception { S3IntegrationTestBase.setUp(); @@ -59,40 +66,50 @@ public static void setUp() throws Exception { .credentialsProvider(CREDENTIALS_PROVIDER_CHAIN) .region(DEFAULT_REGION) .build(); + s3MpuClient = new MultipartS3AsyncClient(s3Async); } @AfterAll public static void teardown() throws Exception { s3CrtAsyncClient.close(); + s3MpuClient.close(); deleteBucketAndAllContents(BUCKET); } - @Test - void copy_singlePart_hasSameContent() { + public static Stream s3AsyncClient() { + return Stream.of(s3MpuClient, s3CrtAsyncClient); + } + + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("s3AsyncClient") + void copy_singlePart_hasSameContent(S3AsyncClient s3AsyncClient) { byte[] originalContent = randomBytes(SMALL_OBJ_SIZE); createOriginalObject(originalContent, ORIGINAL_OBJ); - copyObject(ORIGINAL_OBJ, COPIED_OBJ); + copyObject(ORIGINAL_OBJ, COPIED_OBJ, s3AsyncClient); validateCopiedObject(originalContent, ORIGINAL_OBJ); } - @Test - void copy_copiedObject_hasSameContent() { + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("s3AsyncClient") + void copy_copiedObject_hasSameContent(S3AsyncClient s3AsyncClient) { byte[] originalContent = randomBytes(OBJ_SIZE); createOriginalObject(originalContent, ORIGINAL_OBJ); - copyObject(ORIGINAL_OBJ, COPIED_OBJ); + copyObject(ORIGINAL_OBJ, COPIED_OBJ, s3AsyncClient); validateCopiedObject(originalContent, ORIGINAL_OBJ); } - @Test - void copy_specialCharacters_hasSameContent() { + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("s3AsyncClient") + void copy_specialCharacters_hasSameContent(S3AsyncClient s3AsyncClient) { byte[] originalContent = randomBytes(OBJ_SIZE); createOriginalObject(originalContent, ORIGINAL_OBJ_SPECIAL_CHARACTER); - copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER); + copyObject(ORIGINAL_OBJ_SPECIAL_CHARACTER, COPIED_OBJ_SPECIAL_CHARACTER, s3AsyncClient); validateCopiedObject(originalContent, COPIED_OBJ_SPECIAL_CHARACTER); } - @Test - void copy_ssecServerSideEncryption_shouldSucceed() { + @ParameterizedTest(autoCloseArguments = false) + @MethodSource("s3AsyncClient") + void copy_ssecServerSideEncryption_shouldSucceed(S3AsyncClient s3AsyncClient) { byte[] originalContent = randomBytes(OBJ_SIZE); byte[] secretKey = generateSecretKey(); String b64Key = Base64.getEncoder().encodeToString(secretKey); @@ -102,8 +119,8 @@ void copy_ssecServerSideEncryption_shouldSucceed() { String newB64Key = Base64.getEncoder().encodeToString(newSecretKey); String newB64KeyMd5 = Md5Utils.md5AsBase64(newSecretKey); - // Java S3 client is used because CRT S3 client putObject fails with SSE-C - // TODO: change back to S3CrtClient once the issue is fixed in CRT + // MPU S3 client gets stuck + // TODO: change back to s3AsyncClient once the issue is fixed in MPU S3 client s3Async.putObject(r -> r.bucket(BUCKET) .key(ORIGINAL_OBJ) .sseCustomerKey(b64Key) @@ -111,7 +128,7 @@ void copy_ssecServerSideEncryption_shouldSucceed() { .sseCustomerKeyMD5(b64KeyMd5), AsyncRequestBody.fromBytes(originalContent)).join(); - CompletableFuture future = s3CrtAsyncClient.copyObject(c -> c + CompletableFuture future = s3AsyncClient.copyObject(c -> c .sourceBucket(BUCKET) .sourceKey(ORIGINAL_OBJ) .metadataDirective(MetadataDirective.REPLACE) @@ -147,8 +164,8 @@ private void createOriginalObject(byte[] originalContent, String originalKey) { AsyncRequestBody.fromBytes(originalContent)).join(); } - private void copyObject(String original, String destination) { - CompletableFuture future = s3CrtAsyncClient.copyObject(c -> c + private void copyObject(String original, String destination, S3AsyncClient s3AsyncClient) { + CompletableFuture future = s3AsyncClient.copyObject(c -> c .sourceBucket(BUCKET) .sourceKey(original) .destinationBucket(BUCKET) diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java index 860ac509932e..21c85520db9b 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/DefaultS3CrtAsyncClient.java @@ -51,6 +51,7 @@ import software.amazon.awssdk.services.s3.S3CrtAsyncClientBuilder; import software.amazon.awssdk.services.s3.crt.S3CrtHttpConfiguration; import software.amazon.awssdk.services.s3.crt.S3CrtRetryConfiguration; +import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper; import software.amazon.awssdk.services.s3.model.CopyObjectRequest; import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.GetObjectRequest; @@ -67,7 +68,9 @@ private DefaultS3CrtAsyncClient(DefaultS3CrtClientBuilder builder) { super(initializeS3AsyncClient(builder)); long partSizeInBytes = builder.minimalPartSizeInBytes == null ? DEFAULT_PART_SIZE_IN_BYTES : builder.minimalPartSizeInBytes; - this.copyObjectHelper = new CopyObjectHelper((S3AsyncClient) delegate(), partSizeInBytes); + this.copyObjectHelper = new CopyObjectHelper((S3AsyncClient) delegate(), + partSizeInBytes, + partSizeInBytes); } @Override diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelper.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java similarity index 97% rename from services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelper.java rename to services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java index 9070eb7192c5..31b947bb89c5 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelper.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/CopyObjectHelper.java @@ -13,7 +13,7 @@ * permissions and limitations under the License. */ -package software.amazon.awssdk.services.s3.internal.crt; +package software.amazon.awssdk.services.s3.internal.multipart; import java.util.ArrayList; @@ -23,6 +23,7 @@ import java.util.stream.IntStream; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.internal.crt.UploadPartCopyRequestIterable; import software.amazon.awssdk.services.s3.internal.multipart.GenericMultipartHelper; import software.amazon.awssdk.services.s3.internal.multipart.SdkPojoConversionUtils; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -50,13 +51,15 @@ public final class CopyObjectHelper { private final S3AsyncClient s3AsyncClient; private final long partSizeInBytes; private final GenericMultipartHelper genericMultipartHelper; + private final long uploadThreshold; - public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes) { + public CopyObjectHelper(S3AsyncClient s3AsyncClient, long partSizeInBytes, long uploadThreshold) { this.s3AsyncClient = s3AsyncClient; this.partSizeInBytes = partSizeInBytes; this.genericMultipartHelper = new GenericMultipartHelper<>(s3AsyncClient, SdkPojoConversionUtils::toAbortMultipartUploadRequest, SdkPojoConversionUtils::toCopyObjectResponse); + this.uploadThreshold = uploadThreshold; } public CompletableFuture copyObject(CopyObjectRequest copyObjectRequest) { @@ -89,7 +92,7 @@ private void doCopyObject(CopyObjectRequest copyObjectRequest, CompletableFuture HeadObjectResponse headObjectResponse) { Long contentLength = headObjectResponse.contentLength(); - if (contentLength <= partSizeInBytes) { + if (contentLength <= partSizeInBytes || contentLength <= uploadThreshold) { log.debug(() -> "Starting the copy as a single copy part request"); copyInOneChunk(copyObjectRequest, returnFuture); } else { diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java index f2895d65fcd2..869eb4048144 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/MultipartS3AsyncClient.java @@ -21,6 +21,8 @@ import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.services.s3.DelegatingS3AsyncClient; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.model.CopyObjectRequest; +import software.amazon.awssdk.services.s3.model.CopyObjectResponse; import software.amazon.awssdk.services.s3.model.PutObjectRequest; import software.amazon.awssdk.services.s3.model.PutObjectResponse; @@ -33,15 +35,27 @@ public class MultipartS3AsyncClient extends DelegatingS3AsyncClient { private static final long DEFAULT_MAX_MEMORY = DEFAULT_PART_SIZE_IN_BYTES * 2; private final MultipartUploadHelper mpuHelper; + private final CopyObjectHelper copyObjectHelper; public MultipartS3AsyncClient(S3AsyncClient delegate) { super(delegate); // TODO: pass a config object to the upload helper instead mpuHelper = new MultipartUploadHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD, DEFAULT_MAX_MEMORY); + copyObjectHelper = new CopyObjectHelper(delegate, DEFAULT_PART_SIZE_IN_BYTES, DEFAULT_THRESHOLD); } @Override public CompletableFuture putObject(PutObjectRequest putObjectRequest, AsyncRequestBody requestBody) { return mpuHelper.uploadObject(putObjectRequest, requestBody); } + + @Override + public CompletableFuture copyObject(CopyObjectRequest copyObjectRequest) { + return copyObjectHelper.copyObject(copyObjectRequest); + } + + @Override + public void close() { + delegate().close(); + } } diff --git a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/SdkPojoConversionUtils.java b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/SdkPojoConversionUtils.java index 70512084150b..a99c16670d17 100644 --- a/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/SdkPojoConversionUtils.java +++ b/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/SdkPojoConversionUtils.java @@ -38,12 +38,14 @@ import software.amazon.awssdk.services.s3.model.UploadPartCopyRequest; import software.amazon.awssdk.services.s3.model.UploadPartRequest; import software.amazon.awssdk.services.s3.model.UploadPartResponse; +import software.amazon.awssdk.utils.Logger; /** * Request conversion utility method for POJO classes associated with multipart feature. */ @SdkInternalApi public final class SdkPojoConversionUtils { + private static final Logger log = Logger.loggerFor(SdkPojoConversionUtils.class); private static final HashSet PUT_OBJECT_REQUEST_TO_UPLOAD_PART_FIELDS_TO_IGNORE = new HashSet<>(Arrays.asList("ChecksumSHA1", "ChecksumSHA256", "ContentMD5", "ChecksumCRC32C", "ChecksumCRC32")); @@ -68,9 +70,22 @@ public static CreateMultipartUploadRequest toCreateMultipartUploadRequest(PutObj } public static HeadObjectRequest toHeadObjectRequest(CopyObjectRequest copyObjectRequest) { - HeadObjectRequest.Builder builder = HeadObjectRequest.builder(); - setSdkFields(builder, copyObjectRequest); - return builder.build(); + + // We can't set SdkFields directly because the fields in CopyObjectRequest do not match 100% with the ones in + // HeadObjectRequest + return HeadObjectRequest.builder() + .bucket(copyObjectRequest.sourceBucket()) + .key(copyObjectRequest.sourceKey()) + .versionId(copyObjectRequest.sourceVersionId()) + .ifMatch(copyObjectRequest.copySourceIfMatch()) + .ifModifiedSince(copyObjectRequest.copySourceIfModifiedSince()) + .ifNoneMatch(copyObjectRequest.copySourceIfNoneMatch()) + .ifUnmodifiedSince(copyObjectRequest.copySourceIfUnmodifiedSince()) + .expectedBucketOwner(copyObjectRequest.expectedSourceBucketOwner()) + .sseCustomerAlgorithm(copyObjectRequest.copySourceSSECustomerAlgorithm()) + .sseCustomerKey(copyObjectRequest.copySourceSSECustomerKey()) + .sseCustomerKeyMD5(copyObjectRequest.copySourceSSECustomerKeyMD5()) + .build(); } public static CompletedPart toCompletedPart(CopyPartResult copyPartResult, int partNumber) { @@ -106,6 +121,8 @@ public static CreateMultipartUploadRequest toCreateMultipartUploadRequest(CopyOb CreateMultipartUploadRequest.Builder builder = CreateMultipartUploadRequest.builder(); setSdkFields(builder, copyObjectRequest); + builder.bucket(copyObjectRequest.destinationBucket()); + builder.key(copyObjectRequest.destinationKey()); return builder.build(); } @@ -136,6 +153,8 @@ private static CopyObjectResult toCopyObjectResult(CompleteMultipartUploadRespon public static AbortMultipartUploadRequest.Builder toAbortMultipartUploadRequest(CopyObjectRequest copyObjectRequest) { AbortMultipartUploadRequest.Builder builder = AbortMultipartUploadRequest.builder(); setSdkFields(builder, copyObjectRequest); + builder.bucket(copyObjectRequest.destinationBucket()); + builder.key(copyObjectRequest.destinationKey()); return builder; } @@ -154,6 +173,8 @@ public static UploadPartCopyRequest toUploadPartCopyRequest(CopyObjectRequest co return builder.copySourceRange(range) .partNumber(partNumber) .uploadId(uploadId) + .bucket(copyObjectRequest.destinationBucket()) + .key(copyObjectRequest.destinationKey()) .build(); } diff --git a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelperTest.java b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelperTest.java index ec78d7b15eb6..acca503a352f 100644 --- a/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelperTest.java +++ b/services/s3/src/test/java/software/amazon/awssdk/services/s3/internal/crt/CopyObjectHelperTest.java @@ -33,6 +33,7 @@ import org.mockito.stubbing.Answer; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.services.s3.S3AsyncClient; +import software.amazon.awssdk.services.s3.internal.multipart.CopyObjectHelper; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadRequest; import software.amazon.awssdk.services.s3.model.AbortMultipartUploadResponse; import software.amazon.awssdk.services.s3.model.CompleteMultipartUploadRequest; @@ -59,10 +60,13 @@ class CopyObjectHelperTest { private S3AsyncClient s3AsyncClient; private CopyObjectHelper copyHelper; + private static final long PART_SIZE = 1024L; + private static final long UPLOAD_THRESHOLD = 2048L; + @BeforeEach public void setUp() { s3AsyncClient = Mockito.mock(S3AsyncClient.class); - copyHelper = new CopyObjectHelper(s3AsyncClient, 1024L); + copyHelper = new CopyObjectHelper(s3AsyncClient, PART_SIZE, UPLOAD_THRESHOLD); } @Test @@ -114,6 +118,25 @@ void singlePartCopy_happyCase_shouldSucceed() { assertThat(future.join()).isEqualTo(expectedResponse); } + @Test + void copy_doesNotExceedThreshold_shouldUseSingleObjectCopy() { + + CopyObjectRequest copyObjectRequest = copyObjectRequest(); + + stubSuccessfulHeadObjectCall(2000L); + + CopyObjectResponse expectedResponse = CopyObjectResponse.builder().build(); + CompletableFuture copyFuture = + CompletableFuture.completedFuture(expectedResponse); + + when(s3AsyncClient.copyObject(copyObjectRequest)).thenReturn(copyFuture); + + CompletableFuture future = + copyHelper.copyObject(copyObjectRequest); + + assertThat(future.join()).isEqualTo(expectedResponse); + } + @Test void multiPartCopy_fourPartsHappyCase_shouldSucceed() { CopyObjectRequest copyObjectRequest = copyObjectRequest();