@@ -162,32 +162,67 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
162162
163163 /** Length of time to wait while draining listener events. */
164164 val WAIT_TIMEOUT_MILLIS = 10000
165- val sparkListener = new SparkListener () {
166- val submittedStageInfos = new HashSet [StageInfo ]
167- val successfulStages = new HashSet [Int ]
168- val failedStages = new ArrayBuffer [Int ]
169- val stageByOrderOfExecution = new ArrayBuffer [Int ]
170- val endedTasks = new HashSet [Long ]
165+
166+ /**
167+ * Listeners which records some information to verify in UTs. Getter-kind methods in this class
168+ * ensures the value is returned after ensuring there's no event to process, as well as the
169+ * value is immutable: prevent showing odd result by race condition.
170+ */
171+ class EventInfoRecordingListener extends SparkListener {
172+ private val _submittedStageInfos = new HashSet [StageInfo ]
173+ private val _successfulStages = new HashSet [Int ]
174+ private val _failedStages = new ArrayBuffer [Int ]
175+ private val _stageByOrderOfExecution = new ArrayBuffer [Int ]
176+ private val _endedTasks = new HashSet [Long ]
171177
172178 override def onStageSubmitted (stageSubmitted : SparkListenerStageSubmitted ) {
173- submittedStageInfos += stageSubmitted.stageInfo
179+ _submittedStageInfos += stageSubmitted.stageInfo
174180 }
175181
176182 override def onStageCompleted (stageCompleted : SparkListenerStageCompleted ) {
177183 val stageInfo = stageCompleted.stageInfo
178- stageByOrderOfExecution += stageInfo.stageId
184+ _stageByOrderOfExecution += stageInfo.stageId
179185 if (stageInfo.failureReason.isEmpty) {
180- successfulStages += stageInfo.stageId
186+ _successfulStages += stageInfo.stageId
181187 } else {
182- failedStages += stageInfo.stageId
188+ _failedStages += stageInfo.stageId
183189 }
184190 }
185191
186192 override def onTaskEnd (taskEnd : SparkListenerTaskEnd ): Unit = {
187- endedTasks += taskEnd.taskInfo.taskId
193+ _endedTasks += taskEnd.taskInfo.taskId
194+ }
195+
196+ def submittedStageInfos : Set [StageInfo ] = {
197+ waitForListeners()
198+ _submittedStageInfos.toSet
199+ }
200+
201+ def successfulStages : Set [Int ] = {
202+ waitForListeners()
203+ _successfulStages.toSet
204+ }
205+
206+ def failedStages : List [Int ] = {
207+ waitForListeners()
208+ _failedStages.toList
209+ }
210+
211+ def stageByOrderOfExecution : List [Int ] = {
212+ waitForListeners()
213+ _stageByOrderOfExecution.toList
214+ }
215+
216+ def endedTasks : Set [Long ] = {
217+ waitForListeners()
218+ _endedTasks.toSet
188219 }
220+
221+ private def waitForListeners (): Unit = sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
189222 }
190223
224+ var sparkListener : EventInfoRecordingListener = null
225+
191226 var mapOutputTracker : MapOutputTrackerMaster = null
192227 var broadcastManager : BroadcastManager = null
193228 var securityMgr : SecurityManager = null
@@ -236,10 +271,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
236271
237272 private def init (testConf : SparkConf ): Unit = {
238273 sc = new SparkContext (" local[2]" , " DAGSchedulerSuite" , testConf)
239- sparkListener.submittedStageInfos.clear()
240- sparkListener.successfulStages.clear()
241- sparkListener.failedStages.clear()
242- sparkListener.endedTasks.clear()
274+ sparkListener = new EventInfoRecordingListener
243275 failure = null
244276 sc.addSparkListener(sparkListener)
245277 taskSets.clear()
@@ -361,11 +393,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
361393 }
362394
363395 test(" [SPARK-3353] parent stage should have lower stage id" ) {
364- sparkListener.stageByOrderOfExecution.clear()
365396 sc.parallelize(1 to 10 ).map(x => (x, x)).reduceByKey(_ + _, 4 ).count()
366- sc.listenerBus.waitUntilEmpty( WAIT_TIMEOUT_MILLIS )
367- assert(sparkListener. stageByOrderOfExecution.length === 2 )
368- assert(sparkListener. stageByOrderOfExecution(0 ) < sparkListener. stageByOrderOfExecution(1 ))
397+ val stageByOrderOfExecution = sparkListener.stageByOrderOfExecution
398+ assert(stageByOrderOfExecution.length === 2 )
399+ assert(stageByOrderOfExecution(0 ) < stageByOrderOfExecution(1 ))
369400 }
370401
371402 /**
@@ -606,19 +637,15 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
606637 submit(unserializableRdd, Array (0 ))
607638 assert(failure.getMessage.startsWith(
608639 " Job aborted due to stage failure: Task not serializable:" ))
609- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
610- assert(sparkListener.failedStages.contains(0 ))
611- assert(sparkListener.failedStages.size === 1 )
640+ assert(sparkListener.failedStages === Seq (0 ))
612641 assertDataStructuresEmpty()
613642 }
614643
615644 test(" trivial job failure" ) {
616645 submit(new MyRDD (sc, 1 , Nil ), Array (0 ))
617646 failed(taskSets(0 ), " some failure" )
618647 assert(failure.getMessage === " Job aborted due to stage failure: some failure" )
619- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
620- assert(sparkListener.failedStages.contains(0 ))
621- assert(sparkListener.failedStages.size === 1 )
648+ assert(sparkListener.failedStages === Seq (0 ))
622649 assertDataStructuresEmpty()
623650 }
624651
@@ -627,9 +654,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
627654 val jobId = submit(rdd, Array (0 ))
628655 cancel(jobId)
629656 assert(failure.getMessage === s " Job $jobId cancelled " )
630- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
631- assert(sparkListener.failedStages.contains(0 ))
632- assert(sparkListener.failedStages.size === 1 )
657+ assert(sparkListener.failedStages === Seq (0 ))
633658 assertDataStructuresEmpty()
634659 }
635660
@@ -683,7 +708,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
683708 assert(results === Map (0 -> 42 ))
684709 assertDataStructuresEmpty()
685710
686- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
687711 assert(sparkListener.failedStages.isEmpty)
688712 assert(sparkListener.successfulStages.contains(0 ))
689713 }
@@ -1068,7 +1092,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
10681092 taskSets(1 ).tasks(0 ),
10691093 FetchFailed (makeBlockManagerId(" hostA" ), shuffleId, 0 , 0 , " ignored" ),
10701094 null ))
1071- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
10721095 assert(sparkListener.failedStages.contains(1 ))
10731096
10741097 // The second ResultTask fails, with a fetch failure for the output from the second mapper.
@@ -1077,8 +1100,7 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
10771100 FetchFailed (makeBlockManagerId(" hostA" ), shuffleId, 1 , 1 , " ignored" ),
10781101 null ))
10791102 // The SparkListener should not receive redundant failure events.
1080- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1081- assert(sparkListener.failedStages.size == 1 )
1103+ assert(sparkListener.failedStages.size === 1 )
10821104 }
10831105
10841106 test(" Retry all the tasks on a resubmitted attempt of a barrier stage caused by FetchFailure" ) {
@@ -1183,7 +1205,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
11831205 }
11841206
11851207 // The map stage should have been submitted.
1186- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
11871208 assert(countSubmittedMapStageAttempts() === 1 )
11881209
11891210 complete(taskSets(0 ), Seq (
@@ -1200,12 +1221,10 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12001221 taskSets(1 ).tasks(0 ),
12011222 FetchFailed (makeBlockManagerId(" hostA" ), shuffleId, 0 , 0 , " ignored" ),
12021223 null ))
1203- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12041224 assert(sparkListener.failedStages.contains(1 ))
12051225
12061226 // Trigger resubmission of the failed map stage.
12071227 runEvent(ResubmitFailedStages )
1208- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12091228
12101229 // Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
12111230 assert(countSubmittedMapStageAttempts() === 2 )
@@ -1222,7 +1241,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12221241 // shouldn't effect anything -- our calling it just makes *SURE* it gets called between the
12231242 // desired event and our check.
12241243 runEvent(ResubmitFailedStages )
1225- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12261244 assert(countSubmittedMapStageAttempts() === 2 )
12271245
12281246 }
@@ -1247,7 +1265,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12471265 }
12481266
12491267 // The map stage should have been submitted.
1250- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12511268 assert(countSubmittedMapStageAttempts() === 1 )
12521269
12531270 // Complete the map stage.
@@ -1256,7 +1273,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12561273 (Success , makeMapStatus(" hostB" , 2 ))))
12571274
12581275 // The reduce stage should have been submitted.
1259- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12601276 assert(countSubmittedReduceStageAttempts() === 1 )
12611277
12621278 // The first result task fails, with a fetch failure for the output from the first mapper.
@@ -1271,7 +1287,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
12711287
12721288 // Because the map stage finished, another attempt for the reduce stage should have been
12731289 // submitted, resulting in 2 total attempts for each the map and the reduce stage.
1274- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
12751290 assert(countSubmittedMapStageAttempts() === 2 )
12761291 assert(countSubmittedReduceStageAttempts() === 2 )
12771292
@@ -1301,20 +1316,17 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
13011316 runEvent(makeCompletionEvent(
13021317 taskSets(0 ).tasks(1 ), Success , 42 ,
13031318 Seq .empty, createFakeTaskInfoWithId(1 )))
1304- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
13051319 // verify stage exists
13061320 assert(scheduler.stageIdToStage.contains(0 ))
1307- assert(sparkListener.endedTasks.size == 2 )
1308-
1321+ assert(sparkListener.endedTasks.size === 2 )
13091322 // finish other 2 tasks
13101323 runEvent(makeCompletionEvent(
13111324 taskSets(0 ).tasks(2 ), Success , 42 ,
13121325 Seq .empty, createFakeTaskInfoWithId(2 )))
13131326 runEvent(makeCompletionEvent(
13141327 taskSets(0 ).tasks(3 ), Success , 42 ,
13151328 Seq .empty, createFakeTaskInfoWithId(3 )))
1316- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
1317- assert(sparkListener.endedTasks.size == 4 )
1329+ assert(sparkListener.endedTasks.size === 4 )
13181330
13191331 // verify the stage is done
13201332 assert(! scheduler.stageIdToStage.contains(0 ))
@@ -1324,14 +1336,12 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
13241336 runEvent(makeCompletionEvent(
13251337 taskSets(0 ).tasks(3 ), Success , 42 ,
13261338 Seq .empty, createFakeTaskInfoWithId(5 )))
1327- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
13281339 assert(sparkListener.endedTasks.size == 5 )
13291340
13301341 // make sure non successful tasks also send out event
13311342 runEvent(makeCompletionEvent(
13321343 taskSets(0 ).tasks(3 ), UnknownReason , 42 ,
13331344 Seq .empty, createFakeTaskInfoWithId(6 )))
1334- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
13351345 assert(sparkListener.endedTasks.size == 6 )
13361346 }
13371347
@@ -1405,7 +1415,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
14051415
14061416 // Listener bus should get told about the map stage failing, but not the reduce stage
14071417 // (since the reduce stage hasn't been started yet).
1408- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
14091418 assert(sparkListener.failedStages.toSet === Set (0 ))
14101419
14111420 assertDataStructuresEmpty()
@@ -1649,7 +1658,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
16491658 assert(cancelledStages.toSet === Set (0 , 2 ))
16501659
16511660 // Make sure the listeners got told about both failed stages.
1652- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
16531661 assert(sparkListener.successfulStages.isEmpty)
16541662 assert(sparkListener.failedStages.toSet === Set (0 , 2 ))
16551663
@@ -2607,7 +2615,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26072615 }
26082616
26092617 // The map stage should have been submitted.
2610- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
26112618 assert(countSubmittedMapStageAttempts() === 1 )
26122619
26132620 // The first map task fails with TaskKilled.
@@ -2625,7 +2632,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26252632
26262633 // Trigger resubmission of the failed map stage.
26272634 runEvent(ResubmitFailedStages )
2628- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
26292635
26302636 // Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
26312637 assert(countSubmittedMapStageAttempts() === 2 )
@@ -2644,7 +2650,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26442650 }
26452651
26462652 // The map stage should have been submitted.
2647- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
26482653 assert(countSubmittedMapStageAttempts() === 1 )
26492654
26502655 // The first map task fails with TaskKilled.
@@ -2656,7 +2661,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26562661
26572662 // Trigger resubmission of the failed map stage.
26582663 runEvent(ResubmitFailedStages )
2659- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
26602664
26612665 // Another attempt for the map stage should have been submitted, resulting in 2 total attempts.
26622666 assert(countSubmittedMapStageAttempts() === 2 )
@@ -2669,7 +2673,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with TimeLi
26692673
26702674 // The second map task failure doesn't trigger stage retry.
26712675 runEvent(ResubmitFailedStages )
2672- sc.listenerBus.waitUntilEmpty(WAIT_TIMEOUT_MILLIS )
26732676 assert(countSubmittedMapStageAttempts() === 2 )
26742677 }
26752678
0 commit comments