Skip to content

Commit 76e7bfa

Browse files
committed
readVectored stata
1 parent b8fa648 commit 76e7bfa

File tree

3 files changed

+37
-8
lines changed

3 files changed

+37
-8
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,11 @@ public void footerParsingFailed() {
5555
statistics.footerParsingFailed();
5656
}
5757

58+
@Override
59+
public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
60+
statistics.readVectoredOperationStarted(numIncomingRanges, numCombinedRanges);
61+
}
62+
5863

5964
}
6065

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,6 @@ public void readVectored(final List<? extends FileRange> ranges,
202202
range.setData(result);
203203
}
204204

205-
// AAL does not do any range coalescing, so input and combined ranges are the same.
206-
this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size());
207205
inputStream.readVectored(objectRanges, allocate, release);
208206
}
209207

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
package org.apache.hadoop.fs.contract.s3a;
2020

21+
import java.util.ArrayList;
2122
import java.util.List;
2223

2324
import org.apache.hadoop.conf.Configuration;
@@ -34,11 +35,11 @@
3435
import org.junit.jupiter.params.ParameterizedClass;
3536
import org.junit.jupiter.params.provider.MethodSource;
3637

37-
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
38-
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
39-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
40-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
38+
import static org.apache.hadoop.fs.contract.ContractTestUtils.*;
39+
import static org.apache.hadoop.fs.s3a.Constants.ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX;
40+
import static org.apache.hadoop.fs.s3a.S3ATestUtils.*;
4141
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
42+
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
4243

4344
/**
4445
* S3A contract tests for vectored reads with the Analytics stream.
@@ -53,6 +54,8 @@
5354
@MethodSource("params")
5455
public class ITestS3AContractAnalyticsStreamVectoredRead extends AbstractContractVectoredReadTest {
5556

57+
private static final int ONE_KB = 1024;
58+
5659
public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) {
5760
super(bufferType);
5861
}
@@ -64,6 +67,20 @@ public ITestS3AContractAnalyticsStreamVectoredRead(String bufferType) {
6467
@Override
6568
protected Configuration createConfiguration() {
6669
Configuration conf = super.createConfiguration();
70+
// Set the coalesce tolerance to 1KB, default is 1MB.
71+
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
72+
"." + "physicalio.request.coalesce.tolerance", 10 * ONE_KB);
73+
74+
// Set the minimum block size to 32KB. AAL uses a default block size of 128KB, which means the minimum size a S3
75+
// request will be is 128KB. Since the file being read is 128KB, we need to use this here to demonstrate that
76+
// separate GET requests are made for ranges that are not coalesced.
77+
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
78+
"." + "physicalio.readbuffersize", 32 * ONE_KB);
79+
80+
// Disable small object prefetched, otherwise anything less than 8MB is fetched in a single GET.
81+
conf.set(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
82+
"." + "physicalio.small.objects.prefetching.enabled", "false");
83+
6784
enableAnalyticsAccelerator(conf);
6885
// If encryption is set, some AAL tests will fail.
6986
// This is because AAL caches the head request response, and uses
@@ -102,21 +119,30 @@ public void testNullReleaseOperation() {
102119

103120
@Test
104121
public void testReadVectoredWithAALStatsCollection() throws Exception {
122+
List<FileRange> fileRanges = new ArrayList<>();
123+
fileRanges.add(FileRange.createFileRange(0, 100));
124+
fileRanges.add(FileRange.createFileRange(800, 200));
125+
fileRanges.add(FileRange.createFileRange(4 * ONE_KB , 4 * ONE_KB));
126+
fileRanges.add(FileRange.createFileRange(80 * ONE_KB , 4 * ONE_KB));
105127

106-
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
107128
try (FSDataInputStream in = openVectorFile()) {
108129
in.readVectored(fileRanges, getAllocate());
109130

110131
validateVectoredReadResult(fileRanges, DATASET, 0);
111132
IOStatistics st = in.getIOStatistics();
112133

113-
// Statistics such as GET requests will be added after IoStats support.
114134
verifyStatisticCounterValue(st,
115135
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);
116136

117137
verifyStatisticCounterValue(st,
118138
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
119139
1);
140+
141+
// Verify ranges are coalesced, we are using a coalescing tolerance of 10KB, so [0-100, 800-200, 4KB-8KB] will
142+
// get coalesced into a single range.
143+
verifyStatisticCounterValue(st, StreamStatisticNames.STREAM_READ_VECTORED_INCOMING_RANGES, 4);
144+
verifyStatisticCounterValue(st, StreamStatisticNames.STREAM_READ_VECTORED_COMBINED_RANGES, 2);
145+
verifyStatisticCounterValue(st, ACTION_HTTP_GET_REQUEST, 2);
120146
}
121147
}
122148
}

0 commit comments

Comments
 (0)