Skip to content

Commit 8b33aa0

Browse files
vijoshiTom Graves
authored andcommitted
[SPARK-17843][WEB UI] Indicate event logs pending for processing on h…
## What changes were proposed in this pull request? Backport PR #15410 to branch-2.0 ## How was this patch tested? Existing unit tests. Screenshots for UI changes provided in PR #15410. Author: Vinayak <[email protected]> Closes #15991 from vijoshi/SAAS-608.
1 parent bdd27d1 commit 8b33aa0

File tree

5 files changed

+116
-18
lines changed

5 files changed

+116
-18
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
$(document).ready(function() {
19+
if ($('#last-updated').length) {
20+
var lastUpdatedMillis = Number($('#last-updated').text());
21+
var updatedDate = new Date(lastUpdatedMillis);
22+
$('#last-updated').text(updatedDate.toLocaleDateString()+", "+updatedDate.toLocaleTimeString())
23+
}
24+
});

core/src/main/scala/org/apache/spark/deploy/history/ApplicationHistoryProvider.scala

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,30 @@ private[history] case class LoadedAppUI(
7272

7373
private[history] abstract class ApplicationHistoryProvider {
7474

75+
/**
76+
* Returns the count of application event logs that the provider is currently still processing.
77+
* History Server UI can use this to indicate to a user that the application listing on the UI
78+
* can be expected to list additional known applications once the processing of these
79+
* application event logs completes.
80+
*
81+
* A History Provider that does not have a notion of count of event logs that may be pending
82+
* for processing need not override this method.
83+
*
84+
* @return Count of application event logs that are currently under process
85+
*/
86+
def getEventLogsUnderProcess(): Int = {
87+
return 0;
88+
}
89+
90+
/**
91+
* Returns the time the history provider last updated the application history information
92+
*
93+
* @return 0 if this is undefined or unsupported, otherwise the last updated time in millis
94+
*/
95+
def getLastUpdatedTime(): Long = {
96+
return 0;
97+
}
98+
7599
/**
76100
* Returns a list of applications available for the history server to show.
77101
*

core/src/main/scala/org/apache/spark/deploy/history/FsHistoryProvider.scala

Lines changed: 41 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ package org.apache.spark.deploy.history
1919

2020
import java.io.{FileNotFoundException, IOException, OutputStream}
2121
import java.util.UUID
22-
import java.util.concurrent.{Executors, ExecutorService, TimeUnit}
22+
import java.util.concurrent.{Executors, ExecutorService, Future, TimeUnit}
2323
import java.util.zip.{ZipEntry, ZipOutputStream}
2424

2525
import scala.collection.mutable
@@ -107,7 +107,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
107107

108108
// The modification time of the newest log detected during the last scan. Currently only
109109
// used for logging msgs (logs are re-scanned based on file size, rather than modtime)
110-
private var lastScanTime = -1L
110+
private val lastScanTime = new java.util.concurrent.atomic.AtomicLong(-1)
111111

112112
// Mapping of application IDs to their metadata, in descending end time order. Apps are inserted
113113
// into the map in order, so the LinkedHashMap maintains the correct ordering.
@@ -119,6 +119,8 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
119119
// List of application logs to be deleted by event log cleaner.
120120
private var attemptsToClean = new mutable.ListBuffer[FsApplicationAttemptInfo]
121121

122+
private val pendingReplayTasksCount = new java.util.concurrent.atomic.AtomicInteger(0)
123+
122124
/**
123125
* Return a runnable that performs the given operation on the event logs.
124126
* This operation is expected to be executed periodically.
@@ -223,6 +225,10 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
223225
applications.get(appId)
224226
}
225227

228+
override def getEventLogsUnderProcess(): Int = pendingReplayTasksCount.get()
229+
230+
override def getLastUpdatedTime(): Long = lastScanTime.get()
231+
226232
override def getAppUI(appId: String, attemptId: Option[String]): Option[LoadedAppUI] = {
227233
try {
228234
applications.get(appId).flatMap { appInfo =>
@@ -310,26 +316,43 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
310316
if (logInfos.nonEmpty) {
311317
logDebug(s"New/updated attempts found: ${logInfos.size} ${logInfos.map(_.getPath)}")
312318
}
313-
logInfos.map { file =>
314-
replayExecutor.submit(new Runnable {
319+
320+
var tasks = mutable.ListBuffer[Future[_]]()
321+
322+
try {
323+
for (file <- logInfos) {
324+
tasks += replayExecutor.submit(new Runnable {
315325
override def run(): Unit = mergeApplicationListing(file)
316326
})
317327
}
318-
.foreach { task =>
319-
try {
320-
// Wait for all tasks to finish. This makes sure that checkForLogs
321-
// is not scheduled again while some tasks are already running in
322-
// the replayExecutor.
323-
task.get()
324-
} catch {
325-
case e: InterruptedException =>
326-
throw e
327-
case e: Exception =>
328-
logError("Exception while merging application listings", e)
329-
}
328+
} catch {
329+
// let the iteration over logInfos break, since an exception on
330+
// replayExecutor.submit (..) indicates the ExecutorService is unable
331+
// to take any more submissions at this time
332+
333+
case e: Exception =>
334+
logError(s"Exception while submitting event log for replay", e)
335+
}
336+
337+
pendingReplayTasksCount.addAndGet(tasks.size)
338+
339+
tasks.foreach { task =>
340+
try {
341+
// Wait for all tasks to finish. This makes sure that checkForLogs
342+
// is not scheduled again while some tasks are already running in
343+
// the replayExecutor.
344+
task.get()
345+
} catch {
346+
case e: InterruptedException =>
347+
throw e
348+
case e: Exception =>
349+
logError("Exception while merging application listings", e)
350+
} finally {
351+
pendingReplayTasksCount.decrementAndGet()
330352
}
353+
}
331354

332-
lastScanTime = newLastScanTime
355+
lastScanTime.set(newLastScanTime)
333356
} catch {
334357
case e: Exception => logError("Exception in checking for event log updates", e)
335358
}
@@ -346,7 +369,7 @@ private[history] class FsHistoryProvider(conf: SparkConf, clock: Clock)
346369
} catch {
347370
case e: Exception =>
348371
logError("Exception encountered when attempting to update last scan time", e)
349-
lastScanTime
372+
lastScanTime.get()
350373
} finally {
351374
if (!fs.delete(path, true)) {
352375
logWarning(s"Error deleting ${path}")

core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,13 +33,30 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
3333
.filter(_.completed != requestedIncomplete)
3434
val allAppsSize = allApps.size
3535

36+
val eventLogsUnderProcessCount = parent.getEventLogsUnderProcess()
37+
val lastUpdatedTime = parent.getLastUpdatedTime()
3638
val providerConfig = parent.getProviderConfig()
3739
val content =
40+
<script src={UIUtils.prependBaseUri("/static/historypage-common.js")}></script>
3841
<div>
3942
<div class="span12">
4043
<ul class="unstyled">
4144
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
4245
</ul>
46+
{
47+
if (eventLogsUnderProcessCount > 0) {
48+
<p>There are {eventLogsUnderProcessCount} event log(s) currently being
49+
processed which may result in additional applications getting listed on this page.
50+
Refresh the page to view updates. </p>
51+
}
52+
}
53+
54+
{
55+
if (lastUpdatedTime > 0) {
56+
<p>Last updated: <span id="last-updated">{lastUpdatedTime}</span></p>
57+
}
58+
}
59+
4360
{
4461
if (allAppsSize > 0) {
4562
<script src={UIUtils.prependBaseUri("/static/dataTables.rowsGroup.js")}></script> ++
@@ -48,6 +65,8 @@ private[history] class HistoryPage(parent: HistoryServer) extends WebUIPage("")
4865
<script>setAppLimit({parent.maxApplications})</script>
4966
} else if (requestedIncomplete) {
5067
<h4>No incomplete applications found!</h4>
68+
} else if (eventLogsUnderProcessCount > 0) {
69+
<h4>No completed applications found!</h4>
5170
} else {
5271
<h4>No completed applications found!</h4> ++
5372
<p>Did you specify the correct logging directory?

core/src/main/scala/org/apache/spark/deploy/history/HistoryServer.scala

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,14 @@ class HistoryServer(
178178
provider.getListing()
179179
}
180180

181+
def getEventLogsUnderProcess(): Int = {
182+
provider.getEventLogsUnderProcess()
183+
}
184+
185+
def getLastUpdatedTime(): Long = {
186+
provider.getLastUpdatedTime()
187+
}
188+
181189
def getApplicationInfoList: Iterator[ApplicationInfo] = {
182190
getApplicationList().iterator.map(ApplicationsListResource.appHistoryInfoToPublicAppInfo)
183191
}

0 commit comments

Comments
 (0)