Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ private[spark] class CoarseGrainedExecutorBackend(
driverUrl: String,
executorId: String,
hostPort: String,
cores: Int)
cores: Int,
actorSystem: ActorSystem)
extends Actor
with ExecutorBackend
with Logging {
Expand Down Expand Up @@ -94,6 +95,9 @@ private[spark] class CoarseGrainedExecutorBackend(
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}

override def akkaFrameSize() = actorSystem.settings.config.getBytes(
"akka.remote.netty.tcp.maximum-frame-size")
}

private[spark] object CoarseGrainedExecutorBackend {
Expand All @@ -113,7 +117,7 @@ private[spark] object CoarseGrainedExecutorBackend {
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Props(classOf[CoarseGrainedExecutorBackend], driverUrl, executorId,
sparkHostPort, cores),
sparkHostPort, cores, actorSystem),
name = "Executor")
workerUrl.foreach {
url =>
Expand Down
8 changes: 3 additions & 5 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,6 @@ private[spark] class Executor(
private val urlClassLoader = createClassLoader()
private val replClassLoader = addReplClassLoaderIfNeeded(urlClassLoader)

// Akka's message frame size. If task result is bigger than this, we use the block manager
// to send the result back.
private val akkaFrameSize = AkkaUtils.maxFrameSizeBytes(conf)

// Start worker thread pool
val threadPool = Utils.newDaemonCachedThreadPool("Executor task launch worker")

Expand Down Expand Up @@ -211,8 +207,10 @@ private[spark] class Executor(
task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
logInfo("Serialized size of result for " + taskId + " is " + serializedDirectResult.limit)

val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
if (serializedDirectResult.limit >= execBackend.akkaFrameSize() -
AkkaUtils.reservedSizeBytes) {
logInfo("Storing result for " + taskId + " in local BlockManager")
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ import org.apache.spark.TaskState.TaskState
*/
private[spark] trait ExecutorBackend {
def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)

// Exists as a work around for SPARK-1112. This only exists in branch-1.x of Spark.
def akkaFrameSize(): Long = Long.MaxValue
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Slightly confused on this -- won't this make LocalBackend and MesosExecutorBackend never use the BlockManager for results, causing them to have the undesired behavior?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The MesosExecutorBackend sends results through mesos, not akka. The LocalBackend sends a message to an actor within the same actor system... which I assumed won't go over TCP.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see. So the only real change is that in certain cases the LocalBackend will no longer use the BlockManager for returning results. Sounds fine.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So that change actually alters the expectations of the unit tests, so I went ahead and just enforced the limit in the LocalBackend anwyays.

}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ class CoarseGrainedSchedulerBackend(scheduler: TaskSchedulerImpl, actorSystem: A
for (task <- tasks.flatten) {
val ser = SparkEnv.get.closureSerializer.newInstance()
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - 1024) {
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ package org.apache.spark.scheduler.local
import java.nio.ByteBuffer

import akka.actor.{Actor, ActorRef, Props}

import org.apache.spark.{Logging, SparkEnv, TaskState}
import org.apache.spark.TaskState.TaskState
import org.apache.spark.executor.{Executor, ExecutorBackend}
import org.apache.spark.scheduler.{SchedulerBackend, TaskSchedulerImpl, WorkerOffer}
import org.apache.spark.util.AkkaUtils

private case class ReviveOffers()

Expand Down Expand Up @@ -106,4 +106,8 @@ private[spark] class LocalBackend(scheduler: TaskSchedulerImpl, val totalCores:
override def statusUpdate(taskId: Long, state: TaskState, serializedData: ByteBuffer) {
localActor ! StatusUpdate(taskId, state, serializedData)
}

// This limit is calculated only to preserve expected behavior in tests. In reality, since this
// backend sends messages over the existing actor system, there is no need to enforce a limit.
override def akkaFrameSize() = AkkaUtils.maxFrameSizeBytes(scheduler.sc.getConf)
}
3 changes: 3 additions & 0 deletions core/src/main/scala/org/apache/spark/util/AkkaUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,7 @@ private[spark] object AkkaUtils extends Logging {
def maxFrameSizeBytes(conf: SparkConf): Int = {
conf.getInt("spark.akka.frameSize", 10) * 1024 * 1024
}

/** Space reserved for extra data in an Akka message besides serialized task or task result. */
val reservedSizeBytes = 200 * 1024
}
21 changes: 12 additions & 9 deletions core/src/test/scala/org/apache/spark/MapOutputTrackerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {

test("remote fetch exceeds akka frame size") {
val newConf = new SparkConf
newConf.set("spark.akka.frameSize", "1")
newConf.set("spark.akka.askTimeout", "1") // Fail fast

val masterTracker = new MapOutputTrackerMaster(conf)
Expand All @@ -191,14 +190,18 @@ class MapOutputTrackerSuite extends FunSuite with LocalSparkContext {
new MapOutputTrackerMasterActor(masterTracker, newConf))(actorSystem)
val masterActor = actorRef.underlyingActor

// Frame size should be ~1.1MB, and MapOutputTrackerMasterActor should throw exception.
// Note that the size is hand-selected here because map output statuses are compressed before
// being sent.
masterTracker.registerShuffle(20, 100)
(0 until 100).foreach { i =>
masterTracker.registerMapOutput(20, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), Array.fill[Byte](4000000)(0)))
// Frame size should be 2 * the configured frame size, and MapOutputTrackerMasterActor should
// throw exception.
val shuffleId = 20
val numMaps = 2
val data = new Array[Byte](AkkaUtils.maxFrameSizeBytes(conf))
val random = new java.util.Random(0)
random.nextBytes(data) // Make it hard to compress.
masterTracker.registerShuffle(shuffleId, numMaps)
(0 until numMaps).foreach { i =>
masterTracker.registerMapOutput(shuffleId, i, new MapStatus(
BlockManagerId("999", "mps", 1000, 0), data))
}
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(20)) }
intercept[SparkException] { masterActor.receive(GetMapOutputStatuses(shuffleId)) }
}
}