-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-37089][SQL] Do not register ParquetFileFormat completion listener lazily #34369
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
…in ParquetFileFormat" This reverts commit 4badb76.
| iter.asInstanceOf[Iterator[InternalRow]] | ||
| } catch { | ||
| case e: Throwable => | ||
| // SPARK-23457: In case there is an exception in initialization, close the iterator to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we let the caller FileScanRDD close the iterator when hitting errors?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In general, I think FileScanRDD does close the iterator when hitting exceptions, because it uses a task completion listener to do so. The only case where it will not close the iterator is when the exception prevents FileScanRDD from getting a reference to the iterator, as is the case here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah I see!
|
Test build #144538 has finished for PR 34369 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #144543 has finished for PR 34369 at commit
|
|
Thank you for pinging me, @ankurdave . |
|
cc @sunchao |
| iter.closeIfNeeded() | ||
| case iter: Closeable => | ||
| iter.close() | ||
| case _ => // do nothing |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When does this happen? Only when currentIterator is null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are currently two cases aside from null:
- OrcFileFormat produces an ordinary non-Closeable Iterator due to unwrapOrcStructs().
- The user can create a FileScanRDD with an arbitrary readFunction that does not return a Closeable Iterator.
It would be ideal if we could disallow these cases and require the iterator to be Closeable, but it seems that would require changing public APIs.
|
Test build #144544 has finished for PR 34369 at commit
|
sunchao
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM - just one minor question
| private lazy val internalIter = readCurrentFile() | ||
| // vectorized Parquet reader. Here we use a lazily initialized variable to delay the | ||
| // creation of iterator so that we will throw exception in `getNext`. | ||
| private var internalIter: Iterator[InternalRow] = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hm why is this change necessary?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the downstream operator never pulls any rows from the iterator, then the first time we access internalIter will be when close() is called. If internalIter is a lazy val, this will trigger a call to readCurrentFile(), which is unnecessary and may throw. Changing internalIter from a lazy val to a var lets us avoid this unnecessary call.
Several tests fail without this change, including AvroV1Suite.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. Thanks
|
Merged to master and branch-3.2. |
…ner lazily ### What changes were proposed in this pull request? The previous PR #34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault. The fix is to close the output iterator using FileScanRDD's task completion listener. ### Why are the changes needed? Without this PR, the Python tests introduced in #34245 are flaky ([see details in thread](#34245 (comment))). They intermittently fail with a segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Repeatedly ran one of the Python tests introduced in #34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs. ```sh ./build/sbt -Phive clean package && ./build/sbt test:compile seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf' ``` Closes #34369 from ankurdave/SPARK-37089. Authored-by: Ankur Dave <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 1fc1d07) Signed-off-by: Hyukjin Kwon <[email protected]>
…ner lazily ### What changes were proposed in this pull request? The previous PR apache#34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault. The fix is to close the output iterator using FileScanRDD's task completion listener. ### Why are the changes needed? Without this PR, the Python tests introduced in apache#34245 are flaky ([see details in thread](apache#34245 (comment))). They intermittently fail with a segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Repeatedly ran one of the Python tests introduced in apache#34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs. ```sh ./build/sbt -Phive clean package && ./build/sbt test:compile seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf' ``` Closes apache#34369 from ankurdave/SPARK-37089. Authored-by: Ankur Dave <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 1fc1d07) Signed-off-by: Hyukjin Kwon <[email protected]>
…ner lazily ### What changes were proposed in this pull request? The previous PR apache#34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault. The fix is to close the output iterator using FileScanRDD's task completion listener. ### Why are the changes needed? Without this PR, the Python tests introduced in apache#34245 are flaky ([see details in thread](apache#34245 (comment))). They intermittently fail with a segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Repeatedly ran one of the Python tests introduced in apache#34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs. ```sh ./build/sbt -Phive clean package && ./build/sbt test:compile seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf' ``` Closes apache#34369 from ankurdave/SPARK-37089. Authored-by: Ankur Dave <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 1fc1d07) Signed-off-by: Hyukjin Kwon <[email protected]>
…ner lazily ### What changes were proposed in this pull request? The previous PR apache#34245 assumed task completion listeners are registered bottom-up. `ParquetFileFormat#buildReaderWithPartitionValues()` violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault. The fix is to close the output iterator using FileScanRDD's task completion listener. ### Why are the changes needed? Without this PR, the Python tests introduced in apache#34245 are flaky ([see details in thread](apache#34245 (comment))). They intermittently fail with a segfault. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Repeatedly ran one of the Python tests introduced in apache#34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs. ```sh ./build/sbt -Phive clean package && ./build/sbt test:compile seq 1000 | parallel -j 8 --halt now,fail=1 'echo {#}; python/run-tests --testnames pyspark.sql.tests.test_udf' ``` Closes apache#34369 from ankurdave/SPARK-37089. Authored-by: Ankur Dave <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]> (cherry picked from commit 1fc1d07) Signed-off-by: Hyukjin Kwon <[email protected]>
What changes were proposed in this pull request?
The previous PR #34245 assumed task completion listeners are registered bottom-up.
ParquetFileFormat#buildReaderWithPartitionValues()violates this assumption by registering a task completion listener to close its output iterator lazily. Since task completion listeners are executed in reverse order of registration, this listener always runs before other listeners. When the downstream operator contains a Python UDF and the off-heap vectorized Parquet reader is enabled, this results in a use-after-free that causes a segfault.The fix is to close the output iterator using FileScanRDD's task completion listener.
Why are the changes needed?
Without this PR, the Python tests introduced in #34245 are flaky (see details in thread). They intermittently fail with a segfault.
Does this PR introduce any user-facing change?
No.
How was this patch tested?
Repeatedly ran one of the Python tests introduced in #34245 using the commands below. Previously, the test was flaky and failed after about 50 runs. With this PR, the test has not failed after 1000 runs.