Skip to content
This repository was archived by the owner on Oct 23, 2024. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ import java.util.{Date, Locale}
import java.util.concurrent.atomic.AtomicLong
import javax.servlet.http.HttpServletResponse

import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf, SparkException}
import org.apache.spark.deploy.Command
import org.apache.spark.deploy.mesos.MesosDriverDescription
import org.apache.spark.deploy.rest._
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler
import org.apache.spark.deploy.rest.{SubmitRestProtocolException, _}
import org.apache.spark.scheduler.cluster.mesos.{MesosClusterScheduler, MesosProtoUtils}
import org.apache.spark.util.Utils

/**
Expand Down Expand Up @@ -70,7 +70,7 @@ private[mesos] class MesosSubmitRequestServlet(
// These defaults copied from YARN
private val MEMORY_OVERHEAD_FACTOR = 0.10
private val MEMORY_OVERHEAD_MIN = 384

/**
* Build a driver description from the fields specified in the submit request.
*
Expand Down Expand Up @@ -107,6 +107,8 @@ private[mesos] class MesosSubmitRequestServlet(
val driverCores = sparkProperties.get("spark.driver.cores")
val name = request.sparkProperties.getOrElse("spark.app.name", mainClass)

validateLabelsFormat(sparkProperties)

// Construct driver description
val defaultConf = this.conf.getAllWithPrefix("spark.mesos.dispatcher.driverDefault.").toMap
val driverConf = new SparkConf(false)
Expand All @@ -129,7 +131,7 @@ private[mesos] class MesosSubmitRequestServlet(
val submissionId = newDriverId(submitDate)

new MesosDriverDescription(
name, appResource, actualDriverMemory + actualDriverMemoryOverhead, actualDriverCores,
name, appResource, actualDriverMemory + actualDriverMemoryOverhead, actualDriverCores,
actualSuperviseDriver, command, driverConf.getAll.toMap, submissionId, submitDate)
}

Expand All @@ -139,20 +141,40 @@ private[mesos] class MesosSubmitRequestServlet(
responseServlet: HttpServletResponse): SubmitRestProtocolResponse = {
requestMessage match {
case submitRequest: CreateSubmissionRequest =>
val driverDescription = buildDriverDescription(submitRequest)
val s = scheduler.submitDriver(driverDescription)
s.serverSparkVersion = sparkVersion
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
s.unknownFields = unknownFields
try {
val driverDescription = buildDriverDescription(submitRequest)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While exceptions have been used in buildDriverDescription before, this approach doesn't look quite right. When we have a method returning value or throwing an error it's a natural use case for Scala's Either or Try. On the other hand, there're no usages of Either or Try in mesos package. On the other hand, having try-catch block here is a good defensive approach which can save us from surprises.

Given the status of the codebase for Mesos Scheduler in general, a more severe refactoring is needed so I'd say, let's keep the current code as is.

val s = scheduler.submitDriver(driverDescription)
s.serverSparkVersion = sparkVersion
val unknownFields = findUnknownFields(requestMessageJson, requestMessage)
if (unknownFields.nonEmpty) {
// If there are fields that the server does not know about, warn the client
s.unknownFields = unknownFields
}
s

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know this isn't you but man... single char variable names are the worst!

} catch {
case ex: SubmitRestProtocolException =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError(s"Bad request: ${ex.getMessage}")
}
s
case unexpected =>
responseServlet.setStatus(HttpServletResponse.SC_BAD_REQUEST)
handleError(s"Received message of unexpected type ${unexpected.messageType}.")
}
}

private[mesos] def validateLabelsFormat(properties: Map[String, String]): Unit = {
List("spark.mesos.network.labels", "spark.mesos.task.labels", "spark.mesos.driver.labels")
.foreach { name =>
properties.get(name) foreach { label =>
try {
MesosProtoUtils.mesosLabels(label)
} catch {
case _ : SparkException => throw new SubmitRestProtocolException("Malformed label in " +
s"${name}: ${label}. Valid label format: ${name}=key1:value1,key2:value2")
}
}
}
}
}

private[mesos] class MesosKillRequestServlet(scheduler: MesosClusterScheduler, conf: SparkConf)
Expand All @@ -172,3 +194,5 @@ private[mesos] class MesosStatusRequestServlet(scheduler: MesosClusterScheduler,
d
}
}


Original file line number Diff line number Diff line change
Expand Up @@ -679,7 +679,7 @@ private[spark] class MesosClusterScheduler(
TaskID.newBuilder().setValue(submission.submissionId).build(),
SlaveID.newBuilder().setValue("").build(),
None,
null,
new Date(),
None,
getDriverFrameworkID(submission))
logError(s"Failed to launch the driver with id: ${submission.submissionId}, " +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.mockito.Mockito.mock

import org.apache.spark.{SparkConf, SparkFunSuite}
import org.apache.spark.deploy.TestPrematureExit
import org.apache.spark.deploy.rest.CreateSubmissionRequest
import org.apache.spark.deploy.rest.{CreateSubmissionRequest, SubmitRestProtocolException}
import org.apache.spark.scheduler.cluster.mesos.MesosClusterScheduler

class MesosSubmitRequestServletSuite extends SparkFunSuite
Expand Down Expand Up @@ -50,4 +50,24 @@ class MesosSubmitRequestServletSuite extends SparkFunSuite
assert("test_network" == driverConf.get("spark.mesos.network.name"))
assert("k0:v0,k1:v1" == driverConf.get("spark.mesos.network.labels"))
}

test("test a job with malformed labels is not submitted") {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you change to test a job with malformed labels throws exception?

val conf = new SparkConf(loadDefaults = false)

val submitRequestServlet = new MesosSubmitRequestServlet(
scheduler = mock(classOf[MesosClusterScheduler]),
conf
)

val request = new CreateSubmissionRequest
request.appResource = "hdfs://test.jar"
request.mainClass = "foo.Bar"
request.appArgs = Array.empty[String]
request.sparkProperties = Map("spark.mesos.network.labels" -> "k0,k1:v1") // malformed label
request.environmentVariables = Map.empty[String, String]

assertThrows[SubmitRestProtocolException] {
submitRequestServlet.buildDriverDescription(request)
}
}
}