-
Notifications
You must be signed in to change notification settings - Fork 938
Olapplin/large object merge #6425
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/master/large-object-dl
Are you sure you want to change the base?
Olapplin/large object merge #6425
Conversation
…in the onResponse callback. Keep track of all inflight requests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a copy of the DownstreamSubscription
inner class in SplittingTransformer
that has been moved to its own class to be reused.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we reuse the same class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The DownstreamSubscription
refers to a few of variable of SplittingTransformer
, it is not a static
inner class. I don't think we can.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to utils
module because used in sdk-core
…ansformerPublisher to prevent copying all headers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still going through the PR.
core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java
Outdated
Show resolved
Hide resolved
...src/main/java/software/amazon/awssdk/core/internal/async/ThreadSafeEmittingSubscription.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java
Outdated
Show resolved
Hide resolved
...e/src/main/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformer.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
/** | ||
* Amount of demand requested but not yet fulfilled by the subscription | ||
*/ | ||
private final AtomicInteger outstandingDemand = new AtomicInteger(0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we need to track outstanding demand in a subscriber?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We track the demand that we requested but that has not yet been fulfilled by on next. This is to prevent requesting more than the maxInFlight.
* is a 'one-shot' class, it should <em>NOT</em> be reused for more than one multipart download. | ||
*/ | ||
@SdkInternalApi | ||
public class NonLinearMultipartDownloaderSubscriber |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: ParallelMultipartDownloadSubscriber?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since we use parallelSplitSupported
we can rename that to ParallelMultipartDownloadSubscriber, yeah
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Show resolved
Hide resolved
return false; | ||
} | ||
|
||
firstPartLock.lock(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any reason we need this? We only request more in whenComplete future of the first part, right?
MPU has similar logic where we need to wait for creatMPU to finish, can we use similar logic here? https://github.com/aws/aws-sdk-java-v2/blob/master/services/s3/src/main/java/software/amazon/awssdk/services/s3/internal/multipart/UploadWithUnknownContentLengthHelper.java#L191
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can't really reuse that logic because we need to wait for the first part to complete before we know if its a multipart object or not. Or maybe I don't exactly understand what you want to reuse, could you elaborate?
...n/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java
Show resolved
Hide resolved
...n/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java
Show resolved
Hide resolved
...n/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java
Outdated
Show resolved
Hide resolved
core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java
Outdated
Show resolved
Hide resolved
...n/java/software/amazon/awssdk/core/internal/async/FileAsyncResponseTransformerPublisher.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
…ents - try/catch in ThreadSafeEmittingSubscription - negative totalParts check - NonLinearMultipartDownloaderSubscriberTckTest tck test
core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncResponseTransformer.java
Outdated
Show resolved
Hide resolved
|
||
private DefaultAsyncResponseTransformerSplitResult(Builder<ResponseT, ResultT> builder) { | ||
this.publisher = Validate.paramNotNull( | ||
builder.publisher(), "asyncResponseTransformerPublisher"); | ||
this.future = Validate.paramNotNull( | ||
builder.resultFuture(), "future"); | ||
this.supportsNonSerial = Validate.getOrDefault(builder.supportsNonSerial(), () -> false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these classes are all related to Async , and concurrency is good to have why do we disable concurrent splits by default ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because most implementation is AsyncResponseTransformer cannot support it. We send back data to customers serially and changing that would be a braking change. We would also probably need to update all our implemetation of AsyncResponseTransformer we have, which is out of scope
|
||
@Override | ||
public void onResponse(T response) { | ||
Optional<String> contentRangeList = response.sdkHttpResponse().firstMatchingHeader("x-amz-content-range"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do something like
String contentRange = response.sdkHttpResponse()
.firstMatchingHeader("x-amz-content-range")
.orElseThrow(() -> new IllegalStateException("Content range header is missing"));
and do try catch for entire function
public void onResponse(T response) {
try {
// existing
} catch (IllegalStateException e) {
handleError(e.getMessage(), future);
}
}
// Some common handleError function
private void handleError(String errorMessage, CompletableFuture<T> future) {
IllegalStateException exception = new IllegalStateException(errorMessage);
if (subscriber != null) {
subscriber.onError(exception);
}
if (future != null) {
future.completeExceptionally(exception);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I usually don't like using exception as control flow (remember the createURI thing we had a while ago), but I can use a handle error method 👍
|
||
@Override | ||
public void subscribe(Subscriber<? super AsyncResponseTransformer<T, T>> s) { | ||
if (s == null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use
this.subscriber =Validate.notNull(responseTransformer.path(), "subscriber");
* transformer will retry independently based on the retry configuration of the client it is used with. We only need to verify | ||
* the completion state of the future of each individually | ||
*/ | ||
private class IndividualFileTransformer implements AsyncResponseTransformer<T, T> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move it to a separate class file ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
mmmh it is convenient to have it as an inner class as it references variable of the outer class. We use similar pattern in other places for multipart operations
...src/main/java/software/amazon/awssdk/core/internal/async/ThreadSafeEmittingSubscription.java
Outdated
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Show resolved
Hide resolved
|
||
@Override | ||
public void onResponse(T response) { | ||
Optional<String> contentRangeList = response.sdkHttpResponse().firstMatchingHeader("x-amz-content-range"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since we are depending on header sent by service will this cause issue for Third party tools like minio or gcp ? Since we are erring out if the header is not present it would be good to know its impact when used with Third party s3 like minio or gcp
...va/software/amazon/awssdk/services/s3/internal/multipart/MultipartConfigurationResolver.java
Show resolved
Hide resolved
...are/amazon/awssdk/services/s3/internal/multipart/NonLinearMultipartDownloaderSubscriber.java
Outdated
Show resolved
Hide resolved
- renamed EmittingSubscription, mark it ThreadSafe - Added comments - some other renaming
Implement parallel download for multipart GetObject in s3 Async Client and Transfer Manager.
Modifications
NonLinearMultipartDownloaderSubscriber
andFileAsyncResponseTransformerPublisher
. Note for reviewer: This is the core of the PR new functionality and review should probably start with those two classes.FileAsyncResponseTransformerPublisher
needs to wrapped to publish progress to the progress updater. This is done inGenericS3TransferManager
andTransferProgressUpdater
supportNonSerial
onSplitResult
ParallelConfiguration
new config class inMultipartConfiguration
for themaxInFlightParts
configposition
,path
andFileTransformerConfiguration
Testing