Skip to content

Commit 2bcd5d5

Browse files
committed
[SPARK-17193][CORE] HadoopRDD NPE at DEBUG log level when getLocationInfo == null
## What changes were proposed in this pull request? Handle null from Hadoop getLocationInfo directly instead of catching (and logging) exception ## How was this patch tested? Jenkins tests Author: Sean Owen <[email protected]> Closes #14760 from srowen/SPARK-17193.
1 parent 5f02d2e commit 2bcd5d5

File tree

2 files changed

+13
-15
lines changed

2 files changed

+13
-15
lines changed

core/src/main/scala/org/apache/spark/rdd/HadoopRDD.scala

Lines changed: 12 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import java.text.SimpleDateFormat
2222
import java.util.Date
2323

2424
import scala.collection.immutable.Map
25-
import scala.collection.mutable.ListBuffer
2625
import scala.reflect.ClassTag
2726

2827
import org.apache.hadoop.conf.{Configurable, Configuration}
@@ -317,7 +316,7 @@ class HadoopRDD[K, V](
317316
try {
318317
val lsplit = c.inputSplitWithLocationInfo.cast(hsplit)
319318
val infos = c.getLocationInfo.invoke(lsplit).asInstanceOf[Array[AnyRef]]
320-
Some(HadoopRDD.convertSplitLocationInfo(infos))
319+
HadoopRDD.convertSplitLocationInfo(infos)
321320
} catch {
322321
case e: Exception =>
323322
logDebug("Failed to use InputSplitWithLocations.", e)
@@ -419,21 +418,20 @@ private[spark] object HadoopRDD extends Logging {
419418
None
420419
}
421420

422-
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Seq[String] = {
423-
val out = ListBuffer[String]()
424-
infos.foreach { loc =>
425-
val locationStr = HadoopRDD.SPLIT_INFO_REFLECTIONS.get.
426-
getLocation.invoke(loc).asInstanceOf[String]
421+
private[spark] def convertSplitLocationInfo(infos: Array[AnyRef]): Option[Seq[String]] = {
422+
Option(infos).map(_.flatMap { loc =>
423+
val reflections = HadoopRDD.SPLIT_INFO_REFLECTIONS.get
424+
val locationStr = reflections.getLocation.invoke(loc).asInstanceOf[String]
427425
if (locationStr != "localhost") {
428-
if (HadoopRDD.SPLIT_INFO_REFLECTIONS.get.isInMemory.
429-
invoke(loc).asInstanceOf[Boolean]) {
430-
logDebug("Partition " + locationStr + " is cached by Hadoop.")
431-
out += new HDFSCacheTaskLocation(locationStr).toString
426+
if (reflections.isInMemory.invoke(loc).asInstanceOf[Boolean]) {
427+
logDebug(s"Partition $locationStr is cached by Hadoop.")
428+
Some(HDFSCacheTaskLocation(locationStr).toString)
432429
} else {
433-
out += new HostTaskLocation(locationStr).toString
430+
Some(HostTaskLocation(locationStr).toString)
434431
}
432+
} else {
433+
None
435434
}
436-
}
437-
out.seq
435+
})
438436
}
439437
}

core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ class NewHadoopRDD[K, V](
255255
case Some(c) =>
256256
try {
257257
val infos = c.newGetLocationInfo.invoke(split).asInstanceOf[Array[AnyRef]]
258-
Some(HadoopRDD.convertSplitLocationInfo(infos))
258+
HadoopRDD.convertSplitLocationInfo(infos)
259259
} catch {
260260
case e : Exception =>
261261
logDebug("Failed to use InputSplit#getLocationInfo.", e)

0 commit comments

Comments
 (0)