Skip to content

Commit 511df1e

Browse files
committed
HADOOP-16430. S3AFilesystem.delete to incrementally update s3guard with deletions
Contributed by Steve Loughran. This overlaps the scanning for directory entries with batched calls to S3 DELETE and updates of the S3Guard tables. It also uses S3Guard to list the files to delete, so find newly created files even when S3 listings are not use consistent. For path which the client considers S3Guard to be authoritative, we also do a recursive LIST of the store and delete files; this is to find unindexed files and do guarantee that the delete(path, true) call really does delete everything underneath. Change-Id: Ice2f6e940c506e0b3a78fa534a99721b1698708e
1 parent 2b16d53 commit 511df1e

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

43 files changed

+1740
-578
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/FutureIOSupport.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -144,13 +144,13 @@ private static IOException unwrapInnerException(final Throwable e) {
144144
Throwable cause = e.getCause();
145145
if (cause instanceof IOException) {
146146
return (IOException) cause;
147-
} else if (cause instanceof WrappedIOException){
147+
} else if (cause instanceof WrappedIOException) {
148148
return ((WrappedIOException) cause).getCause();
149-
} else if (cause instanceof CompletionException){
149+
} else if (cause instanceof CompletionException) {
150150
return unwrapInnerException(cause);
151-
} else if (cause instanceof ExecutionException){
151+
} else if (cause instanceof ExecutionException) {
152152
return unwrapInnerException(cause);
153-
} else if (cause instanceof RuntimeException){
153+
} else if (cause instanceof RuntimeException) {
154154
throw (RuntimeException) cause;
155155
} else if (cause != null) {
156156
// other type: wrap with a new IOE

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/FileContextURIBase.java

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ public void testCreateFileWithNullName() throws IOException {
130130

131131
@Test
132132
public void testCreateExistingFile() throws IOException {
133-
String fileName = "testFile";
133+
String fileName = "testCreateExistingFile";
134134
Path testPath = qualifiedPath(fileName, fc2);
135135

136136
// Ensure file does not exist
@@ -153,7 +153,7 @@ public void testCreateExistingFile() throws IOException {
153153

154154
@Test
155155
public void testCreateFileInNonExistingDirectory() throws IOException {
156-
String fileName = "testDir/testFile";
156+
String fileName = "testCreateFileInNonExistingDirectory/testFile";
157157

158158
Path testPath = qualifiedPath(fileName, fc2);
159159

@@ -165,7 +165,8 @@ public void testCreateFileInNonExistingDirectory() throws IOException {
165165

166166
// Ensure using fc2 that file is created
167167
Assert.assertTrue(isDir(fc2, testPath.getParent()));
168-
Assert.assertEquals("testDir", testPath.getParent().getName());
168+
Assert.assertEquals("testCreateFileInNonExistingDirectory",
169+
testPath.getParent().getName());
169170
Assert.assertTrue(exists(fc2, testPath));
170171

171172
}
@@ -293,7 +294,7 @@ public void testIsDirectory() throws IOException {
293294

294295
@Test
295296
public void testDeleteFile() throws IOException {
296-
Path testPath = qualifiedPath("testFile", fc2);
297+
Path testPath = qualifiedPath("testDeleteFile", fc2);
297298

298299
// Ensure file does not exist
299300
Assert.assertFalse(exists(fc2, testPath));
@@ -314,7 +315,7 @@ public void testDeleteFile() throws IOException {
314315

315316
@Test
316317
public void testDeleteNonExistingFile() throws IOException {
317-
String testFileName = "testFile";
318+
String testFileName = "testDeleteNonExistingFile";
318319
Path testPath = qualifiedPath(testFileName, fc2);
319320

320321
// TestCase1 : Test delete on file never existed
@@ -341,7 +342,7 @@ public void testDeleteNonExistingFile() throws IOException {
341342

342343
@Test
343344
public void testDeleteNonExistingFileInDir() throws IOException {
344-
String testFileInDir = "testDir/testDir/TestFile";
345+
String testFileInDir = "testDeleteNonExistingFileInDir/testDir/TestFile";
345346
Path testPath = qualifiedPath(testFileInDir, fc2);
346347

347348
// TestCase1 : Test delete on file never existed
@@ -418,7 +419,7 @@ public void testDeleteDirectory() throws IOException {
418419

419420
@Test
420421
public void testDeleteNonExistingDirectory() throws IOException {
421-
String testDirName = "testFile";
422+
String testDirName = "testDeleteNonExistingDirectory";
422423
Path testPath = qualifiedPath(testDirName, fc2);
423424

424425
// TestCase1 : Test delete on directory never existed
@@ -445,7 +446,7 @@ public void testDeleteNonExistingDirectory() throws IOException {
445446

446447
@Test
447448
public void testModificationTime() throws IOException {
448-
String testFile = "file1";
449+
String testFile = "testModificationTime";
449450
long fc2ModificationTime, fc1ModificationTime;
450451

451452
Path testPath = qualifiedPath(testFile, fc2);
@@ -461,7 +462,7 @@ public void testModificationTime() throws IOException {
461462

462463
@Test
463464
public void testFileStatus() throws IOException {
464-
String fileName = "file1";
465+
String fileName = "testModificationTime";
465466
Path path2 = fc2.makeQualified(new Path(BASE, fileName));
466467

467468
// Create a file on fc2's file system using fc1

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractGetFileStatusTest.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,11 @@ public void testListFilesEmptyDirectoryRecursive() throws IOException {
112112
private void listFilesOnEmptyDir(boolean recursive) throws IOException {
113113
describe("Invoke listFiles(recursive=" + recursive + ")" +
114114
" on empty directories, expect nothing found");
115-
Path subfolder = createDirWithEmptySubFolder();
116115
FileSystem fs = getFileSystem();
117-
new TreeScanResults(fs.listFiles(getContract().getTestPath(), recursive))
116+
Path path = getContract().getTestPath();
117+
fs.delete(path, true);
118+
Path subfolder = createDirWithEmptySubFolder();
119+
new TreeScanResults(fs.listFiles(path, recursive))
118120
.assertSizeEquals("listFiles(test dir, " + recursive + ")", 0, 0, 0);
119121
describe("Test on empty subdirectory");
120122
new TreeScanResults(fs.listFiles(subfolder, recursive))
@@ -126,9 +128,11 @@ private void listFilesOnEmptyDir(boolean recursive) throws IOException {
126128
public void testListLocatedStatusEmptyDirectory() throws IOException {
127129
describe("Invoke listLocatedStatus() on empty directories;" +
128130
" expect directories to be found");
129-
Path subfolder = createDirWithEmptySubFolder();
130131
FileSystem fs = getFileSystem();
131-
new TreeScanResults(fs.listLocatedStatus(getContract().getTestPath()))
132+
Path path = getContract().getTestPath();
133+
fs.delete(path, true);
134+
Path subfolder = createDirWithEmptySubFolder();
135+
new TreeScanResults(fs.listLocatedStatus(path))
132136
.assertSizeEquals("listLocatedStatus(test dir)", 0, 1, 0);
133137
describe("Test on empty subdirectory");
134138
new TreeScanResults(fs.listLocatedStatus(subfolder))

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/ContractTestUtils.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1627,6 +1627,22 @@ public String toString() {
16271627
getFileCount() == 1 ? "" : "s");
16281628
}
16291629

1630+
/**
1631+
* Dump the files and directories to a multi-line string for error
1632+
* messages and assertions.
1633+
* @return a dump of the internal state
1634+
*/
1635+
private String dump() {
1636+
StringBuilder sb = new StringBuilder(toString());
1637+
sb.append("\nFiles:");
1638+
directories.forEach(p ->
1639+
sb.append("\n \"").append(p.toString()));
1640+
sb.append("\nDirectories:");
1641+
files.forEach(p ->
1642+
sb.append("\n \"").append(p.toString()));
1643+
return sb.toString();
1644+
}
1645+
16301646
/**
16311647
* Equality check compares files and directory counts.
16321648
* As these are non-final fields, this class cannot be used in
@@ -1667,7 +1683,7 @@ public int hashCode() {
16671683
* @param o expected other entries.
16681684
*/
16691685
public void assertSizeEquals(String text, long f, long d, long o) {
1670-
String self = toString();
1686+
String self = dump();
16711687
Assert.assertEquals(text + ": file count in " + self,
16721688
f, getFileCount());
16731689
Assert.assertEquals(text + ": directory count in " + self,

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InconsistentAmazonS3Client.java

Lines changed: 22 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -199,6 +199,7 @@ public DeleteObjectsResult deleteObjects(DeleteObjectsRequest
199199
deleteObjectsRequest)
200200
throws AmazonClientException, AmazonServiceException {
201201
maybeFail();
202+
LOG.info("registering bulk delete of objects");
202203
for (DeleteObjectsRequest.KeyVersion keyVersion :
203204
deleteObjectsRequest.getKeys()) {
204205
registerDeleteObject(keyVersion.getKey(),
@@ -278,6 +279,7 @@ private void addSummaryIfNotPresent(List<S3ObjectSummary> list,
278279
// Behavior of S3ObjectSummary
279280
String key = item.getKey();
280281
if (list.stream().noneMatch((member) -> member.getKey().equals(key))) {
282+
LOG.debug("Reinstate summary {}", key);
281283
list.add(item);
282284
}
283285
}
@@ -302,6 +304,7 @@ private void addPrefixIfNotPresent(List<String> prefixes, String ancestor,
302304
if (nextParent.equals(ancestorPath)) {
303305
String prefix = prefixCandidate.toString();
304306
if (!prefixes.contains(prefix)) {
307+
LOG.debug("Reinstate prefix {}", prefix);
305308
prefixes.add(prefix);
306309
}
307310
return;
@@ -401,6 +404,7 @@ private void restoreDeleted(List<S3ObjectSummary> summaries,
401404
}
402405
} else {
403406
// Clean up any expired entries
407+
LOG.debug("Remove expired key {}", key);
404408
delayedDeletes.remove(key);
405409
}
406410
}
@@ -467,16 +471,24 @@ private boolean isKeyDelayed(Long enqueueTime, String key) {
467471

468472
private void registerDeleteObject(String key, String bucket) {
469473
if (policy.shouldDelay(key)) {
470-
// Record summary so we can add it back for some time post-deletion
471-
ListObjectsRequest request = new ListObjectsRequest()
472-
.withBucketName(bucket)
473-
.withPrefix(key);
474-
S3ObjectSummary summary = innerlistObjects(request).getObjectSummaries()
475-
.stream()
476-
.filter(result -> result.getKey().equals(key))
477-
.findFirst()
478-
.orElse(null);
479-
delayedDeletes.put(key, new Delete(System.currentTimeMillis(), summary));
474+
Delete delete = delayedDeletes.get(key);
475+
if (delete != null && isKeyDelayed(delete.time(), key)) {
476+
// there is already an entry in the delayed delete list,
477+
// so ignore the operation
478+
LOG.debug("Ignoring delete of already deleted object");
479+
} else {
480+
// Record summary so we can add it back for some time post-deletion
481+
ListObjectsRequest request = new ListObjectsRequest()
482+
.withBucketName(bucket)
483+
.withPrefix(key);
484+
S3ObjectSummary summary = innerlistObjects(request).getObjectSummaries()
485+
.stream()
486+
.filter(result -> result.getKey().equals(key))
487+
.findFirst()
488+
.orElse(null);
489+
delayedDeletes.put(key, new Delete(System.currentTimeMillis(),
490+
summary));
491+
}
480492
}
481493
}
482494

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/InternalConstants.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Listing.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
package org.apache.hadoop.fs.s3a;
2020

21+
import javax.annotation.Nullable;
22+
2123
import com.amazonaws.AmazonClientException;
2224
import com.amazonaws.services.s3.model.S3ObjectSummary;
2325
import com.google.common.annotations.VisibleForTesting;
@@ -58,6 +60,9 @@ public class Listing {
5860
private final S3AFileSystem owner;
5961
private static final Logger LOG = S3AFileSystem.LOG;
6062

63+
static final FileStatusAcceptor ACCEPT_ALL_BUT_S3N =
64+
new AcceptAllButS3nDirs();
65+
6166
public Listing(S3AFileSystem owner) {
6267
this.owner = owner;
6368
}
@@ -339,7 +344,8 @@ class FileStatusListingIterator
339344
FileStatusListingIterator(ObjectListingIterator source,
340345
PathFilter filter,
341346
FileStatusAcceptor acceptor,
342-
RemoteIterator<S3AFileStatus> providedStatus) throws IOException {
347+
@Nullable RemoteIterator<S3AFileStatus> providedStatus)
348+
throws IOException {
343349
this.source = source;
344350
this.filter = filter;
345351
this.acceptor = acceptor;

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileStatus.java

Lines changed: 48 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
@InterfaceAudience.Private
3232
@InterfaceStability.Evolving
3333
public class S3AFileStatus extends FileStatus {
34+
35+
private static final long serialVersionUID = -5955674081978903922L;
36+
3437
private Tristate isEmptyDirectory;
3538
private String eTag;
3639
private String versionId;
@@ -56,12 +59,16 @@ public S3AFileStatus(boolean isemptydir,
5659
public S3AFileStatus(Tristate isemptydir,
5760
Path path,
5861
String owner) {
59-
super(0, true, 1, 0, 0, 0,
60-
null, null, null, null,
61-
path, false, true, false);
62-
isEmptyDirectory = isemptydir;
63-
setOwner(owner);
64-
setGroup(owner);
62+
this(path,
63+
true,
64+
isemptydir,
65+
0,
66+
0,
67+
0,
68+
owner,
69+
null,
70+
null
71+
);
6572
}
6673

6774
/**
@@ -76,10 +83,43 @@ public S3AFileStatus(Tristate isemptydir,
7683
*/
7784
public S3AFileStatus(long length, long modification_time, Path path,
7885
long blockSize, String owner, String eTag, String versionId) {
79-
super(length, false, 1, blockSize, modification_time,
86+
this(path,
87+
false,
88+
Tristate.FALSE,
89+
length,
90+
modification_time,
91+
blockSize,
92+
owner,
93+
eTag,
94+
versionId
95+
);
96+
}
97+
98+
/**
99+
* Either a file or directory.
100+
* @param path path
101+
* @param isDir is this a directory?
102+
* @param isemptydir is this an empty directory?
103+
* @param length file length
104+
* @param modificationTime mod time
105+
* @param blockSize block size
106+
* @param owner owner
107+
* @param eTag eTag of the S3 object if available, else null
108+
* @param versionId versionId of the S3 object if available, else null
109+
*/
110+
S3AFileStatus(Path path,
111+
boolean isDir,
112+
Tristate isemptydir,
113+
long length,
114+
long modificationTime,
115+
long blockSize,
116+
String owner,
117+
String eTag,
118+
String versionId) {
119+
super(length, isDir, 1, blockSize, modificationTime,
80120
0, null, owner, owner, null,
81121
path, false, true, false);
82-
isEmptyDirectory = Tristate.FALSE;
122+
this.isEmptyDirectory = isemptydir;
83123
this.eTag = eTag;
84124
this.versionId = versionId;
85125
}

0 commit comments

Comments
 (0)