Skip to content

Commit d25cbd4

Browse files
HyukjinKwonMarcelo Vanzin
authored andcommitted
[SPARK-28839][CORE] Avoids NPE in context cleaner when dynamic allocation and shuffle service are on
### What changes were proposed in this pull request? This PR proposes to avoid to thrown NPE at context cleaner when shuffle service is on - it is kind of a small followup of #24817 Seems like it sets `null` for `shuffleIds` to track when the service is on. Later, `removeShuffle` tries to remove an element at `shuffleIds` which leads to NPE. It fixes it by explicitly not sending the event (`ShuffleCleanedEvent`) in this case. See the code path below: https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/SparkContext.scala#L584 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L125 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L190 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/ContextCleaner.scala#L220-L230 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L353-L357 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L347 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L400-L406 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L475 https://github.com/apache/spark/blob/cbad616d4cb0c58993a88df14b5e30778c7f7e85/core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala#L427 ### Why are the changes needed? This is a bug fix. ### Does this PR introduce any user-facing change? It prevents the exception: ``` 19/08/21 06:44:01 ERROR AsyncEventQueue: Listener ExecutorMonitor threw an exception java.lang.NullPointerException at org.apache.spark.scheduler.dynalloc.ExecutorMonitor$Tracker.removeShuffle(ExecutorMonitor.scala:479) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2(ExecutorMonitor.scala:408) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.$anonfun$cleanupShuffle$2$adapted(ExecutorMonitor.scala:407) at scala.collection.Iterator.foreach(Iterator.scala:941) at scala.collection.Iterator.foreach$(Iterator.scala:941) at scala.collection.AbstractIterator.foreach(Iterator.scala:1429) at scala.collection.IterableLike.foreach(IterableLike.scala:74) at scala.collection.IterableLike.foreach$(IterableLike.scala:73) at scala.collection.AbstractIterable.foreach(Iterable.scala:56) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.cleanupShuffle(ExecutorMonitor.scala:407) at org.apache.spark.scheduler.dynalloc.ExecutorMonitor.onOtherEvent(ExecutorMonitor.sc ``` ### How was this patch test? Unittest was added. Closes #25551 from HyukjinKwon/SPARK-28839. Authored-by: HyukjinKwon <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
1 parent 07c4b9b commit d25cbd4

File tree

2 files changed

+19
-3
lines changed

2 files changed

+19
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitor.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,9 +355,12 @@ private[spark] class ExecutorMonitor(
355355
override def rddCleaned(rddId: Int): Unit = { }
356356

357357
override def shuffleCleaned(shuffleId: Int): Unit = {
358-
// Because this is called in a completely separate thread, we post a custom event to the
359-
// listener bus so that the internal state is safely updated.
360-
listenerBus.post(ShuffleCleanedEvent(shuffleId))
358+
// Only post the event if tracking is enabled
359+
if (shuffleTrackingEnabled) {
360+
// Because this is called in a completely separate thread, we post a custom event to the
361+
// listener bus so that the internal state is safely updated.
362+
listenerBus.post(ShuffleCleanedEvent(shuffleId))
363+
}
361364
}
362365

363366
override def broadcastCleaned(broadcastId: Long): Unit = { }

core/src/test/scala/org/apache/spark/scheduler/dynalloc/ExecutorMonitorSuite.scala

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -333,6 +333,19 @@ class ExecutorMonitorSuite extends SparkFunSuite {
333333
assert(monitor.timedOutExecutors(idleDeadline) === Seq("1"))
334334
}
335335

336+
337+
test("SPARK-28839: Avoids NPE in context cleaner when shuffle service is on") {
338+
val bus = mockListenerBus()
339+
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, true)
340+
monitor = new ExecutorMonitor(conf, client, bus, clock) {
341+
override def onOtherEvent(event: SparkListenerEvent): Unit = {
342+
throw new IllegalStateException("No event should be sent.")
343+
}
344+
}
345+
monitor.onExecutorAdded(SparkListenerExecutorAdded(clock.getTimeMillis(), "1", null))
346+
monitor.shuffleCleaned(0)
347+
}
348+
336349
test("shuffle tracking with multiple executors and concurrent jobs") {
337350
val bus = mockListenerBus()
338351
conf.set(DYN_ALLOCATION_SHUFFLE_TRACKING, true).set(SHUFFLE_SERVICE_ENABLED, false)

0 commit comments

Comments
 (0)