diff --git a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java index 542fe34e96c79..f9ed5b5af11c6 100644 --- a/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java +++ b/hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java @@ -87,6 +87,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead, * set */ private volatile boolean closed; + private S3Object object; private S3ObjectInputStream wrappedStream; private final S3AReadOpContext context; private final AmazonS3 client; @@ -202,7 +203,7 @@ private synchronized void reopen(String reason, long targetPos, long length, String text = String.format("%s %s at %d", operation, uri, targetPos); changeTracker.maybeApplyConstraint(request); - S3Object object = Invoker.once(text, uri, + object = Invoker.once(text, uri, () -> client.getObject(request)); changeTracker.processResponse(object, operation, @@ -431,8 +432,9 @@ public synchronized int read() throws IOException { private void onReadFailure(IOException ioe, int length, boolean forceAbort) throws IOException { - LOG.info("Got exception while trying to read from stream {}" + - " trying to recover: " + ioe, uri); + LOG.info("Got exception while trying to read from stream " + uri + + ", client: " + client + " object: " + object + + " trying to recover.", ioe); streamStatistics.readException(); reopen("failure recovery", pos, length, forceAbort); } @@ -550,14 +552,17 @@ public synchronized void close() throws IOException { */ @Retries.OnceRaw private void closeStream(String reason, long length, boolean forceAbort) { - if (isObjectStreamOpen()) { + if (!isObjectStreamOpen()) + return; - // if the amount of data remaining in the current request is greater - // than the readahead value: abort. - long remaining = remainingInCurrentRequest(); - LOG.debug("Closing stream {}: {}", reason, - forceAbort ? "abort" : "soft"); - boolean shouldAbort = forceAbort || remaining > readahead; + // if the amount of data remaining in the current request is greater + // than the readahead value: abort. + long remaining = remainingInCurrentRequest(); + LOG.debug("Closing stream {}: {}", reason, + forceAbort ? "abort" : "soft"); + boolean shouldAbort = forceAbort || remaining > readahead; + + try { if (!shouldAbort) { try { // clean close. This will read to the end of the stream, @@ -578,25 +583,31 @@ private void closeStream(String reason, long length, boolean forceAbort) { streamStatistics.streamClose(false, drained); } catch (Exception e) { // exception escalates to an abort - LOG.debug("When closing {} stream for {}", uri, reason, e); + LOG.warn("When closing {} stream for {}, will abort the stream", uri, reason, e); shouldAbort = true; } } if (shouldAbort) { // Abort, rather than just close, the underlying stream. Otherwise, the // remaining object payload is read from S3 while closing the stream. - LOG.debug("Aborting stream"); - wrappedStream.abort(); + LOG.warn("Aborting stream {}", uri); + try { + wrappedStream.abort(); + } catch (Exception e) { + LOG.warn("When aborting {} stream after failing to close it for {}", uri, reason, e); + } streamStatistics.streamClose(true, remaining); } LOG.debug("Stream {} {}: {}; remaining={} streamPos={}," + " nextReadPos={}," + - " request range {}-{} length={}", + " request range {}-{} length={}", uri, (shouldAbort ? "aborted" : "closed"), reason, remaining, pos, nextReadPos, contentRangeStart, contentRangeFinish, length); + } finally { wrappedStream = null; + object = null; } }