diff --git a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala index 1311cf8607d1c..56135d854453b 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala @@ -21,8 +21,9 @@ import java.io._ import java.net.URI import java.nio.charset.StandardCharsets import java.util.Locale +import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function -import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import org.apache.hadoop.conf.Configuration @@ -290,7 +291,7 @@ private[spark] object EventLoggingListener extends Logging { private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort) // A cache for compression codecs to avoid creating the same codec many times - private val codecMap = new mutable.HashMap[String, CompressionCodec] + private val codecMap = new ConcurrentHashMap[String, CompressionCodec] /** * Write metadata about an event log to the given stream. @@ -357,7 +358,10 @@ private[spark] object EventLoggingListener extends Logging { val in = new BufferedInputStream(fs.open(log)) try { val codec = codecName(log).map { c => - codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c)) + codecMap.computeIfAbsent(c, new Function[String, CompressionCodec] { + override def apply(key: String): CompressionCodec = + CompressionCodec.createCodec(new SparkConf, key) + }) } codec.map(_.compressedInputStream(in)).getOrElse(in) } catch {