1717
1818package org .apache .spark .ui .jobs
1919
20- import scala .collection .mutable .{HashMap , ListBuffer }
20+ import scala .collection .mutable .{HashMap , HashSet , ListBuffer }
2121
2222import org .apache .spark ._
2323import org .apache .spark .annotation .DeveloperApi
@@ -59,7 +59,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
5959 val failedStages = ListBuffer [StageInfo ]()
6060 val stageIdToData = new HashMap [(StageId , StageAttemptId ), StageUIData ]
6161 val stageIdToInfo = new HashMap [StageId , StageInfo ]
62-
62+ val stageIdToActiveJobIds = new HashMap [StageId , HashSet [JobId ]]
63+
6364 // Number of completed and failed stages, may not actually equal to completedStages.size and
6465 // failedStages.size respectively due to completedStage and failedStages only maintain the latest
6566 // part of the stages, the earlier ones will be removed when there are too many stages for
@@ -86,6 +87,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
8687 jobGroup, JobExecutionStatus .RUNNING )
8788 jobIdToData(jobStart.jobId) = jobData
8889 activeJobs(jobStart.jobId) = jobData
90+ for (stageId <- jobStart.stageIds) {
91+ stageIdToActiveJobIds.getOrElseUpdate(stageId, new HashSet [StageId ]).add(jobStart.jobId)
92+ }
8993 }
9094
9195 override def onJobEnd (jobEnd : SparkListenerJobEnd ) = synchronized {
@@ -102,6 +106,9 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
102106 failedJobs += jobData
103107 jobData.status = JobExecutionStatus .FAILED
104108 }
109+ for (stageId <- jobData.stageIds) {
110+ stageIdToActiveJobIds.get(stageId).foreach(_.remove(jobEnd.jobId))
111+ }
105112 }
106113
107114 override def onStageCompleted (stageCompleted : SparkListenerStageCompleted ) = synchronized {
@@ -138,6 +145,7 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
138145 stages.take(toRemove).foreach { s =>
139146 stageIdToData.remove((s.stageId, s.attemptId))
140147 stageIdToInfo.remove(s.stageId)
148+ stageIdToActiveJobIds.remove(s.stageId)
141149 }
142150 stages.trimStart(toRemove)
143151 }
@@ -162,6 +170,14 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
162170
163171 val stages = poolToActiveStages.getOrElseUpdate(poolName, new HashMap [Int , StageInfo ])
164172 stages(stage.stageId) = stage
173+
174+ for (
175+ activeJobsDependentOnStage <- stageIdToActiveJobIds.get(stage.stageId);
176+ jobId <- activeJobsDependentOnStage;
177+ jobData <- jobIdToData.get(jobId)
178+ ) {
179+ jobData.numTasks += stage.numTasks
180+ }
165181 }
166182
167183 override def onTaskStart (taskStart : SparkListenerTaskStart ) = synchronized {
@@ -174,6 +190,13 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
174190 stageData.numActiveTasks += 1
175191 stageData.taskData.put(taskInfo.taskId, new TaskUIData (taskInfo))
176192 }
193+ for (
194+ activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskStart.stageId);
195+ jobId <- activeJobsDependentOnStage;
196+ jobData <- jobIdToData.get(jobId)
197+ ) {
198+ jobData.numActiveTasks += 1
199+ }
177200 }
178201
179202 override def onTaskGettingResult (taskGettingResult : SparkListenerTaskGettingResult ) {
@@ -208,6 +231,8 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
208231 execSummary.taskTime += info.duration
209232 stageData.numActiveTasks -= 1
210233
234+ val isRecomputation = stageData.completedIndices.contains(info.index)
235+
211236 val (errorMessage, metrics): (Option [String ], Option [TaskMetrics ]) =
212237 taskEnd.reason match {
213238 case org.apache.spark.Success =>
@@ -231,6 +256,22 @@ class JobProgressListener(conf: SparkConf) extends SparkListener with Logging {
231256 taskData.taskInfo = info
232257 taskData.taskMetrics = metrics
233258 taskData.errorMessage = errorMessage
259+
260+ for (
261+ activeJobsDependentOnStage <- stageIdToActiveJobIds.get(taskEnd.stageId);
262+ jobId <- activeJobsDependentOnStage;
263+ jobData <- jobIdToData.get(jobId)
264+ ) {
265+ jobData.numActiveTasks -= 1
266+ taskEnd.reason match {
267+ case Success =>
268+ if (! isRecomputation) {
269+ jobData.numCompletedTasks += 1
270+ }
271+ case _ =>
272+ jobData.numFailedTasks += 1
273+ }
274+ }
234275 }
235276 }
236277
0 commit comments