@@ -1569,24 +1569,44 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
15691569 assertDataStructuresEmpty()
15701570 }
15711571
1572+ /**
1573+ * In this test, we run a map stage where one of the executors fails but we still receive a
1574+ * "zombie" complete message from a task that ran on that executor. We want to make sure the
1575+ * stage is resubmitted so that the task that ran on the failed executor is re-executed, and
1576+ * that the stage is only marked as finished once that task completes.
1577+ */
15721578 test(" run trivial shuffle with out-of-band failure and retry" ) {
15731579 val shuffleMapRdd = new MyRDD (sc, 2 , Nil )
15741580 val shuffleDep = new ShuffleDependency (shuffleMapRdd, new HashPartitioner (2 ))
15751581 val shuffleId = shuffleDep.shuffleId
15761582 val reduceRdd = new MyRDD (sc, 1 , List (shuffleDep), tracker = mapOutputTracker)
15771583 submit(reduceRdd, Array (0 ))
1578- // blockManagerMaster.removeExecutor("exec-hostA")
1579- // pretend we were told hostA went away
1584+ // Tell the DAGScheduler that hostA was lost.
15801585 runEvent(ExecutorLost (" exec-hostA" , ExecutorKilled ))
1581- // DAGScheduler will immediately resubmit the stage after it appears to have no pending tasks
1582- // rather than marking it is as failed and waiting.
15831586 complete(taskSets(0 ), Seq (
15841587 (Success , makeMapStatus(" hostA" , 1 )),
15851588 (Success , makeMapStatus(" hostB" , 1 ))))
1589+
1590+ // At this point, no more tasks are running for the stage (and the TaskSetManager considers the
1591+ // stage complete), but the tasks that ran on HostA need to be re-run, so the DAGScheduler
1592+ // should re-submit the stage.
1593+ assert(taskSets.size === 2 )
1594+
1595+ // Make sure that the stage that was re-submitted was the ShuffleMapStage (not the reduce
1596+ // stage, which shouldn't be run until all of the tasks in the ShuffleMapStage complete on
1597+ // alive executors).
1598+ assert(taskSets(1 ).tasks(0 ).isInstanceOf [ShuffleMapTask ])
1599+
15861600 // have hostC complete the resubmitted task
15871601 complete(taskSets(1 ), Seq ((Success , makeMapStatus(" hostC" , 1 ))))
15881602 assert(mapOutputTracker.getMapSizesByExecutorId(shuffleId, 0 ).map(_._1).toSet ===
15891603 HashSet (makeBlockManagerId(" hostC" ), makeBlockManagerId(" hostB" )))
1604+
1605+ // Make sure that the reduce stage was now submitted.
1606+ assert(taskSets.size === 3 )
1607+ assert(taskSets(2 ).tasks(0 ).isInstanceOf [ResultTask [_, _]])
1608+
1609+ // Complete the reduce stage.
15901610 complete(taskSets(2 ), Seq ((Success , 42 )))
15911611 assert(results === Map (0 -> 42 ))
15921612 assertDataStructuresEmpty()
@@ -2027,54 +2047,6 @@ class DAGSchedulerSuite extends SparkFunSuite with LocalSparkContext with Timeou
20272047 assertDataStructuresEmpty()
20282048 }
20292049
2030- /**
2031- * In this test, we run a map stage where one of the executors fails but we still receive a
2032- * "zombie" complete message from that executor. We want to make sure the stage is not reported
2033- * as done until all tasks have completed.
2034- */
2035- test(" map stage submission with executor failure late map task completions" ) {
2036- val shuffleMapRdd = new MyRDD (sc, 3 , Nil )
2037- val shuffleDep = new ShuffleDependency (shuffleMapRdd, new HashPartitioner (2 ))
2038-
2039- submitMapStage(shuffleDep)
2040-
2041- val oldTaskSet = taskSets(0 )
2042- runEvent(makeCompletionEvent(oldTaskSet.tasks(0 ), Success , makeMapStatus(" hostA" , 2 )))
2043- assert(results.size === 0 ) // Map stage job should not be complete yet
2044-
2045- // Pretend host A was lost
2046- val oldEpoch = mapOutputTracker.getEpoch
2047- runEvent(ExecutorLost (" exec-hostA" , ExecutorKilled ))
2048- val newEpoch = mapOutputTracker.getEpoch
2049- assert(newEpoch > oldEpoch)
2050-
2051- // Suppose we also get a completed event from task 1 on the same host; this should be ignored
2052- runEvent(makeCompletionEvent(oldTaskSet.tasks(1 ), Success , makeMapStatus(" hostA" , 2 )))
2053- assert(results.size === 0 ) // Map stage job should not be complete yet
2054-
2055- // A completion from another task should work because it's a non-failed host
2056- runEvent(makeCompletionEvent(oldTaskSet.tasks(2 ), Success , makeMapStatus(" hostB" , 2 )))
2057- assert(results.size === 0 ) // Map stage job should not be complete yet
2058-
2059- // Now complete tasks in the second task set
2060- val newTaskSet = taskSets(1 )
2061- assert(newTaskSet.tasks.size === 2 ) // Both tasks 0 and 1 were on hostA
2062- runEvent(makeCompletionEvent(newTaskSet.tasks(0 ), Success , makeMapStatus(" hostB" , 2 )))
2063- assert(results.size === 0 ) // Map stage job should not be complete yet
2064- runEvent(makeCompletionEvent(newTaskSet.tasks(1 ), Success , makeMapStatus(" hostB" , 2 )))
2065- assert(results.size === 1 ) // Map stage job should now finally be complete
2066- assertDataStructuresEmpty()
2067-
2068- // Also test that a reduce stage using this shuffled data can immediately run
2069- val reduceRDD = new MyRDD (sc, 2 , List (shuffleDep), tracker = mapOutputTracker)
2070- results.clear()
2071- submit(reduceRDD, Array (0 , 1 ))
2072- complete(taskSets(2 ), Seq ((Success , 42 ), (Success , 43 )))
2073- assert(results === Map (0 -> 42 , 1 -> 43 ))
2074- results.clear()
2075- assertDataStructuresEmpty()
2076- }
2077-
20782050 /**
20792051 * Checks the DAGScheduler's internal logic for traversing an RDD DAG by making sure that
20802052 * getShuffleDependencies correctly returns the direct shuffle dependencies of a particular
0 commit comments