diff --git a/docs/running-on-yarn.md b/docs/running-on-yarn.md
index dc93e9cea5bc..9d9b253a5c8e 100644
--- a/docs/running-on-yarn.md
+++ b/docs/running-on-yarn.md
@@ -142,20 +142,20 @@ To use a custom metrics.properties for the application master and executors, upd
- spark.yarn.am.resource.{resource-type} |
+ spark.yarn.am.resource.{resource-type}.amount |
(none) |
Amount of resource to use for the YARN Application Master in client mode.
- In cluster mode, use spark.yarn.driver.resource.<resource-type> instead.
+ In cluster mode, use spark.yarn.driver.resource.<resource-type>.amount instead.
Please note that this feature can be used only with YARN 3.0+
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
Example:
- To request GPU resources from YARN, use: spark.yarn.am.resource.yarn.io/gpu
+ To request GPU resources from YARN, use: spark.yarn.am.resource.yarn.io/gpu.amount
|
- spark.yarn.driver.resource.{resource-type} |
+ spark.yarn.driver.resource.{resource-type}.amount |
(none) |
Amount of resource to use for the YARN Application Master in cluster mode.
@@ -163,11 +163,11 @@ To use a custom metrics.properties for the application master and executors, upd
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
Example:
- To request GPU resources from YARN, use: spark.yarn.driver.resource.yarn.io/gpu
+ To request GPU resources from YARN, use: spark.yarn.driver.resource.yarn.io/gpu.amount
|
- spark.yarn.executor.resource.{resource-type} |
+ spark.yarn.executor.resource.{resource-type}.amount |
(none) |
Amount of resource to use per executor process.
@@ -175,7 +175,7 @@ To use a custom metrics.properties for the application master and executors, upd
For reference, see YARN Resource Model documentation: https://hadoop.apache.org/docs/r3.0.1/hadoop-yarn/hadoop-yarn-site/ResourceModel.html
Example:
- To request GPU resources from YARN, use: spark.yarn.executor.resource.yarn.io/gpu
+ To request GPU resources from YARN, use: spark.yarn.executor.resource.yarn.io/gpu.amount
|
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
index 5b361d17c01a..651e706021fc 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/Client.scala
@@ -51,7 +51,7 @@ import org.apache.spark.{SecurityManager, SparkConf, SparkException}
import org.apache.spark.api.python.PythonUtils
import org.apache.spark.deploy.{SparkApplication, SparkHadoopUtil}
import org.apache.spark.deploy.security.HadoopDelegationTokenManager
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
@@ -241,12 +241,12 @@ private[spark] class Client(
newApp: YarnClientApplication,
containerContext: ContainerLaunchContext): ApplicationSubmissionContext = {
- val yarnAMResources =
- if (isClusterMode) {
- sparkConf.getAllWithPrefix(config.YARN_DRIVER_RESOURCE_TYPES_PREFIX).toMap
- } else {
- sparkConf.getAllWithPrefix(config.YARN_AM_RESOURCE_TYPES_PREFIX).toMap
- }
+ val componentName = if (isClusterMode) {
+ config.YARN_DRIVER_RESOURCE_TYPES_PREFIX
+ } else {
+ config.YARN_AM_RESOURCE_TYPES_PREFIX
+ }
+ val yarnAMResources = getYarnResourcesAndAmounts(sparkConf, componentName)
val amResources = yarnAMResources ++
getYarnResourcesFromSparkResources(SPARK_DRIVER_PREFIX, sparkConf)
logDebug(s"AM resources: $amResources")
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
index cb0c68d1d346..522c16b3a108 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/ResourceRequestHelper.scala
@@ -26,11 +26,11 @@ import scala.util.Try
import org.apache.hadoop.yarn.api.records.Resource
import org.apache.spark.{SparkConf, SparkException}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils.{AMOUNT, FPGA, GPU}
import org.apache.spark.util.{CausedBy, Utils}
/**
@@ -40,6 +40,45 @@ import org.apache.spark.util.{CausedBy, Utils}
private object ResourceRequestHelper extends Logging {
private val AMOUNT_AND_UNIT_REGEX = "([0-9]+)([A-Za-z]*)".r
private val RESOURCE_INFO_CLASS = "org.apache.hadoop.yarn.api.records.ResourceInformation"
+ val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
+ val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
+
+ private[yarn] def getYarnResourcesAndAmounts(
+ sparkConf: SparkConf,
+ componentName: String): Map[String, String] = {
+ sparkConf.getAllWithPrefix(s"$componentName").map { case (key, value) =>
+ val splitIndex = key.lastIndexOf('.')
+ if (splitIndex == -1) {
+ val errorMessage = s"Missing suffix for ${componentName}${key}, you must specify" +
+ s" a suffix - $AMOUNT is currently the only supported suffix."
+ throw new IllegalArgumentException(errorMessage.toString())
+ }
+ val resourceName = key.substring(0, splitIndex)
+ val resourceSuffix = key.substring(splitIndex + 1)
+ if (!AMOUNT.equals(resourceSuffix)) {
+ val errorMessage = s"Unsupported suffix: $resourceSuffix in: ${componentName}${key}, " +
+ s"only .$AMOUNT is supported."
+ throw new IllegalArgumentException(errorMessage.toString())
+ }
+ (resourceName, value)
+ }.toMap
+ }
+
+ /**
+ * Convert Spark resources into YARN resources.
+ * The only resources we know how to map from spark configs to yarn configs are
+ * gpus and fpgas, everything else the user has to specify them in both the
+ * spark.yarn.*.resource and the spark.*.resource configs.
+ */
+ private[yarn] def getYarnResourcesFromSparkResources(
+ confPrefix: String,
+ sparkConf: SparkConf
+ ): Map[String, String] = {
+ Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map {
+ case (rName, yarnName) =>
+ (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0"))
+ }.filter { case (_, count) => count.toLong > 0 }
+ }
/**
* Validates sparkConf and throws a SparkException if any of standard resources (memory or cores)
@@ -81,8 +120,9 @@ private object ResourceRequestHelper extends Logging {
val errorMessage = new mutable.StringBuilder()
resourceDefinitions.foreach { case (sparkName, resourceRequest) =>
- if (sparkConf.contains(resourceRequest)) {
- errorMessage.append(s"Error: Do not use $resourceRequest, " +
+ val resourceRequestAmount = s"${resourceRequest}.${AMOUNT}"
+ if (sparkConf.contains(resourceRequestAmount)) {
+ errorMessage.append(s"Error: Do not use $resourceRequestAmount, " +
s"please use $sparkName instead!\n")
}
}
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
index 6e634b921fcd..8ec7bd66b250 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocator.scala
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.client.api.AMRMClient.ContainerRequest
import org.apache.hadoop.yarn.conf.YarnConfiguration
import org.apache.spark.{SecurityManager, SparkConf, SparkException}
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.Logging
@@ -142,8 +143,8 @@ private[yarn] class YarnAllocator(
protected val executorCores = sparkConf.get(EXECUTOR_CORES)
private val executorResourceRequests =
- sparkConf.getAllWithPrefix(config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX).toMap ++
- getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
+ getYarnResourcesAndAmounts(sparkConf, config.YARN_EXECUTOR_RESOURCE_TYPES_PREFIX) ++
+ getYarnResourcesFromSparkResources(SPARK_EXECUTOR_PREFIX, sparkConf)
// Resource capability requested for each executor
private[yarn] val resource: Resource = {
diff --git a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
index 6b87eec795f9..11035520ae18 100644
--- a/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
+++ b/resource-managers/yarn/src/main/scala/org/apache/spark/deploy/yarn/YarnSparkHadoopUtil.scala
@@ -41,24 +41,6 @@ object YarnSparkHadoopUtil {
val MEMORY_OVERHEAD_MIN = 384L
val ANY_HOST = "*"
- val YARN_GPU_RESOURCE_CONFIG = "yarn.io/gpu"
- val YARN_FPGA_RESOURCE_CONFIG = "yarn.io/fpga"
-
- /**
- * Convert Spark resources into YARN resources.
- * The only resources we know how to map from spark configs to yarn configs are
- * gpus and fpgas, everything else the user has to specify them in both the
- * spark.yarn.*.resource and the spark.*.resource configs.
- */
- private[yarn] def getYarnResourcesFromSparkResources(
- confPrefix: String,
- sparkConf: SparkConf
- ): Map[String, String] = {
- Map(GPU -> YARN_GPU_RESOURCE_CONFIG, FPGA -> YARN_FPGA_RESOURCE_CONFIG).map {
- case (rName, yarnName) =>
- (yarnName -> sparkConf.get(ResourceID(confPrefix, rName).amountConf, "0"))
- }.filter { case (_, count) => count.toLong > 0 }
- }
// All RM requests are issued with same priority : we do not (yet) have any distinction between
// request types (like map/reduce in hadoop for example)
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
index d5f1992a09f5..847fc3773de5 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ClientSuite.scala
@@ -38,10 +38,11 @@ import org.mockito.Mockito.{spy, verify}
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite, TestUtils}
-import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
import org.apache.spark.resource.ResourceID
+import org.apache.spark.resource.ResourceUtils.AMOUNT
import org.apache.spark.util.{SparkConfWithEnv, Utils}
class ClientSuite extends SparkFunSuite with Matchers {
@@ -372,7 +373,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, deployMode)
resources.foreach { case (name, v) =>
- conf.set(prefix + name, v.toString)
+ conf.set(s"${prefix}${name}.${AMOUNT}", v.toString)
}
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
@@ -397,7 +398,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.keys.foreach { yarnName =>
- conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
+ conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
}
resources.values.foreach { rName =>
conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
@@ -407,9 +408,9 @@ class ClientSuite extends SparkFunSuite with Matchers {
ResourceRequestHelper.validateResources(conf)
}.getMessage()
- assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga," +
+ assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/fpga.amount," +
" please use spark.driver.resource.fpga.amount"))
- assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu," +
+ assert(error.contains("Do not use spark.yarn.driver.resource.yarn.io/gpu.amount," +
" please use spark.driver.resource.gpu.amount"))
}
@@ -420,7 +421,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
val conf = new SparkConf().set(SUBMIT_DEPLOY_MODE, "cluster")
resources.keys.foreach { yarnName =>
- conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}", "2")
+ conf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnName}.${AMOUNT}", "2")
}
resources.values.foreach { rName =>
conf.set(ResourceID(SPARK_EXECUTOR_PREFIX, rName).amountConf, "3")
@@ -430,9 +431,9 @@ class ClientSuite extends SparkFunSuite with Matchers {
ResourceRequestHelper.validateResources(conf)
}.getMessage()
- assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga," +
+ assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/fpga.amount," +
" please use spark.executor.resource.fpga.amount"))
- assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu," +
+ assert(error.contains("Do not use spark.yarn.executor.resource.yarn.io/gpu.amount," +
" please use spark.executor.resource.gpu.amount"))
}
@@ -450,7 +451,7 @@ class ClientSuite extends SparkFunSuite with Matchers {
conf.set(ResourceID(SPARK_DRIVER_PREFIX, rName).amountConf, "3")
}
// also just set yarn one that we don't convert
- conf.set(YARN_DRIVER_RESOURCE_TYPES_PREFIX + yarnMadeupResource, "5")
+ conf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}", "5")
val appContext = Records.newRecord(classOf[ApplicationSubmissionContext])
val getNewApplicationResponse = Records.newRecord(classOf[GetNewApplicationResponse])
val containerLaunchContext = Records.newRecord(classOf[ContainerLaunchContext])
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
index 9e3cc6ec01df..f5ec531e26e0 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/ResourceRequestHelperSuite.scala
@@ -22,9 +22,11 @@ import org.apache.hadoop.yarn.util.Records
import org.scalatest.Matchers
import org.apache.spark.{SparkConf, SparkException, SparkFunSuite}
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.ResourceRequestTestHelper.ResourceInformation
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config.{DRIVER_CORES, DRIVER_MEMORY, EXECUTOR_CORES, EXECUTOR_MEMORY}
+import org.apache.spark.resource.ResourceUtils.AMOUNT
class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
@@ -32,16 +34,18 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
private val CUSTOM_RES_2 = "custom-resource-type-2"
private val MEMORY = "memory"
private val CORES = "cores"
- private val NEW_CONFIG_EXECUTOR_MEMORY = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + MEMORY
- private val NEW_CONFIG_EXECUTOR_CORES = YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + CORES
- private val NEW_CONFIG_AM_MEMORY = YARN_AM_RESOURCE_TYPES_PREFIX + MEMORY
- private val NEW_CONFIG_AM_CORES = YARN_AM_RESOURCE_TYPES_PREFIX + CORES
- private val NEW_CONFIG_DRIVER_MEMORY = YARN_DRIVER_RESOURCE_TYPES_PREFIX + MEMORY
- private val NEW_CONFIG_DRIVER_CORES = YARN_DRIVER_RESOURCE_TYPES_PREFIX + CORES
+ private val NEW_CONFIG_EXECUTOR_MEMORY =
+ s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}"
+ private val NEW_CONFIG_EXECUTOR_CORES =
+ s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}"
+ private val NEW_CONFIG_AM_MEMORY = s"${YARN_AM_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}"
+ private val NEW_CONFIG_AM_CORES = s"${YARN_AM_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}"
+ private val NEW_CONFIG_DRIVER_MEMORY = s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${MEMORY}.${AMOUNT}"
+ private val NEW_CONFIG_DRIVER_CORES = s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${CORES}.${AMOUNT}"
test("empty SparkConf should be valid") {
val sparkConf = new SparkConf()
- ResourceRequestHelper.validateResources(sparkConf)
+ validateResources(sparkConf)
}
test("just normal resources are defined") {
@@ -50,7 +54,44 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
sparkConf.set(DRIVER_CORES.key, "4")
sparkConf.set(EXECUTOR_MEMORY.key, "4G")
sparkConf.set(EXECUTOR_CORES.key, "2")
- ResourceRequestHelper.validateResources(sparkConf)
+ validateResources(sparkConf)
+ }
+
+ test("get yarn resources from configs") {
+ val sparkConf = new SparkConf()
+ val resources = Map(YARN_GPU_RESOURCE_CONFIG -> "2G",
+ YARN_FPGA_RESOURCE_CONFIG -> "3G", "custom" -> "4")
+ resources.foreach { case (name, value) =>
+ sparkConf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
+ sparkConf.set(s"${YARN_DRIVER_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
+ sparkConf.set(s"${YARN_AM_RESOURCE_TYPES_PREFIX}${name}.${AMOUNT}", value)
+ }
+ var parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX)
+ assert(parsedResources === resources)
+ parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_DRIVER_RESOURCE_TYPES_PREFIX)
+ assert(parsedResources === resources)
+ parsedResources = getYarnResourcesAndAmounts(sparkConf, YARN_AM_RESOURCE_TYPES_PREFIX)
+ assert(parsedResources === resources)
+ }
+
+ test("get invalid yarn resources from configs") {
+ val sparkConf = new SparkConf()
+
+ val missingAmountConfig = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}missingAmount"
+ // missing .amount
+ sparkConf.set(missingAmountConfig, "2g")
+ var thrown = intercept[IllegalArgumentException] {
+ getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX)
+ }
+ thrown.getMessage should include("Missing suffix for")
+
+ sparkConf.remove(missingAmountConfig)
+ sparkConf.set(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}customResource.invalid", "2g")
+
+ thrown = intercept[IllegalArgumentException] {
+ getYarnResourcesAndAmounts(sparkConf, YARN_EXECUTOR_RESOURCE_TYPES_PREFIX)
+ }
+ thrown.getMessage should include("Unsupported suffix")
}
Seq(
@@ -60,14 +101,14 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
ResourceInformation(CUSTOM_RES_2, 10, "G"))
).foreach { case (name, resources) =>
test(s"valid request: $name") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ assume(isYarnResourceTypesAvailable())
val resourceDefs = resources.map { r => r.name }
val requests = resources.map { r => (r.name, r.value.toString + r.unit) }.toMap
ResourceRequestTestHelper.initializeResourceTypes(resourceDefs)
val resource = createResource()
- ResourceRequestHelper.setResourceRequests(requests, resource)
+ setResourceRequests(requests, resource)
resources.foreach { r =>
val requested = ResourceRequestTestHelper.getResourceInformationByName(resource, r.name)
@@ -82,12 +123,12 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
("invalid unit", CUSTOM_RES_1, "123ppp")
).foreach { case (name, key, value) =>
test(s"invalid request: $name") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ assume(isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(Seq(key))
val resource = createResource()
val thrown = intercept[IllegalArgumentException] {
- ResourceRequestHelper.setResourceRequests(Map(key -> value), resource)
+ setResourceRequests(Map(key -> value), resource)
}
thrown.getMessage should include (key)
}
@@ -95,20 +136,20 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
Seq(
NEW_CONFIG_EXECUTOR_MEMORY -> "30G",
- YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "memory-mb" -> "30G",
- YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "mb" -> "30G",
+ s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}memory-mb.$AMOUNT" -> "30G",
+ s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}mb.$AMOUNT" -> "30G",
NEW_CONFIG_EXECUTOR_CORES -> "5",
- YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "vcores" -> "5",
+ s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}vcores.$AMOUNT" -> "5",
NEW_CONFIG_AM_MEMORY -> "1G",
NEW_CONFIG_DRIVER_MEMORY -> "1G",
NEW_CONFIG_AM_CORES -> "3",
NEW_CONFIG_DRIVER_CORES -> "1G"
).foreach { case (key, value) =>
test(s"disallowed resource request: $key") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ assume(isYarnResourceTypesAvailable())
val conf = new SparkConf(false).set(key, value)
val thrown = intercept[SparkException] {
- ResourceRequestHelper.validateResources(conf)
+ validateResources(conf)
}
thrown.getMessage should include (key)
}
@@ -126,7 +167,7 @@ class ResourceRequestHelperSuite extends SparkFunSuite with Matchers {
sparkConf.set(NEW_CONFIG_DRIVER_MEMORY, "2G")
val thrown = intercept[SparkException] {
- ResourceRequestHelper.validateResources(sparkConf)
+ validateResources(sparkConf)
}
thrown.getMessage should (
include(NEW_CONFIG_EXECUTOR_MEMORY) and
diff --git a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
index ca89af26230f..4ac27ede6483 100644
--- a/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
+++ b/resource-managers/yarn/src/test/scala/org/apache/spark/deploy/yarn/YarnAllocatorSuite.scala
@@ -31,9 +31,11 @@ import org.scalatest.{BeforeAndAfterEach, Matchers}
import org.apache.spark.{SecurityManager, SparkConf, SparkFunSuite}
import org.apache.spark.deploy.SparkHadoopUtil
+import org.apache.spark.deploy.yarn.ResourceRequestHelper._
import org.apache.spark.deploy.yarn.YarnSparkHadoopUtil._
import org.apache.spark.deploy.yarn.config._
import org.apache.spark.internal.config._
+import org.apache.spark.resource.ResourceUtils.{AMOUNT, GPU}
import org.apache.spark.resource.TestResourceIDs._
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.scheduler.SplitInfo
@@ -160,12 +162,12 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
test("custom resource requested from yarn") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ assume(isYarnResourceTypesAvailable())
ResourceRequestTestHelper.initializeResourceTypes(List("gpu"))
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
val handler = createAllocator(1, mockAmClient,
- Map(YARN_EXECUTOR_RESOURCE_TYPES_PREFIX + "gpu" -> "2G"))
+ Map(s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${GPU}.${AMOUNT}" -> "2G"))
handler.updateResourceRequests()
val container = createContainer("host1", resource = handler.resource)
@@ -174,7 +176,7 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
// get amount of memory and vcores from resource, so effectively skipping their validation
val expectedResources = Resource.newInstance(handler.resource.getMemory(),
handler.resource.getVirtualCores)
- ResourceRequestHelper.setResourceRequests(Map("gpu" -> "2G"), expectedResources)
+ setResourceRequests(Map("gpu" -> "2G"), expectedResources)
val captor = ArgumentCaptor.forClass(classOf[ContainerRequest])
verify(mockAmClient).addContainerRequest(captor.capture())
@@ -183,15 +185,16 @@ class YarnAllocatorSuite extends SparkFunSuite with Matchers with BeforeAndAfter
}
test("custom spark resource mapped to yarn resource configs") {
- assume(ResourceRequestHelper.isYarnResourceTypesAvailable())
+ assume(isYarnResourceTypesAvailable())
val yarnMadeupResource = "yarn.io/madeup"
val yarnResources = Seq(YARN_GPU_RESOURCE_CONFIG, YARN_FPGA_RESOURCE_CONFIG, yarnMadeupResource)
ResourceRequestTestHelper.initializeResourceTypes(yarnResources)
val mockAmClient = mock(classOf[AMRMClient[ContainerRequest]])
+ val madeupConfigName = s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}.${AMOUNT}"
val sparkResources =
Map(EXECUTOR_GPU_ID.amountConf -> "3",
EXECUTOR_FPGA_ID.amountConf -> "2",
- s"${YARN_EXECUTOR_RESOURCE_TYPES_PREFIX}${yarnMadeupResource}" -> "5")
+ madeupConfigName -> "5")
val handler = createAllocator(1, mockAmClient, sparkResources)
handler.updateResourceRequests()