2525import java .io .UncheckedIOException ;
2626import java .net .URI ;
2727import java .nio .file .AccessDeniedException ;
28- import java .text .DateFormat ;
29- import java .text .SimpleDateFormat ;
3028import java .time .Duration ;
31- import java .time .Instant ;
3229import java .util .ArrayList ;
3330import java .util .Collections ;
34- import java .util .Date ;
3531import java .util .EnumSet ;
3632import java .util .Iterator ;
3733import java .util .List ;
5349
5450import software .amazon .awssdk .core .exception .SdkException ;
5551import software .amazon .awssdk .services .s3 .S3Client ;
56- import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadRequest ;
57- import software .amazon .awssdk .services .s3 .model .CompleteMultipartUploadResponse ;
5852import software .amazon .awssdk .services .s3 .model .GetBucketLocationRequest ;
5953import software .amazon .awssdk .services .s3 .model .HeadBucketRequest ;
6054import software .amazon .awssdk .services .s3 .model .HeadBucketResponse ;
6155import software .amazon .awssdk .services .s3 .model .MultipartUpload ;
62- import software .amazon .awssdk .services .s3 .model .CreateMultipartUploadRequest ;
63- import software .amazon .awssdk .services .s3 .model .CreateMultipartUploadResponse ;
64- import software .amazon .awssdk .services .s3 .model .ListMultipartUploadsRequest ;
6556import software .amazon .awssdk .services .s3 .model .ListObjectsRequest ;
6657import software .amazon .awssdk .services .s3 .model .ListObjectsV2Request ;
6758import software .amazon .awssdk .awscore .exception .AwsServiceException ;
146137import org .apache .hadoop .fs .s3a .impl .StoreContext ;
147138import org .apache .hadoop .fs .s3a .impl .StoreContextBuilder ;
148139import org .apache .hadoop .fs .s3a .impl .StoreContextFactory ;
149- import org .apache .hadoop .fs .s3a .impl .UploadContentProviders ;
150140import org .apache .hadoop .fs .s3a .impl .CSEUtils ;
151141import org .apache .hadoop .fs .s3a .impl .streams .InputStreamType ;
152142import org .apache .hadoop .fs .s3a .impl .streams .ObjectReadParameters ;
253243import static org .apache .hadoop .fs .s3a .impl .CreateFileBuilder .OPTIONS_CREATE_FILE_OVERWRITE ;
254244import static org .apache .hadoop .fs .s3a .impl .CreateFileBuilder .OPTIONS_CREATE_FILE_PERFORMANCE ;
255245import static org .apache .hadoop .fs .s3a .impl .ErrorTranslation .isUnknownBucket ;
256- import static org .apache .hadoop .fs .s3a .impl .HeaderProcessing .CONTENT_TYPE_OCTET_STREAM ;
257246import static org .apache .hadoop .fs .s3a .impl .InternalConstants .AP_REQUIRED_EXCEPTION ;
258247import static org .apache .hadoop .fs .s3a .impl .InternalConstants .ARN_BUCKET_OPTION ;
259248import static org .apache .hadoop .fs .s3a .impl .InternalConstants .DEFAULT_UPLOAD_PART_COUNT_LIMIT ;
@@ -1364,9 +1353,13 @@ public FlagSet<PerformanceFlagEnum> getPerformanceFlags() {
13641353
13651354 /**
13661355 * Get the store for low-level operations.
1356+ * This is absolutely not for external access; it's a single method
1357+ * just to ease use throughout internal code.
13671358 * @return the store the S3A FS is working through.
13681359 */
1369- private S3AStore getStore () {
1360+
1361+ @ InterfaceAudience .Private
1362+ public S3AStore getStore () {
13701363 return store ;
13711364 }
13721365
@@ -1431,7 +1424,10 @@ private void initMultipartUploads(Configuration conf) throws IOException {
14311424 Duration .ofSeconds (DEFAULT_PURGE_EXISTING_MULTIPART_AGE ),
14321425 TimeUnit .SECONDS ,
14331426 Duration .ZERO );
1434- abortOutstandingMultipartUploads (purgeDuration .getSeconds ());
1427+ getStore ().getMultipartIO ().abortOutstandingMultipartUploads (
1428+ purgeDuration .getSeconds (),
1429+ "" ,
1430+ maxKeys );
14351431 } catch (AccessDeniedException e ) {
14361432 instrumentation .errorIgnored ();
14371433 LOG .debug ("Failed to purge multipart uploads against {}," +
@@ -1440,34 +1436,6 @@ private void initMultipartUploads(Configuration conf) throws IOException {
14401436 }
14411437 }
14421438
1443- /**
1444- * Abort all outstanding MPUs older than a given age.
1445- * @param seconds time in seconds
1446- * @throws IOException on any failure, other than 403 "permission denied"
1447- */
1448- @ Retries .RetryTranslated
1449- public void abortOutstandingMultipartUploads (long seconds )
1450- throws IOException {
1451- Preconditions .checkArgument (seconds >= 0 );
1452- Instant purgeBefore =
1453- Instant .now ().minusSeconds (seconds );
1454- LOG .debug ("Purging outstanding multipart uploads older than {}" ,
1455- purgeBefore );
1456- invoker .retry ("Purging multipart uploads" , bucket , true ,
1457- () -> {
1458- RemoteIterator <MultipartUpload > uploadIterator =
1459- MultipartUtils .listMultipartUploads (createStoreContext (),
1460- getS3Client (), null , maxKeys );
1461-
1462- while (uploadIterator .hasNext ()) {
1463- MultipartUpload upload = uploadIterator .next ();
1464- if (upload .initiated ().compareTo (purgeBefore ) < 0 ) {
1465- abortMultipartUpload (upload );
1466- }
1467- }
1468- });
1469- }
1470-
14711439 /**
14721440 * Return the protocol scheme for the FileSystem.
14731441 *
@@ -1934,33 +1902,6 @@ private ObjectInputStreamCallbacks createInputStreamCallbacks(
19341902 }
19351903
19361904
1937- /**
1938- * Callbacks for WriteOperationHelper.
1939- */
1940- private final class WriteOperationHelperCallbacksImpl
1941- implements WriteOperationHelper .WriteOperationHelperCallbacks {
1942-
1943- @ Override
1944- @ Retries .OnceRaw
1945- public CompleteMultipartUploadResponse completeMultipartUpload (
1946- CompleteMultipartUploadRequest request ) {
1947- checkRunning ();
1948- return getStore ().completeMultipartUpload (request );
1949- }
1950-
1951- @ Override
1952- @ Retries .OnceRaw
1953- public UploadPartResponse uploadPart (
1954- final UploadPartRequest request ,
1955- final RequestBody body ,
1956- final DurationTrackerFactory durationTrackerFactory )
1957- throws AwsServiceException , UncheckedIOException {
1958- checkRunning ();
1959- return getStore ().uploadPart (request , body , durationTrackerFactory );
1960- }
1961-
1962- }
1963-
19641905 /**
19651906 * Create the read context for reading from the referenced file,
19661907 * using FS state as well as the status.
@@ -2250,12 +2191,11 @@ public WriteOperationHelper getWriteOperationHelper() {
22502191 */
22512192 @ InterfaceAudience .Private
22522193 public WriteOperationHelper createWriteOperationHelper (AuditSpan auditSpan ) {
2253- return new WriteOperationHelper (this ,
2254- getConf (),
2255- statisticsContext ,
2194+ return new WriteOperationHelper (
22562195 getAuditSpanSource (),
22572196 auditSpan ,
2258- new WriteOperationHelperCallbacksImpl ());
2197+ getStore ().createWriteOperationHelperCallbacks ()
2198+ );
22592199 }
22602200
22612201 /**
@@ -2710,7 +2650,7 @@ private long abortMultipartUploadsUnderPrefix(StoreContext storeContext,
27102650 span .activate ();
27112651 return foreach (uploads , upload ->
27122652 invoker .retry ("Aborting multipart commit" , upload .key (), true , () ->
2713- abortMultipartUpload (upload )));
2653+ getStore (). getMultipartIO (). abortMultipartUpload (upload )));
27142654 }
27152655
27162656 /**
@@ -2942,14 +2882,7 @@ protected void incrementGauge(Statistic statistic, long count) {
29422882 * @param ex exception.
29432883 */
29442884 public void operationRetried (Exception ex ) {
2945- if (isThrottleException (ex )) {
2946- LOG .debug ("Request throttled" );
2947- incrementStatistic (STORE_IO_THROTTLED );
2948- statisticsContext .addValueToQuantiles (STORE_IO_THROTTLE_RATE , 1 );
2949- } else {
2950- incrementStatistic (STORE_IO_RETRY );
2951- incrementStatistic (IGNORED_ERRORS );
2952- }
2885+ getStore ().operationRetried (ex );
29532886 }
29542887
29552888 /**
@@ -3294,64 +3227,6 @@ public UploadInfo putObject(PutObjectRequest putObjectRequest, File file,
32943227 return getStore ().putObject (putObjectRequest , file , listener );
32953228 }
32963229
3297- /**
3298- * PUT an object directly (i.e. not via the transfer manager).
3299- * Byte length is calculated from the file length, or, if there is no
3300- * file, from the content length of the header.
3301- *
3302- * Retry Policy: none.
3303- * Auditing: must be inside an audit span.
3304- * <i>Important: this call will close any input stream in the request.</i>
3305- * @param putObjectRequest the request
3306- * @param putOptions put object options
3307- * @param uploadData data to be uploaded
3308- * @param durationTrackerFactory factory for duration tracking
3309- * @return the upload initiated
3310- * @throws SdkException on problems
3311- */
3312- @ VisibleForTesting
3313- @ Retries .OnceRaw ("For PUT; post-PUT actions are RetryExceptionsSwallowed" )
3314- PutObjectResponse putObjectDirect (PutObjectRequest putObjectRequest ,
3315- PutObjectOptions putOptions ,
3316- S3ADataBlocks .BlockUploadData uploadData ,
3317- DurationTrackerFactory durationTrackerFactory )
3318- throws SdkException {
3319- checkRunning ();
3320- long len = getPutRequestLength (putObjectRequest );
3321- LOG .debug ("PUT {} bytes to {}" , len , putObjectRequest .key ());
3322- incrementPutStartStatistics (len );
3323- final UploadContentProviders .BaseContentProvider provider =
3324- uploadData .getContentProvider ();
3325- try {
3326- PutObjectResponse response =
3327- trackDurationOfSupplier (nonNullDurationTrackerFactory (durationTrackerFactory ),
3328- OBJECT_PUT_REQUESTS .getSymbol (),
3329- () -> getS3Client ().putObject (putObjectRequest ,
3330- RequestBody .fromContentProvider (
3331- provider ,
3332- provider .getSize (),
3333- CONTENT_TYPE_OCTET_STREAM )));
3334- incrementPutCompletedStatistics (true , len );
3335- return response ;
3336- } catch (SdkException e ) {
3337- incrementPutCompletedStatistics (false , len );
3338- throw e ;
3339- }
3340- }
3341-
3342- /**
3343- * Get the length of the PUT, verifying that the length is known.
3344- * @param putObjectRequest a request bound to a file or a stream.
3345- * @return the request length
3346- * @throws IllegalArgumentException if the length is negative
3347- */
3348- private long getPutRequestLength (PutObjectRequest putObjectRequest ) {
3349- long len = putObjectRequest .contentLength ();
3350-
3351- Preconditions .checkState (len >= 0 , "Cannot PUT object of unknown length" );
3352- return len ;
3353- }
3354-
33553230 /**
33563231 * Upload part of a multi-partition file.
33573232 * Increments the write and put counters.
@@ -4653,23 +4528,6 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
46534528 return response ;
46544529 }
46554530
4656- /**
4657- * Initiate a multipart upload from the preconfigured request.
4658- * Retry policy: none + untranslated.
4659- * @param request request to initiate
4660- * @return the result of the call
4661- * @throws SdkException on failures inside the AWS SDK
4662- * @throws IOException Other IO problems
4663- */
4664- @ Retries .OnceRaw
4665- CreateMultipartUploadResponse initiateMultipartUpload (
4666- CreateMultipartUploadRequest request ) throws IOException {
4667- LOG .debug ("Initiate multipart upload to {}" , request .key ());
4668- return trackDurationOfSupplier (getDurationTrackerFactory (),
4669- OBJECT_MULTIPART_UPLOAD_INITIATED .getSymbol (),
4670- () -> getS3Client ().createMultipartUpload (request ));
4671- }
4672-
46734531 /**
46744532 * Perform post-write actions.
46754533 * <p>
@@ -4719,7 +4577,7 @@ private void createEmptyObject(final String objectName, PutObjectOptions putOpti
47194577 new byte [0 ], 0 , 0 , null );
47204578
47214579 invoker .retry ("PUT 0-byte object " , objectName , true ,
4722- () -> putObjectDirect (
4580+ () -> getStore (). putObjectDirect (
47234581 getRequestFactory ().newDirectoryMarkerRequest (objectName ).build (),
47244582 putOptions ,
47254583 uploadData ,
@@ -5299,13 +5157,7 @@ public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
52995157 final StoreContext storeContext ,
53005158 final @ Nullable String prefix )
53015159 throws IOException {
5302- // span is picked up retained in the listing.
5303- String p = prefix ;
5304- if (prefix != null && !prefix .isEmpty () && !prefix .endsWith ("/" )) {
5305- p = prefix + "/" ;
5306- }
5307- // duration tracking is done in iterator.
5308- return MultipartUtils .listMultipartUploads (storeContext , getS3Client (), p , maxKeys );
5160+ return getStore ().getMultipartIO ().listMultipartUploads (storeContext , prefix , maxKeys );
53095161 }
53105162
53115163 /**
@@ -5321,54 +5173,9 @@ public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
53215173 @ Retries .RetryTranslated
53225174 public List <MultipartUpload > listMultipartUploads (String prefix )
53235175 throws IOException {
5324- // add a trailing / if needed.
5325- if (prefix != null && !prefix .isEmpty () && !prefix .endsWith ("/" )) {
5326- prefix = prefix + "/" ;
5327- }
5328- String p = prefix ;
5329- return invoker .retry ("listMultipartUploads" , p , true , () -> {
5330- final ListMultipartUploadsRequest request = getRequestFactory ()
5331- .newListMultipartUploadsRequestBuilder (p ).build ();
5332- return trackDuration (getInstrumentation (), MULTIPART_UPLOAD_LIST .getSymbol (), () ->
5333- getS3Client ().listMultipartUploads (request ).uploads ());
5334- });
5335- }
5336-
5337- /**
5338- * Abort a multipart upload.
5339- * Retry policy: none.
5340- * @param destKey destination key
5341- * @param uploadId Upload ID
5342- * @throws IOException IO failure, including any uprated SdkException
5343- */
5344- @ Retries .OnceTranslated
5345- public void abortMultipartUpload (String destKey , String uploadId ) throws IOException {
5346- LOG .debug ("Aborting multipart upload {} to {}" , uploadId , destKey );
5347- trackDuration (getInstrumentation (), OBJECT_MULTIPART_UPLOAD_ABORTED .getSymbol (), () ->
5348- getS3Client ().abortMultipartUpload (
5349- getRequestFactory ().newAbortMultipartUploadRequestBuilder (
5350- destKey ,
5351- uploadId ).build ()));
5176+ return getStore ().getMultipartIO ().listMultipartUploads (prefix );
53525177 }
53535178
5354- /**
5355- * Abort a multipart upload.
5356- * Retry policy: none.
5357- * @param upload the listed upload to abort.
5358- * @throws IOException IO failure, including any uprated SdkException
5359- */
5360- @ Retries .OnceTranslated
5361- public void abortMultipartUpload (MultipartUpload upload ) throws IOException {
5362- String destKey = upload .key ();
5363- String uploadId = upload .uploadId ();
5364- if (LOG .isDebugEnabled ()) {
5365- DateFormat df = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" );
5366- LOG .debug ("Aborting multipart upload {} to {} initiated by {} on {}" ,
5367- uploadId , destKey , upload .initiator (),
5368- df .format (Date .from (upload .initiated ())));
5369- }
5370- abortMultipartUpload (destKey , uploadId );
5371- }
53725179
53735180 /**
53745181 * Create a new instance of the committer statistics.
0 commit comments