-
Notifications
You must be signed in to change notification settings - Fork 970
Fixed an issue that could cause checksum mismatch errors in S3 uploads. #5836
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
48d19b5
18f00ca
03d0084
6a01566
4bd1ee1
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,6 @@ | ||
| { | ||
| "type": "bugfix", | ||
| "category": "Amazon S3", | ||
| "contributor": "", | ||
| "description": "Fixed an issue that could cause checksum mismatch errors when performing parallel uploads with the async S3 client and the SHA1 or SHA256 checksum algorithms selected." | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,7 +17,11 @@ | |
|
|
||
| import java.security.MessageDigest; | ||
| import java.security.NoSuchAlgorithmException; | ||
| import java.util.Deque; | ||
| import java.util.concurrent.ConcurrentLinkedDeque; | ||
| import java.util.function.Supplier; | ||
| import software.amazon.awssdk.annotations.SdkInternalApi; | ||
| import software.amazon.awssdk.utils.SdkAutoCloseable; | ||
|
|
||
| @SdkInternalApi | ||
| public enum DigestAlgorithm { | ||
|
|
@@ -27,41 +31,92 @@ public enum DigestAlgorithm { | |
| SHA256("SHA-256") | ||
| ; | ||
|
|
||
| private static final int MAX_CACHED_DIGESTS = 10_000; | ||
| private final String algorithmName; | ||
| private final DigestThreadLocal digestReference; | ||
| private final Deque<MessageDigest> digestCache = new ConcurrentLinkedDeque<>(); // Used as LIFO for cache-friendliness | ||
|
|
||
| DigestAlgorithm(String algorithmName) { | ||
| this.algorithmName = algorithmName; | ||
| digestReference = new DigestThreadLocal(algorithmName); | ||
| } | ||
|
|
||
| public String getAlgorithmName() { | ||
| return algorithmName; | ||
| } | ||
|
|
||
| /** | ||
| * Returns the thread local reference for the {@link MessageDigest} algorithm | ||
| * Returns a {@link CloseableMessageDigest} to use for this algorithm. | ||
| */ | ||
| public MessageDigest getDigest() { | ||
| MessageDigest digest = digestReference.get(); | ||
| digest.reset(); | ||
| return digest; | ||
| public CloseableMessageDigest getDigest() { | ||
| MessageDigest digest = digestCache.pollFirst(); | ||
| if (digest != null) { | ||
| digest.reset(); | ||
| return new CloseableMessageDigest(digest); | ||
| } | ||
| return new CloseableMessageDigest(newDigest()); | ||
| } | ||
|
|
||
| private MessageDigest newDigest() { | ||
| try { | ||
| return MessageDigest.getInstance(algorithmName); | ||
| } catch (NoSuchAlgorithmException e) { | ||
| throw new RuntimeException("Unable to fetch message digest instance for Algorithm " | ||
| + algorithmName + ": " + e.getMessage(), e); | ||
| } | ||
| } | ||
|
|
||
| private static class DigestThreadLocal extends ThreadLocal<MessageDigest> { | ||
| private final String algorithmName; | ||
| public final class CloseableMessageDigest implements SdkAutoCloseable, Cloneable { | ||
| private final Supplier<MessageDigest> closedDigest = () -> { | ||
| throw new IllegalStateException("This message digest is closed."); | ||
| }; | ||
| private Supplier<MessageDigest> digest; | ||
| private byte[] messageDigest; | ||
|
|
||
| private CloseableMessageDigest(MessageDigest digest) { | ||
| this.digest = () -> digest; | ||
| } | ||
|
|
||
| /** | ||
| * Retrieve the message digest instance. | ||
| */ | ||
| public MessageDigest messageDigest() { | ||
| return digest.get(); | ||
| } | ||
|
|
||
| DigestThreadLocal(String algorithmName) { | ||
| this.algorithmName = algorithmName; | ||
| /** | ||
| * Retrieve the message digest bytes. This will close the message digest when invoked. This is because the underlying | ||
| * message digest is reset on read, and we'd rather fail future interactions with the digest than act on the wrong data. | ||
| */ | ||
| public byte[] digest() { | ||
| if (messageDigest != null) { | ||
| return messageDigest; | ||
| } | ||
| messageDigest = messageDigest().digest(); | ||
| return messageDigest; | ||
| } | ||
|
|
||
| /** | ||
| * Release this message digest back to the cache. Once released, you must not use the digest anymore. | ||
| */ | ||
| @Override | ||
| public void close() { | ||
| if (digest == closedDigest) { | ||
| return; | ||
| } | ||
|
|
||
| // Avoid over-caching after large traffic bursts. The maximum chosen here is arbitrary. It's also not strictly | ||
| // enforced, since these statements aren't synchronized. | ||
|
||
| if (digestCache.size() <= MAX_CACHED_DIGESTS) { | ||
|
||
| digestCache.addFirst(digest.get()); | ||
| } | ||
| digest = closedDigest; | ||
| } | ||
|
|
||
| @Override | ||
| protected MessageDigest initialValue() { | ||
| public CloseableMessageDigest clone() { | ||
| try { | ||
| return MessageDigest.getInstance(algorithmName); | ||
| } catch (NoSuchAlgorithmException e) { | ||
| throw new RuntimeException("Unable to fetch message digest instance for Algorithm " | ||
| + algorithmName + ": " + e.getMessage(), e); | ||
| return new CloseableMessageDigest((MessageDigest) digest.get().clone()); | ||
| } catch (CloneNotSupportedException e) { // should never occur | ||
|
||
| throw new IllegalStateException("unexpected", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
This file was deleted.
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Question: where do we close messageDigest in this method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, that needs a test added.