Skip to content

Commit 0e208c8

Browse files
committed
HADOOP-19256. S3A: Support Conditional Overwrites
Amazon S3 now supports conditional overwrites, which can be be used when creating files through the createFile() API with two new builder options: fs.option.create.conditional.overwrite: Write if and only if there is no object at the target path. This is an atomic PUT-no-overwrite, checked in close(), not create(). fs.option.create.conditional.overwrite.etag Write a file if and only if it is overwriting a file with a specific etag. If the "fs.s3a.performance.flags" enumeration includes the flag "create" then file creation will use conditional creation to detect and reject overwrites. The configuration option "fs.s3a.create.conditional.enabled" can be set to false to disable these features on third-party stores. Contributed by Diljot Grewal, Saikat Roy and Steve Loughran
1 parent a38294d commit 0e208c8

31 files changed

+1625
-110
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/Options.java

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -710,4 +710,114 @@ private OpenFileOptions() {
710710
public static final String FS_OPTION_OPENFILE_EC_POLICY =
711711
FS_OPTION_OPENFILE + "ec.policy";
712712
}
713+
714+
/**
715+
* The standard {@code createFile()} options.
716+
* <p>
717+
* If an option is not supported during file creation and it is considered
718+
* part of a commit protocol, then, when supplied in a must() option,
719+
* it MUST be rejected.
720+
*/
721+
@InterfaceAudience.Public
722+
@InterfaceStability.Evolving
723+
public interface CreateFileOptionKeys {
724+
725+
/**
726+
* {@code createFile()} option to write a file in the close() operation iff
727+
* there is nothing at the destination.
728+
* this is the equivalent of {@code create(path, overwrite=true)}
729+
* <i>except that the existence check is postponed to the end of the write</i>.
730+
* <p>
731+
* Value {@value}.
732+
* </p>
733+
* <p>
734+
* This can be set in the builder.
735+
* </p>
736+
* <ol>
737+
* <li>It is for object stores stores which only upload/manifest files
738+
* at the end of the stream write.</li>
739+
* <li>Streams which support it SHALL not manifest any object to
740+
* the destination path until close()</li>
741+
* <li>It MUST be declared as a stream capability in streams for which
742+
* this overwrite is enabled.</li>
743+
* <li>It MUST be exported as a path capability for all stores where
744+
* the feature is available <i>and</i> enabled</li>
745+
* <li>If passed to a filesystem as a {@code must()} parameter where
746+
* the option value is {@code true}, and it is supported/enabled,
747+
* the FS SHALL omit all overwrite checks in {@code create},
748+
* including for the existence of an object or a directory underneath.
749+
* Instead, during {@code close()} the object will only be manifest
750+
* at the target path if there is no object at the destination.
751+
* </li>
752+
* <li>The existence check and object creation SHALL be atomic.</li>
753+
* <li>If passed to a filesystem as a {@code must()} parameter where
754+
* the option value is {@code true}, and the FS does not recognise
755+
* the feature, or it is recognized but disabled on this FS instance,
756+
* the filesystem SHALL reject the request.
757+
* </li>
758+
* <li>If passed to a filesystem as a {@code opt()} parameter where
759+
* the option value is {@code true}, the filesystem MAY ignore
760+
* the request, or it MAY enable the feature.
761+
* Any filesystem which does not support the feature, including
762+
* from older releases, SHALL ignore it.
763+
* </li>
764+
* </ol>
765+
*/
766+
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE = "fs.option.create.conditional.overwrite";
767+
768+
/**
769+
* Overwrite a file only if there is an Etag match. This option takes a string,
770+
*
771+
* Value {@value}.
772+
* <p>
773+
* This is similar to {@link #FS_OPTION_CREATE_CONDITIONAL_OVERWRITE}.
774+
* <ol>
775+
* <li>If supported and enabled, it SHALL be declared as a capability of the filesystem</li>
776+
* <li>If supported and enabled, it SHALL be declared as a capability of the stream</li>
777+
* <li>The string passed as the value SHALL be the etag value as returned by
778+
* {@code EtagSource.getEtag()}</li>
779+
* <li>This value MUST NOT be empty</li>
780+
* <li>If passed to a filesystem which supports it, then when the file is created,
781+
* the store SHALL check for the existence of a file/object at the destination
782+
* path.
783+
* </li>
784+
* <li>If there is no object there, the operation SHALL be rejected by raising
785+
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
786+
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
787+
* </li>
788+
* <li>If there is an object there, its Etag SHALL be compared to the
789+
* value passed here.</li>
790+
* <li>If there is no match, the operation SHALL be rejected by raising
791+
* either a {@code org.apache.hadoop.fs.FileAlreadyExistsException}
792+
* exception, or a{@code java.nio.file.FileAlreadyExistsException}
793+
* </li>
794+
* <li>If the etag does match, the file SHALL be created.</li>
795+
* <li>The check and create SHALL be atomic</li>
796+
* <li>The check and create MAY be at the end of the write, in {@code close()},
797+
* or it MAY be in the {@code create()} operation. That is: some stores
798+
* MAY perform the check early</li>
799+
* <li>If supported and enabled, stores MAY check for the existence of subdirectories;
800+
* this behavior is implementation-specific.</li>
801+
* </ol>
802+
*/
803+
String FS_OPTION_CREATE_CONDITIONAL_OVERWRITE_ETAG =
804+
"fs.option.create.conditional.overwrite.etag";
805+
806+
/**
807+
* A flag which requires the filesystem to create files/objects in close(),
808+
* rather than create/createFile.
809+
* <p>
810+
* Object stores with this behavior should also export it as a path capability.
811+
*
812+
* Value {@value}.
813+
*/
814+
String FS_OPTION_CREATE_IN_CLOSE = "fs.option.create.in.close";
815+
816+
/**
817+
* String to define the content filetype.
818+
* Value {@value}.
819+
*/
820+
String FS_OPTION_CREATE_CONTENT_TYPE = "fs.option.create.content.type";
821+
822+
}
713823
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -467,6 +467,12 @@ public final class StoreStatisticNames {
467467
public static final String MULTIPART_UPLOAD_LIST
468468
= "multipart_upload_list";
469469

470+
public static final String CONDITIONAL_CREATE
471+
= "conditional_create";
472+
473+
public static final String CONDITIONAL_CREATE_FAILED
474+
= "conditional_create_failed";
475+
470476
private StoreStatisticNames() {
471477
}
472478

hadoop-common-project/hadoop-common/src/site/markdown/filesystem/fsdataoutputstreambuilder.md

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,7 @@ Here are the custom options which the S3A Connector supports.
192192
|-----------------------------|-----------|----------------------------------------|
193193
| `fs.s3a.create.performance` | `boolean` | create a file with maximum performance |
194194
| `fs.s3a.create.header` | `string` | prefix for user supplied headers |
195+
| `fs.s3a.create.multipart` | `boolean` | create a multipart file |
195196

196197
### `fs.s3a.create.performance`
197198

@@ -200,7 +201,8 @@ Prioritize file creation performance over safety checks for filesystem consisten
200201
This:
201202
1. Skips the `LIST` call which makes sure a file is being created over a directory.
202203
Risk: a file is created over a directory.
203-
2. Ignores the overwrite flag.
204+
2. If the overwrite flag is false and filesystem flag`fs.s3a.create.conditional.enabled` is true,
205+
uses conditional creation to prevent the overwrite of any object at the destination.
204206
3. Never issues a `DELETE` call to delete parent directory markers.
205207

206208
It is possible to probe an S3A Filesystem instance for this capability through
@@ -243,3 +245,17 @@ When an object is renamed, the metadata is propagated the copy created.
243245

244246
It is possible to probe an S3A Filesystem instance for this capability through
245247
the `hasPathCapability(path, "fs.s3a.create.header")` check.
248+
249+
### `fs.s3a.create.multipart` Create a multipart file
250+
251+
Initiate a multipart upload when a file is created, rather
252+
than only when the amount of data buffered reaches the threshold
253+
set in `fs.s3a.multipart.size`.
254+
255+
This is only relevant during testing, as it allows for multipart
256+
operation to be initiated without writing any data, so
257+
reducing test time.
258+
259+
It is not recommended for production use, because as well as adding
260+
more network IO, it is not compatible with third-party stores which
261+
do not supprt multipart uploads.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1522,6 +1522,29 @@ private Constants() {
15221522
*/
15231523
public static final String FS_S3A_PERFORMANCE_FLAGS =
15241524
"fs.s3a.performance.flags";
1525+
1526+
/**
1527+
* Is the create overwrite feature enabled or not?
1528+
* A configuration option and a path status probe.
1529+
* Value {@value}.
1530+
*/
1531+
public static final String FS_S3A_CONDITIONAL_CREATE_ENABLED =
1532+
"fs.s3a.create.conditional.enabled";
1533+
1534+
/**
1535+
* Default value for {@link #FS_S3A_CONDITIONAL_CREATE_ENABLED}.
1536+
* Value {@value}.
1537+
*/
1538+
public static final boolean DEFAULT_FS_S3A_CONDITIONAL_CREATE_ENABLED = true;
1539+
1540+
/**
1541+
* createFile() boolean option toreate a multipart file, always: {@value}.
1542+
* <p>
1543+
* This is inefficient and will not work on a store which doesn't support that feature,
1544+
* so is primarily for testing.
1545+
*/
1546+
public static final String FS_S3A_CREATE_MULTIPART = "fs.s3a.create.multipart";
1547+
15251548
/**
15261549
* Prefix for adding a header to the object when created.
15271550
* The actual value must have a "." suffix and then the actual header.
@@ -1845,4 +1868,11 @@ private Constants() {
18451868
public static final String ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX =
18461869
"fs.s3a.analytics.accelerator";
18471870

1871+
/**
1872+
* Value for the {@code If-None-Match} HTTP header in S3 requests.
1873+
* Value: {@value}.
1874+
* More information: <a href="https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html">
1875+
* AWS S3 PutObject API Documentation</a>
1876+
*/
1877+
public static final String IF_NONE_MATCH_STAR = "*";
18481878
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.time.Instant;
2727
import java.util.ArrayList;
2828
import java.util.Collections;
29+
import java.util.EnumSet;
2930
import java.util.List;
3031
import java.util.Locale;
3132
import java.util.Map;
@@ -52,6 +53,7 @@
5253
import org.apache.hadoop.fs.s3a.impl.ProgressListener;
5354
import org.apache.hadoop.fs.s3a.impl.ProgressListenerEvent;
5455
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
56+
import org.apache.hadoop.fs.s3a.impl.write.WriteObjectFlags;
5557
import org.apache.hadoop.fs.statistics.IOStatisticsAggregator;
5658
import org.apache.hadoop.util.Preconditions;
5759
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ListenableFuture;
@@ -224,6 +226,11 @@ class S3ABlockOutputStream extends OutputStream implements
224226
/** Is multipart upload enabled? */
225227
private final boolean isMultipartUploadEnabled;
226228

229+
/**
230+
* Object write option flags.
231+
*/
232+
private final EnumSet<WriteObjectFlags> writeObjectFlags;
233+
227234
/**
228235
* An S3A output stream which uploads partitions in a separate pool of
229236
* threads; different {@link S3ADataBlocks.BlockFactory}
@@ -249,6 +256,7 @@ class S3ABlockOutputStream extends OutputStream implements
249256
this.iostatistics = statistics.getIOStatistics();
250257
this.writeOperationHelper = builder.writeOperations;
251258
this.putTracker = builder.putTracker;
259+
this.writeObjectFlags = builder.putOptions.getWriteObjectFlags();
252260
this.executorService = MoreExecutors.listeningDecorator(
253261
builder.executorService);
254262
this.multiPartUpload = null;
@@ -266,9 +274,19 @@ class S3ABlockOutputStream extends OutputStream implements
266274
? builder.blockSize
267275
: -1;
268276

277+
// if required to be multipart by the committer put tracker or
278+
// write flags (i.e createFile() options, initiate multipart uploads.
279+
// this will fail fast if the store doesn't support multipart uploads
269280
if (putTracker.initialize()) {
270281
LOG.debug("Put tracker requests multipart upload");
271282
initMultipartUpload();
283+
} else if (writeObjectFlags.contains(WriteObjectFlags.CreateMultipart)) {
284+
// this not merged simply to avoid confusion
285+
// to what to do it both are set, so as to guarantee
286+
// the put tracker initialization always takes priority
287+
// over any file flag.
288+
LOG.debug("Multipart initiated from createFile() options");
289+
initMultipartUpload();
272290
}
273291
this.isCSEEnabled = builder.isCSEEnabled;
274292
this.threadIOStatisticsAggregator = builder.ioStatisticsAggregator;
@@ -772,7 +790,8 @@ BlockOutputStreamStatistics getStatistics() {
772790
@SuppressWarnings("deprecation")
773791
@Override
774792
public boolean hasCapability(String capability) {
775-
switch (capability.toLowerCase(Locale.ENGLISH)) {
793+
final String cap = capability.toLowerCase(Locale.ENGLISH);
794+
switch (cap) {
776795

777796
// does the output stream have delayed visibility
778797
case CommitConstants.STREAM_CAPABILITY_MAGIC_OUTPUT:
@@ -797,6 +816,12 @@ public boolean hasCapability(String capability) {
797816
return true;
798817

799818
default:
819+
// scan flags for the capability
820+
for (WriteObjectFlags flag : writeObjectFlags) {
821+
if (flag.hasKey(cap)) {
822+
return true;
823+
}
824+
}
800825
return false;
801826
}
802827
}

0 commit comments

Comments
 (0)