diff --git a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java index a498a33e5..f2a971f55 100644 --- a/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java +++ b/kafka-connect-s3/src/main/java/io/confluent/connect/s3/storage/S3OutputStream.java @@ -23,6 +23,7 @@ import com.amazonaws.services.s3.model.CannedAccessControlList; import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest; import com.amazonaws.services.s3.model.InitiateMultipartUploadRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; import com.amazonaws.services.s3.model.PartETag; import com.amazonaws.services.s3.model.SSEAlgorithm; import com.amazonaws.services.s3.model.SSEAwsKeyManagementParams; @@ -205,14 +206,26 @@ private void internalClose() throws IOException { super.close(); } + private ObjectMetadata newObjectMetadata() { + ObjectMetadata meta = new ObjectMetadata(); + if (StringUtils.isNotBlank(ssea)) { + meta.setSSEAlgorithm(ssea); + } + return meta; + } + private MultipartUpload newMultipartUpload() throws IOException { InitiateMultipartUploadRequest initRequest = new InitiateMultipartUploadRequest( bucket, key ).withCannedACL(cannedAcl); - if (SSEAlgorithm.KMS.toString().equalsIgnoreCase(ssea) - && StringUtils.isNotBlank(sseKmsKeyId)) { + if (SSEAlgorithm.AES256.toString().equalsIgnoreCase(ssea) + && sseCustomerKey == null) { + log.debug("Using SSE (AES256) without customer key"); + initRequest.setObjectMetadata(newObjectMetadata()); + } else if (SSEAlgorithm.KMS.toString().equalsIgnoreCase(ssea) + && StringUtils.isNotBlank(sseKmsKeyId)) { log.debug("Using KMS Key ID: {}", sseKmsKeyId); initRequest.setSSEAwsKeyManagementParams(new SSEAwsKeyManagementParams(sseKmsKeyId)); } else if (sseCustomerKey != null) {