@@ -257,7 +257,9 @@ private[yarn] class YarnAllocator(
257257 * This must be synchronized because variables read in this method are mutated by other methods.
258258 */
259259 def allocateResources (): Unit = synchronized {
260- updateResourceRequests()
260+ val launchContainerCount =
261+ sparkConf.getInt(" spark.yarn.launchContainer.count.simultaneously" , - 1 )
262+ updateResourceRequests(launchContainerCount)
261263
262264 val progressIndicator = 0.1f
263265 // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
@@ -292,9 +294,16 @@ private[yarn] class YarnAllocator(
292294 * Visible for testing.
293295 */
294296 def updateResourceRequests (): Unit = {
297+ updateResourceRequests(- 1 )
298+ }
299+
300+ def updateResourceRequests (maxCount : Int ): Unit = {
295301 val pendingAllocate = getPendingAllocate
296302 val numPendingAllocate = pendingAllocate.size
297- val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
303+ var missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
304+ if (maxCount > 0 ) {
305+ missing = math.min(missing, maxCount)
306+ }
298307
299308 if (missing > 0 ) {
300309 logInfo(s " Will request $missing executor container(s), each with " +
@@ -431,18 +440,33 @@ private[yarn] class YarnAllocator(
431440 remainingAfterOffRackMatches)
432441 }
433442
434- if (! remainingAfterOffRackMatches.isEmpty) {
435- logDebug(s " Releasing ${remainingAfterOffRackMatches.size} unneeded containers that were " +
443+ var launchContainerCount =
444+ sparkConf.getInt(" spark.yarn.launchContainer.count.simultaneously" , - 1 )
445+ val containerNumToLaunch = {
446+ if (launchContainerCount <= 0 ) {
447+ containersToUse.size
448+ } else {
449+ math.min(launchContainerCount, containersToUse.size)
450+ }
451+ }
452+ val filteredContainersToUse = containersToUse.drop(containersToUse.size - containerNumToLaunch)
453+ val remainingAfterFilter
454+ = containersToUse -- filteredContainersToUse ++ remainingAfterOffRackMatches
455+
456+ if (! remainingAfterFilter.isEmpty) {
457+ logDebug(s " Releasing ${remainingAfterFilter.size} unneeded containers that were " +
436458 s " allocated to us " )
437- for (container <- remainingAfterOffRackMatches ) {
459+ for (container <- remainingAfterFilter ) {
438460 internalReleaseContainer(container)
439461 }
440462 }
441463
442- runAllocatedContainers(containersToUse)
464+ if (filteredContainersToUse.length > 0 ) {
465+ runAllocatedContainers(filteredContainersToUse)
466+ }
443467
444468 logInfo(" Received %d containers from YARN, launching executors on %d of them."
445- .format(allocatedContainers.size, containersToUse .size))
469+ .format(allocatedContainers.size, filteredContainersToUse .size))
446470 }
447471
448472 /**
0 commit comments