Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/bugfix-AmazonS3-6cefd9c.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "bugfix",
"category": "Amazon S3",
"contributor": "",
"description": "Fixed an issue that could cause checksum mismatch errors when performing parallel uploads with the async S3 client and the SHA1 or SHA256 checksum algorithms selected."
}
5 changes: 5 additions & 0 deletions core/checksums/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,8 @@
import software.amazon.awssdk.checksums.internal.Crc32Checksum;
import software.amazon.awssdk.checksums.internal.Crc64NvmeChecksum;
import software.amazon.awssdk.checksums.internal.CrcChecksumProvider;
import software.amazon.awssdk.checksums.internal.Md5Checksum;
import software.amazon.awssdk.checksums.internal.Sha1Checksum;
import software.amazon.awssdk.checksums.internal.Sha256Checksum;
import software.amazon.awssdk.checksums.internal.DigestAlgorithm;
import software.amazon.awssdk.checksums.internal.DigestAlgorithmChecksum;
import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm;

/**
Expand All @@ -43,11 +42,11 @@ static SdkChecksum forAlgorithm(ChecksumAlgorithm algorithm) {
case "CRC32":
return new Crc32Checksum();
case "SHA1":
return new Sha1Checksum();
return new DigestAlgorithmChecksum(DigestAlgorithm.SHA1);
case "SHA256":
return new Sha256Checksum();
return new DigestAlgorithmChecksum(DigestAlgorithm.SHA256);
case "MD5":
return new Md5Checksum();
return new DigestAlgorithmChecksum(DigestAlgorithm.MD5);
case "CRC64NVME":
return new Crc64NvmeChecksum();
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,51 +17,115 @@

import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Deque;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Supplier;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.annotations.SdkTestInternalApi;
import software.amazon.awssdk.utils.SdkAutoCloseable;

@SdkInternalApi
public enum DigestAlgorithm {

SHA1("SHA-1"),

MD5("MD5"),
SHA256("SHA-256")
;

private static final Supplier<MessageDigest> CLOSED_DIGEST = () -> {
throw new IllegalStateException("This message digest is closed.");
};

private static final int MAX_CACHED_DIGESTS = 10_000;
private final String algorithmName;
private final DigestThreadLocal digestReference;
private final Deque<MessageDigest> digestCache = new LinkedBlockingDeque<>(MAX_CACHED_DIGESTS); // LIFO

DigestAlgorithm(String algorithmName) {
this.algorithmName = algorithmName;
digestReference = new DigestThreadLocal(algorithmName);
}

public String getAlgorithmName() {
return algorithmName;
}

/**
* Returns the thread local reference for the {@link MessageDigest} algorithm
* Returns a {@link CloseableMessageDigest} to use for this algorithm.
*/
public MessageDigest getDigest() {
MessageDigest digest = digestReference.get();
digest.reset();
return digest;
public CloseableMessageDigest getDigest() {
MessageDigest digest = digestCache.pollFirst();
if (digest != null) {
digest.reset();
return new CloseableMessageDigest(digest);
}
return new CloseableMessageDigest(newDigest());
}

private static class DigestThreadLocal extends ThreadLocal<MessageDigest> {
private final String algorithmName;
private MessageDigest newDigest() {
try {
return MessageDigest.getInstance(algorithmName);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Unable to fetch message digest instance for Algorithm "
+ algorithmName + ": " + e.getMessage(), e);
}
}

@SdkTestInternalApi
static void clearCaches() {
for (DigestAlgorithm value : values()) {
value.digestCache.clear();
}
}

public final class CloseableMessageDigest implements SdkAutoCloseable, Cloneable {

private Supplier<MessageDigest> digest;
private byte[] messageDigest;

private CloseableMessageDigest(MessageDigest digest) {
this.digest = () -> digest;
}

/**
* Retrieve the message digest instance.
*/
public MessageDigest messageDigest() {
return digest.get();
}

/**
* Retrieve the message digest bytes. This will close the message digest when invoked. This is because the underlying
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: where do we close messageDigest in this method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, that needs a test added.

* message digest is reset on read, and we'd rather fail future interactions with the digest than act on the wrong data.
*/
public byte[] digest() {
if (messageDigest != null) {
return messageDigest;
}
messageDigest = messageDigest().digest();
close();
return messageDigest;
}

/**
* Release this message digest back to the cache. Once released, you must not use the digest anymore.
*/
@Override
public void close() {
if (digest == CLOSED_DIGEST) {
return;
}

// Drop this digest is the cache is full.
digestCache.offerFirst(digest.get());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice


DigestThreadLocal(String algorithmName) {
this.algorithmName = algorithmName;
digest = CLOSED_DIGEST;
}

@Override
protected MessageDigest initialValue() {
public CloseableMessageDigest clone() {
try {
return MessageDigest.getInstance(algorithmName);
} catch (NoSuchAlgorithmException e) {
throw new RuntimeException("Unable to fetch message digest instance for Algorithm "
+ algorithmName + ": " + e.getMessage(), e);
return new CloseableMessageDigest((MessageDigest) digest.get().clone());
} catch (CloneNotSupportedException e) {
throw new IllegalStateException("Clone was not supported by this digest type.", e);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,39 @@

package software.amazon.awssdk.checksums.internal;

import java.security.MessageDigest;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.checksums.SdkChecksum;
import software.amazon.awssdk.checksums.internal.DigestAlgorithm.CloseableMessageDigest;

/**
* Implementation of {@link SdkChecksum} to calculate an MD5 checksum.
* An implementation of {@link SdkChecksum} that uses a {@link DigestAlgorithm}.
*/
@SdkInternalApi
public class Md5Checksum implements SdkChecksum {
public class DigestAlgorithmChecksum implements SdkChecksum {

private MessageDigest digest;
private final DigestAlgorithm algorithm;

private MessageDigest digestLastMarked;
private CloseableMessageDigest digest;

public Md5Checksum() {
this.digest = getDigest();
private CloseableMessageDigest digestLastMarked;

public DigestAlgorithmChecksum(DigestAlgorithm algorithm) {
this.algorithm = algorithm;
this.digest = newDigest();
}

private CloseableMessageDigest newDigest() {
return algorithm.getDigest();
}

@Override
public void update(int b) {
digest.update((byte) b);
digest.messageDigest().update((byte) b);
}

@Override
public void update(byte[] b, int off, int len) {
digest.update(b, off, len);
digest.messageDigest().update(b, off, len);
}

@Override
Expand All @@ -50,15 +57,12 @@ public long getValue() {

@Override
public void reset() {
digest = (digestLastMarked == null)
// This is necessary so that should there be a reset without a
// preceding mark, the MD5 would still be computed correctly.
? getDigest()
: cloneFrom(digestLastMarked);
}

private MessageDigest getDigest() {
return DigestAlgorithm.MD5.getDigest();
digest.close();
if (digestLastMarked == null) {
digest = newDigest();
} else {
digest = digestLastMarked;
}
}

@Override
Expand All @@ -68,14 +72,6 @@ public byte[] getChecksumBytes() {

@Override
public void mark(int readLimit) {
digestLastMarked = cloneFrom(digest);
}

private MessageDigest cloneFrom(MessageDigest from) {
try {
return (MessageDigest) from.clone();
} catch (CloneNotSupportedException e) { // should never occur
throw new IllegalStateException("unexpected", e);
}
digestLastMarked = digest.clone();
}
}

This file was deleted.

Loading
Loading