Skip to content

Conversation

sitalkedia
Copy link

Profiling some of our big jobs, we see that around 30% of the time is being spent in reading the spill files from disk. In order to amortize the disk IO cost, the idea is to implement a read ahead input stream which asynchronously reads ahead from the underlying input stream when specified amount of data has been read from the current buffer. It does it by maintaining two buffer - active buffer and read ahead buffer. The active buffer contains data which should be returned when a read() call is issued. The read-ahead buffer is used to asynchronously read from the underlying input stream and once the active buffer is exhausted, we flip the two buffers so that we can start reading from the read ahead buffer without being blocked in disk I/O.

How was this patch tested?

Tested by running a job on the cluster and could see up to 8% CPU improvement.

@SparkQA
Copy link

SparkQA commented Jun 15, 2017

Test build #78122 has finished for PR 18317 at commit d42000f.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public class ReadAheadInputStream extends InputStream

@sitalkedia
Copy link
Author

cc - @rxin, @srowen, @sameeragarwal, @zsxwing, @mridulm

@SparkQA
Copy link

SparkQA commented Jun 15, 2017

Test build #78124 has finished for PR 18317 at commit f687918.

  • This patch fails RAT tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

Jenkins retest this please.

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78125 has finished for PR 18317 at commit e138c0d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78127 has finished for PR 18317 at commit e138c0d.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 16, 2017

Test build #78132 has finished for PR 18317 at commit dbee3e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public abstract class GenericFileInputStreamSuite

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like this idea, left a few comments.

*/
private int readInternal(byte[] b, int offset, int len) throws IOException {

if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe offset < 0 || len < 0 || offset + len > b.length is enough?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may take care an underflow of offset + len (e.g. offset = 0x40000000, len= 0x40000000, and b.length = 0x50000000).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

stateChangeLock.lock();
arr = byteBuffer.array();
stateChangeLock.unlock();
// Please note that it is safe to release the lock and read into the read ahead buffer
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The byteBuffer is not necessarily the read ahead buffer, it can be the active buffer if read is called for the first time.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the comment accordingly.

if (offset < 0 || len < 0 || offset + len < 0 || offset + len > b.length) {
throw new IndexOutOfBoundsException();
}
if (!activeBuffer.hasRemaining() && !isReadInProgress) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about:

     if (!activeBuffer.hasRemaining()) {
     	if (!isReadInProgress) {
     		// This condition will only be triggered for the first time read is called.
       		readAsync(activeBuffer);
     	}
     	waitForAsyncReadComplete();
     }

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
new ReadAheadInputStream(
new NioBufferedFileInputStream(file, (int) bufferSizeBytes),
(int)bufferSizeBytes, (int)bufferSizeBytes / 2);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we have a test on what would be the best threshold? Should we consider make this configurable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our test shows best result was obtained when threshold = 0.5* buffer size. I made it configurable anyways in case someone wants to play with it.

@jiangxb1987
Copy link
Contributor

retest this please.

@SparkQA
Copy link

SparkQA commented Jul 7, 2017

Test build #79334 has started for PR 18317 at commit dbee3e1.

@shaneknapp
Copy link
Contributor

i'm going to kill the current build and restart it once i finish doing some package upgrades.

@shaneknapp
Copy link
Contributor

test this please

@SparkQA
Copy link

SparkQA commented Jul 7, 2017

Test build #79336 has finished for PR 18317 at commit dbee3e1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • public abstract class GenericFileInputStreamSuite

@jiangxb1987
Copy link
Contributor

ping @sitalkedia


private final byte[] oneByte = new byte[1];

public ReadAheadInputStream(InputStream inputStream, int bufferSizeInBytes, int readAheadThresholdInBytes) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add @param for these parameters?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* the stateChangeLock is already acquired in the caller before calling this function.
*/
private int readInternal(byte[] b, int offset, int len) throws IOException {

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we insert assert(stateChangeLock.isLocked()) after declaring stateChangeLock as ReentrantLock?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

* the stateChangeLock is already acquired in the caller before calling this function.
*/
private long skipInternal(long n) throws IOException {
if (n <= 0L) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we insert assert(stateChangeLock.isLocked()) after declaring stateChangeLock as ReentrantLock?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

}
if (available() >= n) {
// we can skip from the internal buffers
int toSkip = (int)n;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

May need to take care for underflow (e.g. n = 0x80000000) by using Ints.checkedCast().

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sitalkedia How about this? Is this code available to ensure n < 0x80000000?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As @mridulm pointed, since n < available() which returns an int, so n is guaranteed to be within integer range. So we should not be worried about the casting here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. I overlooked this comment.

@sitalkedia
Copy link
Author

@jiangxb1987 - Sorry haven't gotten time to work on this lately. Will address comments in next few days.

@sitalkedia
Copy link
Author

@jiangxb1987, @kiszk Addressed review comments, lmk what you guys think.

BTW, this idea can be applied to other places when we block on reading the input stream like HDFS reading. What do you think?

}
if (available() >= n) {
// we can skip from the internal buffers
int toSkip = Ints.checkedCast(n);
Copy link
Contributor

@mridulm mridulm Aug 4, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In general, checkedCast must be used when parameter must be an Int as a precondition [1] - not when it is being coerced into an int as part of impl detail.
It is valid for n to be > Integer.MAX_VALUE as a parameter value.

Having said that : available() is an int - so if n is less than/equal to available(), it is within Int bounds.
Using Ints.checkedCast here would be incorrect and noise.

[1] checkedCast will throw an exception - which would have been the wrong behavior here given contract says n is long.

@SparkQA
Copy link

SparkQA commented Aug 5, 2017

Test build #80267 has finished for PR 18317 at commit 42740be.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 5, 2017

Test build #80268 has finished for PR 18317 at commit a83a3d2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@mridulm mridulm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given the sensitive nature of the change, can you take a look at it @zsxwing if you have the bandwidth ?
I want to make sure we are not missing any corner cases !

return true;
}
return false;
}
Copy link
Contributor

@mridulm mridulm Aug 29, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant hasRemaining on the buffer's instead of remaining() == 0.
The earlier method name isEndOfStream looks more appropriate (and is consistent with the return value).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. My bad, changed accordingly

public void run() {
stateChangeLock.lock();
byte[] arr = byteBuffer.array();
stateChangeLock.unlock();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock/unlock here can be removed - and final byte[] arr moved to earlier stateChangeLock locked block in readAsync.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

// So there is no race condition in both the situations.
boolean handled = false;
int read = 0;
Exception exception = new Exception("Unknown exception in ReadAheadInputStream");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should do this only in ! handled block - so that we create Exception object only when required - and not for each readAsync.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, changed

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

-> Exception exception = null

}

@Override
public synchronized int read(byte[] b, int offset, int len) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As I mention above, as long as state is read/updated only with the lock held, I dont see how release of lock in waitForAsyncReadComplete is causing issues.
stateChangeLock is a reentrant lock, so if at entry lock was held, on exit it will continue to be held; and if at entry it was not, at exit it will not be due to the try/finally acquired/release [1].
Did I miss something here ? Perhaps I misunderstood your comment ?

[1]

import java.util.concurrent.locks.ReentrantLock
val stateChangeLock = new ReentrantLock()
stateChangeLock.lock()
stateChangeLock.lock()
stateChangeLock.unlock()
stateChangeLock.isLocked
stateChangeLock.unlock()
stateChangeLock.isLocked

activeBuffer.flip();
readAheadBuffer.position(0);
readAheadBuffer.flip();
long skippedFromInputStream = underlyingInputStream.skip(toSkip);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am curious what the issue was, can you elaborate please ? Thanks !

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81197 has finished for PR 18317 at commit 28e767d.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 29, 2017

Test build #81227 has finished for PR 18317 at commit c7b60a5.

  • This patch fails SparkR unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Aug 30, 2017

Test build #81238 has finished for PR 18317 at commit 8076e27.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@sitalkedia
Copy link
Author

ping @zsxwing !

@sitalkedia
Copy link
Author

Hi @zsxwing, could you find some time to take a look at this PR.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry for the delay. Made one pass. Will take another look tomorrow.

bufferSizeBytes = DEFAULT_BUFFER_SIZE_BYTES;
}

final Double readAheadFraction =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Double -> double

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

new NioBufferedFileInputStream(file, (int) bufferSizeBytes);
try {
this.in = serializerManager.wrapStream(blockId, bs);
this.in = new ReadAheadInputStream(serializerManager.wrapStream(blockId, bs),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add an internal conf to disable it? It will allow the user to disable it when the new feature causes a regression.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, done.

inputFile.delete();
}


Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: extra empty line

try {
executorService.awaitTermination(10, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please don't swallow InterruptedException.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can convert it to InterruptedIOException

StorageUtils.dispose(activeBuffer);
StorageUtils.dispose(readAheadBuffer);
}
finally {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use } finally {

// Make sure we have no integer overflow.
int val = (int) Math.min((long) Integer.MAX_VALUE,
(long) activeBuffer.remaining() + readAheadBuffer.remaining());
stateChangeLock.unlock();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: put stateChangeLock.unlock(); into finally. Then you can remove int val and just return the whole expression.

try {
len = readInternal(b, offset, len);
}
finally {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: } finally {

}
stateChangeLock.lock();
try {
len = readInternal(b, offset, len);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can just return readInternal(b, offset, len);

}
return false;
}
private void readAsync(final ByteBuffer byteBuffer) throws IOException {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: please add an empty line between two methods

}
if (!activeBuffer.hasRemaining()) {
ByteBuffer temp = activeBuffer;
activeBuffer = readAheadBuffer;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like there is a race condition here: the next read will try to read from a buffer that is being read from the underlying input stream.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any race condition here. The readInternal function is protected by the stateChangeLock so only one thread is guaranteed to atomically flip the buffer. Am I missing something?

@SparkQA
Copy link

SparkQA commented Sep 12, 2017

Test build #81648 has finished for PR 18317 at commit ed426f3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 12, 2017

Test build #81649 has finished for PR 18317 at commit 36ae448.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Sep 13, 2017

Test build #81694 has finished for PR 18317 at commit f30117e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

*/
private int readInternal(byte[] b, int offset, int len) throws IOException {
assert (stateChangeLock.isLocked());
if (!activeBuffer.hasRemaining()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems wrong when activeBuffer.hasRemaining() == false && readAheadBuffer.hasRemaining() == true. This may happen when read after skip(size of activeBuffer).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm.. not really, because if we skip(size of active buffer) then we will flip the active buffer and read ahead buffer. So the active buffer will actually have remaining bytes.

This commit includes the following changes:
- Close the underlying input stream only if it's not being read.
- Always read to `readAheadBuffer`.
@zsxwing
Copy link
Member

zsxwing commented Sep 13, 2017

I opened sitalkedia#1 in your repo. Could you take a look at it?

@sitalkedia
Copy link
Author

Thanks for the change. Left few comments there.

Close the underlying input stream safely
@SparkQA
Copy link

SparkQA commented Sep 14, 2017

Test build #81743 has finished for PR 18317 at commit 884c9d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Member

@zsxwing zsxwing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Thanks! Merging to master.

@asfgit asfgit closed this in 1e978b1 Sep 18, 2017
asfgit pushed a commit that referenced this pull request Feb 15, 2018
## What changes were proposed in this pull request?

`ReadAheadInputStream` was introduced in #18317 to optimize reading spill files from disk.
However, from the profiles it seems that the hot path of reading small amounts of data (like readInt) is inefficient - it involves taking locks, and multiple checks.

Optimize locking: Lock is not needed when simply accessing the active buffer. Only lock when needing to swap buffers or trigger async reading, or get information about the async state.

Optimize short-path single byte reads, that are used e.g. by Java library DataInputStream.readInt.

The asyncReader used to call "read" only once on the underlying stream, that never filled the underlying buffer when it was wrapping an LZ4BlockInputStream. If the buffer was returned unfilled, that would trigger the async reader to be triggered to fill the read ahead buffer on each call, because the reader would see that the active buffer is below the refill threshold all the time.

However, filling the full buffer all the time could introduce increased latency, so also add an `AtomicBoolean` flag for the async reader to return earlier if there is a reader waiting for data.

Remove `readAheadThresholdInBytes` and instead immediately trigger async read when switching the buffers. It allows to simplify code paths, especially the hot one that then only has to check if there is available data in the active buffer, without worrying if it needs to retrigger async read. It seems to have positive effect on perf.

## How was this patch tested?

It was noticed as a regression in some workloads after upgrading to Spark 2.3. 

It was particularly visible on TPCDS Q95 running on instances with fast disk (i3 AWS instances).
Running with profiling:
* Spark 2.2 - 5.2-5.3 minutes 9.5% in LZ4BlockInputStream.read
* Spark 2.3 - 6.4-6.6 minutes 31.1% in ReadAheadInputStream.read
* Spark 2.3 + fix - 5.3-5.4 minutes 13.3% in ReadAheadInputStream.read - very slightly slower, practically within noise.

We didn't see other regressions, and many workloads in general seem to be faster with Spark 2.3 (not investigated if thanks to async readed, or unrelated).

Author: Juliusz Sompolski <[email protected]>

Closes #20555 from juliuszsompolski/SPARK-23366.
@cxzl25
Copy link
Contributor

cxzl25 commented May 28, 2025

If we generate a large number of spill files, using ReadAhead's Task in UnsafeExternalSorter is easier to fail and OOM. By default, each spill file requires a 2MB in-heap buffer and creates a thread.
Perhaps ReadAhead is better and more stable for BytesToBytesMap.

Should we introduce a threshold configuration to control whether UnsafeExternalSorter uses ReadAhead to reduce memory usage on the executor? @mridulm @cloud-fan

org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter#getIterator

for (UnsafeSorterSpillWriter spillWriter : spillWriters) {
if (i + spillWriter.recordsSpilled() > startIndex) {
UnsafeSorterIterator iter = spillWriter.getReader(serializerManager);
moveOver(iter, startIndex - i);
queue.add(iter);
}

Closeables.close(reader, /* swallowIOException = */ false);
reader = spillWriters.getFirst().getReader(serializerManager);

Caused by: java.lang.OutOfMemoryError: Java heap space
	at java.base/java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:71)
	at java.base/java.nio.ByteBuffer.allocate(ByteBuffer.java:391)
	at org.apache.spark.io.ReadAheadInputStream.<init>(ReadAheadInputStream.java:105)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillReader.<init>(UnsafeSorterSpillReader.java:74)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeSorterSpillWriter.getReader(UnsafeSorterSpillWriter.java:159)
	at org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.getIterator(UnsafeExternalSorter.java:733)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.generateIterator(ExternalAppendOnlyUnsafeRowArray.scala:173)
	at org.apache.spark.sql.execution.ExternalAppendOnlyUnsafeRowArray.generateIterator(ExternalAppendOnlyUnsafeRowArray.scala:177)
	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceBufferUntilBoundConditionSatisfied(SortMergeJoinExec.scala:1240)
	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceStream(SortMergeJoinExec.scala:1222)
	at org.apache.spark.sql.execution.joins.OneSideOuterIterator.advanceNext(SortMergeJoinExec.scala:1254)

@mridulm
Copy link
Contributor

mridulm commented Jun 2, 2025

This is a fairly old change, so things could have evolved since then; can you create a PR for this proposal @cxzl25 ? Will help us evaluate the impact.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants