Skip to content

Commit c58e11b

Browse files
author
Gabor Bota
authored
HADOOP-16383. Pass ITtlTimeProvider instance in initialize method in MetadataStore interface. Contributed by Gabor Bota. (#1009)
1 parent 256fcc1 commit c58e11b

19 files changed

+159
-151
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -396,7 +396,7 @@ public void initialize(URI name, Configuration originalConf)
396396
DEFAULT_METADATASTORE_METADATA_TTL, TimeUnit.MILLISECONDS);
397397
ttlTimeProvider = new S3Guard.TtlTimeProvider(authDirTtl);
398398

399-
setMetadataStore(S3Guard.getMetadataStore(this));
399+
setMetadataStore(S3Guard.getMetadataStore(this, ttlTimeProvider));
400400
allowAuthoritativeMetadataStore = conf.getBoolean(METADATASTORE_AUTHORITATIVE,
401401
DEFAULT_METADATASTORE_AUTHORITATIVE);
402402
allowAuthoritativePaths = S3Guard.getAuthoritativePaths(this);
@@ -1767,7 +1767,7 @@ void deleteObjectAtPath(Path f, String key, boolean isFile)
17671767
instrumentation.directoryDeleted();
17681768
}
17691769
deleteObject(key);
1770-
metadataStore.delete(f, ttlTimeProvider);
1770+
metadataStore.delete(f);
17711771
}
17721772

17731773
/**
@@ -2293,7 +2293,7 @@ private boolean innerDelete(S3AFileStatus status, boolean recursive)
22932293
}
22942294
try(DurationInfo ignored =
22952295
new DurationInfo(LOG, false, "Delete metastore")) {
2296-
metadataStore.deleteSubtree(f, ttlTimeProvider);
2296+
metadataStore.deleteSubtree(f);
22972297
}
22982298
} else {
22992299
LOG.debug("delete: Path is a file: {}", key);
@@ -4066,6 +4066,7 @@ public ITtlTimeProvider getTtlTimeProvider() {
40664066
@VisibleForTesting
40674067
protected void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
40684068
this.ttlTimeProvider = ttlTimeProvider;
4069+
metadataStore.setTtlTimeProvider(ttlTimeProvider);
40694070
}
40704071

40714072
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/MultiObjectDeleteSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ public static List<Path> convertToPaths(
178178
// metastore entries
179179
deleted.forEach(path -> {
180180
try {
181-
metadataStore.delete(path, getStoreContext().getTimeProvider());
181+
metadataStore.delete(path);
182182
} catch (IOException e) {
183183
// trouble: we failed to delete the far end entry
184184
// try with the next one.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DelayedUpdateRenameTracker.java

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -135,9 +135,7 @@ public synchronized void sourceObjectsDeleted(
135135

136136
@Override
137137
public void completeRename() throws IOException {
138-
metadataStore.move(sourcePaths, destMetas,
139-
getStoreContext().getTimeProvider(),
140-
getOperationState());
138+
metadataStore.move(sourcePaths, destMetas, getOperationState());
141139
super.completeRename();
142140
}
143141

@@ -147,12 +145,10 @@ public IOException renameFailed(final Exception ex) {
147145
try (DurationInfo ignored = new DurationInfo(LOG,
148146
"Cleaning up deleted paths")) {
149147
// the destination paths are updated; the source is left alone.
150-
metadataStore.move(new ArrayList<>(0), destMetas,
151-
getStoreContext().getTimeProvider(),
152-
getOperationState());
148+
metadataStore.move(new ArrayList<>(0), destMetas, getOperationState());
153149
for (Path deletedPath : deletedPaths) {
154150
// this is not ideal in that it may leave parent stuff around.
155-
metadataStore.delete(deletedPath, getStoreContext().getTimeProvider());
151+
metadataStore.delete(deletedPath);
156152
}
157153
deleteParentPaths();
158154
} catch (IOException | SdkBaseException e) {
@@ -185,7 +181,7 @@ private void deleteParentPaths() throws IOException {
185181
PathMetadata md = metadataStore.get(parent, true);
186182
if (md != null && md.isEmptyDirectory() == Tristate.TRUE) {
187183
// if were confident that this is empty: delete it.
188-
metadataStore.delete(parent, getStoreContext().getTimeProvider());
184+
metadataStore.delete(parent);
189185
}
190186
}
191187
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/s3guard/DynamoDBMetadataStore.java

Lines changed: 36 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -200,8 +200,8 @@
200200
* sub-tree.
201201
*
202202
* Some mutating operations, notably
203-
* {@link MetadataStore#deleteSubtree(Path, ITtlTimeProvider)} and
204-
* {@link MetadataStore#move(Collection, Collection, ITtlTimeProvider, BulkOperationState)}
203+
* {@link MetadataStore#deleteSubtree(Path)} and
204+
* {@link MetadataStore#move(Collection, Collection, BulkOperationState)}
205205
* are less efficient with this schema.
206206
* They require mutating multiple items in the DynamoDB table.
207207
*
@@ -356,7 +356,7 @@ public class DynamoDBMetadataStore implements MetadataStore,
356356
* Time source. This is used during writes when parent
357357
* entries need to be created.
358358
*/
359-
private ITtlTimeProvider timeProvider;
359+
private ITtlTimeProvider ttlTimeProvider;
360360

361361
/**
362362
* A utility function to create DynamoDB instance.
@@ -391,11 +391,13 @@ private DynamoDB createDynamoDB(
391391
* FS via {@link S3AFileSystem#shareCredentials(String)}; this will
392392
* increment the reference counter of these credentials.
393393
* @param fs {@code S3AFileSystem} associated with the MetadataStore
394+
* @param ttlTp the time provider to use for metadata expiry
394395
* @throws IOException on a failure
395396
*/
396397
@Override
397398
@Retries.OnceRaw
398-
public void initialize(FileSystem fs) throws IOException {
399+
public void initialize(FileSystem fs, ITtlTimeProvider ttlTp)
400+
throws IOException {
399401
Preconditions.checkNotNull(fs, "Null filesystem");
400402
Preconditions.checkArgument(fs instanceof S3AFileSystem,
401403
"DynamoDBMetadataStore only supports S3A filesystem.");
@@ -433,7 +435,7 @@ public void initialize(FileSystem fs) throws IOException {
433435
this::retryEvent
434436
);
435437

436-
timeProvider = new S3Guard.TtlTimeProvider(conf);
438+
this.ttlTimeProvider = ttlTp;
437439
initTable();
438440

439441
instrumentation.initialized();
@@ -453,7 +455,7 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
453455
instrumentation = context.getInstrumentation().getS3GuardInstrumentation();
454456
username = context.getUsername();
455457
executor = context.createThrottledExecutor();
456-
timeProvider = Preconditions.checkNotNull(
458+
ttlTimeProvider = Preconditions.checkNotNull(
457459
context.getTimeProvider(),
458460
"ttlTimeProvider must not be null");
459461
}
@@ -468,7 +470,8 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
468470
*
469471
* This is used to operate the metadata store directly beyond the scope of the
470472
* S3AFileSystem integration, e.g. command line tools.
471-
* Generally, callers should use {@link #initialize(FileSystem)}
473+
* Generally, callers should use
474+
* {@link MetadataStore#initialize(FileSystem, ITtlTimeProvider)}
472475
* with an initialized {@code S3AFileSystem} instance.
473476
*
474477
* Without a filesystem to act as a reference point, the configuration itself
@@ -479,13 +482,14 @@ void bindToOwnerFilesystem(final S3AFileSystem fs) {
479482
* using the base fs.s3a.* options, as there is no bucket to infer per-bucket
480483
* settings from.
481484
*
482-
* @see #initialize(FileSystem)
485+
* @see MetadataStore#initialize(FileSystem, ITtlTimeProvider)
483486
* @throws IOException if there is an error
484487
* @throws IllegalArgumentException if the configuration is incomplete
485488
*/
486489
@Override
487490
@Retries.OnceRaw
488-
public void initialize(Configuration config) throws IOException {
491+
public void initialize(Configuration config,
492+
ITtlTimeProvider ttlTp) throws IOException {
489493
conf = config;
490494
// use the bucket as the DynamoDB table name if not specified in config
491495
tableName = conf.getTrimmed(S3GUARD_DDB_TABLE_NAME_KEY);
@@ -512,7 +516,7 @@ public void initialize(Configuration config) throws IOException {
512516
TimeUnit.SECONDS,
513517
"s3a-ddb-" + tableName);
514518
initDataAccessRetries(conf);
515-
timeProvider = new S3Guard.TtlTimeProvider(conf);
519+
this.ttlTimeProvider = ttlTp;
516520
initTable();
517521
}
518522

@@ -540,16 +544,16 @@ private void initDataAccessRetries(Configuration config) {
540544

541545
@Override
542546
@Retries.RetryTranslated
543-
public void delete(Path path, ITtlTimeProvider ttlTimeProvider)
547+
public void delete(Path path)
544548
throws IOException {
545-
innerDelete(path, true, ttlTimeProvider, null);
549+
innerDelete(path, true, null);
546550
}
547551

548552
@Override
549553
@Retries.RetryTranslated
550554
public void forgetMetadata(Path path) throws IOException {
551555
LOG.debug("Forget metadata for {}", path);
552-
innerDelete(path, false, null, null);
556+
innerDelete(path, false, null);
553557
}
554558

555559
/**
@@ -558,15 +562,12 @@ public void forgetMetadata(Path path) throws IOException {
558562
* There is no check as to whether the entry exists in the table first.
559563
* @param path path to delete
560564
* @param tombstone flag to create a tombstone marker
561-
* @param ttlTimeProvider The time provider to set last_updated. Must not
562-
* be null if tombstone is true.
563565
* @param ancestorState ancestor state for logging
564566
* @throws IOException I/O error.
565567
*/
566568
@Retries.RetryTranslated
567569
private void innerDelete(final Path path,
568570
final boolean tombstone,
569-
final ITtlTimeProvider ttlTimeProvider,
570571
final AncestorState ancestorState)
571572
throws IOException {
572573
checkPath(path);
@@ -615,7 +616,7 @@ private void innerDelete(final Path path,
615616

616617
@Override
617618
@Retries.RetryTranslated
618-
public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
619+
public void deleteSubtree(Path path)
619620
throws IOException {
620621
checkPath(path);
621622
LOG.debug("Deleting subtree from table {} in region {}: {}",
@@ -639,7 +640,7 @@ public void deleteSubtree(Path path, ITtlTimeProvider ttlTimeProvider)
639640
desc.hasNext();) {
640641
final Path pathToDelete = desc.next().getPath();
641642
futures.add(submit(executor, () -> {
642-
innerDelete(pathToDelete, true, ttlTimeProvider, state);
643+
innerDelete(pathToDelete, true, state);
643644
return null;
644645
}));
645646
if (futures.size() > S3GUARD_DDB_SUBMITTED_TASK_LIMIT) {
@@ -823,13 +824,11 @@ DirListingMetadata getDirListingMetadataFromDirMetaAndList(Path path,
823824
* Callers are required to synchronize on ancestorState.
824825
* @param pathsToCreate paths to create
825826
* @param ancestorState ongoing ancestor state.
826-
* @param ttlTimeProvider Must not be null
827827
* @return the full ancestry paths
828828
*/
829829
private Collection<DDBPathMetadata> completeAncestry(
830830
final Collection<DDBPathMetadata> pathsToCreate,
831-
final AncestorState ancestorState,
832-
final ITtlTimeProvider ttlTimeProvider) throws PathIOException {
831+
final AncestorState ancestorState) throws PathIOException {
833832
// Key on path to allow fast lookup
834833
Map<Path, DDBPathMetadata> ancestry = new HashMap<>();
835834
LOG.debug("Completing ancestry for {} paths", pathsToCreate.size());
@@ -913,9 +912,7 @@ private Collection<DDBPathMetadata> completeAncestry(
913912
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
914913
@Override
915914
@Retries.RetryTranslated
916-
public void addAncestors(
917-
final Path qualifiedPath,
918-
final ITtlTimeProvider ttlTimeProvider,
915+
public void addAncestors(final Path qualifiedPath,
919916
@Nullable final BulkOperationState operationState) throws IOException {
920917

921918
Collection<DDBPathMetadata> newDirs = new ArrayList<>();
@@ -1000,10 +997,8 @@ public void addAncestors(
1000997
*/
1001998
@Override
1002999
@Retries.RetryTranslated
1003-
public void move(
1004-
@Nullable Collection<Path> pathsToDelete,
1000+
public void move(@Nullable Collection<Path> pathsToDelete,
10051001
@Nullable Collection<PathMetadata> pathsToCreate,
1006-
final ITtlTimeProvider ttlTimeProvider,
10071002
@Nullable final BulkOperationState operationState) throws IOException {
10081003
if (pathsToDelete == null && pathsToCreate == null) {
10091004
return;
@@ -1032,8 +1027,7 @@ public void move(
10321027
newItems.addAll(
10331028
completeAncestry(
10341029
pathMetaToDDBPathMeta(pathsToCreate),
1035-
ancestorState,
1036-
extractTimeProvider(ttlTimeProvider)));
1030+
ancestorState));
10371031
}
10381032
}
10391033
// sort all the new items topmost first.
@@ -1222,7 +1216,7 @@ public void put(
12221216
public void put(
12231217
final Collection<? extends PathMetadata> metas,
12241218
@Nullable final BulkOperationState operationState) throws IOException {
1225-
innerPut(pathMetaToDDBPathMeta(metas), operationState, timeProvider);
1219+
innerPut(pathMetaToDDBPathMeta(metas), operationState, ttlTimeProvider);
12261220
}
12271221

12281222
/**
@@ -1236,15 +1230,15 @@ public void put(
12361230
* create entries in the table without parents.
12371231
* @param metas metadata entries to write.
12381232
* @param operationState (nullable) operational state for a bulk update
1239-
* @param ttlTimeProvider
1233+
* @param ttlTp The time provider for metadata expiry
12401234
* @throws IOException failure.
12411235
*/
12421236
@SuppressWarnings("SynchronizationOnLocalVariableOrMethodParameter")
12431237
@Retries.RetryTranslated
12441238
private void innerPut(
12451239
final Collection<DDBPathMetadata> metas,
12461240
@Nullable final BulkOperationState operationState,
1247-
final ITtlTimeProvider ttlTimeProvider) throws IOException {
1241+
final ITtlTimeProvider ttlTp) throws IOException {
12481242
if (metas.isEmpty()) {
12491243
// Happens when someone calls put() with an empty list.
12501244
LOG.debug("Ignoring empty list of entries to put");
@@ -1258,7 +1252,7 @@ private void innerPut(
12581252
Item[] items;
12591253
synchronized (ancestorState) {
12601254
items = pathMetadataToItem(
1261-
completeAncestry(metas, ancestorState, ttlTimeProvider));
1255+
completeAncestry(metas, ancestorState));
12621256
}
12631257
LOG.debug("Saving batch of {} items to table {}, region {}", items.length,
12641258
tableName, region);
@@ -1644,7 +1638,7 @@ private void removeAuthoritativeDirFlag(
16441638
try {
16451639
LOG.debug("innerPut on metas: {}", metas);
16461640
if (!metas.isEmpty()) {
1647-
innerPut(metas, state, timeProvider);
1641+
innerPut(metas, state, ttlTimeProvider);
16481642
}
16491643
} catch (IOException e) {
16501644
String msg = String.format("IOException while setting false "
@@ -2320,15 +2314,20 @@ public AncestorState initiateBulkWrite(
23202314
return new AncestorState(this, operation, dest);
23212315
}
23222316

2317+
@Override
2318+
public void setTtlTimeProvider(ITtlTimeProvider ttlTimeProvider) {
2319+
this.ttlTimeProvider = ttlTimeProvider;
2320+
}
2321+
23232322
/**
23242323
* Extract a time provider from the argument or fall back to the
23252324
* one in the constructor.
2326-
* @param ttlTimeProvider nullable time source passed in as an argument.
2325+
* @param ttlTp nullable time source passed in as an argument.
23272326
* @return a non-null time source.
23282327
*/
23292328
private ITtlTimeProvider extractTimeProvider(
2330-
@Nullable ITtlTimeProvider ttlTimeProvider) {
2331-
return ttlTimeProvider != null ? ttlTimeProvider : timeProvider;
2329+
@Nullable ITtlTimeProvider ttlTp) {
2330+
return ttlTp != null ? ttlTp : this.ttlTimeProvider;
23322331
}
23332332

23342333
/**

0 commit comments

Comments
 (0)