Skip to content

Commit 77cb778

Browse files
mukund-thakursteveloughran
authored andcommitted
HADOOP-18460. checkIfVectoredIOStopped before populating the buffers (#4986)
Contributed by Mukund Thakur
1 parent 8052561 commit 77cb778

File tree

1 file changed

+28
-15
lines changed

1 file changed

+28
-15
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -910,21 +910,15 @@ public void readVectored(List<? extends FileRange> ranges,
910910
private void readCombinedRangeAndUpdateChildren(CombinedFileRange combinedFileRange,
911911
IntFunction<ByteBuffer> allocate) {
912912
LOG.debug("Start reading combined range {} from path {} ", combinedFileRange, pathStr);
913-
// This reference is must be kept till all buffers are populated as this is a
913+
// This reference must be kept till all buffers are populated as this is a
914914
// finalizable object which closes the internal stream when gc triggers.
915915
S3Object objectRange = null;
916916
S3ObjectInputStream objectContent = null;
917917
try {
918-
checkIfVectoredIOStopped();
919-
final String operationName = "readCombinedFileRange";
920-
objectRange = getS3Object(operationName,
918+
objectRange = getS3ObjectAndValidateNotNull("readCombinedFileRange",
921919
combinedFileRange.getOffset(),
922920
combinedFileRange.getLength());
923921
objectContent = objectRange.getObjectContent();
924-
if (objectContent == null) {
925-
throw new PathIOException(uri,
926-
"Null IO stream received during " + operationName);
927-
}
928922
populateChildBuffers(combinedFileRange, objectContent, allocate);
929923
} catch (Exception ex) {
930924
LOG.debug("Exception while reading a range {} from path {} ", combinedFileRange, pathStr, ex);
@@ -1019,19 +1013,15 @@ private void validateRangeRequest(FileRange range) throws EOFException {
10191013
*/
10201014
private void readSingleRange(FileRange range, ByteBuffer buffer) {
10211015
LOG.debug("Start reading range {} from path {} ", range, pathStr);
1016+
// This reference must be kept till all buffers are populated as this is a
1017+
// finalizable object which closes the internal stream when gc triggers.
10221018
S3Object objectRange = null;
10231019
S3ObjectInputStream objectContent = null;
10241020
try {
1025-
checkIfVectoredIOStopped();
10261021
long position = range.getOffset();
10271022
int length = range.getLength();
1028-
final String operationName = "readRange";
1029-
objectRange = getS3Object(operationName, position, length);
1023+
objectRange = getS3ObjectAndValidateNotNull("readSingleRange", position, length);
10301024
objectContent = objectRange.getObjectContent();
1031-
if (objectContent == null) {
1032-
throw new PathIOException(uri,
1033-
"Null IO stream received during " + operationName);
1034-
}
10351025
populateBuffer(length, buffer, objectContent);
10361026
range.getData().complete(buffer);
10371027
} catch (Exception ex) {
@@ -1043,6 +1033,29 @@ private void readSingleRange(FileRange range, ByteBuffer buffer) {
10431033
LOG.debug("Finished reading range {} from path {} ", range, pathStr);
10441034
}
10451035

1036+
/**
1037+
* Get the s3 object for S3 server for a specified range.
1038+
* Also checks if the vectored io operation has been stopped before and after
1039+
* the http get request such that we don't waste time populating the buffers.
1040+
* @param operationName name of the operation for which get object on S3 is called.
1041+
* @param position position of the object to be read from S3.
1042+
* @param length length from position of the object to be read from S3.
1043+
* @return result s3 object.
1044+
* @throws IOException exception if any.
1045+
*/
1046+
private S3Object getS3ObjectAndValidateNotNull(final String operationName,
1047+
final long position,
1048+
final int length) throws IOException {
1049+
checkIfVectoredIOStopped();
1050+
S3Object objectRange = getS3Object(operationName, position, length);
1051+
if (objectRange.getObjectContent() == null) {
1052+
throw new PathIOException(uri,
1053+
"Null IO stream received during " + operationName);
1054+
}
1055+
checkIfVectoredIOStopped();
1056+
return objectRange;
1057+
}
1058+
10461059
/**
10471060
* Populates the buffer with data from objectContent
10481061
* till length. Handles both direct and heap byte buffers.

0 commit comments

Comments
 (0)