Skip to content

Commit dbc684b

Browse files
committed
adds in a cache_hit stat
1 parent 038a76f commit dbc684b

File tree

7 files changed

+75
-2
lines changed

7 files changed

+75
-2
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -499,6 +499,12 @@ public final class StreamStatisticNames {
499499
*/
500500
public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED = "stream_read_parquet_footer_parsing_failed";
501501

502+
/**
503+
* A cache hit occurs when the request range can be satisfied by the data in the cache.
504+
*/
505+
public static final String STREAM_READ_CACHE_HIT = "stream_read_cache_hit";
506+
507+
502508
private StreamStatisticNames() {
503509
}
504510

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;
8484
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
8585
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
86+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
8687
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
8788
import static org.apache.hadoop.fs.s3a.Statistic.*;
8889

@@ -895,8 +896,9 @@ private InputStreamStatistics(
895896
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
896897
StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
897898
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
898-
STREAM_READ_PREFETCHED_BYTES,
899-
STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
899+
StreamStatisticNames.STREAM_READ_CACHE_HIT,
900+
StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
901+
StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
900902
)
901903
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
902904
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
@@ -1154,6 +1156,11 @@ public void footerParsingFailed() {
11541156
increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED);
11551157
}
11561158

1159+
@Override
1160+
public void streamReadCacheHit() {
1161+
increment(STREAM_READ_CACHE_HIT);
1162+
}
1163+
11571164

11581165
@Override
11591166
public void executorAcquired(Duration timeInQueue) {

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -458,6 +458,18 @@ public enum Statistic {
458458
StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
459459
"Gauge of active memory in use",
460460
TYPE_GAUGE),
461+
STREAM_READ_PREFETCH_BYTES(
462+
StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
463+
"Bytes prefetched by AAL stream",
464+
TYPE_COUNTER),
465+
STREAM_READ_PARQUET_FOOTER_PARSING_FAILED(
466+
StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED,
467+
"Count of Parquet footer parsing failures encountered by AAL",
468+
TYPE_COUNTER),
469+
STREAM_READ_CACHE_HIT(
470+
StreamStatisticNames.STREAM_READ_CACHE_HIT,
471+
"Count of cache hits in AAL stream",
472+
TYPE_COUNTER),
461473

462474

463475
/* Stream Write statistics */

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsRequestCallback.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,10 @@ public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
6060
statistics.readVectoredOperationStarted(numIncomingRanges, numCombinedRanges);
6161
}
6262

63+
@Override
64+
public void onCacheHit() {
65+
statistics.streamReadCacheHit();
66+
}
6367

6468
}
6569

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -140,6 +140,11 @@ void readVectoredOperationStarted(int numIncomingRanges,
140140
*/
141141
void footerParsingFailed();
142142

143+
/**
144+
* If the request data is already in the data cache.
145+
*/
146+
void streamReadCacheHit();
147+
143148
@Override
144149
void close();
145150

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,11 @@ public void footerParsingFailed() {
232232

233233
}
234234

235+
@Override
236+
public void streamReadCacheHit() {
237+
238+
}
239+
235240
@Override
236241
public void close() {
237242

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStreamReading.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,15 @@ public void testConnectorFrameWorkIntegration() throws Throwable {
139139

140140
@Test
141141
public void testSequentialPrefetching() throws IOException {
142+
143+
Configuration conf = getConfiguration();
144+
145+
// AAL uses a caffeine cache, and expires any prefetched data for a key 1s after it was last accessed by default.
146+
// While this works well when running on EC2, for local testing, it can take more than 1s to download large chunks
147+
// of data. Set this value to higher for testing to prevent early cache evictions.
148+
conf.setInt(ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX +
149+
"." + "physicalio.cache.timeout", 10000);
150+
142151
S3AFileSystem fs =
143152
(S3AFileSystem) FileSystem.get(externalTestFile.toUri(), getConfiguration());
144153
byte[] buffer = new byte[10 * ONE_MB];
@@ -162,9 +171,14 @@ public void testSequentialPrefetching() throws IOException {
162171
inputStream.readFully(buffer, 0, 2 * ONE_MB);
163172
inputStream.readFully(buffer, 0, ONE_MB);
164173
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 3 * ONE_MB);
174+
// Two cache hits, as the previous two reads were already prefetched.
175+
verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
165176

166177
// Another sequential read, GP will now prefetch the next 8MB of data.
167178
inputStream.readFully(buffer, 0, ONE_MB);
179+
// Cache hit is still 2, as the previous read required a new GET request as it was outside the previously fetched
180+
// 4MB.
181+
verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 2);
168182
// A total of 10MB is prefetched - 3MB and then 7MB.
169183
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB);
170184

@@ -173,7 +187,14 @@ public void testSequentialPrefetching() throws IOException {
173187
// Though the next GP should prefetch 16MB, since the file is ~23MB, only the bytes till EoF are prefetched:
174188
// 6291456 remaining bytes.
175189
verifyStatisticCounterValue(ioStats, STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB + 6291456);
190+
inputStream.readFully(buffer, 0, 3 * ONE_MB);
191+
verifyStatisticCounterValue(ioStats, STREAM_READ_CACHE_HIT, 3);
176192
}
193+
194+
// verify all AAL stats are passed to the FS.
195+
verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_CACHE_HIT, 3);
196+
verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 0);
197+
verifyStatisticCounterValue(fs.getIOStatistics(), STREAM_READ_PREFETCHED_BYTES, 10 * ONE_MB + 6291456);
177198
}
178199

179200
@Test
@@ -234,6 +255,9 @@ public void testMalformedParquetFooter() throws IOException {
234255

235256
// The footer is only prefetched once, but parsing is attempted on each stream open.
236257
verifyStatisticCounterValue(ioStats, STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 1);
258+
259+
// stat is passed up to the FS.
260+
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), STREAM_READ_PARQUET_FOOTER_PARSING_FAILED, 2);
237261
}
238262

239263
/**
@@ -332,6 +356,9 @@ public void testRandomSeekPatternGets() throws Throwable {
332356
verifyStatisticCounterValue(ioStats, ACTION_HTTP_GET_REQUEST, 1);
333357
verifyStatisticCounterValue(ioStats, ACTION_HTTP_HEAD_REQUEST, 0);
334358
}
359+
360+
// We did 3 reads, and all of them were served from the cache
361+
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), STREAM_READ_CACHE_HIT, 3);
335362
}
336363

337364

@@ -364,5 +391,12 @@ public void testSequentialStreamsNoDuplicateGets() throws Throwable {
364391
// The second stream will not prefetch any bytes, as they have already been prefetched by stream 1.
365392
verifyStatisticCounterValue(stats2, STREAM_READ_PREFETCHED_BYTES, 0);
366393
}
394+
395+
// verify value is passed up to the FS
396+
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), STREAM_READ_PREFETCHED_BYTES, 1048575);
397+
398+
// We did 3 reads, all of them were served from the small object cache. In this case, the whole object was
399+
// downloaded as soon as the stream to it was opened.
400+
verifyStatisticCounterValue(getFileSystem().getIOStatistics(), STREAM_READ_CACHE_HIT, 3);
367401
}
368402
}

0 commit comments

Comments
 (0)