2121
2222import java .io .IOException ;
2323import java .nio .ByteBuffer ;
24+ import java .time .Duration ;
25+ import java .time .Instant ;
2426import java .util .concurrent .CompletableFuture ;
2527import java .util .concurrent .Future ;
2628import java .util .concurrent .TimeUnit ;
3133import org .slf4j .Logger ;
3234import org .slf4j .LoggerFactory ;
3335
36+ import org .apache .hadoop .fs .statistics .DurationTracker ;
37+
38+ import static java .util .Objects .requireNonNull ;
39+
3440import static org .apache .hadoop .io .IOUtils .cleanupWithLogger ;
3541
3642/**
@@ -70,33 +76,37 @@ public abstract class CachingBlockManager extends BlockManager {
7076 // Once set to true, any further caching requests will be ignored.
7177 private final AtomicBoolean cachingDisabled ;
7278
79+ private final PrefetchingStatistics prefetchingStatistics ;
80+
7381 /**
7482 * Constructs an instance of a {@code CachingBlockManager}.
7583 *
7684 * @param futurePool asynchronous tasks are performed in this pool.
7785 * @param blockData information about each block of the underlying file.
7886 * @param bufferPoolSize size of the in-memory cache in terms of number of blocks.
87+ * @param prefetchingStatistics statistics for this stream.
7988 *
80- * @throws IllegalArgumentException if futurePool is null.
8189 * @throws IllegalArgumentException if bufferPoolSize is zero or negative.
8290 */
8391 public CachingBlockManager (
8492 ExecutorServiceFuturePool futurePool ,
8593 BlockData blockData ,
86- int bufferPoolSize ) {
94+ int bufferPoolSize ,
95+ PrefetchingStatistics prefetchingStatistics ) {
8796 super (blockData );
8897
89- Validate .checkNotNull (futurePool , "futurePool" );
9098 Validate .checkPositiveInteger (bufferPoolSize , "bufferPoolSize" );
9199
92- this .futurePool = futurePool ;
100+ this .futurePool = requireNonNull ( futurePool ) ;
93101 this .bufferPoolSize = bufferPoolSize ;
94102 this .numCachingErrors = new AtomicInteger ();
95103 this .numReadErrors = new AtomicInteger ();
96104 this .cachingDisabled = new AtomicBoolean ();
105+ this .prefetchingStatistics = requireNonNull (prefetchingStatistics );
97106
98107 if (this .getBlockData ().getFileSize () > 0 ) {
99- this .bufferPool = new BufferPool (bufferPoolSize , this .getBlockData ().getBlockSize ());
108+ this .bufferPool = new BufferPool (bufferPoolSize , this .getBlockData ().getBlockSize (),
109+ this .prefetchingStatistics );
100110 this .cache = this .createCache ();
101111 }
102112
@@ -249,7 +259,7 @@ public void requestPrefetch(int blockNumber) {
249259 }
250260
251261 BlockOperations .Operation op = this .ops .requestPrefetch (blockNumber );
252- PrefetchTask prefetchTask = new PrefetchTask (data , this );
262+ PrefetchTask prefetchTask = new PrefetchTask (data , this , Instant . now () );
253263 Future <Void > prefetchFuture = this .futurePool .executeFunction (prefetchTask );
254264 data .setPrefetch (prefetchFuture );
255265 this .ops .end (op );
@@ -279,8 +289,10 @@ private void read(BufferData data) throws IOException {
279289 }
280290 }
281291
282- private void prefetch (BufferData data ) throws IOException {
292+ private void prefetch (BufferData data , Instant taskQueuedStartTime ) throws IOException {
283293 synchronized (data ) {
294+ prefetchingStatistics .executorAcquired (
295+ Duration .between (taskQueuedStartTime , Instant .now ()));
284296 this .readBlock (
285297 data ,
286298 true ,
@@ -297,6 +309,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
297309 }
298310
299311 BlockOperations .Operation op = null ;
312+ DurationTracker tracker = null ;
300313
301314 synchronized (data ) {
302315 try {
@@ -318,6 +331,7 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
318331 }
319332
320333 if (isPrefetch ) {
334+ tracker = prefetchingStatistics .prefetchOperationStarted ();
321335 op = this .ops .prefetch (data .getBlockNumber ());
322336 } else {
323337 op = this .ops .getRead (data .getBlockNumber ());
@@ -333,13 +347,25 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
333347 } catch (Exception e ) {
334348 String message = String .format ("error during readBlock(%s)" , data .getBlockNumber ());
335349 LOG .error (message , e );
350+
351+ if (isPrefetch && tracker != null ) {
352+ tracker .failed ();
353+ }
354+
336355 this .numReadErrors .incrementAndGet ();
337356 data .setDone ();
338357 throw e ;
339358 } finally {
340359 if (op != null ) {
341360 this .ops .end (op );
342361 }
362+
363+ if (isPrefetch ) {
364+ prefetchingStatistics .prefetchOperationCompleted ();
365+ if (tracker != null ) {
366+ tracker .close ();
367+ }
368+ }
343369 }
344370 }
345371 }
@@ -350,16 +376,18 @@ private void readBlock(BufferData data, boolean isPrefetch, BufferData.State...
350376 private static class PrefetchTask implements Supplier <Void > {
351377 private final BufferData data ;
352378 private final CachingBlockManager blockManager ;
379+ private final Instant taskQueuedStartTime ;
353380
354- PrefetchTask (BufferData data , CachingBlockManager blockManager ) {
381+ PrefetchTask (BufferData data , CachingBlockManager blockManager , Instant taskQueuedStartTime ) {
355382 this .data = data ;
356383 this .blockManager = blockManager ;
384+ this .taskQueuedStartTime = taskQueuedStartTime ;
357385 }
358386
359387 @ Override
360388 public Void get () {
361389 try {
362- this .blockManager .prefetch (data );
390+ this .blockManager .prefetch (data , taskQueuedStartTime );
363391 } catch (Exception e ) {
364392 LOG .error ("error during prefetch" , e );
365393 }
@@ -420,14 +448,18 @@ public void requestCaching(BufferData data) {
420448 blockFuture = cf ;
421449 }
422450
423- CachePutTask task = new CachePutTask (data , blockFuture , this );
451+ CachePutTask task = new CachePutTask (data , blockFuture , this , Instant . now () );
424452 Future <Void > actionFuture = this .futurePool .executeFunction (task );
425453 data .setCaching (actionFuture );
426454 this .ops .end (op );
427455 }
428456 }
429457
430- private void addToCacheAndRelease (BufferData data , Future <Void > blockFuture ) {
458+ private void addToCacheAndRelease (BufferData data , Future <Void > blockFuture ,
459+ Instant taskQueuedStartTime ) {
460+ prefetchingStatistics .executorAcquired (
461+ Duration .between (taskQueuedStartTime , Instant .now ()));
462+
431463 if (this .closed ) {
432464 return ;
433465 }
@@ -493,7 +525,7 @@ private void addToCacheAndRelease(BufferData data, Future<Void> blockFuture) {
493525 }
494526
495527 protected BlockCache createCache () {
496- return new SingleFilePerBlockCache ();
528+ return new SingleFilePerBlockCache (prefetchingStatistics );
497529 }
498530
499531 protected void cachePut (int blockNumber , ByteBuffer buffer ) throws IOException {
@@ -513,18 +545,22 @@ private static class CachePutTask implements Supplier<Void> {
513545 // Block manager that manages this block.
514546 private final CachingBlockManager blockManager ;
515547
548+ private final Instant taskQueuedStartTime ;
549+
516550 CachePutTask (
517551 BufferData data ,
518552 Future <Void > blockFuture ,
519- CachingBlockManager blockManager ) {
553+ CachingBlockManager blockManager ,
554+ Instant taskQueuedStartTime ) {
520555 this .data = data ;
521556 this .blockFuture = blockFuture ;
522557 this .blockManager = blockManager ;
558+ this .taskQueuedStartTime = taskQueuedStartTime ;
523559 }
524560
525561 @ Override
526562 public Void get () {
527- this .blockManager .addToCacheAndRelease (this .data , this .blockFuture );
563+ this .blockManager .addToCacheAndRelease (this .data , this .blockFuture , taskQueuedStartTime );
528564 return null ;
529565 }
530566 }
0 commit comments