Skip to content

Commit 0e3aafe

Browse files
authored
HADOOP-18399. S3A Prefetch - SingleFilePerBlockCache to use LocalDirAllocator (#5054)
Contributed by Viraj Jasani
1 parent 405ed1d commit 0e3aafe

File tree

13 files changed

+356
-70
lines changed

13 files changed

+356
-70
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/BlockCache.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@
2323
import java.io.IOException;
2424
import java.nio.ByteBuffer;
2525

26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.LocalDirAllocator;
28+
2629
/**
2730
* Provides functionality necessary for caching blocks of data read from FileSystem.
2831
*/
@@ -64,7 +67,10 @@ public interface BlockCache extends Closeable {
6467
*
6568
* @param blockNumber the id of the given block.
6669
* @param buffer contents of the given block to be added to this cache.
70+
* @param conf the configuration.
71+
* @param localDirAllocator the local dir allocator instance.
6772
* @throws IOException if there is an error writing the given block.
6873
*/
69-
void put(int blockNumber, ByteBuffer buffer) throws IOException;
74+
void put(int blockNumber, ByteBuffer buffer, Configuration conf,
75+
LocalDirAllocator localDirAllocator) throws IOException;
7076
}

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/CachingBlockManager.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36+
import org.apache.hadoop.conf.Configuration;
37+
import org.apache.hadoop.fs.LocalDirAllocator;
3638
import org.apache.hadoop.fs.statistics.DurationTracker;
3739

3840
import static java.util.Objects.requireNonNull;
@@ -95,21 +97,28 @@ public abstract class CachingBlockManager extends BlockManager {
9597

9698
private final PrefetchingStatistics prefetchingStatistics;
9799

100+
private final Configuration conf;
101+
102+
private final LocalDirAllocator localDirAllocator;
103+
98104
/**
99105
* Constructs an instance of a {@code CachingBlockManager}.
100106
*
101107
* @param futurePool asynchronous tasks are performed in this pool.
102108
* @param blockData information about each block of the underlying file.
103109
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
104110
* @param prefetchingStatistics statistics for this stream.
105-
*
111+
* @param conf the configuration.
112+
* @param localDirAllocator the local dir allocator instance.
106113
* @throws IllegalArgumentException if bufferPoolSize is zero or negative.
107114
*/
108115
public CachingBlockManager(
109116
ExecutorServiceFuturePool futurePool,
110117
BlockData blockData,
111118
int bufferPoolSize,
112-
PrefetchingStatistics prefetchingStatistics) {
119+
PrefetchingStatistics prefetchingStatistics,
120+
Configuration conf,
121+
LocalDirAllocator localDirAllocator) {
113122
super(blockData);
114123

115124
Validate.checkPositiveInteger(bufferPoolSize, "bufferPoolSize");
@@ -129,6 +138,8 @@ public CachingBlockManager(
129138

130139
this.ops = new BlockOperations();
131140
this.ops.setDebug(false);
141+
this.conf = requireNonNull(conf);
142+
this.localDirAllocator = localDirAllocator;
132143
}
133144

134145
/**
@@ -468,7 +479,8 @@ public void requestCaching(BufferData data) {
468479
blockFuture = cf;
469480
}
470481

471-
CachePutTask task = new CachePutTask(data, blockFuture, this, Instant.now());
482+
CachePutTask task =
483+
new CachePutTask(data, blockFuture, this, Instant.now());
472484
Future<Void> actionFuture = futurePool.executeFunction(task);
473485
data.setCaching(actionFuture);
474486
ops.end(op);
@@ -554,7 +566,7 @@ protected void cachePut(int blockNumber, ByteBuffer buffer) throws IOException {
554566
return;
555567
}
556568

557-
cache.put(blockNumber, buffer);
569+
cache.put(blockNumber, buffer, conf, localDirAllocator);
558570
}
559571

560572
private static class CachePutTask implements Supplier<Void> {

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/impl/prefetch/SingleFilePerBlockCache.java

Lines changed: 66 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,9 @@
2727
import java.nio.file.Files;
2828
import java.nio.file.OpenOption;
2929
import java.nio.file.Path;
30+
import java.nio.file.Paths;
3031
import java.nio.file.StandardOpenOption;
31-
import java.nio.file.attribute.FileAttribute;
3232
import java.nio.file.attribute.PosixFilePermission;
33-
import java.nio.file.attribute.PosixFilePermissions;
3433
import java.util.ArrayList;
3534
import java.util.Collections;
3635
import java.util.EnumSet;
@@ -39,9 +38,13 @@
3938
import java.util.Set;
4039
import java.util.concurrent.ConcurrentHashMap;
4140

41+
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
4242
import org.slf4j.Logger;
4343
import org.slf4j.LoggerFactory;
4444

45+
import org.apache.hadoop.conf.Configuration;
46+
import org.apache.hadoop.fs.LocalDirAllocator;
47+
4548
import static java.util.Objects.requireNonNull;
4649
import static org.apache.hadoop.fs.impl.prefetch.Validate.checkNotNull;
4750

@@ -67,6 +70,12 @@ public class SingleFilePerBlockCache implements BlockCache {
6770

6871
private final PrefetchingStatistics prefetchingStatistics;
6972

73+
/**
74+
* File attributes attached to any intermediate temporary file created during index creation.
75+
*/
76+
private static final Set<PosixFilePermission> TEMP_FILE_ATTRS =
77+
ImmutableSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE);
78+
7079
/**
7180
* Cache entry.
7281
* Each block is stored as a separate file.
@@ -172,11 +181,17 @@ private Entry getEntry(int blockNumber) {
172181
/**
173182
* Puts the given block in this cache.
174183
*
175-
* @throws IllegalArgumentException if buffer is null.
176-
* @throws IllegalArgumentException if buffer.limit() is zero or negative.
184+
* @param blockNumber the block number, used as a key for blocks map.
185+
* @param buffer buffer contents of the given block to be added to this cache.
186+
* @param conf the configuration.
187+
* @param localDirAllocator the local dir allocator instance.
188+
* @throws IOException if either local dir allocator fails to allocate file or if IO error
189+
* occurs while writing the buffer content to the file.
190+
* @throws IllegalArgumentException if buffer is null, or if buffer.limit() is zero or negative.
177191
*/
178192
@Override
179-
public void put(int blockNumber, ByteBuffer buffer) throws IOException {
193+
public void put(int blockNumber, ByteBuffer buffer, Configuration conf,
194+
LocalDirAllocator localDirAllocator) throws IOException {
180195
if (closed) {
181196
return;
182197
}
@@ -191,7 +206,7 @@ public void put(int blockNumber, ByteBuffer buffer) throws IOException {
191206

192207
Validate.checkPositiveInteger(buffer.limit(), "buffer.limit()");
193208

194-
Path blockFilePath = getCacheFilePath();
209+
Path blockFilePath = getCacheFilePath(conf, localDirAllocator);
195210
long size = Files.size(blockFilePath);
196211
if (size != 0) {
197212
String message =
@@ -221,8 +236,19 @@ protected void writeFile(Path path, ByteBuffer buffer) throws IOException {
221236
writeChannel.close();
222237
}
223238

224-
protected Path getCacheFilePath() throws IOException {
225-
return getTempFilePath();
239+
/**
240+
* Return temporary file created based on the file path retrieved from local dir allocator.
241+
*
242+
* @param conf The configuration object.
243+
* @param localDirAllocator Local dir allocator instance.
244+
* @return Path of the temporary file created.
245+
* @throws IOException if IO error occurs while local dir allocator tries to retrieve path
246+
* from local FS or file creation fails or permission set fails.
247+
*/
248+
protected Path getCacheFilePath(final Configuration conf,
249+
final LocalDirAllocator localDirAllocator)
250+
throws IOException {
251+
return getTempFilePath(conf, localDirAllocator);
226252
}
227253

228254
@Override
@@ -323,9 +349,19 @@ private String getStats() {
323349

324350
private static final String CACHE_FILE_PREFIX = "fs-cache-";
325351

326-
public static boolean isCacheSpaceAvailable(long fileSize) {
352+
/**
353+
* Determine if the cache space is available on the local FS.
354+
*
355+
* @param fileSize The size of the file.
356+
* @param conf The configuration.
357+
* @param localDirAllocator Local dir allocator instance.
358+
* @return True if the given file size is less than the available free space on local FS,
359+
* False otherwise.
360+
*/
361+
public static boolean isCacheSpaceAvailable(long fileSize, Configuration conf,
362+
LocalDirAllocator localDirAllocator) {
327363
try {
328-
Path cacheFilePath = getTempFilePath();
364+
Path cacheFilePath = getTempFilePath(conf, localDirAllocator);
329365
long freeSpace = new File(cacheFilePath.toString()).getUsableSpace();
330366
LOG.info("fileSize = {}, freeSpace = {}", fileSize, freeSpace);
331367
Files.deleteIfExists(cacheFilePath);
@@ -339,16 +375,25 @@ public static boolean isCacheSpaceAvailable(long fileSize) {
339375
// The suffix (file extension) of each serialized index file.
340376
private static final String BINARY_FILE_SUFFIX = ".bin";
341377

342-
// File attributes attached to any intermediate temporary file created during index creation.
343-
private static final FileAttribute<Set<PosixFilePermission>> TEMP_FILE_ATTRS =
344-
PosixFilePermissions.asFileAttribute(EnumSet.of(PosixFilePermission.OWNER_READ,
345-
PosixFilePermission.OWNER_WRITE));
346-
347-
private static Path getTempFilePath() throws IOException {
348-
return Files.createTempFile(
349-
CACHE_FILE_PREFIX,
350-
BINARY_FILE_SUFFIX,
351-
TEMP_FILE_ATTRS
352-
);
378+
/**
379+
* Create temporary file based on the file path retrieved from local dir allocator
380+
* instance. The file is created with .bin suffix. The created file has been granted
381+
* posix file permissions available in TEMP_FILE_ATTRS.
382+
*
383+
* @param conf the configuration.
384+
* @param localDirAllocator the local dir allocator instance.
385+
* @return path of the file created.
386+
* @throws IOException if IO error occurs while local dir allocator tries to retrieve path
387+
* from local FS or file creation fails or permission set fails.
388+
*/
389+
private static Path getTempFilePath(final Configuration conf,
390+
final LocalDirAllocator localDirAllocator) throws IOException {
391+
org.apache.hadoop.fs.Path path =
392+
localDirAllocator.getLocalPathForWrite(CACHE_FILE_PREFIX, conf);
393+
File dir = new File(path.getParent().toUri().getPath());
394+
String prefix = path.getName();
395+
File tmpFile = File.createTempFile(prefix, BINARY_FILE_SUFFIX, dir);
396+
Path tmpFilePath = Paths.get(tmpFile.toURI());
397+
return Files.setPosixFilePermissions(tmpFilePath, TEMP_FILE_ATTRS);
353398
}
354399
}

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/impl/prefetch/TestBlockCache.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323

2424
import org.junit.Test;
2525

26+
import org.apache.hadoop.conf.Configuration;
27+
import org.apache.hadoop.fs.LocalDirAllocator;
2628
import org.apache.hadoop.test.AbstractHadoopTestBase;
2729

30+
import static org.apache.hadoop.fs.CommonConfigurationKeys.HADOOP_TMP_DIR;
2831
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
2932
import static org.junit.Assert.assertEquals;
3033
import static org.junit.Assert.assertFalse;
@@ -36,6 +39,8 @@ public class TestBlockCache extends AbstractHadoopTestBase {
3639

3740
private static final int BUFFER_SIZE = 16;
3841

42+
private static final Configuration CONF = new Configuration();
43+
3944
@Test
4045
public void testArgChecks() throws Exception {
4146
// Should not throw.
@@ -46,7 +51,7 @@ public void testArgChecks() throws Exception {
4651

4752
// Verify it throws correctly.
4853
intercept(IllegalArgumentException.class, "'buffer' must not be null",
49-
() -> cache.put(42, null));
54+
() -> cache.put(42, null, null, null));
5055

5156

5257
intercept(NullPointerException.class, null,
@@ -67,7 +72,7 @@ public void testPutAndGet() throws Exception {
6772

6873
assertEquals(0, cache.size());
6974
assertFalse(cache.containsBlock(0));
70-
cache.put(0, buffer1);
75+
cache.put(0, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
7176
assertEquals(1, cache.size());
7277
assertTrue(cache.containsBlock(0));
7378
ByteBuffer buffer2 = ByteBuffer.allocate(BUFFER_SIZE);
@@ -77,7 +82,7 @@ public void testPutAndGet() throws Exception {
7782

7883
assertEquals(1, cache.size());
7984
assertFalse(cache.containsBlock(1));
80-
cache.put(1, buffer1);
85+
cache.put(1, buffer1, CONF, new LocalDirAllocator(HADOOP_TMP_DIR));
8186
assertEquals(2, cache.size());
8287
assertTrue(cache.containsBlock(1));
8388
ByteBuffer buffer3 = ByteBuffer.allocate(BUFFER_SIZE);

hadoop-tools/hadoop-aws/pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -200,6 +200,9 @@
200200
<exclude>**/ITestMarkerToolRootOperations.java</exclude>
201201
<!-- leave this until the end for better statistics -->
202202
<exclude>**/ITestAggregateIOStatistics.java</exclude>
203+
<!-- cache file based assertions cannot be properly achieved with parallel
204+
execution, let this be sequential -->
205+
<exclude>**/ITestS3APrefetchingCacheFiles.java</exclude>
203206
</excludes>
204207
</configuration>
205208
</execution>
@@ -246,6 +249,8 @@
246249
<include>**/ITestS3AContractRootDir.java</include>
247250
<!-- leave this until the end for better statistics -->
248251
<include>**/ITestAggregateIOStatistics.java</include>
252+
<!-- sequential execution for the better cleanup -->
253+
<include>**/ITestS3APrefetchingCacheFiles.java</include>
249254
</includes>
250255
</configuration>
251256
</execution>

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 20 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1368,19 +1368,28 @@ public S3AEncryptionMethods getS3EncryptionAlgorithm() {
13681368
*/
13691369
File createTmpFileForWrite(String pathStr, long size,
13701370
Configuration conf) throws IOException {
1371+
initLocalDirAllocatorIfNotInitialized(conf);
1372+
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
1373+
size, conf);
1374+
File dir = new File(path.getParent().toUri().getPath());
1375+
String prefix = path.getName();
1376+
// create a temp file on this directory
1377+
return File.createTempFile(prefix, null, dir);
1378+
}
1379+
1380+
/**
1381+
* Initialize dir allocator if not already initialized.
1382+
*
1383+
* @param conf The Configuration object.
1384+
*/
1385+
private void initLocalDirAllocatorIfNotInitialized(Configuration conf) {
13711386
if (directoryAllocator == null) {
13721387
synchronized (this) {
13731388
String bufferDir = conf.get(BUFFER_DIR) != null
13741389
? BUFFER_DIR : HADOOP_TMP_DIR;
13751390
directoryAllocator = new LocalDirAllocator(bufferDir);
13761391
}
13771392
}
1378-
Path path = directoryAllocator.getLocalPathForWrite(pathStr,
1379-
size, conf);
1380-
File dir = new File(path.getParent().toUri().getPath());
1381-
String prefix = path.getName();
1382-
// create a temp file on this directory
1383-
return File.createTempFile(prefix, null, dir);
13841393
}
13851394

13861395
/**
@@ -1573,12 +1582,16 @@ private FSDataInputStream executeOpen(
15731582
LOG.debug("Opening '{}'", readContext);
15741583

15751584
if (this.prefetchEnabled) {
1585+
Configuration configuration = getConf();
1586+
initLocalDirAllocatorIfNotInitialized(configuration);
15761587
return new FSDataInputStream(
15771588
new S3APrefetchingInputStream(
15781589
readContext.build(),
15791590
createObjectAttributes(path, fileStatus),
15801591
createInputStreamCallbacks(auditSpan),
1581-
inputStreamStats));
1592+
inputStreamStats,
1593+
configuration,
1594+
directoryAllocator));
15821595
} else {
15831596
return new FSDataInputStream(
15841597
new S3AInputStream(

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3ACachingBlockManager.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,8 @@
2525
import org.slf4j.Logger;
2626
import org.slf4j.LoggerFactory;
2727

28+
import org.apache.hadoop.conf.Configuration;
29+
import org.apache.hadoop.fs.LocalDirAllocator;
2830
import org.apache.hadoop.fs.impl.prefetch.BlockData;
2931
import org.apache.hadoop.fs.impl.prefetch.CachingBlockManager;
3032
import org.apache.hadoop.fs.impl.prefetch.ExecutorServiceFuturePool;
@@ -52,16 +54,20 @@ public class S3ACachingBlockManager extends CachingBlockManager {
5254
* @param blockData information about each block of the S3 file.
5355
* @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
5456
* @param streamStatistics statistics for this stream.
55-
*
57+
* @param conf the configuration.
58+
* @param localDirAllocator the local dir allocator instance.
5659
* @throws IllegalArgumentException if reader is null.
5760
*/
5861
public S3ACachingBlockManager(
5962
ExecutorServiceFuturePool futurePool,
6063
S3ARemoteObjectReader reader,
6164
BlockData blockData,
6265
int bufferPoolSize,
63-
S3AInputStreamStatistics streamStatistics) {
64-
super(futurePool, blockData, bufferPoolSize, streamStatistics);
66+
S3AInputStreamStatistics streamStatistics,
67+
Configuration conf,
68+
LocalDirAllocator localDirAllocator) {
69+
70+
super(futurePool, blockData, bufferPoolSize, streamStatistics, conf, localDirAllocator);
6571

6672
Validate.checkNotNull(reader, "reader");
6773

0 commit comments

Comments
 (0)