Skip to content

Commit 3cdae0f

Browse files
committed
[SPARK-17638][STREAMING] Stop JVM StreamingContext when the Python process is dead
## What changes were proposed in this pull request? When the Python process is dead, the JVM StreamingContext is still running. Hence we will see a lot of Py4jException before the JVM process exits. It's better to stop the JVM StreamingContext to avoid those annoying logs. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #15201 from zsxwing/stop-jvm-ssc.
1 parent 85d609c commit 3cdae0f

File tree

3 files changed

+35
-2
lines changed

3 files changed

+35
-2
lines changed

streaming/src/main/scala/org/apache/spark/streaming/api/python/PythonDStream.scala

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,11 +24,14 @@ import java.util.{ArrayList => JArrayList, List => JList}
2424
import scala.collection.JavaConverters._
2525
import scala.language.existentials
2626

27+
import py4j.Py4JException
28+
2729
import org.apache.spark.SparkException
2830
import org.apache.spark.api.java._
31+
import org.apache.spark.internal.Logging
2932
import org.apache.spark.rdd.RDD
3033
import org.apache.spark.storage.StorageLevel
31-
import org.apache.spark.streaming.{Duration, Interval, Time}
34+
import org.apache.spark.streaming.{Duration, Interval, StreamingContext, Time}
3235
import org.apache.spark.streaming.api.java._
3336
import org.apache.spark.streaming.dstream._
3437
import org.apache.spark.util.Utils
@@ -157,7 +160,7 @@ private[python] object PythonTransformFunctionSerializer {
157160
/**
158161
* Helper functions, which are called from Python via Py4J.
159162
*/
160-
private[python] object PythonDStream {
163+
private[streaming] object PythonDStream {
161164

162165
/**
163166
* can not access PythonTransformFunctionSerializer.register() via Py4j
@@ -184,6 +187,32 @@ private[python] object PythonDStream {
184187
rdds.asScala.foreach(queue.add)
185188
queue
186189
}
190+
191+
/**
192+
* Stop [[StreamingContext]] if the Python process crashes (E.g., OOM) in case the user cannot
193+
* stop it in the Python side.
194+
*/
195+
def stopStreamingContextIfPythonProcessIsDead(e: Throwable): Unit = {
196+
// These two special messages are from:
197+
// scalastyle:off
198+
// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L218
199+
// https://github.com/bartdag/py4j/blob/5cbb15a21f857e8cf334ce5f675f5543472f72eb/py4j-java/src/main/java/py4j/CallbackClient.java#L340
200+
// scalastyle:on
201+
if (e.isInstanceOf[Py4JException] &&
202+
("Cannot obtain a new communication channel" == e.getMessage ||
203+
"Error while obtaining a new communication channel" == e.getMessage)) {
204+
// Start a new thread to stop StreamingContext to avoid deadlock.
205+
new Thread("Stop-StreamingContext") with Logging {
206+
setDaemon(true)
207+
208+
override def run(): Unit = {
209+
logError(
210+
"Cannot connect to Python process. It's probably dead. Stopping StreamingContext.", e)
211+
StreamingContext.getActive().foreach(_.stop(stopSparkContext = false))
212+
}
213+
}.start()
214+
}
215+
}
187216
}
188217

189218
/**

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ import scala.util.{Failure, Success, Try}
2222
import org.apache.spark.internal.Logging
2323
import org.apache.spark.rdd.RDD
2424
import org.apache.spark.streaming.{Checkpoint, CheckpointWriter, Time}
25+
import org.apache.spark.streaming.api.python.PythonDStream
2526
import org.apache.spark.streaming.util.RecurringTimer
2627
import org.apache.spark.util.{Clock, EventLoop, ManualClock, Utils}
2728

@@ -252,6 +253,7 @@ class JobGenerator(jobScheduler: JobScheduler) extends Logging {
252253
jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
253254
case Failure(e) =>
254255
jobScheduler.reportError("Error generating jobs for time " + time, e)
256+
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
255257
}
256258
eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
257259
}

streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobScheduler.scala

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import org.apache.spark.ExecutorAllocationClient
2828
import org.apache.spark.internal.Logging
2929
import org.apache.spark.rdd.{PairRDDFunctions, RDD}
3030
import org.apache.spark.streaming._
31+
import org.apache.spark.streaming.api.python.PythonDStream
3132
import org.apache.spark.streaming.ui.UIUtils
3233
import org.apache.spark.util.{EventLoop, ThreadUtils}
3334

@@ -217,6 +218,7 @@ class JobScheduler(val ssc: StreamingContext) extends Logging {
217218
private def handleError(msg: String, e: Throwable) {
218219
logError(msg, e)
219220
ssc.waiter.notifyError(e)
221+
PythonDStream.stopStreamingContextIfPythonProcessIsDead(e)
220222
}
221223

222224
private class JobHandler(job: Job) extends Runnable with Logging {

0 commit comments

Comments
 (0)