Skip to content

Commit 4746674

Browse files
committed
[SPARK-18617][SPARK-18560][TESTS] Fix flaky test: StreamingContextSuite. Receiver data should be deserialized properly
## What changes were proposed in this pull request? Avoid to create multiple threads to stop StreamingContext. Otherwise, the latch added in #16091 can be passed too early. ## How was this patch tested? Jenkins Author: Shixiong Zhu <[email protected]> Closes #16105 from zsxwing/SPARK-18617-2. (cherry picked from commit 086b0c8) Signed-off-by: Shixiong Zhu <[email protected]>
1 parent 4c673c6 commit 4746674

File tree

1 file changed

+3
-1
lines changed

1 file changed

+3
-1
lines changed

streaming/src/test/scala/org/apache/spark/streaming/StreamingContextSuite.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -814,10 +814,12 @@ class StreamingContextSuite extends SparkFunSuite with BeforeAndAfter with Timeo
814814
ssc = new StreamingContext(conf, Milliseconds(100))
815815
val input = ssc.receiverStream(new TestReceiver)
816816
val latch = new CountDownLatch(1)
817+
@volatile var stopping = false
817818
input.count().foreachRDD { rdd =>
818819
// Make sure we can read from BlockRDD
819-
if (rdd.collect().headOption.getOrElse(0L) > 0) {
820+
if (rdd.collect().headOption.getOrElse(0L) > 0 && !stopping) {
820821
// Stop StreamingContext to unblock "awaitTerminationOrTimeout"
822+
stopping = true
821823
new Thread() {
822824
setDaemon(true)
823825
override def run(): Unit = {

0 commit comments

Comments
 (0)