Skip to content

Commit 62103fd

Browse files
committed
Update as per review
1 parent 0c07d1f commit 62103fd

File tree

6 files changed

+14
-12
lines changed

6 files changed

+14
-12
lines changed

core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1060,7 +1060,7 @@ class DAGScheduler(
10601060
if (runningStages.contains(failedStage)) {
10611061
logInfo(s"Marking $failedStage (${failedStage.name}) as failed " +
10621062
s"due to a fetch failure from $mapStage (${mapStage.name})")
1063-
markStageAsFinished(failedStage, Some(failureMessage))
1063+
markStageAsFinished(failedStage, Some("Fetch failure: " + failureMessage))
10641064
runningStages -= failedStage
10651065
}
10661066

core/src/main/scala/org/apache/spark/shuffle/FetchFailedException.scala

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -35,11 +35,14 @@ private[spark] class FetchFailedException(
3535
message: String)
3636
extends Exception {
3737

38+
def this(bmAddress: BlockManagerId, shuffleId: Int, mapId: Int, reduceId: Int, e: Throwable) {
39+
this(bmAddress, shuffleId, mapId, reduceId, Utils.exceptionString(e))
40+
}
41+
3842
override def getMessage: String =
3943
"Fetch failed: %s %d %d %d".format(bmAddress, shuffleId, mapId, reduceId)
4044

41-
def toTaskEndReason: TaskEndReason =
42-
FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
45+
def toTaskEndReason: TaskEndReason = FetchFailed(bmAddress, shuffleId, mapId, reduceId, message)
4346
}
4447

4548
/**

core/src/main/scala/org/apache/spark/shuffle/hash/BlockStoreShuffleFetcher.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ import org.apache.spark._
2525
import org.apache.spark.serializer.Serializer
2626
import org.apache.spark.shuffle.FetchFailedException
2727
import org.apache.spark.storage.{BlockId, BlockManagerId, ShuffleBlockFetcherIterator, ShuffleBlockId}
28-
import org.apache.spark.util.{Utils, CompletionIterator}
28+
import org.apache.spark.util.CompletionIterator
2929

3030
private[hash] object BlockStoreShuffleFetcher extends Logging {
3131
def fetch[T](
@@ -64,8 +64,7 @@ private[hash] object BlockStoreShuffleFetcher extends Logging {
6464
blockId match {
6565
case ShuffleBlockId(shufId, mapId, _) =>
6666
val address = statuses(mapId.toInt)._1
67-
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId,
68-
Utils.exceptionString(e))
67+
throw new FetchFailedException(address, shufId.toInt, mapId.toInt, reduceId, e)
6968
case _ =>
7069
throw new SparkException(
7170
"Failed to get block " + blockId + ", which is not a shuffle block")

core/src/main/scala/org/apache/spark/storage/ShuffleBlockFetcherIterator.scala

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -321,7 +321,7 @@ object ShuffleBlockFetcherIterator {
321321
/**
322322
* Result of a fetch from a remote block.
323323
*/
324-
trait FetchResult {
324+
sealed trait FetchResult {
325325
val blockId: BlockId
326326
}
327327

@@ -332,7 +332,7 @@ object ShuffleBlockFetcherIterator {
332332
* Note that this is NOT the exact bytes.
333333
* @param buf [[ManagedBuffer]] for the content.
334334
*/
335-
case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer)
335+
sealed case class SuccessFetchResult(blockId: BlockId, size: Long, buf: ManagedBuffer)
336336
extends FetchResult {
337337
require(buf != null)
338338
require(size >= 0)
@@ -343,6 +343,5 @@ object ShuffleBlockFetcherIterator {
343343
* @param blockId block id
344344
* @param e the failure exception
345345
*/
346-
case class FailureFetchResult(blockId: BlockId, e: Throwable) extends FetchResult {
347-
}
346+
sealed case class FailureFetchResult(blockId: BlockId, e: Throwable) extends FetchResult
348347
}

core/src/main/scala/org/apache/spark/util/JsonProtocol.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -630,7 +630,7 @@ private[spark] object JsonProtocol {
630630
val reduceId = (json \ "Reduce ID").extract[Int]
631631
val message = Utils.jsonOption(json \ "Message").map(_.extract[String])
632632
new FetchFailed(blockManagerAddress, shuffleId, mapId, reduceId,
633-
message.getOrElse("Unknown"))
633+
message.getOrElse("Unknown reason"))
634634
case `exceptionFailure` =>
635635
val className = (json \ "Class Name").extract[String]
636636
val description = (json \ "Description").extract[String]

core/src/test/scala/org/apache/spark/util/JsonProtocolSuite.scala

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,8 @@ class JsonProtocolSuite extends FunSuite {
107107
testJobResult(jobFailed)
108108

109109
// TaskEndReason
110-
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19, "Some exception")
110+
val fetchFailed = FetchFailed(BlockManagerId("With or", "without you", 15), 17, 18, 19,
111+
"Some exception")
111112
val exceptionFailure = ExceptionFailure("To be", "or not to be", stackTrace, None)
112113
testTaskEndReason(Success)
113114
testTaskEndReason(Resubmitted)

0 commit comments

Comments
 (0)