@@ -24,6 +24,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
2424import org .json4s .jackson .JsonMethods ._
2525
2626import org .apache .spark .{Logging , SparkConf , SparkContext }
27+ import org .apache .spark .deploy .SparkHadoopUtil
2728import org .apache .spark .io .CompressionCodec
2829import org .apache .spark .util .{FileLogger , JsonProtocol }
2930
@@ -39,22 +40,25 @@ import org.apache.spark.util.{FileLogger, JsonProtocol}
3940 */
4041private [spark] class EventLoggingListener (
4142 appName : String ,
42- conf : SparkConf ,
43- hadoopConfiguration : Configuration )
43+ sparkConf : SparkConf ,
44+ hadoopConf : Configuration )
4445 extends SparkListener with Logging {
4546
4647 import EventLoggingListener ._
4748
48- private val shouldCompress = conf.getBoolean(" spark.eventLog.compress" , false )
49- private val shouldOverwrite = conf.getBoolean(" spark.eventLog.overwrite" , false )
50- private val outputBufferSize = conf.getInt(" spark.eventLog.buffer.kb" , 100 ) * 1024
51- private val logBaseDir = conf.get(" spark.eventLog.dir" , DEFAULT_LOG_DIR ).stripSuffix(" /" )
49+ def this (appName : String , sparkConf : SparkConf ) = {
50+ this (appName, sparkConf, SparkHadoopUtil .get.newConfiguration())
51+ }
52+
53+ private val shouldCompress = sparkConf.getBoolean(" spark.eventLog.compress" , false )
54+ private val shouldOverwrite = sparkConf.getBoolean(" spark.eventLog.overwrite" , false )
55+ private val outputBufferSize = sparkConf.getInt(" spark.eventLog.buffer.kb" , 100 ) * 1024
56+ private val logBaseDir = sparkConf.get(" spark.eventLog.dir" , DEFAULT_LOG_DIR ).stripSuffix(" /" )
5257 private val name = appName.replaceAll(" [ :/]" , " -" ).toLowerCase + " -" + System .currentTimeMillis
5358 val logDir = logBaseDir + " /" + name
5459
5560 private val logger =
56- new FileLogger (logDir, conf, hadoopConfiguration, outputBufferSize, shouldCompress,
57- shouldOverwrite)
61+ new FileLogger (logDir, sparkConf, hadoopConf, outputBufferSize, shouldCompress, shouldOverwrite)
5862
5963 /**
6064 * Begin logging events.
@@ -63,7 +67,7 @@ private[spark] class EventLoggingListener(
6367 def start () {
6468 logInfo(" Logging events to %s" .format(logDir))
6569 if (shouldCompress) {
66- val codec = conf .get(" spark.io.compression.codec" , CompressionCodec .DEFAULT_COMPRESSION_CODEC )
70+ val codec = sparkConf .get(" spark.io.compression.codec" , CompressionCodec .DEFAULT_COMPRESSION_CODEC )
6771 logger.newFile(COMPRESSION_CODEC_PREFIX + codec)
6872 }
6973 logger.newFile(SPARK_VERSION_PREFIX + SparkContext .SPARK_VERSION )
0 commit comments