Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,7 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
taskScheduler.start()

val applicationId: String = taskScheduler.applicationId()
val applicationAttemptId : String = taskScheduler.applicationAttemptId()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no space before :

conf.set("spark.app.id", applicationId)

env.blockManager.initialize(applicationId)
Expand All @@ -364,7 +365,8 @@ class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationCli
private[spark] val eventLogger: Option[EventLoggingListener] = {
if (isEventLogEnabled) {
val logger =
new EventLoggingListener(applicationId, eventLogDir.get, conf, hadoopConfiguration)
new EventLoggingListener(applicationId, applicationAttemptId,
eventLogDir.get, conf, hadoopConfiguration)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer this style:

val logger = new EventLoggingListener(
  applicationId, applicationAttemptId, eventLogDir.get, conf, hadoopConfiguration)

logger.start()
listenerBus.addListener(logger)
Some(logger)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.Utils
import scala.util.control.Breaks._

/**
* A class that provides application history from event logs stored in the file system.
Expand Down Expand Up @@ -166,21 +167,65 @@ private[history] class FsHistoryProvider(conf: SparkConf) extends ApplicationHis
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
modTime > lastModifiedTime
} else {
false
val appLogStatus = fs.listStatus(new Path(dir.getPath().toUri()))
val appAttemptsDirs = if (appLogStatus != null) appLogStatus.filter(_.isDir).toSeq
else Seq[FileStatus]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

val appAttemptDirs =
  if (appLogStatus != null) {
    appLogStatus.filter(_.isDir).toSeq
  } else {
    Seq[FileStatus]()
  }

var isValidApplicationLogToUpdate = false
breakable {
for (appAttemptDir <- appAttemptsDirs){
// There are multiple attempts inside this application logs
if (fs.isFile(new Path(appAttemptDir.getPath(),
EventLoggingListener.APPLICATION_COMPLETE))) {
val modTime = getModificationTime(dir)
newLastModifiedTime = math.max(newLastModifiedTime, modTime)
isValidApplicationLogToUpdate = modTime > lastModifiedTime
break
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please do not use break here. While it's unnatural that Scala does not provide native support for breaks, we try to avoid breaks elsewhere in the code base because it requires us to wrap the code in a breakable block. Here you can just use a while loop with a boolean var instead.

isValidApplicationLogToUpdate
}
}
.flatMap { dir =>
try {
val (replayBus, appListener) = createReplayBus(dir)
replayBus.replay()
Some(new FsApplicationHistoryInfo(
dir.getPath().getName(),
appListener.appId.getOrElse(dir.getPath().getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED)))
}
.flatMap { dir =>
val appAttemptsApplicationHistoryInfo =
new scala.collection.mutable.ArrayBuffer[FsApplicationHistoryInfo]()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we should just import scala.collection.mutable.ArrayBuffer at the top

try {
if (!fs.isFile(new Path(dir.getPath(), EventLoggingListener.APPLICATION_COMPLETE))) {
// There are multiple attempts inside this application logs
val appLogStatus = fs.listStatus(new Path(dir.getPath().toUri()))
val appAttemptSubDirs:Seq[FileStatus] =
if (appLogStatus != null) appLogStatus.filter(_.isDir).toSeq else Seq[FileStatus]()

for (appAttemptDir <- appAttemptSubDirs){
// There are multiple attempts inside this application logs
if (fs.isFile(new Path(appAttemptDir.getPath(),
EventLoggingListener.APPLICATION_COMPLETE))) {
val (replayBus, appListener) = createReplayBus(appAttemptDir)
replayBus.replay()
appAttemptsApplicationHistoryInfo += new FsApplicationHistoryInfo(
dir.getPath().getName() + "/" + appAttemptDir.getPath().getName(),
dir.getPath().getName() + "_attemptid_" + appAttemptDir.getPath().getName(),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED))
}
}

} else {
val (replayBus, appListener) = createReplayBus(dir)
replayBus.replay()
appAttemptsApplicationHistoryInfo += new FsApplicationHistoryInfo(
dir.getPath().getName(),
appListener.appId.getOrElse(dir.getPath().getName()),
appListener.appName.getOrElse(NOT_STARTED),
appListener.startTime.getOrElse(-1L),
appListener.endTime.getOrElse(-1L),
getModificationTime(dir),
appListener.sparkUser.getOrElse(NOT_STARTED))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is indented too much. Please use 2 spaces instead

}
appAttemptsApplicationHistoryInfo.toSeq
} catch {
case e: Exception =>
logInfo(s"Failed to load application log data from $dir.", e)
Expand Down
129 changes: 120 additions & 9 deletions core/src/main/scala/org/apache/spark/deploy/history/HistoryPage.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import scala.xml.Node

import org.apache.spark.ui.{WebUIPage, UIUtils}

import scala.collection.immutable.ListMap

private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {

private val pageSize = 20
Expand All @@ -31,15 +33,25 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
val requestedPage = Option(request.getParameter("page")).getOrElse("1").toInt
val requestedFirst = (requestedPage - 1) * pageSize

val allApps = parent.getApplicationList()
val actualFirst = if (requestedFirst < allApps.size) requestedFirst else 0
val apps = allApps.slice(actualFirst, Math.min(actualFirst + pageSize, allApps.size))

val applicationNattemptsList = parent.getApplicationList()
val (hasAttemptInfo, appToAttemptMap) = getApplicationLevelList(applicationNattemptsList)
val allAppsSize = if(hasAttemptInfo) appToAttemptMap.size else applicationNattemptsList.size
val actualFirst = if (requestedFirst < allAppsSize) requestedFirst else 0
val apps = applicationNattemptsList.slice(actualFirst,
Math.min(actualFirst + pageSize,
allAppsSize))
val appWithAttemptsDisplayList = appToAttemptMap.slice(actualFirst,
Math.min(actualFirst + pageSize,
allAppsSize))
val actualPage = (actualFirst / pageSize) + 1
val last = Math.min(actualFirst + pageSize, allApps.size) - 1
val pageCount = allApps.size / pageSize + (if (allApps.size % pageSize > 0) 1 else 0)
val last = Math.min(actualFirst + pageSize, allAppsSize) - 1
val pageCount = allAppsSize / pageSize + (if (allAppsSize % pageSize > 0) 1 else 0)

val appTable = if(hasAttemptInfo ) UIUtils.listingTable(appWithAttemptHeader,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

style: space after if, and no space in the parentheses

val appTable =
  if (hasAttemptInfo) {
    UIUtils.listingTable(...)
  } else {
    UIUtils.listingTable(...)
  }

appWithAttemptRow,
appWithAttemptsDisplayList)
else UIUtils.listingTable(appHeader, appRow, apps)

val appTable = UIUtils.listingTable(appHeader, appRow, apps)
val providerConfig = parent.getProviderConfig()
val content =
<div class="row-fluid">
Expand All @@ -48,9 +60,9 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
{providerConfig.map { case (k, v) => <li><strong>{k}:</strong> {v}</li> }}
</ul>
{
if (allApps.size > 0) {
if (allAppsSize > 0) {
<h4>
Showing {actualFirst + 1}-{last + 1} of {allApps.size}
Showing {actualFirst + 1}-{last + 1} of {allAppsSize}
<span style="float: right">
{if (actualPage > 1) <a href={"/?page=" + (actualPage - 1)}>&lt;</a>}
{if (actualPage < pageCount) <a href={"/?page=" + (actualPage + 1)}>&gt;</a>}
Expand All @@ -72,6 +84,38 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
UIUtils.basicSparkPage(content, "History Server")
}

private def getApplicationLevelList (appNattemptList: Iterable[ApplicationHistoryInfo]) ={
// Create HashMap as per the multiple attempts for one application.
// If there is no attempt specific stuff, then
// do return false, to indicate the same, so that previous UI gets displayed.
var hasAttemptInfo = false
val appToAttemptInfo = new scala.collection.mutable.HashMap[String,
scala.collection.mutable.ArrayBuffer[ApplicationHistoryInfo]]
for( appAttempt <- appNattemptList) {
if(appAttempt.id.contains("_attemptid_")){
hasAttemptInfo = true
val applicationId = appAttempt.id.substring(0, appAttempt.id.indexOf("_attemptid_"))
val attemptId = getAttemptId(appAttempt.id)._1
if(appToAttemptInfo.contains(applicationId)){
val currentAttempts = appToAttemptInfo.get(applicationId).get
currentAttempts += appAttempt
appToAttemptInfo.put( applicationId, currentAttempts)
} else {
val currentAttempts = new scala.collection.mutable.ArrayBuffer[ApplicationHistoryInfo]()
currentAttempts += appAttempt
appToAttemptInfo.put( applicationId, currentAttempts )
}
}else {
val currentAttempts = new scala.collection.mutable.ArrayBuffer[ApplicationHistoryInfo]()
currentAttempts += appAttempt
appToAttemptInfo.put(appAttempt.id, currentAttempts)
}
}
val sortedMap = ListMap(appToAttemptInfo.toSeq.sortWith(_._1 > _._1):_*)
(hasAttemptInfo, sortedMap)
}


private val appHeader = Seq(
"App ID",
"App Name",
Expand All @@ -97,4 +141,71 @@ private[spark] class HistoryPage(parent: HistoryServer) extends WebUIPage("") {
<td sorttable_customkey={info.lastUpdated.toString}>{lastUpdated}</td>
</tr>
}


private val appWithAttemptHeader = Seq(
"App ID",
"App Name",
"Attempt ID",
"Started",
"Completed",
"Duration",
"Spark User",
"Last Updated")


private def firstAttemptRow(attemptInfo : ApplicationHistoryInfo) = {
val uiAddress = HistoryServer.UI_PATH_PREFIX + s"/${attemptInfo.id}"
val startTime = UIUtils.formatDate(attemptInfo.startTime)
val endTime = UIUtils.formatDate(attemptInfo.endTime)
val duration = UIUtils.formatDuration(attemptInfo.endTime - attemptInfo.startTime)
val lastUpdated = UIUtils.formatDate(attemptInfo.lastUpdated)
val attemptId = getAttemptId(attemptInfo.id)._1
<td><a href={uiAddress}>{attemptId}</a></td>
<td sorttable_customkey={attemptInfo.startTime.toString}>{startTime}</td>
<td sorttable_customkey={attemptInfo.endTime.toString}>{endTime}</td>
<td sorttable_customkey={(attemptInfo.endTime - attemptInfo.startTime).toString}>
{duration}</td>
<td>{attemptInfo.sparkUser}</td>
<td sorttable_customkey={attemptInfo.lastUpdated.toString}>{lastUpdated}</td>

}
private def attemptRow(attemptInfo : ApplicationHistoryInfo) = {
<tr>
{firstAttemptRow(attemptInfo)}
</tr>
}


private def getAttemptId(value : String) = {
if(value.contains("_attemptid_")) {
(value.substring(value.indexOf("_attemptid_") + "_attemptid_".length), true )
} else {
// Instead of showing NA, show the application itself, in case of no attempt specific logging.
// One can check the second value, to see if it has any attempt specific value or not
(value, false )
}
}

private def appWithAttemptRow(appAttemptsInfo: (String,
scala.collection.mutable.ArrayBuffer[ApplicationHistoryInfo])): Seq[Node] = {
val applicationId = appAttemptsInfo._1
val info = appAttemptsInfo._2
val rowSpan = info.length
val rowSpanString = rowSpan.toString
val applicatioName = info(0).name
val ttAttempts = info.slice(1, rowSpan -1)
val x = new xml.NodeBuffer
x +=
<tr>
<td rowspan={rowSpanString}>{applicationId}</td>
<td rowspan={rowSpanString}>{applicatioName}</td>
{ firstAttemptRow(info(0)) }
</tr>;
for( i <- 1 until rowSpan ){
x += attemptRow(info(i))
}
x
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -44,21 +44,25 @@ import org.apache.spark.util.{FileLogger, JsonProtocol, Utils}
*/
private[spark] class EventLoggingListener(
appId: String,
appAttemptId : String,
logBaseDir: String,
sparkConf: SparkConf,
hadoopConf: Configuration)
extends SparkListener with Logging {

import EventLoggingListener._

def this(appId: String, logBaseDir: String, sparkConf: SparkConf, hadoopConf: Configuration) =
this(appId, "", logBaseDir, sparkConf, hadoopConf)

def this(appId: String, logBaseDir: String, sparkConf: SparkConf) =
this(appId, logBaseDir, sparkConf, SparkHadoopUtil.get.newConfiguration(sparkConf))

private val shouldCompress = sparkConf.getBoolean("spark.eventLog.compress", false)
private val shouldOverwrite = sparkConf.getBoolean("spark.eventLog.overwrite", false)
private val testing = sparkConf.getBoolean("spark.eventLog.testing", false)
private val outputBufferSize = sparkConf.getInt("spark.eventLog.buffer.kb", 100) * 1024
val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId)
val logDir = EventLoggingListener.getLogDirPath(logBaseDir, appId, appAttemptId)
val logDirName: String = logDir.split("/").last
protected val logger = new FileLogger(logDir, sparkConf, hadoopConf, outputBufferSize,
shouldCompress, shouldOverwrite, Some(LOG_FILE_PERMISSIONS))
Expand Down Expand Up @@ -183,8 +187,26 @@ private[spark] object EventLoggingListener extends Logging {
* @return A path which consists of file-system-safe characters.
*/
def getLogDirPath(logBaseDir: String, appId: String): String = {
getLogDirPath(logBaseDir, appId, "")
}

/**
* Return a file-system-safe path to the log directory for the given application.
*
* @param logBaseDir A base directory for the path to the log directory for given application.
* @param appId A unique app ID.
* @param appAttemptId A unique attempt id of appId.
* @return A path which consists of file-system-safe characters.
*/

def getLogDirPath(logBaseDir: String, appId: String, appAttemptId : String): String = {
val name = appId.replaceAll("[ :/]", "-").replaceAll("[${}'\"]", "_").toLowerCase
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")

if (appAttemptId.equals("")) {
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/")
} else {
Utils.resolveURI(logBaseDir) + "/" + name.stripSuffix("/") + "/" + appAttemptId
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,11 @@ private[spark] trait SchedulerBackend {
*/
def applicationId(): String = appId

/**
* Get an application ID associated with the job.
*
* @return An application attempt id
*/
def applicationAttemptId(): String = ""

}
Original file line number Diff line number Diff line change
Expand Up @@ -73,5 +73,11 @@ private[spark] trait TaskScheduler {
* @return An application ID
*/
def applicationId(): String = appId
/**
* Get an application's attempt Id associated with the job.
*
* @return An application's Attempt ID
*/
def applicationAttemptId(): String = ""

}
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,8 @@ private[spark] class TaskSchedulerImpl(
}

override def applicationId(): String = backend.applicationId()

override def applicationAttemptId() : String = backend.applicationAttemptId()

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@ private[spark] class ApplicationMaster(args: ApplicationMasterArguments,

// Propagate the application ID so that YarnClusterSchedulerBackend can pick it up.
System.setProperty("spark.yarn.app.id", appAttemptId.getApplicationId().toString())

// Propagate the attempt id, so that in case of event logging,
// different attempt's logs gets created in different directory
System.setProperty("spark.yarn.app.attemptid", appAttemptId.getAttemptId().toString())

}

logInfo("ApplicationAttemptId: " + appAttemptId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,5 +46,10 @@ private[spark] class YarnClusterSchedulerBackend(
logError("Application ID is not set.")
super.applicationId
}


override def applicationAttemptId(): String =
sc.getConf.getOption("spark.yarn.app.attemptid").getOrElse {
logError("Application attempt ID is not set.")
super.applicationAttemptId
}
}