Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ public class S3AInputStream extends FSInputStream implements CanSetReadahead,
* set
*/
private volatile boolean closed;
private S3Object object;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add a javadoc comment explaining why this is needed "we have to keep a reference to this to stop garbage collection breaking the input stream"

private S3ObjectInputStream wrappedStream;
private final S3AReadOpContext context;
private final AmazonS3 client;
Expand Down Expand Up @@ -202,7 +203,7 @@ private synchronized void reopen(String reason, long targetPos, long length,
String text = String.format("%s %s at %d",
operation, uri, targetPos);
changeTracker.maybeApplyConstraint(request);
S3Object object = Invoker.once(text, uri,
object = Invoker.once(text, uri,
() -> client.getObject(request));

changeTracker.processResponse(object, operation,
Expand Down Expand Up @@ -431,8 +432,9 @@ public synchronized int read() throws IOException {
private void onReadFailure(IOException ioe, int length, boolean forceAbort)
throws IOException {

LOG.info("Got exception while trying to read from stream {}" +
" trying to recover: " + ioe, uri);
LOG.info("Got exception while trying to read from stream " + uri
+ ", client: " + client + " object: " + object
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the change? The previous one used SLF4J expansion, and deliberately only printed the IOEs message, not the stack.
If you want the stack, add a separate log at debug with the full details, and use SL4J expansion of {} elements in the log string

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks a lot for the review Steve! Will address other comments asap.

The reason I made the change here is, the symptom is intermittent, and the stack is helpful when we need to diagnose issues. I hope to catch it when it occurs. My worry is that turning on debug will introduce a lot of other debug messages to the logs. But if exceptions happen here very often, we can move the stack printing to debug. Is that the case here though? Thanks.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

leave at debug. We collect statistics on it, so if you see it a lot in the stats you can turn on debug logging for this class only.

+ " trying to recover.", ioe);
streamStatistics.readException();
reopen("failure recovery", pos, length, forceAbort);
}
Expand Down Expand Up @@ -550,14 +552,17 @@ public synchronized void close() throws IOException {
*/
@Retries.OnceRaw
private void closeStream(String reason, long length, boolean forceAbort) {
if (isObjectStreamOpen()) {
if (!isObjectStreamOpen())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: wrap in { }. Also add a comment "stream is already closed"

return;

// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;
// if the amount of data remaining in the current request is greater
// than the readahead value: abort.
long remaining = remainingInCurrentRequest();
LOG.debug("Closing stream {}: {}", reason,
forceAbort ? "abort" : "soft");
boolean shouldAbort = forceAbort || remaining > readahead;

try {
if (!shouldAbort) {
try {
// clean close. This will read to the end of the stream,
Expand All @@ -578,25 +583,31 @@ private void closeStream(String reason, long length, boolean forceAbort) {
streamStatistics.streamClose(false, drained);
} catch (Exception e) {
// exception escalates to an abort
LOG.debug("When closing {} stream for {}", uri, reason, e);
LOG.warn("When closing {} stream for {}, will abort the stream", uri, reason, e);
shouldAbort = true;
}
}
if (shouldAbort) {
// Abort, rather than just close, the underlying stream. Otherwise, the
// remaining object payload is read from S3 while closing the stream.
LOG.debug("Aborting stream");
wrappedStream.abort();
LOG.warn("Aborting stream {}", uri);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

revert to debug. We abort() a lot as week seek round files if read policy != random, and don't want application logs overloaded with messages

try {
wrappedStream.abort();
} catch (Exception e) {
LOG.warn("When aborting {} stream after failing to close it for {}", uri, reason, e);
}
streamStatistics.streamClose(true, remaining);
}
LOG.debug("Stream {} {}: {}; remaining={} streamPos={},"
+ " nextReadPos={}," +
" request range {}-{} length={}",
" request range {}-{} length={}",
uri, (shouldAbort ? "aborted" : "closed"), reason,
remaining, pos, nextReadPos,
contentRangeStart, contentRangeFinish,
length);
} finally {
wrappedStream = null;
object = null;
}
}

Expand Down