Skip to content

Commit 78e360c

Browse files
committed
fix tests
1 parent 02c213e commit 78e360c

File tree

2 files changed

+10
-5
lines changed

2 files changed

+10
-5
lines changed

core/src/main/scala/org/apache/spark/storage/BlockManager.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -883,7 +883,7 @@ private[spark] class BlockManager(
883883
Some(block)
884884
})
885885
}
886-
logDebug(s"Read $blockId from the disk of a same host executor is " +
886+
logInfo(s"Read $blockId from the disk of a same host executor is " +
887887
(if (res.isDefined) "successful." else "failed."))
888888
res
889889
}.orElse {

core/src/main/scala/org/apache/spark/storage/BlockManagerMasterEndpoint.scala

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -520,15 +520,20 @@ class BlockManagerMasterEndpoint(
520520
if (externalShuffleServiceRddFetchEnabled && bmId.port == externalShuffleServicePort) {
521521
Option(blockStatusByShuffleService(bmId).get(blockId))
522522
} else {
523-
blockManagerInfo(bmId).getStatus(blockId)
523+
blockManagerInfo.get(bmId).flatMap(_.getStatus(blockId))
524524
}
525525
}
526526

527527
if (locations.nonEmpty && status.isDefined) {
528528
val localDirs = locations.find { loc =>
529-
loc.host == requesterHost && loc.port != externalShuffleServicePort
530-
val status = blockManagerInfo(loc).getStatus(blockId)
531-
status.isDefined && status.get.storageLevel.useDisk
529+
if (loc.port != externalShuffleServicePort && loc.host == requesterHost) {
530+
blockManagerInfo
531+
.get(loc)
532+
.flatMap(_.getStatus(blockId).map(_.storageLevel.useDisk))
533+
.getOrElse(false)
534+
} else {
535+
false
536+
}
532537
}.map(blockManagerInfo(_).localDirs)
533538
Some(BlockLocationsAndStatus(locations, status.get, localDirs))
534539
} else {

0 commit comments

Comments
 (0)