Skip to content

Commit 5227239

Browse files
steveloughrandeepakdamri
authored andcommitted
HADOOP-17833. Improve Magic Committer performance (apache#3289) (apache#4470)
Speed up the magic committer with key changes being * Writes under __magic always retain directory markers * File creation under __magic skips all overwrite checks, including the LIST call intended to stop files being created over dirs. * mkdirs under __magic probes the path for existence but does not look any further. Extra parallelism in task and job commit directory scanning Use of createFile and openFile with parameters which all for HEAD checks to be skipped. The committer can write the summary _SUCCESS file to the path `fs.s3a.committer.summary.report.directory`, which can be in a different file system/bucket if desired, using the job id as the filename. Also: HADOOP-15460. S3A FS to add `fs.s3a.create.performance` Application code can set the createFile() option fs.s3a.create.performance to true to disable the same safety checks when writing under magic directories. Use with care. The createFile option prefix `fs.s3a.create.header.` can be used to add custom headers to S3 objects when created. Contributed by Steve Loughran.
1 parent 9b068b6 commit 5227239

33 files changed

+2437
-71
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/AuditConstants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -90,6 +90,11 @@ private AuditConstants() {
9090
*/
9191
public static final String PARAM_PROCESS = "ps";
9292

93+
/**
94+
* Task Attempt ID query header: {@value}.
95+
*/
96+
public static final String PARAM_TASK_ATTEMPT_ID = "ta";
97+
9398
/**
9499
* Thread 0: the thread which created a span {@value}.
95100
*/

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/audit/CommonAuditContext.java

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,15 @@ private CommonAuditContext() {
116116
/**
117117
* Put a context entry.
118118
* @param key key
119-
* @param value new value
119+
* @param value new value., If null, triggers removal.
120120
* @return old value or null
121121
*/
122122
public Supplier<String> put(String key, String value) {
123-
return evaluatedEntries.put(key, () -> value);
123+
if (value != null) {
124+
return evaluatedEntries.put(key, () -> value);
125+
} else {
126+
return evaluatedEntries.remove(key);
127+
}
124128
}
125129

126130
/**

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,9 @@ public final class StoreStatisticNames {
4747
/** {@value}. */
4848
public static final String OP_CREATE = "op_create";
4949

50+
/** {@value}. */
51+
public static final String OP_CREATE_FILE = "op_createfile";
52+
5053
/** {@value}. */
5154
public static final String OP_CREATE_NON_RECURSIVE =
5255
"op_create_non_recursive";

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -270,11 +270,12 @@ public void save(FileSystem fs, Path path, T instance,
270270
}
271271

272272
/**
273-
* Write the JSON as bytes, then close the file.
273+
* Write the JSON as bytes, then close the stream.
274+
* @param instance instance to write
274275
* @param dataOutputStream an output stream that will always be closed
275276
* @throws IOException on any failure
276277
*/
277-
private void writeJsonAsBytes(T instance,
278+
public void writeJsonAsBytes(T instance,
278279
OutputStream dataOutputStream) throws IOException {
279280
try {
280281
dataOutputStream.write(toBytes(instance));

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ create a new file or open an existing file on `FileSystem` for write.
2626
## Invariants
2727

2828
The `FSDataOutputStreamBuilder` interface does not validate parameters
29-
and modify the state of `FileSystem` until [`build()`](#Builder.build) is
29+
and modify the state of `FileSystem` until `build()` is
3030
invoked.
3131

3232
## Implementation-agnostic parameters.
@@ -110,7 +110,7 @@ of `FileSystem`.
110110
#### Implementation Notes
111111

112112
The concrete `FileSystem` and/or `FSDataOutputStreamBuilder` implementation
113-
MUST verify that implementation-agnostic parameters (i.e., "syncable") or
113+
MUST verify that implementation-agnostic parameters (i.e., "syncable`) or
114114
implementation-specific parameters (i.e., "foofs:cache")
115115
are supported. `FileSystem` will satisfy optional parameters (via `opt(key, ...)`)
116116
on best effort. If the mandatory parameters (via `must(key, ...)`) can not be satisfied
@@ -182,3 +182,58 @@ see `FileSystem#create(path, ...)` and `FileSystem#append()`.
182182
result = FSDataOutputStream
183183

184184
The result is `FSDataOutputStream` to be used to write data to filesystem.
185+
186+
187+
## <a name="s3a"></a> S3A-specific options
188+
189+
Here are the custom options which the S3A Connector supports.
190+
191+
| Name | Type | Meaning |
192+
|-----------------------------|-----------|----------------------------------------|
193+
| `fs.s3a.create.performance` | `boolean` | create a file with maximum performance |
194+
| `fs.s3a.create.header` | `string` | prefix for user supplied headers |
195+
196+
### `fs.s3a.create.performance`
197+
198+
Prioritize file creation performance over safety checks for filesystem consistency.
199+
200+
This:
201+
1. Skips the `LIST` call which makes sure a file is being created over a directory.
202+
Risk: a file is created over a directory.
203+
1. Ignores the overwrite flag.
204+
1. Never issues a `DELETE` call to delete parent directory markers.
205+
206+
It is possible to probe an S3A Filesystem instance for this capability through
207+
the `hasPathCapability(path, "fs.s3a.create.performance")` check.
208+
209+
Creating files with this option over existing directories is likely
210+
to make S3A filesystem clients behave inconsistently.
211+
212+
Operations optimized for directories (e.g. listing calls) are likely
213+
to see the directory tree not the file; operations optimized for
214+
files (`getFileStatus()`, `isFile()`) more likely to see the file.
215+
The exact form of the inconsistencies, and which operations/parameters
216+
trigger this are undefined and may change between even minor releases.
217+
218+
Using this option is the equivalent of pressing and holding down the
219+
"Electronic Stability Control"
220+
button on a rear-wheel drive car for five seconds: the safety checks are off.
221+
Things wil be faster if the driver knew what they were doing.
222+
If they didn't, the fact they had held the button down will
223+
be used as evidence at the inquest as proof that they made a
224+
conscious decision to choose speed over safety and
225+
that the outcome was their own fault.
226+
227+
Accordingly: *Use if and only if you are confident that the conditions are met.*
228+
229+
### `fs.s3a.create.header` User-supplied header support
230+
231+
Options with the prefix `fs.s3a.create.header.` will be added to to the
232+
S3 object metadata as "user defined metadata".
233+
This metadata is visible to all applications. It can also be retrieved through the
234+
FileSystem/FileContext `listXAttrs()` and `getXAttrs()` API calls with the prefix `header.`
235+
236+
When an object is renamed, the metadata is propagated the copy created.
237+
238+
It is possible to probe an S3A Filesystem instance for this capability through
239+
the `hasPathCapability(path, "fs.s3a.create.header")` check.

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -396,6 +396,39 @@
396396
</execution>
397397
</executions>
398398
</plugin>
399+
<plugin>
400+
<groupId>org.apache.maven.plugins</groupId>
401+
<artifactId>maven-enforcer-plugin</artifactId>
402+
<executions>
403+
<execution>
404+
<id>banned-illegal-imports</id>
405+
<phase>process-sources</phase>
406+
<goals>
407+
<goal>enforce</goal>
408+
</goals>
409+
<configuration>
410+
<rules>
411+
<restrictImports>
412+
<includeTestCode>false</includeTestCode>
413+
<reason>Restrict mapreduce imports to committer code</reason>
414+
<exclusions>
415+
<exclusion>org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitter</exclusion>
416+
<exclusion>org.apache.hadoop.fs.s3a.commit.AbstractS3ACommitterFactory</exclusion>
417+
<exclusion>org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory</exclusion>
418+
<exclusion>org.apache.hadoop.fs.s3a.commit.impl.*</exclusion>
419+
<exclusion>org.apache.hadoop.fs.s3a.commit.magic.*</exclusion>
420+
<exclusion>org.apache.hadoop.fs.s3a.commit.staging.*</exclusion>
421+
</exclusions>
422+
<bannedImports>
423+
<bannedImport>org.apache.hadoop.mapreduce.**</bannedImport>
424+
<bannedImport>org.apache.hadoop.mapred.**</bannedImport>
425+
</bannedImports>
426+
</restrictImports>
427+
</rules>
428+
</configuration>
429+
</execution>
430+
</executions>
431+
</plugin>
399432
</plugins>
400433
</build>
401434

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@
4949
import org.apache.hadoop.fs.PathIOException;
5050
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
5151
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
52+
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
5253

5354
/**
5455
* Factory for S3 objects.
@@ -128,23 +129,26 @@ CopyObjectRequest newCopyObjectRequest(String srcKey,
128129
* Adds the ACL and metadata
129130
* @param key key of object
130131
* @param metadata metadata header
132+
* @param options options for the request
131133
* @param srcfile source file
132134
* @return the request
133135
*/
134136
PutObjectRequest newPutObjectRequest(String key,
135-
ObjectMetadata metadata, File srcfile);
137+
ObjectMetadata metadata, PutObjectOptions options, File srcfile);
136138

137139
/**
138140
* Create a {@link PutObjectRequest} request.
139141
* The metadata is assumed to have been configured with the size of the
140142
* operation.
141143
* @param key key of object
142144
* @param metadata metadata header
145+
* @param options options for the request
143146
* @param inputStream source data.
144147
* @return the request
145148
*/
146149
PutObjectRequest newPutObjectRequest(String key,
147150
ObjectMetadata metadata,
151+
PutObjectOptions options,
148152
InputStream inputStream);
149153

150154
/**
@@ -177,10 +181,12 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest(
177181
/**
178182
* Start a multipart upload.
179183
* @param destKey destination object key
184+
* @param options options for the request
180185
* @return the request.
181186
*/
182187
InitiateMultipartUploadRequest newMultipartUploadRequest(
183-
String destKey);
188+
String destKey,
189+
@Nullable PutObjectOptions options);
184190

185191
/**
186192
* Complete a multipart upload.

0 commit comments

Comments
 (0)