Skip to content

Commit 735e35d

Browse files
HADOOP-18347. S3A Vectored IO to use bounded thread pool. (#4918)
part of HADOOP-18103. Also introducing a config fs.s3a.vectored.active.ranged.reads to configure the maximum number of number of range reads a single input stream can have active (downloading, or queued) to the central FileSystem instance's pool of queued operations. This stops a single stream overloading the shared thread pool. Contributed by: Mukund Thakur
1 parent d9f435f commit 735e35d

File tree

5 files changed

+55
-16
lines changed

5 files changed

+55
-16
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/StreamCapabilities.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public interface StreamCapabilities {
8484
* Support for vectored IO api.
8585
* See {@code PositionedReadable#readVectored(List, IntFunction)}.
8686
*/
87-
String VECTOREDIO = "readvectored";
87+
String VECTOREDIO = "in:readvectored";
8888

8989
/**
9090
* Stream abort() capability implemented by {@link Abortable#abort()}.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1206,6 +1206,24 @@ private Constants() {
12061206
*/
12071207
public static final int DEFAULT_AWS_S3_VECTOR_READS_MAX_MERGED_READ_SIZE = 1253376; //1M
12081208

1209+
/**
1210+
* Maximum number of range reads a single input stream can have
1211+
* active (downloading, or queued) to the central FileSystem
1212+
* instance's pool of queued operations.
1213+
* This stops a single stream overloading the shared thread pool.
1214+
* {@value}
1215+
* <p>
1216+
* Default is {@link #DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS}
1217+
*/
1218+
public static final String AWS_S3_VECTOR_ACTIVE_RANGE_READS =
1219+
"fs.s3a.vectored.active.ranged.reads";
1220+
1221+
/**
1222+
* Limit of queued range data download operations during vectored
1223+
* read. Value: {@value}
1224+
*/
1225+
public static final int DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS = 4;
1226+
12091227
/**
12101228
* Prefix of auth classes in AWS SDK V1.
12111229
*/

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,6 +339,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
339339
/** Vectored IO context. */
340340
private VectoredIOContext vectoredIOContext;
341341

342+
/**
343+
* Maximum number of active range read operation a single
344+
* input stream can have.
345+
*/
346+
private int vectoredActiveRangeReads;
347+
342348
private long readAhead;
343349
private ChangeDetectionPolicy changeDetectionPolicy;
344350
private final AtomicBoolean closed = new AtomicBoolean(false);
@@ -628,6 +634,8 @@ public void initialize(URI name, Configuration originalConf)
628634
longBytesOption(conf, ASYNC_DRAIN_THRESHOLD,
629635
DEFAULT_ASYNC_DRAIN_THRESHOLD, 0),
630636
inputPolicy);
637+
vectoredActiveRangeReads = intOption(conf,
638+
AWS_S3_VECTOR_ACTIVE_RANGE_READS, DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS, 1);
631639
vectoredIOContext = populateVectoredIOContext(conf);
632640
} catch (AmazonClientException e) {
633641
// amazon client exception: stop all services then throw the translation
@@ -1561,7 +1569,11 @@ private FSDataInputStream executeOpen(
15611569
createObjectAttributes(path, fileStatus),
15621570
createInputStreamCallbacks(auditSpan),
15631571
inputStreamStats,
1564-
unboundedThreadPool));
1572+
new SemaphoredDelegatingExecutor(
1573+
boundedThreadPool,
1574+
vectoredActiveRangeReads,
1575+
true,
1576+
inputStreamStats)));
15651577
}
15661578
}
15671579

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
import java.nio.ByteBuffer;
2828
import java.util.List;
2929
import java.util.concurrent.CompletableFuture;
30-
import java.util.concurrent.ThreadPoolExecutor;
30+
import java.util.concurrent.ExecutorService;
3131
import java.util.concurrent.atomic.AtomicBoolean;
3232
import java.util.function.IntFunction;
3333

@@ -139,7 +139,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
139139
/**
140140
* Thread pool used for vectored IO operation.
141141
*/
142-
private final ThreadPoolExecutor unboundedThreadPool;
142+
private final ExecutorService boundedThreadPool;
143143
private final String bucket;
144144
private final String key;
145145
private final String pathStr;
@@ -196,13 +196,13 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
196196
* @param s3Attributes object attributes
197197
* @param client S3 client to use
198198
* @param streamStatistics stream io stats.
199-
* @param unboundedThreadPool thread pool to use.
199+
* @param boundedThreadPool thread pool to use.
200200
*/
201201
public S3AInputStream(S3AReadOpContext ctx,
202202
S3ObjectAttributes s3Attributes,
203203
InputStreamCallbacks client,
204204
S3AInputStreamStatistics streamStatistics,
205-
ThreadPoolExecutor unboundedThreadPool) {
205+
ExecutorService boundedThreadPool) {
206206
Preconditions.checkArgument(isNotEmpty(s3Attributes.getBucket()),
207207
"No Bucket");
208208
Preconditions.checkArgument(isNotEmpty(s3Attributes.getKey()), "No Key");
@@ -224,7 +224,7 @@ public S3AInputStream(S3AReadOpContext ctx,
224224
setInputPolicy(ctx.getInputPolicy());
225225
setReadahead(ctx.getReadahead());
226226
this.asyncDrainThreshold = ctx.getAsyncDrainThreshold();
227-
this.unboundedThreadPool = unboundedThreadPool;
227+
this.boundedThreadPool = boundedThreadPool;
228228
this.vectoredIOContext = context.getVectoredIOContext();
229229
this.threadIOStatistics = requireNonNull(ctx.getIOStatisticsAggregator());
230230
}
@@ -882,7 +882,7 @@ public void readVectored(List<? extends FileRange> ranges,
882882
streamStatistics.readVectoredOperationStarted(sortedRanges.size(), sortedRanges.size());
883883
for (FileRange range: sortedRanges) {
884884
ByteBuffer buffer = allocate.apply(range.getLength());
885-
unboundedThreadPool.submit(() -> readSingleRange(range, buffer));
885+
boundedThreadPool.submit(() -> readSingleRange(range, buffer));
886886
}
887887
} else {
888888
LOG.debug("Trying to merge the ranges as they are not disjoint");
@@ -893,7 +893,7 @@ public void readVectored(List<? extends FileRange> ranges,
893893
LOG.debug("Number of original ranges size {} , Number of combined ranges {} ",
894894
ranges.size(), combinedFileRanges.size());
895895
for (CombinedFileRange combinedFileRange: combinedFileRanges) {
896-
unboundedThreadPool.submit(
896+
boundedThreadPool.submit(
897897
() -> readCombinedRangeAndUpdateChildren(combinedFileRange, allocate));
898898
}
899899
}

hadoop-tools/hadoop-aws/src/site/markdown/tools/hadoop-aws/performance.md

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,22 @@ on the client requirements.
7575
</description>
7676
</property>
7777
<property>
78-
<name>fs.s3a.vectored.read.max.merged.size</name>
79-
<value>1M</value>
80-
<description>
81-
What is the largest merged read size in bytes such
82-
that we group ranges together during vectored read.
83-
Setting this value to 0 will disable merging of ranges.
84-
</description>
78+
<name>fs.s3a.vectored.read.max.merged.size</name>
79+
<value>1M</value>
80+
<description>
81+
What is the largest merged read size in bytes such
82+
that we group ranges together during vectored read.
83+
Setting this value to 0 will disable merging of ranges.
84+
</description>
85+
<property>
86+
<name>fs.s3a.vectored.active.ranged.reads</name>
87+
<value>4</value>
88+
<description>
89+
Maximum number of range reads a single input stream can have
90+
active (downloading, or queued) to the central FileSystem
91+
instance's pool of queued operations.
92+
This stops a single stream overloading the shared thread pool.
93+
</description>
8594
</property>
8695
```
8796

0 commit comments

Comments
 (0)