-
Notifications
You must be signed in to change notification settings - Fork 9.2k
HADOOP-17318. Support concurrent S3A commit jobs with same app attempt ID. #2399
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
HADOOP-17318. Support concurrent S3A commit jobs with same app attempt ID. #2399
Conversation
|
Tested OK with -Dparallel-tests -DtestsThreadCount=4 -Dmarkers=keep -Ds3guard -Ddynamo -Dfs.s3a.directory.marker.audit=true -Dscale now retesting with delete and unguarded |
|
FYI @dongjoon-hyun |
|
Thank you for pinging me, @steveloughran . cc @sunchao , too. |
|
Related to this, I'm going include work related to SPARK-33230 If in setupJob there's no "spark.sql.sources.writeJobUUID", a UUID will be set; staging committers can use this to be confident they are getting separate dirs for jobs even when jobIDs are the same |
|
FYI, I merged SPARK-33230, @steveloughran . |
|
@dongjoon-hyun thanks...doing a bit more on this as the more tests I write, the more corner cases surface. Think I'm control now. |
d3dc4ed to
293c53f
Compare
|
new s3a test failing and something in hadoop common related to TestLdap. Filing JIRA there |
|
#2427 HADOOP-17340. TestLdapGroupsMapping failing -string mismatch in exception validation to cover hadoop-common failure |
293c53f to
6cd4d83
Compare
|
Test run with: -Dparallel-tests -DtestsThreadCount=4 -Dmarkers=keep -Ds3guard -Ddynamo -Dfs.s3a.directory.marker.audit=true -Dscale My next big bit of work is to do tests in spark itself |
|
Running integration tests on this with spark + patch and the 3.4.0-SNAPSHOT builds. Ignoring compilation issues with spark trunk, hadoop-trunk, scala versions and scalatest, I'm running tests in cloud-integration
that is: spark is setting the UUID and the committer is picking it up and using as appropriate |
|
Thank you for sharing, @steveloughran ! |
|
some more detail for the watchers from my testing (hadoop-trunk + CDP spark 2.4). I could not get spark master and hadoop trunk to build together this week.
I'm not going near that other than to add a para in troubleshooting.md saying "you're in classloader hell". Will need to be testing against spark master before worrying about WTF is going on there I'm also now worried that if anyone does >1 job with the same dest dir and overwrite=true, then there's a risk that you get the same duplicate app attempt ID race condition. It's tempting just to do something ambitious like use a random number to generate a timestamp for the cluster launch, or some random(year-month-day)+ seconds-of-day, so that this problem goes away almost completely |
|
latest test run against s3 london, no s3guard; markers deleted (classic config). Everything, even the flaky read() tests passed! -Dparallel-tests -DtestsThreadCount=4 -Dmarkers=delete -Dfs.s3a.directory.marker.audit=true -Dscale |
b526e95 to
1f14f64
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a big patch. To follow all the changes, I started from the doc and tests. After that I go back to how it was changing the code. I guess I can follow now. I'm +1 on this PR with my best knowledge, though a second review will very be appreciated.
Thanks Steve! GREAT WORK.
hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/upload/upload ID/
I was thinking of consistent log keywords so taht for any retry log we can search "upload ID" or "commit ID"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the S3 multipart upload ID, so I'll use upload ID for it...its also used in BlockOutputStream
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: We can make it clear in javadoc here that default value is false. Same the generate.uuid below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1. adding two new constants and referring to them in the production code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this SPARK app ID name constant still used, or I missed something? 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cut it. this was a very old property passed down by spark.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
😄 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is only for java serialization, obviously. It's to make sure anyone (me) who might pass them around in spark RDDs won't create serlalization problems. FWIW I use the JSON format in those cloud committer tests, primarily to verify the committer name correctness
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: just getUUID() without this.?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
relic of wrapping/pulling up the old code. Fixed. Also clarified the uuid javadocs now that SPARK-33402 is generating more unique job IDs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/./,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an empty line between the table header and this first row. I see github online viewer is not blessing this. Maybe we just remove LoC 552
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done. Also reviewed both tables, removed those columns about which committer supports what option, now they are split into common and staging
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: may call this conf2 like jobConf2 to make it a bit clearer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add multipartInitiatedInWrite to the log message? Same below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
mehakmeet
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, just some small nits.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Would this be "If disabled"? Also, what is the property we are talking about that is enabled or not, is it FS_S3A_COMMITTER_GENERATE_UUID, then we should mention it here too I think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added the extra details
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: typo in "random"
|
thanks. will go through comments and apply before merging |
…t IDs
* ITests for the uuid generate/require logic
* Pendingset and _SUCCESS files add jobId field
* And on job commit, the jobID of pendingset files are validated
The validation will detect and fail if a job with a different ID
has got a .pendingset file into the directory used by the current
job.
The _SUCCESS file is to aid auditing/testing
Change-Id: I07a6a2d00ac5598c8f961aebbc9c9fdbb70ab51a
commit f2128bd1bf94de36ee1e3ed542de0a0e839307e3
Author: Steve Loughran <[email protected]>
Date: Tue Oct 27 17:05:12 2020 +0000
HADOOP-17318. Support concurrent S3A jobs with conflicting app attempt IDs.
Have a job UUID everywhere, which for spark must be passed in or
self-generated; for MR it will be the yarn app attempt.
The task attempts are still used under this where task work needs
to be differentiated.
Examples
* temp dir for staging
* magic path for magic committers
* HDFS dir for staging summary info
Change-Id: I17c641280d916ea1ad4ce4407215d07e488954af
commit 0ae79187d6c821eef13872e4c384948729b9c72d
Author: Steve Loughran <[email protected]>
Date: Tue Oct 20 21:36:36 2020 +0100
HADOOP-17318. Support concurrent S3A commit jobs slightly better.
All S3A committers can have purging pending deletes on job commit
disabled. (new option; old one deprecated).
More logging of what is going on with individual file load/upload/commit
(including with duration)
Test of concurrent jobs designed to trigger the specific failure conditions
of Staging (job2 commit after task1 commit) and Magic (job2 commit before
task2 commit)
Change-Id: If560c7541c287dc6d4c2f1af395c93b838495139
Change-Id: I2374b904bfb65399e08084e6c2b78237ec1603cd
rdblue
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. I looked through just the UUID-related parts.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Committers don't cancel just their own pending uploads?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taskAbort, yet. JobAbort/cleanup is where things are more trouble, because the job doesn't know what specific task attempts have uploaded.
with the staging committer, there's no files uploaded until task commit. Tasks which fail before that moment don't have any pending uploads to cancel.
with the magic committer, because the files are written direct to S3, there is more risk of pending uploads collecting.
I'm not sure about spark here, but on MR when a task is considered to have failed, abortTask is called in the AM to abort that specific task; for the magic committer the task's set of .pending files is determined by listing the task attempt dir, and those operations cancelled. If that operation is called reliably, only the current upload is pending.
Of course, if an entire job fails: no cleanup at all.
The best thing to do is simply to tell everyone to have a scheduled cleanup.
FWIW, the most leakage I see in the real world is actually from incomplete S3ABlockOutputStream writes as again, they accrue bills. Everyone needs a lifecycle rule to delete old ones. The sole exception there is one which our QE team used which (unknown to them) I'd use for testing the scalability of the "hadoop s3guard uploads" command -how well does it work when there are many, many incomplete uploads, can it still delete them all etc. If they had a rule then it'd screw up my test runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other places use this. as a prefix when setting fields. I find that helpful when reading to know that an instance field is being set, vs a local variable.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense in the constructor. Done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A lot of these changes don't seem related to the UUID change. I think it would be easier to review if only necessary changes were in this PR.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The IDE was whining about calling an override point in the constructor, so I turned it off at the same time. sorry
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems odd to set the Spark property. Does anything else use this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just trying to be rigorous. will roll back. While I'm there I think I'll add the source attribute -i can then probe for it in the tests. I'm already saving it in the _SUCCESS file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect if this is self-generated but this method is called after setupJob. I think that method shouldn't set SPARK_WRITE_UUID.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why would a committer not want to generate a unique ID and use the job ID instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MR jobs where their updated config doesn't get through to the tasks. Use a self-generated ID and things won't work. And as they know that the app ID is unique on that yarn cluster, that's all they need.
For my spark integration tests I turned off auto generate and enabled the fail-on-job-ID option, to verify that all operations (RDD, dataframe, dataset, sql) were passing down the spark.sql option. Helped me find out where it wasn't being set
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was just trying to be rigorous. will roll back. While I'm there I think I'll add the source attribute -i can then probe for it in the tests. I'm already saving it in the _SUCCESS file
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed it there.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
MR jobs where their updated config doesn't get through to the tasks. Use a self-generated ID and things won't work. And as they know that the app ID is unique on that yarn cluster, that's all they need.
For my spark integration tests I turned off auto generate and enabled the fail-on-job-ID option, to verify that all operations (RDD, dataframe, dataset, sql) were passing down the spark.sql option. Helped me find out where it wasn't being set
hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md
Outdated
Show resolved
Hide resolved
|
yes, I did a bit more than was needed because I had to also let > 1 magic committer commit work side-by-side (all that active upload warning), and the IDE was trying to keep me in check too, on a piece of code which hasn't been revisited for a while. While I had the files open in the IDE, I moved to passing FileStatus down to line up with the changes in #2168 -if you open a file through the JsonSerializer by passing in the FileStatus, that will be handed off to the FileSystem's implementation of openFile(status.path).withFileStatus(status), and so be used by S3A FS to skip the initial HEAD request. Means if we are reading 1000 .pendingset files in S3A, we eliminate 1000 HEAD calls, which should have tangible benefits for committers using S3 as the place to keep those files. |
Also added config option fs.s3a.committer.uuid.source which is set in the jobconf during job setup, used in test to verify source of ID. Change-Id: I9eb44113bc6afd5826c8a51bdf16fb220f8fb111
1f14f64 to
9ba3f4e
Compare
|
Pushed up an iteration with all the feedback addressed testing: s3 london, unguarded, markers=keep |
This comment has been minimized.
This comment has been minimized.
Change-Id: Ic7106a43738a14eba59f81d892b7856e6596ad65
This comment has been minimized.
This comment has been minimized.
…t ID. (#2399) See also [SPARK-33402]: Jobs launched in same second have duplicate MapReduce JobIDs Contributed by Steve Loughran. Change-Id: Iae65333cddc84692997aae5d902ad8765b45772a
|
Merged to trunk, not yet 3.3. See #2473 for the test failure caused in code from a different PR which this patch goes nowhere near. |
|
Thank you, @steveloughran and guys! |
…t ID. (#2399) See also [SPARK-33402]: Jobs launched in same second have duplicate MapReduce JobIDs Contributed by Steve Loughran. Change-Id: Iae65333cddc84692997aae5d902ad8765b45772a
…e app attempt ID. (apache#2399) See also [SPARK-33402]: Jobs launched in same second have duplicate MapReduce JobIDs Contributed by Steve Loughran. Change-Id: Iae65333cddc84692997aae5d902ad8765b45772a
This PR addresses concurrency issues in the s3A committers, mostly related to application attempt IDs not always being unique in spark, but also from some other issues
Magic:
they would conflict there
Staging: app attempt ID used for
All S3A committers can have purging pending deletes on job commit
disabled. (new option; old one deprecated).
New test of concurrent jobs designed to trigger the specific failure conditions
of Staging (job2 commit after task1 commit) and Magic (job2 commit before
task2 commit), also verify that paths for output are different.