Skip to content

Commit 51c2291

Browse files
committed
HADOOP-17318. Support concurrent S3A jobs with conflicting app attempt IDs
* ITests for the uuid generate/require logic * Pendingset and _SUCCESS files add jobId field * And on job commit, the jobID of pendingset files are validated The validation will detect and fail if a job with a different ID has got a .pendingset file into the directory used by the current job. The _SUCCESS file is to aid auditing/testing Change-Id: I07a6a2d00ac5598c8f961aebbc9c9fdbb70ab51a commit f2128bd1bf94de36ee1e3ed542de0a0e839307e3 Author: Steve Loughran <[email protected]> Date: Tue Oct 27 17:05:12 2020 +0000 HADOOP-17318. Support concurrent S3A jobs with conflicting app attempt IDs. Have a job UUID everywhere, which for spark must be passed in or self-generated; for MR it will be the yarn app attempt. The task attempts are still used under this where task work needs to be differentiated. Examples * temp dir for staging * magic path for magic committers * HDFS dir for staging summary info Change-Id: I17c641280d916ea1ad4ce4407215d07e488954af commit 0ae79187d6c821eef13872e4c384948729b9c72d Author: Steve Loughran <[email protected]> Date: Tue Oct 20 21:36:36 2020 +0100 HADOOP-17318. Support concurrent S3A commit jobs slightly better. All S3A committers can have purging pending deletes on job commit disabled. (new option; old one deprecated). More logging of what is going on with individual file load/upload/commit (including with duration) Test of concurrent jobs designed to trigger the specific failure conditions of Staging (job2 commit after task1 commit) and Magic (job2 commit before task2 commit) Change-Id: If560c7541c287dc6d4c2f1af395c93b838495139 Change-Id: I2374b904bfb65399e08084e6c2b78237ec1603cd
1 parent fc961b6 commit 51c2291

30 files changed

+1340
-298
lines changed

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 3 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1925,20 +1925,13 @@
19251925
</property>
19261926

19271927
<property>
1928-
<name>fs.s3a.committer.staging.abort.pending.uploads</name>
1928+
<name>fs.s3a.committer.abort.pending.uploads</name>
19291929
<value>true</value>
19301930
<description>
1931-
Should the staging committers abort all pending uploads to the destination
1931+
Should the committers abort all pending uploads to the destination
19321932
directory?
19331933

1934-
Changing this if more than one partitioned committer is
1935-
writing to the same destination tree simultaneously; otherwise
1936-
the first job to complete will cancel all outstanding uploads from the
1937-
others. However, it may lead to leaked outstanding uploads from failed
1938-
tasks. If disabled, configure the bucket lifecycle to remove uploads
1939-
after a time period, and/or set up a workflow to explicitly delete
1940-
entries. Otherwise there is a risk that uncommitted uploads may run up
1941-
bills.
1934+
Set to false if more than one job is writing to the same directory tree.
19421935
</description>
19431936
</property>
19441937

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,8 @@
180180
import static org.apache.hadoop.fs.s3a.auth.RolePolicies.allowS3Operations;
181181
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.TokenIssuingPolicy.NoTokensAvailable;
182182
import static org.apache.hadoop.fs.s3a.auth.delegation.S3ADelegationTokens.hasDelegationTokenBinding;
183+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS;
184+
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS;
183185
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.submit;
184186
import static org.apache.hadoop.fs.s3a.impl.CallableSupplier.waitForCompletionIgnoringExceptions;
185187
import static org.apache.hadoop.fs.s3a.impl.ErrorTranslation.isUnknownBucket;
@@ -314,9 +316,10 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
314316
/** Add any deprecated keys. */
315317
@SuppressWarnings("deprecation")
316318
private static void addDeprecatedKeys() {
317-
// this is retained as a placeholder for when new deprecated keys
318-
// need to be added.
319319
Configuration.DeprecationDelta[] deltas = {
320+
new Configuration.DeprecationDelta(
321+
FS_S3A_COMMITTER_STAGING_ABORT_PENDING_UPLOADS,
322+
FS_S3A_COMMITTER_ABORT_PENDING_UPLOADS)
320323
};
321324

322325
if (deltas.length > 0) {
@@ -4581,7 +4584,7 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
45814584
*/
45824585
@Retries.OnceRaw
45834586
void abortMultipartUpload(String destKey, String uploadId) {
4584-
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
4587+
LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
45854588
getAmazonS3Client().abortMultipartUpload(
45864589
new AbortMultipartUploadRequest(getBucket(),
45874590
destKey,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,8 @@ protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) {
131131
*/
132132
void operationRetried(String text, Exception ex, int retries,
133133
boolean idempotent) {
134+
LOG.info("{}: Retried {}: {}", retries, text, ex.toString());
135+
LOG.debug("Stack", ex);
134136
owner.operationRetried(text, ex, retries, idempotent);
135137
}
136138

@@ -323,7 +325,9 @@ public CompleteMultipartUploadResult completeMPUwithRetries(
323325
public void abortMultipartUpload(String destKey, String uploadId,
324326
Retried retrying)
325327
throws IOException {
326-
invoker.retry("Aborting multipart upload", destKey, true,
328+
invoker.retry("Aborting multipart upload ID " + uploadId,
329+
destKey,
330+
true,
327331
retrying,
328332
() -> owner.abortMultipartUpload(
329333
destKey,
@@ -585,7 +589,8 @@ public BulkOperationState initiateOperation(final Path path,
585589
@Retries.RetryTranslated
586590
public UploadPartResult uploadPart(UploadPartRequest request)
587591
throws IOException {
588-
return retry("upload part",
592+
return retry("upload part #" + request.getPartNumber()
593+
+ " upload "+ request.getUploadId(),
589594
request.getKey(),
590595
true,
591596
() -> owner.uploadPart(request));

0 commit comments

Comments
 (0)