Skip to content

Commit 13da64d

Browse files
committed
adds in support for additional IoStats
1 parent e57306e commit 13da64d

File tree

18 files changed

+580
-72
lines changed

18 files changed

+580
-72
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -489,6 +489,22 @@ public final class StreamStatisticNames {
489489
public static final String STREAM_FILE_CACHE_EVICTION
490490
= "stream_file_cache_eviction";
491491

492+
/**
493+
* Bytes that were prefetched by the stream.
494+
*/
495+
public static final String STREAM_READ_PREFETCHED_BYTES = "stream_read_prefetched_bytes";
496+
497+
/**
498+
* Tracks failures in footer parsing.
499+
*/
500+
public static final String STREAM_READ_PARQUET_FOOTER_PARSING_FAILED = "stream_read_parquet_footer_parsing_failed";
501+
502+
/**
503+
* A cache hit occurs when the request range can be satisfied by the data in the cache.
504+
*/
505+
public static final String STREAM_READ_CACHE_HIT = "stream_read_cache_hit";
506+
507+
492508
private StreamStatisticNames() {
493509
}
494510

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,9 @@
8181
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.ACTION_HTTP_GET_REQUEST;
8282
import static org.apache.hadoop.fs.statistics.StoreStatisticNames.SUFFIX_FAILURES;
8383
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_UNBUFFERED;
84+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES;
85+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED;
86+
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_CACHE_HIT;
8487
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.iostatisticsStore;
8588
import static org.apache.hadoop.fs.s3a.Statistic.*;
8689

@@ -891,7 +894,12 @@ private InputStreamStatistics(
891894
StreamStatisticNames.STREAM_READ_VECTORED_READ_BYTES_DISCARDED,
892895
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
893896
StreamStatisticNames.STREAM_EVICT_BLOCKS_FROM_FILE_CACHE,
894-
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED)
897+
StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST,
898+
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED,
899+
StreamStatisticNames.STREAM_READ_CACHE_HIT,
900+
StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
901+
StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED
902+
)
895903
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
896904
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
897905
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -1128,6 +1136,32 @@ public void readVectoredBytesDiscarded(int discarded) {
11281136
bytesDiscardedInVectoredIO.addAndGet(discarded);
11291137
}
11301138

1139+
@Override
1140+
public void getRequestInitiated() {
1141+
increment(ACTION_HTTP_GET_REQUEST);
1142+
}
1143+
1144+
@Override
1145+
public void headRequestInitiated() {
1146+
increment(StoreStatisticNames.ACTION_HTTP_HEAD_REQUEST);
1147+
}
1148+
1149+
@Override
1150+
public void bytesPrefetched(long size) {
1151+
increment(STREAM_READ_PREFETCHED_BYTES, size);
1152+
}
1153+
1154+
@Override
1155+
public void footerParsingFailed() {
1156+
increment(STREAM_READ_PARQUET_FOOTER_PARSING_FAILED);
1157+
}
1158+
1159+
@Override
1160+
public void streamReadCacheHit() {
1161+
increment(STREAM_READ_CACHE_HIT);
1162+
}
1163+
1164+
11311165
@Override
11321166
public void executorAcquired(Duration timeInQueue) {
11331167
// update the duration fields in the IOStatistics.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -458,9 +458,21 @@ public enum Statistic {
458458
StreamStatisticNames.STREAM_READ_ACTIVE_MEMORY_IN_USE,
459459
"Gauge of active memory in use",
460460
TYPE_GAUGE),
461+
STREAM_READ_PREFETCH_BYTES(
462+
StreamStatisticNames.STREAM_READ_PREFETCHED_BYTES,
463+
"Bytes prefetched by AAL stream",
464+
TYPE_COUNTER),
465+
STREAM_READ_PARQUET_FOOTER_PARSING_FAILED(
466+
StreamStatisticNames.STREAM_READ_PARQUET_FOOTER_PARSING_FAILED,
467+
"Count of Parquet footer parsing failures encountered by AAL",
468+
TYPE_COUNTER),
469+
STREAM_READ_CACHE_HIT(
470+
StreamStatisticNames.STREAM_READ_CACHE_HIT,
471+
"Count of cache hits in AAL stream",
472+
TYPE_COUNTER),
461473

462-
/* Stream Write statistics */
463474

475+
/* Stream Write statistics */
464476
STREAM_WRITE_EXCEPTIONS(
465477
StreamStatisticNames.STREAM_WRITE_EXCEPTIONS,
466478
"Count of stream write failures reported",
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3a.impl.streams;
20+
21+
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
22+
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
23+
24+
/**
25+
* Implementation of AAL's RequestCallback interface that tracks analytics operations.
26+
*/
27+
public class AnalyticsRequestCallback implements RequestCallback {
28+
private final S3AInputStreamStatistics statistics;
29+
30+
/**
31+
* Create a new callback instance.
32+
* @param statistics the statistics to update
33+
*/
34+
public AnalyticsRequestCallback(S3AInputStreamStatistics statistics) {
35+
this.statistics = statistics;
36+
}
37+
38+
@Override
39+
public void onGetRequest() {
40+
statistics.getRequestInitiated();
41+
}
42+
43+
@Override
44+
public void onHeadRequest() {
45+
statistics.headRequestInitiated();
46+
}
47+
48+
@Override
49+
public void onBlockPrefetch(long start, long end) {
50+
statistics.bytesPrefetched(end - start + 1);
51+
}
52+
53+
@Override
54+
public void footerParsingFailed() {
55+
statistics.footerParsingFailed();
56+
}
57+
58+
@Override
59+
public void onReadVectored(int numIncomingRanges, int numCombinedRanges) {
60+
statistics.readVectoredOperationStarted(numIncomingRanges, numCombinedRanges);
61+
}
62+
63+
@Override
64+
public void onCacheHit() {
65+
statistics.streamReadCacheHit();
66+
}
67+
68+
}
69+

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 41 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
4141
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
4242
import software.amazon.s3.analyticsaccelerator.util.S3URI;
43+
import software.amazon.s3.analyticsaccelerator.util.RequestCallback;
4344

4445
import org.slf4j.Logger;
4546
import org.slf4j.LoggerFactory;
@@ -72,6 +73,7 @@ public AnalyticsStream(final ObjectReadParameters parameters,
7273
final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
7374
super(InputStreamType.Analytics, parameters);
7475
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
76+
7577
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(),
7678
s3Attributes.getKey()), buildOpenStreamInformation(parameters));
7779
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
@@ -80,13 +82,21 @@ public AnalyticsStream(final ObjectReadParameters parameters,
8082
@Override
8183
public int read() throws IOException {
8284
throwIfClosed();
85+
86+
getS3AStreamStatistics().readOperationStarted(getPos(), 1);
87+
8388
int bytesRead;
8489
try {
8590
bytesRead = inputStream.read();
8691
} catch (IOException ioe) {
8792
onReadFailure(ioe);
8893
throw ioe;
8994
}
95+
96+
if (bytesRead != -1) {
97+
incrementBytesRead(1);
98+
}
99+
90100
return bytesRead;
91101
}
92102

@@ -122,26 +132,41 @@ public synchronized long getPos() {
122132
*/
123133
public int readTail(byte[] buf, int off, int len) throws IOException {
124134
throwIfClosed();
135+
getS3AStreamStatistics().readOperationStarted(getPos(), len);
136+
125137
int bytesRead;
126138
try {
127139
bytesRead = inputStream.readTail(buf, off, len);
128140
} catch (IOException ioe) {
129141
onReadFailure(ioe);
130142
throw ioe;
131143
}
144+
145+
if (bytesRead > 0) {
146+
incrementBytesRead(bytesRead);
147+
}
148+
132149
return bytesRead;
133150
}
134151

135152
@Override
136153
public int read(byte[] buf, int off, int len) throws IOException {
137154
throwIfClosed();
155+
156+
getS3AStreamStatistics().readOperationStarted(getPos(), len);
157+
138158
int bytesRead;
139159
try {
140160
bytesRead = inputStream.read(buf, off, len);
141161
} catch (IOException ioe) {
142162
onReadFailure(ioe);
143163
throw ioe;
144164
}
165+
166+
if (bytesRead > 0) {
167+
incrementBytesRead(bytesRead);
168+
}
169+
145170
return bytesRead;
146171
}
147172

@@ -177,8 +202,6 @@ public void readVectored(final List<? extends FileRange> ranges,
177202
range.setData(result);
178203
}
179204

180-
// AAL does not do any range coalescing, so input and combined ranges are the same.
181-
this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size());
182205
inputStream.readVectored(objectRanges, allocate, release);
183206
}
184207

@@ -247,10 +270,13 @@ private void onReadFailure(IOException ioe) throws IOException {
247270
}
248271

249272
private OpenStreamInformation buildOpenStreamInformation(ObjectReadParameters parameters) {
273+
274+
final RequestCallback requestCallback = new AnalyticsRequestCallback(getS3AStreamStatistics());
275+
250276
OpenStreamInformation.OpenStreamInformationBuilder openStreamInformationBuilder =
251277
OpenStreamInformation.builder()
252278
.inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
253-
.getInputPolicy()));
279+
.getInputPolicy())).requestCallback(requestCallback);
254280

255281
if (parameters.getObjectAttributes().getETag() != null) {
256282
openStreamInformationBuilder.objectMetadata(ObjectMetadata.builder()
@@ -300,4 +326,16 @@ protected void throwIfClosed() throws IOException {
300326
throw new IOException(getKey() + ": " + FSExceptionMessages.STREAM_IS_CLOSED);
301327
}
302328
}
329+
330+
/**
331+
* Increment the bytes read counter if there is a stats instance
332+
* and the number of bytes read is more than zero.
333+
* @param bytesRead number of bytes read
334+
*/
335+
private void incrementBytesRead(long bytesRead) {
336+
getS3AStreamStatistics().bytesRead(bytesRead);
337+
if (getContext().getStats() != null && bytesRead > 0) {
338+
getContext().getStats().incrementBytesRead(bytesRead);
339+
}
340+
}
303341
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,6 @@ public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
4848

4949
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
5050
private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory;
51-
private boolean requireCrt;
5251

5352
public AnalyticsStreamFactory() {
5453
super("AnalyticsStreamFactory");
@@ -61,7 +60,6 @@ protected void serviceInit(final Configuration conf) throws Exception {
6160
ANALYTICS_ACCELERATOR_CONFIGURATION_PREFIX);
6261
this.seekableInputStreamConfiguration =
6362
S3SeekableInputStreamConfiguration.fromConfiguration(configuration);
64-
this.requireCrt = false;
6563
}
6664

6765
@Override

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/S3AInputStreamStatistics.java

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,32 @@ void readVectoredOperationStarted(int numIncomingRanges,
119119
*/
120120
void readVectoredBytesDiscarded(int discarded);
121121

122+
/**
123+
* Number of S3 GET requests initiated by the stream.
124+
*/
125+
void getRequestInitiated();
126+
127+
/**
128+
* Number of S3 HEAD requests initiated by the stream.
129+
*/
130+
void headRequestInitiated();
131+
132+
/**
133+
* Number of bytes prefetched.
134+
* @param size number of bytes prefetched.
135+
*/
136+
void bytesPrefetched(long size);
137+
138+
/**
139+
* Number of failures in footer parsing.
140+
*/
141+
void footerParsingFailed();
142+
143+
/**
144+
* If the request data is already in the data cache.
145+
*/
146+
void streamReadCacheHit();
147+
122148
@Override
123149
void close();
124150

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,31 @@ public void readVectoredBytesDiscarded(int discarded) {
212212

213213
}
214214

215+
@Override
216+
public void getRequestInitiated() {
217+
218+
}
219+
220+
@Override
221+
public void headRequestInitiated() {
222+
223+
}
224+
225+
@Override
226+
public void bytesPrefetched(long size) {
227+
228+
}
229+
230+
@Override
231+
public void footerParsingFailed() {
232+
233+
}
234+
235+
@Override
236+
public void streamReadCacheHit() {
237+
238+
}
239+
215240
@Override
216241
public void close() {
217242

0 commit comments

Comments
 (0)