Skip to content

Commit ce76f5d

Browse files
steveloughranHarshitGupta11
authored andcommitted
HADOOP-17833. Improve Magic Committer performance (apache#3289)
Speed up the magic committer with key changes being * Writes under __magic always retain directory markers * File creation under __magic skips all overwrite checks, including the LIST call intended to stop files being created over dirs. * mkdirs under __magic probes the path for existence but does not look any further. Extra parallelism in task and job commit directory scanning Use of createFile and openFile with parameters which all for HEAD checks to be skipped. The committer can write the summary _SUCCESS file to the path `fs.s3a.committer.summary.report.directory`, which can be in a different file system/bucket if desired, using the job id as the filename. Also: HADOOP-15460. S3A FS to add `fs.s3a.create.performance` Application code can set the createFile() option fs.s3a.create.performance to true to disable the same safety checks when writing under magic directories. Use with care. The createFile option prefix `fs.s3a.create.header.` can be used to add custom headers to S3 objects when created. Contributed by Steve Loughran.
1 parent fbf7c2a commit ce76f5d

File tree

89 files changed

+4372
-2335
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

89 files changed

+4372
-2335
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/AuditConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ private AuditConstants() {
9090
*/
9191
public static final String PARAM_PROCESS = "ps";
9292

93+
/**
94+
* Task Attempt ID query header: {@value}.
95+
*/
96+
public static final String PARAM_TASK_ATTEMPT_ID = "ta";
97+
9398
/**
9499
* Thread 0: the thread which created a span {@value}.
95100
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,11 +124,15 @@ private CommonAuditContext() {
124124
/**
125125
* Put a context entry.
126126
* @param key key
127-
* @param value new value
127+
* @param value new value., If null, triggers removal.
128128
* @return old value or null
129129
*/
130130
public Supplier<String> put(String key, String value) {
131-
return evaluatedEntries.put(key, () -> value);
131+
if (value != null) {
132+
return evaluatedEntries.put(key, () -> value);
133+
} else {
134+
return evaluatedEntries.remove(key);
135+
}
132136
}
133137

134138
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,9 @@ public final class StoreStatisticNames {
5353
/** {@value}. */
5454
public static final String OP_CREATE = "op_create";
5555

56+
/** {@value}. */
57+
public static final String OP_CREATE_FILE = "op_createfile";
58+
5659
/** {@value}. */
5760
public static final String OP_CREATE_NON_RECURSIVE =
5861
"op_create_non_recursive";

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -297,11 +297,12 @@ public void save(FileSystem fs, Path path, T instance,
297297
}
298298

299299
/**
300-
* Write the JSON as bytes, then close the file.
300+
* Write the JSON as bytes, then close the stream.
301+
* @param instance instance to write
301302
* @param dataOutputStream an output stream that will always be closed
302303
* @throws IOException on any failure
303304
*/
304-
private void writeJsonAsBytes(T instance,
305+
public void writeJsonAsBytes(T instance,
305306
OutputStream dataOutputStream) throws IOException {
306307
try {
307308
dataOutputStream.write(toBytes(instance));

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/functional/CloseableTaskPoolSubmitter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
*/
3535
@InterfaceAudience.Public
3636
@InterfaceStability.Unstable
37-
public final class CloseableTaskPoolSubmitter implements TaskPool.Submitter,
37+
public class CloseableTaskPoolSubmitter implements TaskPool.Submitter,
3838
Closeable {
3939

4040
/** Executors. */

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ create a new file or open an existing file on `FileSystem` for write.
2626
## Invariants
2727

2828
The `FSDataOutputStreamBuilder` interface does not validate parameters
29-
and modify the state of `FileSystem` until [`build()`](#Builder.build) is
29+
and modify the state of `FileSystem` until `build()` is
3030
invoked.
3131

3232
## Implementation-agnostic parameters.
@@ -110,7 +110,7 @@ of `FileSystem`.
110110
#### Implementation Notes
111111

112112
The concrete `FileSystem` and/or `FSDataOutputStreamBuilder` implementation
113-
MUST verify that implementation-agnostic parameters (i.e., "syncable") or
113+
MUST verify that implementation-agnostic parameters (i.e., "syncable`) or
114114
implementation-specific parameters (i.e., "foofs:cache")
115115
are supported. `FileSystem` will satisfy optional parameters (via `opt(key, ...)`)
116116
on best effort. If the mandatory parameters (via `must(key, ...)`) can not be satisfied
@@ -182,3 +182,58 @@ see `FileSystem#create(path, ...)` and `FileSystem#append()`.
182182
result = FSDataOutputStream
183183

184184
The result is `FSDataOutputStream` to be used to write data to filesystem.
185+
186+
187+
## <a name="s3a"></a> S3A-specific options
188+
189+
Here are the custom options which the S3A Connector supports.
190+
191+
| Name | Type | Meaning |
192+
|-----------------------------|-----------|----------------------------------------|
193+
| `fs.s3a.create.performance` | `boolean` | create a file with maximum performance |
194+
| `fs.s3a.create.header` | `string` | prefix for user supplied headers |
195+
196+
### `fs.s3a.create.performance`
197+
198+
Prioritize file creation performance over safety checks for filesystem consistency.
199+
200+
This:
201+
1. Skips the `LIST` call which makes sure a file is being created over a directory.
202+
Risk: a file is created over a directory.
203+
1. Ignores the overwrite flag.
204+
1. Never issues a `DELETE` call to delete parent directory markers.
205+
206+
It is possible to probe an S3A Filesystem instance for this capability through
207+
the `hasPathCapability(path, "fs.s3a.create.performance")` check.
208+
209+
Creating files with this option over existing directories is likely
210+
to make S3A filesystem clients behave inconsistently.
211+
212+
Operations optimized for directories (e.g. listing calls) are likely
213+
to see the directory tree not the file; operations optimized for
214+
files (`getFileStatus()`, `isFile()`) more likely to see the file.
215+
The exact form of the inconsistencies, and which operations/parameters
216+
trigger this are undefined and may change between even minor releases.
217+
218+
Using this option is the equivalent of pressing and holding down the
219+
"Electronic Stability Control"
220+
button on a rear-wheel drive car for five seconds: the safety checks are off.
221+
Things wil be faster if the driver knew what they were doing.
222+
If they didn't, the fact they had held the button down will
223+
be used as evidence at the inquest as proof that they made a
224+
conscious decision to choose speed over safety and
225+
that the outcome was their own fault.
226+
227+
Accordingly: *Use if and only if you are confident that the conditions are met.*
228+
229+
### `fs.s3a.create.header` User-supplied header support
230+
231+
Options with the prefix `fs.s3a.create.header.` will be added to to the
232+
S3 object metadata as "user defined metadata".
233+
This metadata is visible to all applications. It can also be retrieved through the
234+
FileSystem/FileContext `listXAttrs()` and `getXAttrs()` API calls with the prefix `header.`
235+
236+
When an object is renamed, the metadata is propagated the copy created.
237+
238+
It is possible to probe an S3A Filesystem instance for this capability through
239+
the `hasPathCapability(path, "fs.s3a.create.header")` check.

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import org.apache.hadoop.fs.RemoteIterator;
3030
import org.apache.hadoop.fs.StreamCapabilities;
3131
import org.apache.hadoop.io.IOUtils;
32+
import org.apache.hadoop.util.functional.RemoteIterators;
33+
3234
import org.junit.Assert;
3335
import org.junit.AssumptionViolatedException;
3436
import org.slf4j.Logger;
@@ -1446,11 +1448,7 @@ public static TreeScanResults treeWalk(FileSystem fs, Path path)
14461448
*/
14471449
public static List<LocatedFileStatus> toList(
14481450
RemoteIterator<LocatedFileStatus> iterator) throws IOException {
1449-
ArrayList<LocatedFileStatus> list = new ArrayList<>();
1450-
while (iterator.hasNext()) {
1451-
list.add(iterator.next());
1452-
}
1453-
return list;
1451+
return RemoteIterators.toList(iterator);
14541452
}
14551453

14561454
/**
@@ -1464,11 +1462,7 @@ public static List<LocatedFileStatus> toList(
14641462
*/
14651463
public static <T extends FileStatus> List<T> iteratorToList(
14661464
RemoteIterator<T> iterator) throws IOException {
1467-
List<T> list = new ArrayList<>();
1468-
while (iterator.hasNext()) {
1469-
list.add(iterator.next());
1470-
}
1471-
return list;
1465+
return RemoteIterators.toList(iterator);
14721466
}
14731467

14741468

hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/committer/manifest/files/AbstractManifestData.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,7 +70,6 @@ public static Path unmarshallPath(String path) {
7070
throw new RuntimeException(
7171
"Failed to parse \"" + path + "\" : " + e,
7272
e);
73-
7473
}
7574
}
7675

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -405,6 +405,39 @@
405405
</execution>
406406
</executions>
407407
</plugin>
408+
<plugin>
409+
<groupId>org.apache.maven.plugins</groupId>
410+
<artifactId>maven-enforcer-plugin</artifactId>
411+
<executions>
412+
<execution>
413+
<id>banned-illegal-imports</id>
414+
<phase>process-sources</phase>
415+
<goals>
416+
<goal>enforce</goal>
417+
</goals>
418+
<configuration>
419+
<rules>
420+
<restrictImports>
421+
<includeTestCode>false</includeTestCode>
422+
<reason>Restrict mapreduce imports to committer code</reason>
423+
<exclusions>
424+
<exclusion>org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter</exclusion>
425+
<exclusion>org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory</exclusion>
426+
<exclusion>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</exclusion>
427+
<exclusion>org.apache.hadoop.fs.s3a.commit.impl.*</exclusion>
428+
<exclusion>org.apache.hadoop.fs.s3a.commit.magic.*</exclusion>
429+
<exclusion>org.apache.hadoop.fs.s3a.commit.staging.*</exclusion>
430+
</exclusions>
431+
<bannedImports>
432+
<bannedImport>org.apache.hadoop.mapreduce.**</bannedImport>
433+
<bannedImport>org.apache.hadoop.mapred.**</bannedImport>
434+
</bannedImports>
435+
</restrictImports>
436+
</rules>
437+
</configuration>
438+
</execution>
439+
</executions>
440+
</plugin>
408441
</plugins>
409442
</build>
410443

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1159,4 +1159,22 @@ private Constants() {
11591159
* Require that all S3 access is made through Access Points.
11601160
*/
11611161
public static final String AWS_S3_ACCESSPOINT_REQUIRED = "fs.s3a.accesspoint.required";
1162+
1163+
/**
1164+
* Flag for create performance.
1165+
* This is *not* a configuration option; it is for use in the
1166+
* {code createFile()} builder.
1167+
* Value {@value}.
1168+
*/
1169+
public static final String FS_S3A_CREATE_PERFORMANCE = "fs.s3a.create.performance";
1170+
1171+
/**
1172+
* Prefix for adding a header to the object when created.
1173+
* The actual value must have a "." suffix and then the actual header.
1174+
* This is *not* a configuration option; it is only for use in the
1175+
* {code createFile()} builder.
1176+
* Value {@value}.
1177+
*/
1178+
public static final String FS_S3A_CREATE_HEADER = "fs.s3a.create.header";
1179+
11621180
}

0 commit comments

Comments
 (0)