Skip to content
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
279 changes: 279 additions & 0 deletions core/src/main/java/org/apache/spark/io/ReadAheadInputStream.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,279 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.io;

import com.google.common.base.Preconditions;
import org.apache.spark.storage.StorageUtils;

import javax.annotation.concurrent.GuardedBy;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* {@link InputStream} implementation which asynchronously reads ahead from the underlying input
* stream when specified amount of data has been read from the current buffer. It does it by maintaining
* two buffer - active buffer and read ahead buffer. Active buffer contains data which should be returned
* when a read() call is issued. The read ahead buffer is used to asynchronously read from the underlying
* input stream and once the current active buffer is exhausted, we flip the two buffers so that we can
* start reading from the read ahead buffer without being blocked in disk I/O.
*/
public class ReadAheadInputStream extends InputStream {

private Lock stateChangeLock = new ReentrantLock();

@GuardedBy("stateChangeLock")
private ByteBuffer activeBuffer;

@GuardedBy("stateChangeLock")
private ByteBuffer readAheadBuffer;

@GuardedBy("stateChangeLock")
private boolean endOfStream;

@GuardedBy("stateChangeLock")
// true if async read is in progress
private boolean isReadInProgress;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: isReadInProgress -> readInProgress
Similarly for isReadAborted ... java naming convention of fields vs methods.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@GuardedBy("stateChangeLock")
// true if read is aborted due to an exception in reading from underlying input stream.
private boolean isReadAborted;

@GuardedBy("stateChangeLock")
private Exception readException;

// If the remaining data size in the current buffer is below this threshold,
// we issue an async read from the underlying input stream.
private final int readAheadThresholdInBytes;

private final InputStream underlyingInputStream;

private final ExecutorService executorService = Executors.newSingleThreadExecutor();

private final Condition asyncReadComplete = stateChangeLock.newCondition();

private final byte[] oneByte = new byte[1];

public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @param for these parameters?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Preconditions.checkArgument(bufferSizeInBytes > 0, "bufferSizeInBytes should be greater than 0");
Preconditions.checkArgument(readAheadThresholdInBytes > 0 && readAheadThresholdInBytes < bufferSizeInBytes,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Do we want to ensure that the line length is below 100? @cloud-fan

"readAheadThresholdInBytes should be greater than 0 and less than bufferSizeInBytes" );
activeBuffer = ByteBuffer.allocate(bufferSizeInBytes);
readAheadBuffer = ByteBuffer.allocate(bufferSizeInBytes);
this.readAheadThresholdInBytes = readAheadThresholdInBytes;
this.underlyingInputStream = inputStream;
activeBuffer.flip();
readAheadBuffer.flip();
}

private boolean isEndOfStream() {
if(activeBuffer.remaining() == 0 && readAheadBuffer.remaining() == 0 && endOfStream) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

tiny nit: hasRemaining instead ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return true;
}
return false;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: double spaces

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not just return !activeBuffer.hasRemaining() && !readAheadBuffer.hasRemaining() && endOfStream?

}
Copy link
Contributor

@mridulm mridulm Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant hasRemaining on the buffer's instead of remaining() == 0.
The earlier method name isEndOfStream looks more appropriate (and is consistent with the return value).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. My bad, changed accordingly



private void readAsync(final ByteBuffer byteBuffer) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

assert null != byteBuffer

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skip this, will just add to noise ... I added it because an NPE after lock is held can happen with byteBuffer = null for whatever reason.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add an empty line between two methods

stateChangeLock.lock();
if (endOfStream || isReadInProgress) {
stateChangeLock.unlock();
return;
}
byteBuffer.position(0);
byteBuffer.flip();
isReadInProgress = true;
stateChangeLock.unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put stateChangeLock.unlock() in finally

executorService.execute(() -> {
byte[] arr;
stateChangeLock.lock();
arr = byteBuffer.array();
stateChangeLock.unlock();
// Please note that it is safe to release the lock and read into the read ahead buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The byteBuffer is not necessarily the read ahead buffer, it can be the active buffer if read is called for the first time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment accordingly.

// because either of following two conditions will hold - 1. The active buffer has
// data available to read so the reader will not read from the read ahead buffer.
// 2. The active buffer is exhausted, in that case the reader waits for this async
// read to complete. So there is no race condition in both the situations.
int nRead = 0;
while (nRead == 0) {
try {
nRead = underlyingInputStream.read(arr);
if (nRead < 0) {
// We hit end of the underlying input stream
break;
}
} catch (Exception e) {
stateChangeLock.lock();
// We hit a read exception, which should be propagated to the reader
// in the next read() call.
isReadAborted = true;
readException = e;
stateChangeLock.unlock();
}
}
stateChangeLock.lock();
if (nRead < 0) {
endOfStream = true;
}
else {
// fill the byte buffer
byteBuffer.limit(nRead);
}
isReadInProgress = false;
signalAsyncReadComplete();
stateChangeLock.unlock();
Copy link
Contributor

@mridulm mridulm Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In order to prevent issues, and to make the code robust, it is better to wrap the body in try/finally and make all the state changes in the finally block atomically.
Something like (paraphrasing) :

// Remove local lock/unlock and move it into readAsync() before `stateChangeLock.unlock();` (so within stateChangeLock)
/*
stateChangeLock.lock();
final byte[] arr = byteBuffer.array();
stateChangeLock.unlock();
*/

boolean handled = false;
int read = 0;
Exception exception = null;
try {
  while (true) {
    read = underlyingInputStream.read(arr);
    if (0 != read) break;
  }
  handled = true;
} catch (Exception ex) {
  exception = ex;
} finally {
    stateChangeLock.lock();
    if (read < 0 || (exception instanceof EOFException) ) {
      endOfStream = true;
    } else if (null != exception || ! handled) {
      readAborted = true;
      readException = exception;
    } else {
      byteBuffer.limit(read);
    }
    readInProgress = false;
    signalAsyncReadComplete();
    stateChangeLock.unlock();
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If readAborted = true and readException = null will cause issues, we can set it to some stock exception (to handle non-Exception's being thrown)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious, is it even possible that a null exception in java? AFAIK throw null will actually throw an NPE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I made the comment, I maent it to guard against the cases where readException = null with readAborted = true causing NPE in case there is code like
if (readAborted) throw readException;
I dont think I saw some construct like that - but in case it existed, wanted to avoid it.

});
}

private void signalAsyncReadComplete() {
stateChangeLock.lock();
try {
asyncReadComplete.signalAll();
} finally {
stateChangeLock.unlock();
}
}

private void waitForAsyncReadComplete() {
stateChangeLock.lock();
try {
asyncReadComplete.await();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You need to put this in a while loop like this to handle spurious wakeup:

while (readInProgress) {
    asyncReadComplete.await();
}

} catch (InterruptedException e) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto: don't swallow InterruptedException

} finally {
stateChangeLock.unlock();
}
}

@Override
public synchronized int read() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am assuming this is synchronized to allow for reuse of oneByte ? Any other reason ?
If only for oneByte, we could make it static thread local and remove the additional sync ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All public apis are synchronized because we can not depend on stateChangeLock to synchronize the api calls because the stateChangeLock can be reliniquished by any read call in the waitForAsyncReadComplete function

Copy link
Contributor

@mridulm mridulm Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I follow - if all state change and critical sections were protected by stateChangeLock - then why synchronized ? It looks like unnecessary overhead : having said that, it is possible that I have missed some flow where synchronized helps, I want to understand what those are.

I think use of oneByte was one such case IIRC ? (which is why I suggested moving it into a ThreadLocal variable shared across streams :-) )

int val = read(oneByte, 0, 1);
if (val == -1) {
return -1;
}
return oneByte[0] & 0xFF;
}

@Override
public synchronized int read(byte[] b, int offset, int len) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason this method is synchronized ? It is locking on stateChangeLock anyway ...

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm - This is synchronized because the stateChangeLock can be reliniquished by any read call in the waitForAsyncReadComplete function. so depending on stateChangeLock for synchronizing external apis are not thread-safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mention above, as long as state is read/updated only with the lock held, I dont see how release of lock in waitForAsyncReadComplete is causing issues.
stateChangeLock is a reentrant lock, so if at entry lock was held, on exit it will continue to be held; and if at entry it was not, at exit it will not be due to the try/finally acquired/release [1].
Did I miss something here ? Perhaps I misunderstood your comment ?

[1]

import java.util.concurrent.locks.ReentrantLock
val stateChangeLock = new ReentrantLock()
stateChangeLock.lock()
stateChangeLock.lock()
stateChangeLock.unlock()
stateChangeLock.isLocked
stateChangeLock.unlock()
stateChangeLock.isLocked

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are few reasons I am hesitant to remove the synchronized from the public apis.

  1. Some variables like executorService, underlyingInputStream are not protected by the statechange lock. There might be a race condition when we try to execute executorService.execute call at the same time shutdown the executor service in close function call.
  2. I am not sure what will be the behavior if multiple threads are waiting on asyncReadComplete.await() call. Is the signaling thread going to wake only one of them? If yes, is it going to do it in the order? If it is not guaranteed to wake up the callers in order, then the read calls which are issued from different threads at different point of time are not guaranteed to read them in order. Is that acceptable?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

executorService and underlyingInputStream are final right ? So there is no concurrent mods to them.
If you are worried about thread safety of these - executorService is thread safe (jdk contract).
In your code, underlyingInputStream must be invoked in order (if it is not, then we have a bug) : synchronized block is not protecting underlyingInputStream anyway.

For reads on ReadAheadInputStream: there is no gaurantee of ordering in case of parallel read's for InputStream's - did I miss some expectation that this class provides that gaurantee to its consumers ?

Note that I want to understand why we need synchronized, and if there is a need for it, sure we can keep it as such :-)
The only reason I see, as of now, is that use of oneByte array is problematic : if we eliminate that via a thread local or rewrite code to handle read() without using read(byte[]), then I dont see a current need for it. Please let me know if I am missing something.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mridulm - After iterating through the code several times and thinking about it I convinced myself that we do not need the synchronized for the public apis. I also changed the use the oneByte array to be thread local so that we do not have any race-condition there.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying @sitalkedia.
I was not sure if I was missing something, particularly for the way it is being used and the asyncRead vs normal read usecases, w.r.t synchronization requirements.

I will wait for @zsxwing to also take a look, but this is looking really good !

stateChangeLock.lock();
try {
len = readInternal(b, offset, len);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can just return readInternal(b, offset, len);

}
finally {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: } finally {

stateChangeLock.unlock();
}
return len;
}

/**
* Internal read function which should be called only from read() api. The assumption is that
* the stateChangeLock is already acquired in the caller before calling this function.
*/
private int readInternal(byte[] b, int offset, int len) throws IOException {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we insert assert(stateChangeLock.isLocked()) after declaring stateChangeLock as ReentrantLock?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe offset < 0 || len < 0 || offset + len > b.length is enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may take care an underflow of offset + len (e.g. offset = 0x40000000, len= 0x40000000, and b.length = 0x50000000).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Copy link
Contributor

@mridulm mridulm Aug 23, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could move this to before lock is acquired - in read(byte[], int, int) itself.
readInternal is not exposed externally, and can take sanitized inputs (mirror'ing java.io.* streams for example).

We should also special case degenerate case of 0 == len in read(byte[], int, int).
Right now, even if len = 0, we can trigger a thread block until async read is completed.
Additionally, we can return -1 or throw exception when user asked to read 0 bytes.

IIRC this might be non standard - do we depend on this behavior ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

throw new IndexOutOfBoundsException();
}
if (!activeBuffer.hasRemaining() && !isReadInProgress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

     if (!activeBuffer.hasRemaining()) {
     	if (!isReadInProgress) {
     		// This condition will only be triggered for the first time read is called.
       		readAsync(activeBuffer);
     	}
     	waitForAsyncReadComplete();
     }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

// This condition will only be triggered for the first time read is called.
readAsync(activeBuffer);
waitForAsyncReadComplete();
}
if (!activeBuffer.hasRemaining() && isReadInProgress) {
waitForAsyncReadComplete();
}

if (isReadAborted) {
throw new IOException(readException);
}
if (isEndOfStream()) {
return -1;
}
len = Math.min(len, activeBuffer.remaining());
activeBuffer.get(b, offset, len);

if (activeBuffer.remaining() <= readAheadThresholdInBytes && !readAheadBuffer.hasRemaining()) {
readAsync(readAheadBuffer);
}
if (!activeBuffer.hasRemaining()) {
ByteBuffer temp = activeBuffer;
activeBuffer = readAheadBuffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is a race condition here: the next read will try to read from a buffer that is being read from the underlying input stream.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any race condition here. The readInternal function is protected by the stateChangeLock so only one thread is guaranteed to atomically flip the buffer. Am I missing something?

readAheadBuffer = temp;
}
return len;
}

@Override
public synchronized int available() throws IOException {
stateChangeLock.lock();
int val = activeBuffer.remaining() + readAheadBuffer.remaining();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we always ensure val >=0? I am afraid about overflow of + of two ints.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia Is val always positive?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, added a check

stateChangeLock.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, looking at available() contract,
Returns an estimate of the number of bytes that can be read (or skipped over) from this input stream without blocking by the *next invocation of a method* for this input stream..

We cannot read activeBuffer.remaining() + readAheadBuffer.remaining() in a single invocation - only activeBuffer.remaining(), right ? We can read upto activeBuffer.remaining() + readAheadBuffer.remaining() (for example) in two non blocking invocations, but that is slightly different from the expectation.

This is a subtle distinction, and it might be fine to continue with what we have here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put stateChangeLock.unlock(); into finally. Then you can remove int val and just return the whole expression.

return val;
}

@Override
public synchronized long skip(long n) throws IOException {
stateChangeLock.lock();
long skipped;
try {
skipped = skipInternal(n);
} finally {
stateChangeLock.unlock();
}
return skipped;
}

/**
* Internal skip function which should be called only from skip() api. The assumption is that
* the stateChangeLock is already acquired in the caller before calling this function.
*/
private long skipInternal(long n) throws IOException {
if (n <= 0L) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we insert assert(stateChangeLock.isLocked()) after declaring stateChangeLock as ReentrantLock?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

return 0L;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this into skip itself, before lock is acquired.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

if (isReadInProgress) {
waitForAsyncReadComplete();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not always required - for example:
bufferSizeInBytes = 10240
readAheadThresholdInBytes = 1024
buffer.remaining() = 512
n = 10

If this will complicate code too much, it is fine to avoid handling for now.

if (available() >= n) {
// we can skip from the internal buffers
int toSkip = (int)n;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to take care for underflow (e.g. n = 0x80000000) by using Ints.checkedCast().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia How about this? Is this code available to ensure n < 0x80000000?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @mridulm pointed, since n < available() which returns an int, so n is guaranteed to be within integer range. So we should not be worried about the casting here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I overlooked this comment.

byte[] temp = new byte[toSkip];
while (toSkip > 0) {
int skippedBytes = read(temp, 0, toSkip);
toSkip -= skippedBytes;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to actually read (which involves data copy) ? Simply moving limit's should suffice ?
Note: the suffix invariants in readInternal should be handled - namely scheduling async read and buffer flipping - which could be a moved into a method in itself.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm actually that was making the code a little simpler at the expense of data copy. Changed it to avoid any data copy.

return n;
}
int skippedBytes = available();
long toSkip = n - skippedBytes;
activeBuffer.position(0);
activeBuffer.flip();
readAheadBuffer.position(0);
readAheadBuffer.flip();
long skippedFromInputStream = underlyingInputStream.skip(toSkip);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Trigger asyncRead here ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, actually after adding this code, I found out a critical bug in waitForAsyncReadComplete function call which I have fixed. Thanks!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious what the issue was, can you elaborate please ? Thanks !

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Earlier in waitForAsyncReadComplete function I was not actually checking if readInProgress. And this was causing some corner case where read was not in progress and we were just blocked on the signal. In readInternal function for example, we are issueing an async read call but it is possible that the async call finish before the main thread actually waits on the asyncReadComplete signal. so waitForAsyncReadComplete might be blocked forever without the check.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, excellent point - I missed this.
Thanks for debugging it !

return skippedBytes + skippedFromInputStream;
}

@Override
public synchronized void close() throws IOException {
executorService.shutdown();
StorageUtils.dispose(activeBuffer);
StorageUtils.dispose(readAheadBuffer);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

await termination of executorService ?
ReadAheadInputStream can continue to read after close is invoked : which will be happening on stream which might be closed and buffers which might be disposed.

Access to activeBuffer/readAheadBuffer should be done within stateChangeLock ?
Also, delegate close() to underlyingInputStream ?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, fixed it.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@

import com.google.common.io.ByteStreams;
import com.google.common.io.Closeables;

import org.apache.spark.SparkEnv;
import org.apache.spark.TaskContext;
import org.apache.spark.io.NioBufferedFileInputStream;
import org.apache.spark.io.ReadAheadInputStream;
import org.apache.spark.serializer.SerializerManager;
import org.apache.spark.storage.BlockId;
import org.apache.spark.unsafe.Platform;
Expand Down Expand Up @@ -73,7 +73,9 @@ public UnsafeSorterSpillReader(
}

final InputStream bs =
new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
new ReadAheadInputStream(
new NioBufferedFileInputStream(file, (int) bufferSizeBytes),
(int)bufferSizeBytes, (int)bufferSizeBytes / 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test on what would be the best threshold? Should we consider make this configurable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our test shows best result was obtained when threshold = 0.5* buffer size. I made it configurable anyways in case someone wants to play with it.

try {
this.in = serializerManager.wrapStream(blockId, bs);
this.din = new DataInputStream(this.in);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,13 @@
/**
* Tests functionality of {@link NioBufferedFileInputStream}
*/
public class NioBufferedFileInputStreamSuite {
public abstract class GenericFileInputStreamSuite {

private byte[] randomBytes;

private File inputFile;
protected File inputFile;

protected InputStream inputStream;

@Before
public void setUp() throws IOException {
Expand All @@ -52,15 +54,13 @@ public void tearDown() {

@Test
public void testReadOneByte() throws IOException {
InputStream inputStream = new NioBufferedFileInputStream(inputFile);
for (int i = 0; i < randomBytes.length; i++) {
assertEquals(randomBytes[i], (byte) inputStream.read());
}
}

@Test
public void testReadMultipleBytes() throws IOException {
InputStream inputStream = new NioBufferedFileInputStream(inputFile);
byte[] readBytes = new byte[8 * 1024];
int i = 0;
while (i < randomBytes.length) {
Expand All @@ -74,7 +74,6 @@ public void testReadMultipleBytes() throws IOException {

@Test
public void testBytesSkipped() throws IOException {
InputStream inputStream = new NioBufferedFileInputStream(inputFile);
assertEquals(1024, inputStream.skip(1024));
for (int i = 1024; i < randomBytes.length; i++) {
assertEquals(randomBytes[i], (byte) inputStream.read());
Expand All @@ -83,7 +82,6 @@ public void testBytesSkipped() throws IOException {

@Test
public void testBytesSkippedAfterRead() throws IOException {
InputStream inputStream = new NioBufferedFileInputStream(inputFile);
for (int i = 0; i < 1024; i++) {
assertEquals(randomBytes[i], (byte) inputStream.read());
}
Expand All @@ -95,7 +93,6 @@ public void testBytesSkippedAfterRead() throws IOException {

@Test
public void testNegativeBytesSkippedAfterRead() throws IOException {
InputStream inputStream = new NioBufferedFileInputStream(inputFile);
for (int i = 0; i < 1024; i++) {
assertEquals(randomBytes[i], (byte) inputStream.read());
}
Expand All @@ -111,7 +108,6 @@ public void testNegativeBytesSkippedAfterRead() throws IOException {

@Test
public void testSkipFromFileChannel() throws IOException {
InputStream inputStream = new NioBufferedFileInputStream(inputFile, 10);
// Since the buffer is smaller than the skipped bytes, this will guarantee
// we skip from underlying file channel.
assertEquals(1024, inputStream.skip(1024));
Expand All @@ -128,7 +124,6 @@ public void testSkipFromFileChannel() throws IOException {

@Test
public void testBytesSkippedAfterEOF() throws IOException {
InputStream inputStream = new NioBufferedFileInputStream(inputFile);
assertEquals(randomBytes.length, inputStream.skip(randomBytes.length + 1));
assertEquals(-1, inputStream.read());
}
Expand Down
Loading