-
Notifications
You must be signed in to change notification settings - Fork 28.9k
[SPARK-23366] Improve hot reading path in ReadAheadInputStream #20555
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #87243 has finished for PR 20555 at commit
|
| while (readInProgress) { | ||
| isWaiting.set(true); | ||
| asyncReadComplete.await(); | ||
| isWaiting.set(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if await() throws an exception? Is it ok not to update isWaiting?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, I added isWaiting.set(false) to the finally branch.
Actually, since the whole implementation assumes that there is only one reader, I removed the while() loop, since there is no other reader to race with us to trigger another read.
In practice I think not updating isWaiting it would have been benign, as after the exception the query will be going down with an InterruptedException, or elsewise anyone upstream handling that exception would most probably declare that stream as unusable afterwards anyway.
| return -1; | ||
| } | ||
| } | ||
| // Swap the newly read read ahead buffer in place of empty active buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it good to use read-ahead instead of read ahead in comments for ease of reading?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other existing places in comments in the file use read ahead.
|
Test build #87339 has finished for PR 20555 at commit
|
|
Test build #87340 has finished for PR 20555 at commit
|
|
Also cc @jiangxb1987 @cloud-fan |
|
Test build #87337 has finished for PR 20555 at commit
|
|
Test build #87346 has finished for PR 20555 at commit
|
|
LGTM |
| if (isEndOfStream()) { | ||
| return -1; | ||
| if (!readAheadBuffer.hasRemaining()) { | ||
| // The first read or activeBuffer is skipped. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unrelated question: what does activeBuffer is skipped mean?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
skipped using skip().
I moved the comment over from a few lines above, but looking at skip() now I don't think it can happen - the skip would trigger an readAsync read in that case.
I'll update the comment.
|
cc @sameeragarwal @sitalkedia , shall we have this in Spark 2.3? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
|
It would also be great if we can add some unit tests on the read ahead stream model. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Thanks for fixing this @juliuszsompolski .
|
@jiangxb1987 there is ReadAheadInputStreamSuite that extends GenericFileInputStreamSuite. |
|
Test build #87415 has finished for PR 20555 at commit
|
|
Test build #87416 has finished for PR 20555 at commit
|
| isWaiting.set(true); | ||
| try { | ||
| while (readInProgress) { | ||
| if (readInProgress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The while loop here is to handle spurious wakeup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch, thanks!
| // we issue an async read from the underlying input stream. | ||
| private final int readAheadThresholdInBytes; | ||
| // whether there is a reader waiting for data. | ||
| private AtomicBoolean isWaiting = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just use volatile here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll leave it be - should compile to basically the same, and with using AtomicBoolean the intent seems more readable to me.
|
Looks pretty good. Left two minor comments. |
|
Test build #87422 has finished for PR 20555 at commit
|
|
LGTM |
| stateChangeLock.lock(); | ||
| isWaiting.set(true); | ||
| try { | ||
| while (readInProgress) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we add a comment about spurious wakeup? Otherwise someone else may still mistakenly remove it in the future.
|
Test build #87435 has finished for PR 20555 at commit
|
|
Test build #87438 has finished for PR 20555 at commit
|
|
LGTM |
|
thanks, merging to master! |
What changes were proposed in this pull request?
ReadAheadInputStreamwas introduced in #18317 to optimize reading spill files from disk.However, from the profiles it seems that the hot path of reading small amounts of data (like readInt) is inefficient - it involves taking locks, and multiple checks.
Optimize locking: Lock is not needed when simply accessing the active buffer. Only lock when needing to swap buffers or trigger async reading, or get information about the async state.
Optimize short-path single byte reads, that are used e.g. by Java library DataInputStream.readInt.
The asyncReader used to call "read" only once on the underlying stream, that never filled the underlying buffer when it was wrapping an LZ4BlockInputStream. If the buffer was returned unfilled, that would trigger the async reader to be triggered to fill the read ahead buffer on each call, because the reader would see that the active buffer is below the refill threshold all the time.
However, filling the full buffer all the time could introduce increased latency, so also add an
AtomicBooleanflag for the async reader to return earlier if there is a reader waiting for data.Remove
readAheadThresholdInBytesand instead immediately trigger async read when switching the buffers. It allows to simplify code paths, especially the hot one that then only has to check if there is available data in the active buffer, without worrying if it needs to retrigger async read. It seems to have positive effect on perf.How was this patch tested?
It was noticed as a regression in some workloads after upgrading to Spark 2.3.
It was particularly visible on TPCDS Q95 running on instances with fast disk (i3 AWS instances).
Running with profiling:
* Spark 2.2 - 5.2-5.3 minutes 9.5% in LZ4BlockInputStream.read
* Spark 2.3 - 6.4-6.6 minutes 31.1% in ReadAheadInputStream.read
* Spark 2.3 + fix - 5.3-5.4 minutes 13.3% in ReadAheadInputStream.read - very slightly slower, practically within noise.
We didn't see other regressions, and many workloads in general seem to be faster with Spark 2.3 (not investigated if thanks to async readed, or unrelated).