Skip to content

Commit 9ba3f4e

Browse files
committed
HADOOP-17318: address feedback.
Also added config option fs.s3a.committer.uuid.source which is set in the jobconf during job setup, used in test to verify source of ID. Change-Id: I9eb44113bc6afd5826c8a51bdf16fb220f8fb111
1 parent 51c2291 commit 9ba3f4e

File tree

9 files changed

+108
-60
lines changed

9 files changed

+108
-60
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ protected WriteOperationHelper(S3AFileSystem owner, Configuration conf) {
131131
*/
132132
void operationRetried(String text, Exception ex, int retries,
133133
boolean idempotent) {
134-
LOG.info("{}: Retried {}: {}", retries, text, ex.toString());
134+
LOG.info("{}: Retried {}: {}", text, retries, ex.toString());
135135
LOG.debug("Stack", ex);
136136
owner.operationRetried(text, ex, retries, idempotent);
137137
}
@@ -590,7 +590,7 @@ public BulkOperationState initiateOperation(final Path path,
590590
public UploadPartResult uploadPart(UploadPartRequest request)
591591
throws IOException {
592592
return retry("upload part #" + request.getPartNumber()
593-
+ " upload "+ request.getUploadId(),
593+
+ " upload ID "+ request.getUploadId(),
594594
request.getKey(),
595595
true,
596596
() -> owner.uploadPart(request));

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/AbstractS3ACommitter.java

Lines changed: 40 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@
6565
import static org.apache.hadoop.fs.s3a.commit.CommitUtilsWithMR.*;
6666
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
6767
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
68+
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
6869
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
6970

7071
/**
@@ -94,14 +95,25 @@
9495
* committer was large enough for more all the parallel POST requests.
9596
*/
9697
public abstract class AbstractS3ACommitter extends PathOutputCommitter {
98+
9799
private static final Logger LOG =
98100
LoggerFactory.getLogger(AbstractS3ACommitter.class);
99101

100102
public static final String THREAD_PREFIX = "s3a-committer-pool-";
101103

102104
/**
103-
* Unique ID for a Job. On Spark this MUST NOT be the YARN JobID;
104-
* on MapReduce it MUST BE that.
105+
* Error string when task setup fails.
106+
*/
107+
@VisibleForTesting
108+
public static final String E_SELF_GENERATED_JOB_UUID
109+
= "has a self-generated job UUID";
110+
111+
/**
112+
* Unique ID for a Job.
113+
* In MapReduce Jobs the YARN JobID suffices.
114+
* On Spark this only be the YARN JobID
115+
* it is known to be creating strongly unique IDs
116+
* (i.e. SPARK-33402 is on the branch).
105117
*/
106118
private final String uuid;
107119

@@ -175,17 +187,17 @@ protected AbstractS3ACommitter(
175187
setConf(context.getConfiguration());
176188
Pair<String, JobUUIDSource> id = buildJobUUID(
177189
conf, context.getJobID());
178-
uuid = id.getLeft();
179-
uuidSource = id.getRight();
190+
this.uuid = id.getLeft();
191+
this.uuidSource = id.getRight();
180192
LOG.info("Job UUID {} source {}", getUUID(), getUUIDSource().getText());
181193
initOutput(outputPath);
182194
LOG.debug("{} instantiated for job \"{}\" ID {} with destination {}",
183195
role, jobName(context), jobIdString(context), outputPath);
184196
S3AFileSystem fs = getDestS3AFS();
185-
createJobMarker = context.getConfiguration().getBoolean(
197+
this.createJobMarker = context.getConfiguration().getBoolean(
186198
CREATE_SUCCESSFUL_JOB_OUTPUT_DIR_MARKER,
187199
DEFAULT_CREATE_SUCCESSFUL_JOB_DIR_MARKER);
188-
commitOperations = new CommitOperations(fs);
200+
this.commitOperations = new CommitOperations(fs);
189201
}
190202

191203
/**
@@ -483,11 +495,8 @@ public void setupJob(JobContext context) throws IOException {
483495
jobSetup = true;
484496
// patch job conf with the job UUID.
485497
Configuration c = context.getConfiguration();
486-
c.set(FS_S3A_COMMITTER_UUID, this.getUUID());
487-
if (getUUIDSource() == JobUUIDSource.GeneratedLocally) {
488-
// we set the UUID up locally. Save it back to the job configuration
489-
c.set(SPARK_WRITE_UUID, this.getUUID());
490-
}
498+
c.set(FS_S3A_COMMITTER_UUID, getUUID());
499+
c.set(FS_S3A_COMMITTER_UUID_SOURCE, getUUIDSource().getText());
491500
Path dest = getOutputPath();
492501
if (createJobMarker){
493502
commitOperations.deleteSuccessMarker(dest);
@@ -517,7 +526,7 @@ && getUUIDSource() == JobUUIDSource.GeneratedLocally) {
517526
// generated locally.
518527
throw new PathCommitException(getOutputPath().toString(),
519528
"Task attempt " + attemptID
520-
+ " only has a self-generated job UUID");
529+
+ " " + E_SELF_GENERATED_JOB_UUID);
521530
}
522531
Path taskAttemptPath = getTaskAttemptPath(context);
523532
FileSystem fs = taskAttemptPath.getFileSystem(getConf());
@@ -1209,16 +1218,25 @@ protected void warnOnActiveUploads(final Path path) {
12091218
* </p>
12101219
* <p>
12111220
* Spark will use a fake app ID based on the current time.
1212-
* This can lead to collisions on busy clusters.
1213-
*
1221+
* This can lead to collisions on busy clusters unless
1222+
* the specific spark release has SPARK-33402 applied.
1223+
* This appends a random long value to the timestamp, so
1224+
* is unique enough that the risk of collision is almost
1225+
* nonexistent.
1226+
* </p>
1227+
* <p>
1228+
* The order of selection of a uuid is
12141229
* </p>
12151230
* <ol>
12161231
* <li>Value of
12171232
* {@link InternalCommitterConstants#FS_S3A_COMMITTER_UUID}.</li>
12181233
* <li>Value of
12191234
* {@link InternalCommitterConstants#SPARK_WRITE_UUID}.</li>
1220-
* <li>If enabled: Self-generated uuid.</li>
1221-
* <li>If not disabled: Application ID</li>
1235+
* <li>If enabled through
1236+
* {@link CommitConstants#FS_S3A_COMMITTER_GENERATE_UUID}:
1237+
* Self-generated uuid.</li>
1238+
* <li>If {@link CommitConstants#FS_S3A_COMMITTER_REQUIRE_UUID}
1239+
* is not set: Application ID</li>
12221240
* </ol>
12231241
* The UUID bonding takes place during construction;
12241242
* the staging committers use it to set up their wrapped
@@ -1263,16 +1281,18 @@ protected void warnOnActiveUploads(final Path path) {
12631281

12641282
// Check the job hasn't declared a requirement for the UUID.
12651283
// This allows or fail-fast validation of Spark behavior.
1266-
if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID, false)) {
1284+
if (conf.getBoolean(FS_S3A_COMMITTER_REQUIRE_UUID,
1285+
DEFAULT_S3A_COMMITTER_REQUIRE_UUID)) {
12671286
throw new PathCommitException("", E_NO_SPARK_UUID);
12681287
}
12691288

1270-
// see if the job can generate a random UUID
1271-
if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID, false)) {
1289+
// see if the job can generate a random UUI`
1290+
if (conf.getBoolean(FS_S3A_COMMITTER_GENERATE_UUID,
1291+
DEFAULT_S3A_COMMITTER_GENERATE_UUID)) {
12721292
// generate a random UUID. This is OK for a job, for a task
12731293
// it means that the data may not get picked up.
12741294
String newId = UUID.randomUUID().toString();
1275-
LOG.warn("No job ID in configuration; generating a randem ID: {}",
1295+
LOG.warn("No job ID in configuration; generating a random ID: {}",
12761296
newId);
12771297
return Pair.of(newId, JobUUIDSource.GeneratedLocally);
12781298
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/CommitConstants.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,6 +294,12 @@ private CommitConstants() {
294294
public static final String FS_S3A_COMMITTER_REQUIRE_UUID =
295295
"fs.s3a.committer.require.uuid";
296296

297+
/**
298+
* Default value for {@link #FS_S3A_COMMITTER_REQUIRE_UUID}: {@value}.
299+
*/
300+
public static final boolean DEFAULT_S3A_COMMITTER_REQUIRE_UUID =
301+
false;
302+
297303
/**
298304
* Generate a UUID in job setup rather than fall back to
299305
* YARN Application attempt ID.
@@ -304,4 +310,10 @@ private CommitConstants() {
304310
public static final String FS_S3A_COMMITTER_GENERATE_UUID =
305311
"fs.s3a.committer.generate.uuid";
306312

313+
/**
314+
* Default value for {@link #FS_S3A_COMMITTER_GENERATE_UUID}: {@value}.
315+
*/
316+
public static final boolean DEFAULT_S3A_COMMITTER_GENERATE_UUID =
317+
false;
318+
307319
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/InternalCommitterConstants.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,12 @@ private InternalCommitterConstants() {
4949
public static final String FS_S3A_COMMITTER_UUID =
5050
"fs.s3a.committer.uuid";
5151

52+
/**
53+
* Where did the UUID come from? {@value}.
54+
*/
55+
public static final String FS_S3A_COMMITTER_UUID_SOURCE =
56+
"fs.s3a.committer.uuid.source";
57+
5258
/**
5359
* Directory committer factory: {@value}.
5460
*/
@@ -105,11 +111,6 @@ private InternalCommitterConstants() {
105111
public static final String SPARK_WRITE_UUID =
106112
"spark.sql.sources.writeJobUUID";
107113

108-
/**
109-
* The App ID for jobs: {@value}.
110-
*/
111-
public static final String SPARK_APP_ID = "spark.app.id";
112-
113114
/**
114115
* Java temp dir: {@value}.
115116
*/

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/staging/StagingCommitter.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ public StagingCommitter(Path outputPath,
117117
this.uniqueFilenames = conf.getBoolean(
118118
FS_S3A_COMMITTER_STAGING_UNIQUE_FILENAMES,
119119
DEFAULT_STAGING_COMMITTER_UNIQUE_FILENAMES);
120-
setWorkPath(buildWorkPath(context, this.getUUID()));
120+
setWorkPath(buildWorkPath(context, getUUID()));
121121
this.wrappedCommitter = createWrappedCommitter(context, conf);
122122
setOutputPath(constructorOutputPath);
123123
Path finalOutputPath = getOutputPath();
@@ -174,7 +174,7 @@ public String toString() {
174174
sb.append(", commitsDirectory=").append(commitsDirectory);
175175
sb.append(", uniqueFilenames=").append(uniqueFilenames);
176176
sb.append(", conflictResolution=").append(conflictResolution);
177-
sb.append(". uploadPartSize=").append(uploadPartSize);
177+
sb.append(", uploadPartSize=").append(uploadPartSize);
178178
if (wrappedCommitter != null) {
179179
sb.append(", wrappedCommitter=").append(wrappedCommitter);
180180
}

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/committers.md

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -530,30 +530,29 @@ Amazon S3, that means S3Guard must *always* be enabled.
530530

531531
Conflict management is left to the execution engine itself.
532532

533-
## Committer Configuration Options
533+
## Common Committer Options
534534

535535

536-
| Option | Magic | Directory | Partitioned | Meaning | Default |
537-
|--------|-------|-----------|-------------|---------|---------|
538-
| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | X | X | X | Write a `_SUCCESS` file on the successful completion of the job. | `true` |
539-
| `fs.s3a.buffer.dir` | X | X | X | Local filesystem directory for data being written and/or staged. | `${hadoop.tmp.dir}/s3a` |
540-
| `fs.s3a.committer.magic.enabled` | X | | | Enable "magic committer" support in the filesystem. | `false` |
541-
| `fs.s3a.committer.abort.pending.uploads` | X | X | X | list and abort all pending uploads under the destination path when the job is committed or aborted. | `true` |
542-
| `fs.s3a.committer.threads` | X | X | X | Number of threads in committers for parallel operations on files. | 8 |
543-
| `fs.s3a.committer.generate.uuid` | | X | X | Generate a Job UUID if none is passed down from Spark | `false` |
544-
| `fs.s3a.committer.require.uuid` | | X | X | Require the Job UUID to be passed down from Spark | `false` |
536+
| Option | Meaning | Default |
537+
|--------|---------|---------|
538+
| `mapreduce.fileoutputcommitter.marksuccessfuljobs` | Write a `_SUCCESS` file on the successful completion of the job. | `true` |
539+
| `fs.s3a.buffer.dir` | Local filesystem directory for data being written and/or staged. | `${hadoop.tmp.dir}/s3a` |
540+
| `fs.s3a.committer.magic.enabled` | Enable "magic committer" support in the filesystem. | `false` |
541+
| `fs.s3a.committer.abort.pending.uploads` | list and abort all pending uploads under the destination path when the job is committed or aborted. | `true` |
542+
| `fs.s3a.committer.threads` | Number of threads in committers for parallel operations on files. | 8 |
543+
| `fs.s3a.committer.generate.uuid` | Generate a Job UUID if none is passed down from Spark | `false` |
544+
| `fs.s3a.committer.require.uuid` |Require the Job UUID to be passed down from Spark | `false` |
545545

546546

547-
Staging committer (Directory and Partitioned) options
547+
## Staging committer (Directory and Partitioned) options
548548

549549

550-
| Option | Magic | Directory | Partitioned | Meaning | Default |
551-
|--------|-------|-----------|-------------|---------|---------|
552-
553-
| `fs.s3a.committer.staging.conflict-mode` | | X | X | Conflict resolution: `fail`, `append` or `replace`| `append` |
554-
| `fs.s3a.committer.staging.tmp.path` | | X | X | Path in the cluster filesystem for temporary data. | `tmp/staging` |
555-
| `fs.s3a.committer.staging.unique-filenames` | | X | X | Generate unique filenames. | `true` |
556-
| `fs.s3a.committer.staging.abort.pending.uploads` | | X | X | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | |
550+
| Option | Meaning | Default |
551+
|--------|---------|---------|
552+
| `fs.s3a.committer.staging.conflict-mode` | Conflict resolution: `fail`, `append` or `replace`| `append` |
553+
| `fs.s3a.committer.staging.tmp.path` | Path in the cluster filesystem for temporary data. | `tmp/staging` |
554+
| `fs.s3a.committer.staging.unique-filenames` | Generate unique filenames. | `true` |
555+
| `fs.s3a.committer.staging.abort.pending.uploads` | Deprecated; replaced by `fs.s3a.committer.abort.pending.uploads`. | `(false)` |
557556

558557

559558
### Common Committer Options

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/troubleshooting_s3a.md

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ Fixes:
104104
revert to the JVM SSL implementation when the wildfly
105105
or native openssl libraries cannot be loaded.
106106

107+
107108
## <a name="authentication"></a> Authentication Failure
108109

109110
If Hadoop cannot authenticate with the S3 service endpoint,
@@ -286,7 +287,17 @@ There's two main causes
286287
classloader, so the JVM does not consider it to be an implementation.
287288
Fix: learn the entire JVM classloader model and see if you can then debug it.
288289
Tip: having both the AWS Shaded SDK and individual AWS SDK modules on your classpath
289-
may be a cause of this
290+
may be a cause of this.
291+
292+
If you see this and you are trying to use the S3A connector with Spark, then the cause can
293+
be that the isolated classloader used to load Hive classes is interfering with the S3A
294+
connector's dynamic loading of `com.amazonaws` classes. To fix this, declare that that
295+
the classes in the aws SDK are loaded from the same classloader which instantiated
296+
the S3A FileSystem instance:
297+
298+
```
299+
spark.sql.hive.metastore.sharedPrefixes com.amazonaws.
300+
```
290301

291302
## <a name="access_denied"></a> "The security token included in the request is invalid"
292303

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/AbstractITCommitProtocol.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,11 @@
7171
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
7272
import static org.apache.hadoop.fs.s3a.S3AUtils.*;
7373
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
74+
import static org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter.E_SELF_GENERATED_JOB_UUID;
7475
import static org.apache.hadoop.fs.s3a.commit.CommitConstants.*;
7576
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.E_NO_SPARK_UUID;
7677
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID;
78+
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.FS_S3A_COMMITTER_UUID_SOURCE;
7779
import static org.apache.hadoop.fs.s3a.commit.InternalCommitterConstants.SPARK_WRITE_UUID;
7880
import static org.apache.hadoop.test.LambdaTestUtils.*;
7981

@@ -1499,16 +1501,16 @@ public void testParallelJobsToSameDestination() throws Throwable {
14991501
Job job2 = newJob(outDir,
15001502
c2,
15011503
attempt2);
1502-
Configuration conf2 = job2.getConfiguration();
1503-
conf2.set("mapreduce.output.basename", "task2");
1504+
Configuration jobConf2 = job2.getConfiguration();
1505+
jobConf2.set("mapreduce.output.basename", "task2");
15041506
String stage2Id = UUID.randomUUID().toString();
1505-
conf2.set(SPARK_WRITE_UUID,
1507+
jobConf2.set(SPARK_WRITE_UUID,
15061508
stage2Id);
15071509

1508-
JobContext jContext2 = new JobContextImpl(conf2,
1510+
JobContext jContext2 = new JobContextImpl(jobConf2,
15091511
taskAttempt2.getJobID());
15101512
TaskAttemptContext tContext2 =
1511-
new TaskAttemptContextImpl(conf2, taskAttempt2);
1513+
new TaskAttemptContextImpl(jobConf2, taskAttempt2);
15121514
AbstractS3ACommitter committer2 = createCommitter(outDir, tContext2);
15131515
Assertions.assertThat(committer2.getJobAttemptPath(jContext2))
15141516
.describedAs("Job attempt path of %s", committer2)
@@ -1548,7 +1550,7 @@ public void testParallelJobsToSameDestination() throws Throwable {
15481550
if (multipartInitiatedInWrite) {
15491551
// magic committer runs -commit job1 while a job2 TA has an open
15501552
// writer (and hence: open MP Upload)
1551-
LOG.info("Commit Job 1");
1553+
LOG.info("With Multipart Initiated In Write: Commit Job 1");
15521554
commitJob(committer1, jContext1);
15531555
}
15541556

@@ -1567,7 +1569,7 @@ public void testParallelJobsToSameDestination() throws Throwable {
15671569
if (!multipartInitiatedInWrite) {
15681570
// if not a magic committer, commit the job now. Because at
15691571
// this point the staging committer tasks from job2 will be pending
1570-
LOG.info("Commit Job 1");
1572+
LOG.info("With Multipart NOT Initiated In Write: Commit Job 1");
15711573
assertJobAttemptPathExists(committer1, jContext1);
15721574
commitJob(committer1, jContext1);
15731575
}
@@ -1621,13 +1623,14 @@ public void testSelfGeneratedUUID() throws Throwable {
16211623
.describedAs("UUID source of %s", committer)
16221624
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally);
16231625

1626+
// examine the job configuration and verify that it has been updated
16241627
Configuration jobConf = jobData.conf;
16251628
Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID, null))
16261629
.describedAs("Config option " + FS_S3A_COMMITTER_UUID)
16271630
.isEqualTo(uuid);
1628-
Assertions.assertThat(jobConf.get(SPARK_WRITE_UUID, null))
1629-
.describedAs("Config option " + SPARK_WRITE_UUID)
1630-
.isEqualTo(uuid);
1631+
Assertions.assertThat(jobConf.get(FS_S3A_COMMITTER_UUID_SOURCE, null))
1632+
.describedAs("Config option " + FS_S3A_COMMITTER_UUID_SOURCE)
1633+
.isEqualTo(AbstractS3ACommitter.JobUUIDSource.GeneratedLocally.getText());
16311634

16321635
// because the task was set up in the job, it can have task
16331636
// setup called, even though it had a random ID.
@@ -1643,7 +1646,9 @@ public void testSelfGeneratedUUID() throws Throwable {
16431646
assertNotEquals("job UUIDs",
16441647
committer.getUUID(),
16451648
committer2.getUUID());
1646-
intercept(PathCommitException.class, () -> {
1649+
// Task setup MUST fail.
1650+
intercept(PathCommitException.class,
1651+
E_SELF_GENERATED_JOB_UUID, () -> {
16471652
committer2.setupTask(tContext2);
16481653
return committer2;
16491654
});

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/commit/staging/TestStagingCommitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
3636
import com.amazonaws.services.s3.model.CompleteMultipartUploadRequest;
3737

38-
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
3938
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
4039
import org.assertj.core.api.Assertions;
4140
import org.hamcrest.core.StringStartsWith;
@@ -57,6 +56,7 @@
5756
import org.apache.hadoop.fs.s3a.MockS3AFileSystem;
5857
import org.apache.hadoop.fs.s3a.S3AFileSystem;
5958
import org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter;
59+
import org.apache.hadoop.fs.s3a.commit.PathCommitException;
6060
import org.apache.hadoop.fs.s3a.commit.files.PendingSet;
6161
import org.apache.hadoop.fs.s3a.commit.files.SinglePendingCommit;
6262
import org.apache.hadoop.mapred.JobConf;

0 commit comments

Comments
 (0)