Skip to content

Commit a20587d

Browse files
committed
HADOOP-19567. Moving writing classes into impl.write package
1 parent 1038637 commit a20587d

File tree

6 files changed

+70
-49
lines changed

6 files changed

+70
-49
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/S3AStoreImpl.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -278,6 +278,7 @@ protected void serviceInit(final Configuration conf) throws Exception {
278278

279279
// init all child services, including the stream factory
280280
super.serviceInit(conf);
281+
// complete store writer binding
281282
storeWriter.bind(this, clientManager, this);
282283

283284
// pass down extra information to the stream factory.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/StoreWriter.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -105,8 +105,7 @@ UploadInfo putObject(
105105
* @return the upload initiated
106106
* @throws SdkException on problems
107107
*/
108-
@VisibleForTesting
109-
@Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
108+
@Retries.OnceRaw
110109
PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
111110
PutObjectOptions putOptions,
112111
S3ADataBlocks.BlockUploadData uploadData,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/StoreWriterService.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,10 @@
9292

9393
/**
9494
* Store Writing Operations.
95+
* <p>
96+
* Everything related to writing objects should go through here,
97+
* including listing and aborting multipart uploads.
98+
* <p>
9599
* The service is not ready to use until
96100
* {@link #bind(S3AStore, ClientManager, IORateLimiting)}
97101
* is invoked and the service started.
@@ -206,9 +210,10 @@ public UploadInfo putObject(
206210
return new UploadInfo(upload, len);
207211
}
208212

209-
@Retries.OnceRaw("For PUT; post-PUT actions are RetryExceptionsSwallowed")
210213
@Override
211-
public PutObjectResponse putObjectDirect(PutObjectRequest putObjectRequest,
214+
@Retries.OnceRaw
215+
public PutObjectResponse putObjectDirect(
216+
PutObjectRequest putObjectRequest,
212217
PutObjectOptions putOptions,
213218
S3ADataBlocks.BlockUploadData uploadData,
214219
DurationTrackerFactory trackerFactory)

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/WriteOperationHelper.java

Lines changed: 53 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -136,7 +136,7 @@ public class WriteOperationHelper implements WriteOperations {
136136
* @param auditSpan span to activate
137137
* @param callbacks callbacks used by writeOperationHelper
138138
*/
139-
protected WriteOperationHelper(
139+
public WriteOperationHelper(
140140
final AuditSpanSource auditSpanSource,
141141
final AuditSpan auditSpan,
142142
final WriteOperationHelperCallbacks callbacks) {
@@ -157,7 +157,10 @@ protected WriteOperationHelper(
157157
* @param retries number of retries
158158
* @param idempotent is the method idempotent
159159
*/
160-
void operationRetried(String text, Exception ex, int retries,
160+
void operationRetried(
161+
String text,
162+
Exception ex,
163+
int retries,
161164
boolean idempotent) {
162165
LOG.info("{}: Retried {}: {}", text, retries, ex.toString());
163166
LOG.debug("Stack", ex);
@@ -176,7 +179,8 @@ void operationRetried(String text, Exception ex, int retries,
176179
* @return the result of the call
177180
* @throws IOException any IOE raised, or translated exception
178181
*/
179-
public <T> T retry(String action,
182+
public <T> T retry(
183+
String action,
180184
String path,
181185
boolean idempotent,
182186
CallableRaisingIOE<T> operation)
@@ -285,6 +289,7 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload(
285289
long length,
286290
PutObjectOptions putOptions,
287291
Retried retrying) throws IOException {
292+
288293
if (partETags.isEmpty()) {
289294
throw new PathIOException(destKey,
290295
"No upload parts in multipart upload");
@@ -303,43 +308,6 @@ private CompleteMultipartUploadResponse finalizeMultipartUpload(
303308
}
304309
}
305310

306-
/**
307-
* This completes a multipart upload to the destination key via
308-
* {@code finalizeMultipartUpload()}.
309-
* Retry policy: retrying, translated.
310-
* Retries increment the {@code errorCount} counter.
311-
* @param destKey destination
312-
* @param uploadId multipart operation Id
313-
* @param partETags list of partial uploads
314-
* @param length length of the upload
315-
* @param errorCount a counter incremented by 1 on every error; for
316-
* use in statistics
317-
* @param putOptions put object options
318-
* @return the result of the operation.
319-
* @throws IOException if problems arose which could not be retried, or
320-
* the retry count was exceeded
321-
*/
322-
@Retries.RetryTranslated
323-
public CompleteMultipartUploadResponse completeMPUwithRetries(
324-
String destKey,
325-
String uploadId,
326-
List<CompletedPart> partETags,
327-
long length,
328-
AtomicInteger errorCount,
329-
PutObjectOptions putOptions)
330-
throws IOException {
331-
requireNonNull(uploadId);
332-
requireNonNull(partETags);
333-
LOG.debug("Completing multipart upload {} with {} parts",
334-
uploadId, partETags.size());
335-
return finalizeMultipartUpload(destKey,
336-
uploadId,
337-
partETags,
338-
length,
339-
putOptions,
340-
(text, e, r, i) -> errorCount.incrementAndGet());
341-
}
342-
343311
/**
344312
* Abort a multipart upload operation.
345313
* @param destKey destination key of the upload
@@ -350,8 +318,11 @@ public CompleteMultipartUploadResponse completeMPUwithRetries(
350318
* @throws FileNotFoundException if the abort ID is unknown
351319
*/
352320
@Retries.RetryTranslated
353-
public void abortMultipartUpload(String destKey, String uploadId,
354-
boolean shouldRetry, Retried retrying)
321+
public void abortMultipartUpload(
322+
String destKey,
323+
String uploadId,
324+
boolean shouldRetry,
325+
Retried retrying)
355326
throws IOException {
356327
if (shouldRetry) {
357328
// retrying option
@@ -537,8 +508,8 @@ public CompleteMultipartUploadResponse commitUpload(
537508
throws IOException {
538509
requireNonNull(uploadId);
539510
requireNonNull(partETags);
540-
LOG.debug("Completing multipart upload {} with {} parts",
541-
uploadId, partETags.size());
511+
LOG.debug("Committing multipart upload {} to {} with {} parts",
512+
uploadId, destKey, partETags.size());
542513
return finalizeMultipartUpload(destKey,
543514
uploadId,
544515
partETags,
@@ -547,6 +518,44 @@ public CompleteMultipartUploadResponse commitUpload(
547518
Invoker.NO_OP);
548519
}
549520

521+
/**
522+
* This completes a multipart upload to the destination key via
523+
* {@code finalizeMultipartUpload()}.
524+
* Retry policy: retrying, translated.
525+
* Retries increment the {@code errorCount} counter.
526+
* @param destKey destination
527+
* @param uploadId multipart operation Id
528+
* @param partETags list of partial uploads
529+
* @param length length of the upload
530+
* @param errorCount a counter incremented by 1 on every error; for
531+
* use in statistics
532+
* @param putOptions put object options
533+
* @return the result of the operation.
534+
* @throws IOException if problems arose which could not be retried, or
535+
* the retry count was exceeded
536+
*/
537+
@Retries.RetryTranslated
538+
public CompleteMultipartUploadResponse completeMPUwithRetries(
539+
String destKey,
540+
String uploadId,
541+
List<CompletedPart> partETags,
542+
long length,
543+
AtomicInteger errorCount,
544+
PutObjectOptions putOptions)
545+
throws IOException {
546+
requireNonNull(uploadId);
547+
requireNonNull(partETags);
548+
LOG.debug("Completing multipart upload {} to {} with {} parts",
549+
uploadId, destKey, partETags.size());
550+
return finalizeMultipartUpload(destKey,
551+
uploadId,
552+
partETags,
553+
length,
554+
putOptions,
555+
(text, e, r, i) -> errorCount.incrementAndGet());
556+
}
557+
558+
550559
/**
551560
* Upload part of a multi-partition file.
552561
* @param durationTrackerFactory duration tracker factory for operation

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/write/package-info.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@
1818

1919
/**
2020
* Classes related to writing objects.
21+
* <p>
22+
* {@link org.apache.hadoop.fs.s3a.impl.write.StoreWriter} and
23+
* its implementation {@link org.apache.hadoop.fs.s3a.impl.write.StoreWriterService}
24+
* export the object model of S3 itself.
25+
* <p>
26+
* The {@code Write*} classes bridge from the filesystem APIs to the store, being
27+
* invoked by filesystem, output stream and committer services.
2128
*/
2229
@InterfaceAudience.Private
2330
package org.apache.hadoop.fs.s3a.impl.write;

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/impl/ITestStoreClose.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
/**
3737
* Test for store closure.
3838
*/
39-
public class ITestStoreClose extends AbstractS3ATestBase {Ò
39+
public class ITestStoreClose extends AbstractS3ATestBase {
4040

4141
/**
4242
* Open a file in forced multipart, then close the fs.

0 commit comments

Comments
 (0)