Skip to content

Commit 076e28e

Browse files
committed
Make EventLoggingListener.codecMap thread-safe
1 parent 63e93a5 commit 076e28e

File tree

1 file changed

+7
-3
lines changed

1 file changed

+7
-3
lines changed

core/src/main/scala/org/apache/spark/scheduler/EventLoggingListener.scala

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@ import java.io._
2121
import java.net.URI
2222
import java.nio.charset.StandardCharsets
2323
import java.util.Locale
24+
import java.util.concurrent.ConcurrentHashMap
25+
import java.util.function.Function
2426

25-
import scala.collection.mutable
2627
import scala.collection.mutable.ArrayBuffer
2728

2829
import org.apache.hadoop.conf.Configuration
@@ -290,7 +291,7 @@ private[spark] object EventLoggingListener extends Logging {
290291
private val LOG_FILE_PERMISSIONS = new FsPermission(Integer.parseInt("770", 8).toShort)
291292

292293
// A cache for compression codecs to avoid creating the same codec many times
293-
private val codecMap = new mutable.HashMap[String, CompressionCodec]
294+
private val codecMap = new ConcurrentHashMap[String, CompressionCodec]
294295

295296
/**
296297
* Write metadata about an event log to the given stream.
@@ -357,7 +358,10 @@ private[spark] object EventLoggingListener extends Logging {
357358
val in = new BufferedInputStream(fs.open(log))
358359
try {
359360
val codec = codecName(log).map { c =>
360-
codecMap.getOrElseUpdate(c, CompressionCodec.createCodec(new SparkConf, c))
361+
codecMap.computeIfAbsent(c, new Function[String, CompressionCodec] {
362+
override def apply(key: String): CompressionCodec =
363+
CompressionCodec.createCodec(new SparkConf, key)
364+
})
361365
}
362366
codec.map(_.compressedInputStream(in)).getOrElse(in)
363367
} catch {

0 commit comments

Comments
 (0)