5757import org .apache .hadoop .fs .impl .StoreImplementationUtils ;
5858import org .apache .hadoop .fs .permission .FsPermission ;
5959import org .apache .hadoop .fs .statistics .IOStatistics ;
60+ import org .apache .hadoop .fs .statistics .IOStatisticsAggregator ;
61+ import org .apache .hadoop .fs .statistics .IOStatisticsContext ;
6062import org .apache .hadoop .fs .statistics .IOStatisticsSource ;
6163import org .apache .hadoop .fs .statistics .BufferedIOStatisticsOutputStream ;
6264import org .apache .hadoop .fs .statistics .impl .IOStatisticsStore ;
@@ -156,11 +158,19 @@ class LocalFSFileInputStream extends FSInputStream implements
156158 /** Reference to the bytes read counter for slightly faster counting. */
157159 private final AtomicLong bytesRead ;
158160
161+ /**
162+ * Thread level IOStatistics aggregator to update in close().
163+ */
164+ private final IOStatisticsAggregator
165+ ioStatisticsAggregator ;
166+
159167 public LocalFSFileInputStream (Path f ) throws IOException {
160168 name = pathToFile (f );
161169 fis = new FileInputStream (name );
162170 bytesRead = ioStatistics .getCounterReference (
163171 STREAM_READ_BYTES );
172+ ioStatisticsAggregator =
173+ IOStatisticsContext .getCurrentIOStatisticsContext ().getAggregator ();
164174 }
165175
166176 @ Override
@@ -193,9 +203,13 @@ public boolean seekToNewSource(long targetPos) throws IOException {
193203
194204 @ Override
195205 public void close () throws IOException {
196- fis .close ();
197- if (asyncChannel != null ) {
198- asyncChannel .close ();
206+ try {
207+ fis .close ();
208+ if (asyncChannel != null ) {
209+ asyncChannel .close ();
210+ }
211+ } finally {
212+ ioStatisticsAggregator .aggregate (ioStatistics );
199213 }
200214 }
201215
@@ -278,6 +292,7 @@ public boolean hasCapability(String capability) {
278292 // new capabilities.
279293 switch (capability .toLowerCase (Locale .ENGLISH )) {
280294 case StreamCapabilities .IOSTATISTICS :
295+ case StreamCapabilities .IOSTATISTICS_CONTEXT :
281296 case StreamCapabilities .VECTOREDIO :
282297 return true ;
283298 default :
@@ -407,9 +422,19 @@ final class LocalFSFileOutputStream extends OutputStream implements
407422 STREAM_WRITE_EXCEPTIONS )
408423 .build ();
409424
425+ /**
426+ * Thread level IOStatistics aggregator to update in close().
427+ */
428+ private final IOStatisticsAggregator
429+ ioStatisticsAggregator ;
430+
410431 private LocalFSFileOutputStream (Path f , boolean append ,
411432 FsPermission permission ) throws IOException {
412433 File file = pathToFile (f );
434+ // store the aggregator before attempting any IO.
435+ ioStatisticsAggregator =
436+ IOStatisticsContext .getCurrentIOStatisticsContext ().getAggregator ();
437+
413438 if (!append && permission == null ) {
414439 permission = FsPermission .getFileDefault ();
415440 }
@@ -436,10 +461,17 @@ private LocalFSFileOutputStream(Path f, boolean append,
436461 }
437462
438463 /*
439- * Just forward to the fos
464+ * Close the fos; update the IOStatisticsContext.
440465 */
441466 @ Override
442- public void close () throws IOException { fos .close (); }
467+ public void close () throws IOException {
468+ try {
469+ fos .close ();
470+ } finally {
471+ ioStatisticsAggregator .aggregate (ioStatistics );
472+ }
473+ }
474+
443475 @ Override
444476 public void flush () throws IOException { fos .flush (); }
445477 @ Override
@@ -485,6 +517,7 @@ public boolean hasCapability(String capability) {
485517 // new capabilities.
486518 switch (capability .toLowerCase (Locale .ENGLISH )) {
487519 case StreamCapabilities .IOSTATISTICS :
520+ case StreamCapabilities .IOSTATISTICS_CONTEXT :
488521 return true ;
489522 default :
490523 return StoreImplementationUtils .isProbeForSyncable (capability );
0 commit comments