Skip to content

Commit 69eedd9

Browse files
committed
HADOOP-17833. Rebase and add provisional awareness of storage class
rebase onto trunk with HADOOP-12020/storage class; use PutObjectOptions as the place to pass the option from createFile() to the requests. The feature itself is not implemented, just prepared for. Change-Id: I5d2e57e672f49021c3cb8bfa3b1a3daf09c61a38
1 parent 2c5f5a1 commit 69eedd9

File tree

10 files changed

+74
-42
lines changed

10 files changed

+74
-42
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3ABlockOutputStream.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -559,12 +559,12 @@ private int putObject() throws IOException {
559559
writeOperationHelper.createPutObjectRequest(
560560
key,
561561
uploadData.getFile(),
562-
builder.putOptions.getHeaders())
562+
builder.putOptions)
563563
: writeOperationHelper.createPutObjectRequest(
564564
key,
565565
uploadData.getUploadStream(),
566566
size,
567-
builder.putOptions.getHeaders());
567+
builder.putOptions);
568568
BlockUploadProgress callback =
569569
new BlockUploadProgress(
570570
block, progressListener, now());

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1691,11 +1691,11 @@ private FSDataOutputStream innerCreateFile(
16911691
committerIntegration.createTracker(path, key, outputStreamStatistics);
16921692
String destKey = putTracker.getDestKey();
16931693

1694-
// put options are from the path, unless performance is requested,
1695-
// where it is always keeping dirs.
1694+
// put options are derived from the path and the
1695+
// option builder.
16961696
boolean keep = performance || keepDirectoryMarkers(path);
16971697
final PutObjectOptions putOptions =
1698-
new PutObjectOptions(keep, options.getHeaders());
1698+
new PutObjectOptions(keep, null, options.getHeaders());
16991699

17001700
final S3ABlockOutputStream.BlockOutputStreamBuilder builder =
17011701
S3ABlockOutputStream.builder()
@@ -2739,7 +2739,7 @@ private DeleteObjectsResult deleteObjects(DeleteObjectsRequest deleteRequest)
27392739
*/
27402740
public PutObjectRequest newPutObjectRequest(String key,
27412741
ObjectMetadata metadata, File srcfile) {
2742-
return requestFactory.newPutObjectRequest(key, metadata, srcfile);
2742+
return requestFactory.newPutObjectRequest(key, metadata, null, srcfile);
27432743
}
27442744

27452745
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperationHelper.java

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.io.IOException;
2525
import java.io.InputStream;
2626
import java.util.List;
27-
import java.util.Map;
2827
import java.util.concurrent.atomic.AtomicInteger;
2928

3029
import com.amazonaws.services.s3.model.AmazonS3Exception;
@@ -234,49 +233,47 @@ private void deactivateAuditSpan() {
234233
* @param destKey destination key
235234
* @param inputStream source data.
236235
* @param length size, if known. Use -1 for not known
237-
* @param headers optional map of custom headers.
236+
* @param options options for the request
238237
* @return the request
239238
*/
240239
@Retries.OnceRaw
241240
public PutObjectRequest createPutObjectRequest(String destKey,
242241
InputStream inputStream,
243242
long length,
244-
@Nullable final Map<String, String> headers) {
243+
final PutObjectOptions options) {
245244
activateAuditSpan();
246245
ObjectMetadata objectMetadata = newObjectMetadata(length);
247-
if (headers != null) {
248-
objectMetadata.setUserMetadata(headers);
249-
}
250246
return getRequestFactory().newPutObjectRequest(
251247
destKey,
252248
objectMetadata,
249+
options,
253250
inputStream);
254251
}
255252

256253
/**
257254
* Create a {@link PutObjectRequest} request to upload a file.
258255
* @param dest key to PUT to.
259256
* @param sourceFile source file
260-
* @param headers optional map of custom headers.
257+
* @param options options for the request
261258
* @return the request
262259
*/
263260
@Retries.OnceRaw
264261
public PutObjectRequest createPutObjectRequest(
265262
String dest,
266263
File sourceFile,
267-
@Nullable final Map<String, String> headers) {
264+
final PutObjectOptions options) {
268265
Preconditions.checkState(sourceFile.length() < Integer.MAX_VALUE,
269266
"File length is too big for a single PUT upload");
270267
activateAuditSpan();
271268
final ObjectMetadata objectMetadata =
272269
newObjectMetadata((int) sourceFile.length());
273-
if (headers != null) {
274-
objectMetadata.setUserMetadata(headers);
275-
}
276-
return getRequestFactory().
270+
271+
PutObjectRequest putObjectRequest = getRequestFactory().
277272
newPutObjectRequest(dest,
278273
objectMetadata,
274+
options,
279275
sourceFile);
276+
return putObjectRequest;
280277
}
281278

282279
/**
@@ -319,7 +316,7 @@ public String initiateMultiPartUpload(
319316
() -> {
320317
final InitiateMultipartUploadRequest initiateMPURequest =
321318
getRequestFactory().newMultipartUploadRequest(
322-
destKey, options.getHeaders());
319+
destKey, options);
323320
return owner.initiateMultipartUpload(initiateMPURequest)
324321
.getUploadId();
325322
});

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/WriteOperations.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.io.IOException;
2626
import java.io.InputStream;
2727
import java.util.List;
28-
import java.util.Map;
2928
import java.util.concurrent.atomic.AtomicInteger;
3029

3130
import com.amazonaws.services.s3.model.CompleteMultipartUploadResult;
@@ -79,25 +78,25 @@ <T> T retry(String action,
7978
* @param destKey destination key
8079
* @param inputStream source data.
8180
* @param length size, if known. Use -1 for not known
82-
* @param headers optional map of custom headers.
81+
* @param options
8382
* @return the request
8483
*/
8584
PutObjectRequest createPutObjectRequest(String destKey,
8685
InputStream inputStream,
8786
long length,
88-
@Nullable Map<String, String> headers);
87+
@Nullable PutObjectOptions options);
8988

9089
/**
9190
* Create a {@link PutObjectRequest} request to upload a file.
9291
* @param dest key to PUT to.
9392
* @param sourceFile source file
94-
* @param headers optional map of custom headers.
93+
* @param options
9594
* @return the request
9695
*/
9796
PutObjectRequest createPutObjectRequest(
9897
String dest,
9998
File sourceFile,
100-
@Nullable Map<String, String> headers);
99+
@Nullable PutObjectOptions options);
101100

102101
/**
103102
* Callback on a successful write.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/api/RequestFactory.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import java.io.File;
2323
import java.io.InputStream;
2424
import java.util.List;
25-
import java.util.Map;
2625
import java.util.Optional;
2726

2827
import com.amazonaws.services.s3.model.AbortMultipartUploadRequest;
@@ -51,6 +50,7 @@
5150
import org.apache.hadoop.fs.PathIOException;
5251
import org.apache.hadoop.fs.s3a.S3AEncryptionMethods;
5352
import org.apache.hadoop.fs.s3a.auth.delegation.EncryptionSecrets;
53+
import org.apache.hadoop.fs.s3a.impl.PutObjectOptions;
5454

5555
/**
5656
* Factory for S3 objects.
@@ -142,23 +142,26 @@ CopyObjectRequest newCopyObjectRequest(String srcKey,
142142
* Adds the ACL and metadata
143143
* @param key key of object
144144
* @param metadata metadata header
145+
* @param options
145146
* @param srcfile source file
146147
* @return the request
147148
*/
148149
PutObjectRequest newPutObjectRequest(String key,
149-
ObjectMetadata metadata, File srcfile);
150+
ObjectMetadata metadata, final PutObjectOptions options, File srcfile);
150151

151152
/**
152153
* Create a {@link PutObjectRequest} request.
153154
* The metadata is assumed to have been configured with the size of the
154155
* operation.
155156
* @param key key of object
156157
* @param metadata metadata header
158+
* @param options options for the request
157159
* @param inputStream source data.
158160
* @return the request
159161
*/
160162
PutObjectRequest newPutObjectRequest(String key,
161163
ObjectMetadata metadata,
164+
PutObjectOptions options,
162165
InputStream inputStream);
163166

164167
/**
@@ -196,7 +199,7 @@ AbortMultipartUploadRequest newAbortMultipartUploadRequest(
196199
*/
197200
InitiateMultipartUploadRequest newMultipartUploadRequest(
198201
String destKey,
199-
@Nullable Map<String, String> headers);
202+
@Nullable PutObjectOptions options);
200203

201204
/**
202205
* Complete a multipart upload.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/commit/magic/MagicCommitTracker.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -146,7 +146,7 @@ public boolean aboutToComplete(String uploadId,
146146
originalDestKey,
147147
new ByteArrayInputStream(EMPTY),
148148
0,
149-
headers);
149+
new PutObjectOptions(true, null, headers));
150150
upload(originalDestPut);
151151

152152
// build the commit summary

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/PutObjectOptions.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,11 @@ public final class PutObjectOptions {
3131
*/
3232
private final boolean keepMarkers;
3333

34+
/**
35+
* Storage class, if not null.
36+
*/
37+
private final String storageClass;
38+
3439
/**
3540
* Headers; may be null.
3641
*/
@@ -39,12 +44,15 @@ public final class PutObjectOptions {
3944
/**
4045
* Constructor.
4146
* @param keepMarkers Can the PUT operation skip marker deletion?
47+
* @param storageClass Storage class, if not null.
4248
* @param headers Headers; may be null.
4349
*/
4450
public PutObjectOptions(
4551
final boolean keepMarkers,
52+
@Nullable final String storageClass,
4653
@Nullable final Map<String, String> headers) {
4754
this.keepMarkers = keepMarkers;
55+
this.storageClass = storageClass;
4856
this.headers = headers;
4957
}
5058

@@ -68,11 +76,14 @@ public Map<String, String> getHeaders() {
6876
public String toString() {
6977
return "PutObjectOptions{" +
7078
"keepMarkers=" + keepMarkers +
79+
", storageClass='" + storageClass + '\'' +
7180
'}';
7281
}
7382

74-
private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true, null);
75-
private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false, null);
83+
private static final PutObjectOptions KEEP_DIRS = new PutObjectOptions(true,
84+
null, null);
85+
private static final PutObjectOptions DELETE_DIRS = new PutObjectOptions(false,
86+
null, null);
7687

7788
/**
7889
* Get the options to keep directories.
@@ -89,4 +100,5 @@ public static PutObjectOptions keepingDirs() {
89100
public static PutObjectOptions deletingDirs() {
90101
return DELETE_DIRS;
91102
}
103+
92104
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/RequestFactoryImpl.java

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525
import java.util.List;
2626
import java.util.Map;
2727
import java.util.Optional;
28-
2928
import javax.annotation.Nullable;
3029

3130
import com.amazonaws.AmazonWebServiceRequest;
@@ -365,15 +364,19 @@ protected void copyEncryptionParameters(
365364
* Adds the ACL, storage class and metadata
366365
* @param key key of object
367366
* @param metadata metadata header
367+
* @param options options for the request, including headers
368368
* @param srcfile source file
369369
* @return the request
370370
*/
371371
@Override
372372
public PutObjectRequest newPutObjectRequest(String key,
373-
ObjectMetadata metadata, File srcfile) {
373+
ObjectMetadata metadata,
374+
final PutObjectOptions options,
375+
File srcfile) {
374376
Preconditions.checkNotNull(srcfile);
375377
PutObjectRequest putObjectRequest = new PutObjectRequest(getBucket(), key,
376378
srcfile);
379+
maybeSetMetadata(options, metadata);
377380
setOptionalPutRequestParameters(putObjectRequest);
378381
putObjectRequest.setCannedAcl(cannedACL);
379382
if (storageClass != null) {
@@ -389,15 +392,18 @@ public PutObjectRequest newPutObjectRequest(String key,
389392
* operation.
390393
* @param key key of object
391394
* @param metadata metadata header
395+
* @param options options for the request
392396
* @param inputStream source data.
393397
* @return the request
394398
*/
395399
@Override
396400
public PutObjectRequest newPutObjectRequest(String key,
397401
ObjectMetadata metadata,
402+
@Nullable final PutObjectOptions options,
398403
InputStream inputStream) {
399404
Preconditions.checkNotNull(inputStream);
400405
Preconditions.checkArgument(isNotEmpty(key), "Null/empty key");
406+
maybeSetMetadata(options, metadata);
401407
PutObjectRequest putObjectRequest = new PutObjectRequest(getBucket(), key,
402408
inputStream, metadata);
403409
setOptionalPutRequestParameters(putObjectRequest);
@@ -421,7 +427,7 @@ public int read() throws IOException {
421427
final ObjectMetadata md = createObjectMetadata(0L, true);
422428
md.setContentType(HeaderProcessing.CONTENT_TYPE_X_DIRECTORY);
423429
PutObjectRequest putObjectRequest =
424-
newPutObjectRequest(key, md, im);
430+
newPutObjectRequest(key, md, null, im);
425431
return putObjectRequest;
426432
}
427433

@@ -447,12 +453,10 @@ public AbortMultipartUploadRequest newAbortMultipartUploadRequest(
447453

448454
@Override
449455
public InitiateMultipartUploadRequest newMultipartUploadRequest(
450-
String destKey,
451-
@Nullable final Map<String, String> headers) {
456+
final String destKey,
457+
@Nullable final PutObjectOptions options) {
452458
final ObjectMetadata objectMetadata = newObjectMetadata(-1);
453-
if (headers != null) {
454-
objectMetadata.setUserMetadata(headers);
455-
}
459+
maybeSetMetadata(options, objectMetadata);
456460
final InitiateMultipartUploadRequest initiateMPURequest =
457461
new InitiateMultipartUploadRequest(getBucket(),
458462
destKey,
@@ -609,6 +613,23 @@ public void setEncryptionSecrets(final EncryptionSecrets secrets) {
609613
encryptionSecrets = secrets;
610614
}
611615

616+
/**
617+
* Set the metadata from the options if the options are not
618+
* null and the metadata contains headers.
619+
* @param options options for the request
620+
* @param objectMetadata metadata to patch
621+
*/
622+
private void maybeSetMetadata(
623+
@Nullable PutObjectOptions options,
624+
final ObjectMetadata objectMetadata) {
625+
if (options != null) {
626+
Map<String, String> headers = options.getHeaders();
627+
if (headers != null) {
628+
objectMetadata.setUserMetadata(headers);
629+
}
630+
}
631+
}
632+
612633
/**
613634
* Create a builder.
614635
* @return new builder.

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/TestRequestFactory.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ public void testRequestFactoryWithCannedACL() throws Throwable {
9090
ObjectMetadata md = factory.newObjectMetadata(128);
9191
Assertions.assertThat(
9292
factory.newPutObjectRequest(path, md,
93-
new ByteArrayInputStream(new byte[0]))
93+
null, new ByteArrayInputStream(new byte[0]))
9494
.getCannedAcl())
9595
.describedAs("ACL of PUT")
9696
.isEqualTo(acl);
@@ -176,9 +176,9 @@ private void createFactoryObjects(RequestFactory factory) {
176176
a(factory.newMultipartUploadRequest(path, null));
177177
File srcfile = new File("/tmp/a");
178178
a(factory.newPutObjectRequest(path,
179-
factory.newObjectMetadata(-1), srcfile));
179+
factory.newObjectMetadata(-1), null, srcfile));
180180
ByteArrayInputStream stream = new ByteArrayInputStream(new byte[0]);
181-
a(factory.newPutObjectRequest(path, md, stream));
181+
a(factory.newPutObjectRequest(path, md, null, stream));
182182
a(factory.newSelectRequest(path));
183183
}
184184

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/scale/ITestS3ADirectoryPerformance.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,7 @@ public void testMultiPagesListingPerformanceAndCorrectness()
258258
ObjectMetadata om = fs.newObjectMetadata(0L);
259259
PutObjectRequest put = requestFactory
260260
.newPutObjectRequest(fs.pathToKey(file), om,
261-
new FailingInputStream());
261+
null, new FailingInputStream());
262262
futures.add(submit(executorService, () ->
263263
writeOperationHelper.putObject(put, PutObjectOptions.keepingDirs())));
264264
}

0 commit comments

Comments
 (0)