Skip to content

Commit d62ea7b

Browse files
committed
Add failing Selenium test for stage overcounting issue.
1 parent 1145c60 commit d62ea7b

File tree

1 file changed

+32
-2
lines changed

1 file changed

+32
-2
lines changed

core/src/test/scala/org/apache/spark/ui/UISeleniumSuite.scala

Lines changed: 32 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,10 @@ import org.scalatest.selenium.WebBrowser
2525
import org.scalatest.time.SpanSugar._
2626

2727
import org.apache.spark.api.java.StorageLevels
28-
import org.apache.spark.{SparkException, SparkConf, SparkContext}
28+
import org.apache.spark._
2929
import org.apache.spark.SparkContext._
3030
import org.apache.spark.LocalSparkContext._
31+
import org.apache.spark.shuffle.FetchFailedException
3132

3233
/**
3334
* Selenium tests for the Spark Web UI. These tests are not run by default
@@ -145,7 +146,6 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers {
145146

146147
test("jobs page should not display job group name unless some job was submitted in a job group") {
147148
withSpark(newSparkContext()) { sc =>
148-
val ui = sc.ui.get
149149
// If no job has been run in a job group, then "(Job Group)" should not appear in the header
150150
sc.parallelize(Seq(1, 2, 3)).count()
151151
eventually(timeout(5 seconds), interval(50 milliseconds)) {
@@ -163,4 +163,34 @@ class UISeleniumSuite extends FunSuite with WebBrowser with Matchers {
163163
}
164164
}
165165
}
166+
167+
test("stage failures / recomputations should not cause stages to be overcounted on job page") {
168+
withSpark(newSparkContext()) { sc =>
169+
val data = sc.parallelize(Seq(1, 2, 3)).map(identity).groupBy(identity)
170+
val shuffleHandle =
171+
data.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleHandle
172+
// Simulate fetch failures:
173+
val mappedData = data.map { x =>
174+
val taskContext = TaskContext.get
175+
if (taskContext.attemptId() == 1) { // Cause this stage to fail on its first attempt.
176+
val env = SparkEnv.get
177+
val bmAddress = env.blockManager.blockManagerId
178+
val shuffleId = shuffleHandle.shuffleId
179+
val mapId = 0
180+
val reduceId = taskContext.partitionId()
181+
val message = "Simulated fetch failure"
182+
throw new FetchFailedException(bmAddress, shuffleId, mapId, reduceId, message)
183+
} else {
184+
x
185+
}
186+
}
187+
mappedData.count()
188+
eventually(timeout(5 seconds), interval(50 milliseconds)) {
189+
go to (sc.ui.get.appUIAddress.stripSuffix("/") + "/jobs")
190+
find(cssSelector(".stage-progress-cell .completed-stages")).get.text should be ("2")
191+
find(cssSelector(".stage-progress-cell .total-stages")).get.text should be ("2")
192+
find(cssSelector(".stage-progress-cell .failed-stages")).get.text should be ("(1 failed)")
193+
}
194+
}
195+
}
166196
}

0 commit comments

Comments
 (0)