Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/running-on-yarn.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,40 +142,40 @@ To use a custom metrics.properties for the application master and executors, upd
</td>
</tr>
<tr>
<td><code>spark.yarn.am.resource.{resource-type}</code></td>
<td><code>spark.yarn.am.resource.{resource-type}.amount</code></td>
<td><code>(none)</code></td>
<td>
Amount of resource to use for the YARN Application Master in client mode.
In cluster mode, use <code>spark.yarn.driver.resource.&lt;resource-type&gt;</code> instead.
In cluster mode, use <code>spark.yarn.driver.resource.&lt;resource-type&gt;.amount</code> instead.
Please note that this feature can be used only with YARN 3.0+
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
<p/>
Example:
To request GPU resources from YARN, use: <code>spark.yarn.am.resource.yarn.io/gpu</code>
To request GPU resources from YARN, use: <code>spark.yarn.am.resource.yarn.io/gpu.amount</code>
</td>
</tr>
<tr>
<td><code>spark.yarn.driver.resource.{resource-type}</code></td>
<td><code>spark.yarn.driver.resource.{resource-type}.amount</code></td>
<td><code>(none)</code></td>
<td>
Amount of resource to use for the YARN Application Master in cluster mode.
Please note that this feature can be used only with YARN 3.0+
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
<p/>
Example:
To request GPU resources from YARN, use: <code>spark.yarn.driver.resource.yarn.io/gpu</code>
To request GPU resources from YARN, use: <code>spark.yarn.driver.resource.yarn.io/gpu.amount</code>
</td>
</tr>
<tr>
<td><code>spark.yarn.executor.resource.{resource-type}</code></td>
<td><code>spark.yarn.executor.resource.{resource-type}.amount</code></td>
<td><code>(none)</code></td>
<td>
Amount of resource to use per executor process.
Please note that this feature can be used only with YARN 3.0+
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
<p/>
Example:
To request GPU resources from YARN, use: <code>spark.yarn.executor.resource.yarn.io/gpu</code>
To request GPU resources from YARN, use: <code>spark.yarn.executor.resource.yarn.io/gpu.amount</code>
</td>
</tr>
<tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
Expand Down Expand Up @@ -241,12 +241,12 @@ private[spark] class Client(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {

val yarnAMResources =
if (isClusterMode) {
sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap
} else {
sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap
}
val componentName = if (isClusterMode) {
config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
} else {
config.YARN_AM_RESOURCE_TYPES_PREFIX
}
val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
val amResources = yarnAMResources ++
getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
logDebug(s"AM resources: $amResources")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,11 @@ import scala.util.Try
import org.apache.hadoop.yarn.api.records.Resource

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU}
import org.apache.spark.util.{CausedBy, Utils}

/**
Expand All @@ -40,6 +40,45 @@ import org.apache.spark.util.{CausedBy, Utils}
private object ResourceRequestHelper extends Logging {
private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation"
val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to reuse ResourceUtils.GPU and ResourceUtils.FPGA here?

Like:

  val YARN_GPU_RESOURCE_CONFIG = "yarn.io/$GPU"
  val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/$FPGA"

For suggesting using as close mapping as possible between the Spark config keys and YARN resource config keys (now and the future).

I know your PR does not touched this part:

(ResourceID(SPARK_EXECUTOR_PREFIX, "fpga").amountConf,
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
(ResourceID(SPARK_DRIVER_PREFIX, "fpga").amountConf,
s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_FPGA_RESOURCE_CONFIG}"),
(ResourceID(SPARK_EXECUTOR_PREFIX, "gpu").amountConf,
s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"),
(ResourceID(SPARK_DRIVER_PREFIX, "gpu").amountConf,
s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${YARN_GPU_RESOURCE_CONFIG}"))

But at that place using the right constant ($GPU or $FPGA) instead of the string literal is reasonable.


private[yarn] def getYarnResourcesAndAmounts(
sparkConf: SparkConf,
componentName: String): Map[String, String] = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: in getYarnResourcesFromSparkResources the second parameter is the SparkConf what about taking the componentName (prefix) to the first place here too.

sparkConf.getAllWithPrefix(s"$componentName").map { case (key, value) =>
val splitIndex = key.lastIndexOf('.')
if (splitIndex == -1) {
val errorMessage = s"Missing suffix for ${componentName}${key}, you must specify" +
s" a suffix - $AMOUNT is currently the only supported suffix."
throw new IllegalArgumentException(errorMessage.toString())
}
val resourceName = key.substring(0, splitIndex)
val resourceSuffix = key.substring(splitIndex + 1)
if (!AMOUNT.equals(resourceSuffix)) {
val errorMessage = s"Unsupported suffix: $resourceSuffix in: ${componentName}${key}, " +
s"only .$AMOUNT is supported."
throw new IllegalArgumentException(errorMessage.toString())
}
(resourceName, value)
}.toMap
}

/**
* Convert Spark resources into YARN resources.
* The only resources we know how to map from spark configs to yarn configs are
* gpus and fpgas, everything else the user has to specify them in both the
* spark.yarn.*.resource and the spark.*.resource configs.
*/
private[yarn] def getYarnResourcesFromSparkResources(
confPrefix: String,
sparkConf: SparkConf
): Map[String, String] = {
Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map {
case (rName, yarnName) =>
(yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0"))
}.filter { case (_, count) => count.toLong > 0 }
}

/**
* Validates sparkConf and throws a SparkException if any of standard resources (memory or cores)
Expand Down Expand Up @@ -81,8 +120,9 @@ private object ResourceRequestHelper extends Logging {
val errorMessage = new mutable.StringBuilder()

resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
if (sparkConf.contains(resourceRequest)) {
errorMessage.append(s"Error: Do not use $resourceRequest, " +
val resourceRequestAmount = s"${resourceRequest}.${AMOUNT}"
if (sparkConf.contains(resourceRequestAmount)) {
errorMessage.append(s"Error: Do not use $resourceRequestAmount, " +
s"please use $sparkName instead!\n")
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration

import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
Expand Down Expand Up @@ -142,8 +143,8 @@ private[yarn] class YarnAllocator(
protected val executorCores = sparkConf.get(EXECUTOR_CORES)

private val executorResourceRequests =
sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++
getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)

// Resource capability requested for each executor
private[yarn] val resource: Resource = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,24 +41,6 @@ object YarnSparkHadoopUtil {
val MEMORY_OVERHEAD_MIN = 384L

val ANY_HOST = "*"
val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"

/**
* Convert Spark resources into YARN resources.
* The only resources we know how to map from spark configs to yarn configs are
* gpus and fpgas, everything else the user has to specify them in both the
* spark.yarn.*.resource and the spark.*.resource configs.
*/
private[yarn] def getYarnResourcesFromSparkResources(
confPrefix: String,
sparkConf: SparkConf
): Map[String, String] = {
Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map {
case (rName, yarnName) =>
(yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0"))
}.filter { case (_, count) => count.toLong > 0 }
}

// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,11 @@ import org.mockito.Mockito.{spy, verify}
import org.scalatest.Matchers

import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils}
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceID
import org.apache.spark.resource.ResourceUtils.AMOUNT
import org.apache.spark.util.{SparkConfWithEnv, Utils}

class ClientSuite extends SparkFunSuite with Matchers {
Expand Down Expand Up @@ -372,7 +373,7 @@ class ClientSuite extends SparkFunSuite with Matchers {

val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
resources.foreach { case (name, v) =>
conf.set(prefix + name, v.toString)
conf.set(s"${prefix}${name}.${AMOUNT}", v.toString)
}

val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
Expand All @@ -397,7 +398,7 @@ class ClientSuite extends SparkFunSuite with Matchers {

val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.keys.foreach { yarnName =>
conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
}
resources.values.foreach { rName =>
conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
Expand All @@ -407,9 +408,9 @@ class ClientSuite extends SparkFunSuite with Matchers {
ResourceRequestHelper.validateResources(conf)
}.getMessage()

assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," +
assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga.amount," +
" please use spark.driver.resource.fpga.amount"))
assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," +
assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu.amount," +
" please use spark.driver.resource.gpu.amount"))
}

Expand All @@ -420,7 +421,7 @@ class ClientSuite extends SparkFunSuite with Matchers {

val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.keys.foreach { yarnName =>
conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
}
resources.values.foreach { rName =>
conf.set(ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3")
Expand All @@ -430,9 +431,9 @@ class ClientSuite extends SparkFunSuite with Matchers {
ResourceRequestHelper.validateResources(conf)
}.getMessage()

assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga," +
assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga.amount," +
" please use spark.executor.resource.fpga.amount"))
assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu," +
assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu.amount," +
" please use spark.executor.resource.gpu.amount"))
}

Expand All @@ -450,7 +451,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
}
// also just set yarn one that we don't convert
conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5")
conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}", "5")
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
Expand Down
Loading