Skip to content

Commit 66fb591

Browse files
committed
addresses review comments, adds in duration tracker for prefetch ops
1 parent 57bbdc2 commit 66fb591

File tree

8 files changed

+51
-35
lines changed

8 files changed

+51
-35
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StreamStatisticNames.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -393,12 +393,6 @@ public final class StreamStatisticNames {
393393
public static final String STREAM_READ_PREFETCH_OPERATIONS
394394
= "stream_read_prefetch_operations";
395395

396-
/**
397-
* Total number of failed prefetching operations.
398-
*/
399-
public static final String STREAM_READ_FAILED_PREFETCH_OPERATIONS
400-
= "stream_read_failed_prefetch_operations";
401-
402396
/**
403397
* Total number of block in disk cache.
404398
*/

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/CachingBlockManager.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,8 @@
3333
import org.slf4j.Logger;
3434
import org.slf4j.LoggerFactory;
3535

36+
import org.apache.hadoop.fs.statistics.DurationTracker;
37+
3638
import static java.util.Objects.requireNonNull;
3739

3840
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
@@ -307,6 +309,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
307309
}
308310

309311
BlockOperations.Operation op = null;
312+
DurationTracker tracker = null;
310313

311314
synchronized (data) {
312315
try {
@@ -328,7 +331,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
328331
}
329332

330333
if (isPrefetch) {
331-
prefetchingStatistics.prefetchOperationStarted();
334+
tracker = prefetchingStatistics.prefetchOperationStarted();
332335
op = this.ops.prefetch(data.getBlockNumber());
333336
} else {
334337
op = this.ops.getRead(data.getBlockNumber());
@@ -341,20 +344,26 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
341344
this.read(buffer, offset, size);
342345
buffer.flip();
343346
data.setReady(expectedState);
344-
345-
if (isPrefetch) {
346-
prefetchingStatistics.prefetchOperationCompleted();
347-
}
348347
} catch (Exception e) {
349348
String message = String.format("error during readBlock(%s)", data.getBlockNumber());
350349
LOG.error(message, e);
350+
351+
if(isPrefetch) {
352+
tracker.failed();
353+
}
354+
351355
this.numReadErrors.incrementAndGet();
352356
data.setDone();
353357
throw e;
354358
} finally {
355359
if (op != null) {
356360
this.ops.end(op);
357361
}
362+
363+
if (isPrefetch) {
364+
prefetchingStatistics.prefetchOperationCompleted();
365+
tracker.close();
366+
}
358367
}
359368
}
360369
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/common/PrefetchingStatistics.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,14 +21,16 @@
2121

2222
import java.time.Duration;
2323

24+
import org.apache.hadoop.fs.statistics.DurationTracker;
2425
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
2526

2627
public interface PrefetchingStatistics extends IOStatisticsSource {
2728

2829
/**
2930
* A prefetch operation has started.
31+
* @return duration tracker
3032
*/
31-
void prefetchOperationStarted();
33+
DurationTracker prefetchOperationStarted();
3234

3335
/**
3436
* A block has been saved to the file cache.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInstrumentation.java

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -803,8 +803,6 @@ private final class InputStreamStatistics
803803
private final AtomicLong readOperations;
804804
private final AtomicLong readFullyOperations;
805805
private final AtomicLong seekOperations;
806-
private final AtomicLong prefetchReadOperations;
807-
private final AtomicLong failedPrefetchReadOperations;
808806

809807
/** Bytes read by the application and any when draining streams . */
810808
private final AtomicLong totalBytesRead;
@@ -838,9 +836,7 @@ private InputStreamStatistics(
838836
StreamStatisticNames.STREAM_READ_SEEK_BYTES_SKIPPED,
839837
StreamStatisticNames.STREAM_READ_TOTAL_BYTES,
840838
StreamStatisticNames.STREAM_READ_UNBUFFERED,
841-
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES,
842-
StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
843-
StreamStatisticNames.STREAM_READ_FAILED_PREFETCH_OPERATIONS)
839+
StreamStatisticNames.STREAM_READ_VERSION_MISMATCHES)
844840
.withGauges(STREAM_READ_GAUGE_INPUT_POLICY,
845841
STREAM_READ_BLOCKS_IN_FILE_CACHE.getSymbol(),
846842
STREAM_READ_ACTIVE_PREFETCH_OPERATIONS.getSymbol(),
@@ -851,6 +847,7 @@ private InputStreamStatistics(
851847
StoreStatisticNames.ACTION_FILE_OPENED,
852848
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_ABORTED,
853849
StreamStatisticNames.STREAM_READ_REMOTE_STREAM_DRAINED,
850+
StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS,
854851
StreamStatisticNames.STREAM_READ_REMOTE_BLOCK_READ,
855852
StreamStatisticNames.STREAM_READ_BLOCK_ACQUIRE_AND_READ)
856853
.build();
@@ -889,10 +886,6 @@ private InputStreamStatistics(
889886
StreamStatisticNames.STREAM_READ_SEEK_OPERATIONS);
890887
totalBytesRead = st.getCounterReference(
891888
StreamStatisticNames.STREAM_READ_TOTAL_BYTES);
892-
prefetchReadOperations =
893-
st.getCounterReference(StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS);
894-
failedPrefetchReadOperations =
895-
st.getCounterReference(StreamStatisticNames.STREAM_READ_FAILED_PREFETCH_OPERATIONS);
896889
setIOStatistics(st);
897890
// create initial snapshot of merged statistics
898891
mergedStats = snapshotIOStatistics(st);
@@ -1316,9 +1309,9 @@ public DurationTracker initiateInnerStreamClose(final boolean abort) {
13161309
}
13171310

13181311
@Override
1319-
public void prefetchOperationStarted() {
1312+
public DurationTracker prefetchOperationStarted() {
13201313
incAllGauges(STREAM_READ_ACTIVE_PREFETCH_OPERATIONS, 1);
1321-
prefetchReadOperations.incrementAndGet();
1314+
return trackDuration(StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS);
13221315
}
13231316

13241317
@Override

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/statistics/impl/EmptyS3AStatisticsContext.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -209,8 +209,8 @@ public void unbuffered() {
209209
}
210210

211211
@Override
212-
public void prefetchOperationStarted() {
213-
212+
public DurationTracker prefetchOperationStarted() {
213+
return stubDurationTracker();
214214
}
215215

216216
@Override

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/EmptyPrefetchingStatistics.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,25 @@
2121

2222
import java.time.Duration;
2323

24-
public final class EmptyPrefetchingStatistics implements PrefetchingStatistics {
25-
@Override
26-
public void prefetchOperationStarted() {
24+
import org.apache.hadoop.fs.statistics.DurationTracker;
25+
26+
import static org.apache.hadoop.fs.statistics.IOStatisticsSupport.stubDurationTracker;
27+
28+
public final class EmptyPrefetchingStatistics implements PrefetchingStatistics {
29+
30+
private static final EmptyPrefetchingStatistics emptyPrefetchingStatistics =
31+
new EmptyPrefetchingStatistics();
2732

33+
private EmptyPrefetchingStatistics() {
34+
}
35+
36+
public static EmptyPrefetchingStatistics getInstance() {
37+
return emptyPrefetchingStatistics;
38+
}
39+
40+
@Override
41+
public DurationTracker prefetchOperationStarted() {
42+
return stubDurationTracker();
2843
}
2944

3045
@Override

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBlockCache.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public class TestBlockCache extends AbstractHadoopTestBase {
4040
public void testArgChecks() throws Exception {
4141
// Should not throw.
4242
BlockCache cache =
43-
new SingleFilePerBlockCache(new EmptyPrefetchingStatistics());
43+
new SingleFilePerBlockCache(EmptyPrefetchingStatistics.getInstance());
4444

4545
ByteBuffer buffer = ByteBuffer.allocate(16);
4646

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/common/TestBufferPool.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.junit.Test;
2323

24+
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
2425
import org.apache.hadoop.fs.s3a.statistics.impl.EmptyS3AStatisticsContext;
2526
import org.apache.hadoop.test.AbstractHadoopTestBase;
2627

@@ -37,30 +38,32 @@ public class TestBufferPool extends AbstractHadoopTestBase {
3738

3839
@Test
3940
public void testArgChecks() throws Exception {
41+
S3AInputStreamStatistics s3AInputStreamStatistics =
42+
new EmptyS3AStatisticsContext().newInputStreamStatistics();
43+
4044
// Should not throw.
41-
BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE,
42-
EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS);
45+
BufferPool pool = new BufferPool(POOL_SIZE, BUFFER_SIZE, s3AInputStreamStatistics);
4346

4447
// Verify it throws correctly.
4548
ExceptionAsserts.assertThrows(
4649
IllegalArgumentException.class,
4750
"'size' must be a positive integer",
48-
() -> new BufferPool(0, 10, EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS));
51+
() -> new BufferPool(0, 10, s3AInputStreamStatistics));
4952

5053
ExceptionAsserts.assertThrows(
5154
IllegalArgumentException.class,
5255
"'size' must be a positive integer",
53-
() -> new BufferPool(-1, 10, EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS));
56+
() -> new BufferPool(-1, 10, s3AInputStreamStatistics));
5457

5558
ExceptionAsserts.assertThrows(
5659
IllegalArgumentException.class,
5760
"'bufferSize' must be a positive integer",
58-
() -> new BufferPool(10, 0, EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS));
61+
() -> new BufferPool(10, 0, s3AInputStreamStatistics));
5962

6063
ExceptionAsserts.assertThrows(
6164
IllegalArgumentException.class,
6265
"'bufferSize' must be a positive integer",
63-
() -> new BufferPool(1, -10, EmptyS3AStatisticsContext.EMPTY_INPUT_STREAM_STATISTICS));
66+
() -> new BufferPool(1, -10, s3AInputStreamStatistics));
6467

6568
ExceptionAsserts.assertThrows(
6669
NullPointerException.class,

0 commit comments

Comments
 (0)