Skip to content

Conversation

@dongjoon-hyun
Copy link
Member

@dongjoon-hyun dongjoon-hyun commented Dec 6, 2022

What changes were proposed in this pull request?

This PR aims to support PVC-oriented executor pod allocation which means Spark driver will create a fixed number of PVCs (= spark.executor.instances or spark.dynamicAllocation.maxExecutors) and hold on new executor pod creations if the number of created PVCs reached the limit.

Why are the changes needed?

This will allow Spark to hand over the existing PVCs from dead executors to new executors. Previously, Spark creates new executors without waiting the dead executors release their PVCs.

Does this PR introduce any user-facing change?

No, this is a new feature which is disabled by default.

How was this patch tested?

Pass the CIs with the newly added test case.

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you review this, @viirya ?

// Check reusable PVCs for this executor allocation batch
val reusablePVCs = getReusablePVCs(applicationId, pvcsInUse)
for ( _ <- 0 until numExecutorsToAllocate) {
if (reusablePVCs.isEmpty && reusePVC && maxPVCs <= PVC_COUNTER.get()) {
Copy link
Member

@viirya viirya Dec 6, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is reusablePVCs always less than or equal to maxPVCs?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for review.

Theoretically, reusablePVCs are all driver-owned PVCs whose creation time is bigger than podAllocationDelay + now. So, it can be bigger than maxPVCs is there is other PVC creation logic (For example, Spark driver plugin).

private def getReusablePVCs(applicationId: String, pvcsInUse: Seq[String]) = {
if (conf.get(KUBERNETES_DRIVER_OWN_PVC) && conf.get(KUBERNETES_DRIVER_REUSE_PVC) &&
driverPod.nonEmpty) {
try {
val createdPVCs = kubernetesClient
.persistentVolumeClaims
.inNamespace(namespace)
.withLabel("spark-app-selector", applicationId)
.list()
.getItems
.asScala
val now = Instant.now().toEpochMilli
val reusablePVCs = createdPVCs
.filterNot(pvc => pvcsInUse.contains(pvc.getMetadata.getName))
.filter(pvc => now - Instant.parse(pvc.getMetadata.getCreationTimestamp).toEpochMilli
> podAllocationDelay)
logInfo(s"Found ${reusablePVCs.size} reusable PVCs from ${createdPVCs.size} PVCs")
reusablePVCs

Also, previously, Spark creates new pod and PVCs when some executors are dead. In that case, PVCs could be created a little more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So even maxPVCs <= PVC_COUNTER.get(), if reusablePVCs is not empty, the driver will continue executor pod allocation?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, correct! When we have reusablePVCs, PVC-oriented executor pod allocation doesn't need to be blocked. We halts executor allocation only when there is no available PVCs and reached PVC_COUNTER is greater than or equal to the maximum .

Comment on lines +104 to +109
ConfigBuilder("spark.kubernetes.driver.waitToReusePersistentVolumeClaims")
.doc("If true, driver pod counts the number of created on-demand persistent volume claims " +
s"and wait if the number is greater than or equal to the maximum which is " +
s"${EXECUTOR_INSTANCES.key} or ${DYN_ALLOCATION_MAX_EXECUTORS.key}. " +
s"This config requires both ${KUBERNETES_DRIVER_OWN_PVC.key}=true and " +
s"${KUBERNETES_DRIVER_REUSE_PVC.key}=true.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to mention PVC-oriented executor pod allocation in the config description? I think it is more clear on what this feature is.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, initially, I tried to use it as a config name but PVC-oriented executor pod allocation was achieved by three configurations.

  • spark.kubernetes.driver.waitToReusePersistentVolumeClaims
  • spark.kubernetes.driver.ownPersistentVolumeClaims
  • spark.kubernetes.driver.reusePersistentVolumeClaims

I'll add a K8s document section with that name.

Comment on lines 59 to 60
private val reusePVC = conf.get(KUBERNETES_DRIVER_OWN_PVC) &&
conf.get(KUBERNETES_DRIVER_REUSE_PVC) && conf.get(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, if don't use KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC, cannot we still reuse PVCs?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this named reusePVC is a bit confused. Isn't it the combination of three configs you mentioned (https://github.com/apache/spark/pull/38943/files#r1041347366)?

Maybe podAllocOnPVC?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Let me rename~

Copy link
Member

@viirya viirya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks clarifying. Understood the proposed logic. Looks good to me. Remaining comment is about variable naming (https://github.com/apache/spark/pull/38943/files#r1041414732).

We also need to update K8s document (https://github.com/apache/spark/pull/38943/files#r1041347366) which can be in this PR or a followup.

@dongjoon-hyun
Copy link
Member Author

Thank you, @viirya . All test passed. Merged to master for Apache Spark 3.4.0.
I'll proceed to the documentation as the next PR.

@dongjoon-hyun dongjoon-hyun deleted the SPARK-41410 branch December 6, 2022 22:24
@tedyu
Copy link
Contributor

tedyu commented Dec 7, 2022

I think the PVC_COUNTER should only be decremented when the pod deletion happens (in response to error).
@dongjoon-hyun
What do you think of the following change ?

diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
index 4188a9038a..63fc29ea80 100644
--- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
+++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala
@@ -433,6 +433,7 @@ class ExecutorPodsAllocator(
         podWithAttachedContainer, resolvedExecutorSpec.executorKubernetesResources, reusablePVCs)
       val createdExecutorPod =
         kubernetesClient.pods().inNamespace(namespace).resource(podWithAttachedContainer).create()
+      var success = 0
       try {
         addOwnerReference(createdExecutorPod, resources)
         resources
@@ -445,6 +446,7 @@ class ExecutorPodsAllocator(
             logInfo(s"Trying to create PersistentVolumeClaim ${pvc.getMetadata.getName} with " +
               s"StorageClass ${pvc.getSpec.getStorageClassName}")
             kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
+            success += 1
             PVC_COUNTER.incrementAndGet()
           }
         newlyCreatedExecutors(newExecutorId) = (resourceProfileId, clock.getTimeMillis())
@@ -455,8 +457,12 @@ class ExecutorPodsAllocator(
             .inNamespace(namespace)
             .resource(createdExecutorPod)
             .delete()
-          PVC_COUNTER.decrementAndGet()
+          if (success == 1) {
+            success += 1
+          }
           throw e
+      } finally {
+          if (success == 2) PVC_COUNTER.decrementAndGet()
       }
     }
   }

If pvc isn't created successfully, it seems the counter shouldn't be decremented.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Dec 7, 2022

Thank you for review, @tedyu .
Could you make a PR with valid test case for your claim?

@dongjoon-hyun
Copy link
Member Author

Do you mean that .delete() can fail, @tedyu ?

@tedyu
Copy link
Contributor

tedyu commented Dec 7, 2022

Yeah - the delete in catch block may fail.
There could be other error, say prior to the creation of PVC.

@dongjoon-hyun
Copy link
Member Author

dongjoon-hyun commented Dec 7, 2022

In case of creation failure, PVC_COUNTER.incrementAndGet() is not invoked.

kubernetesClient.persistentVolumeClaims().inNamespace(namespace).resource(pvc).create()
PVC_COUNTER.incrementAndGet()

In case of deletion failure, PVC_COUNTER.decrementAndGet() is not invoked.

kubernetesClient.pods()
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
PVC_COUNTER.decrementAndGet()

So, we had better remove PVC_COUNTER.decrementAndGet(), right? If you agree, I will make a follow-up for the deletion of PVC_COUNTER.decrementAndGet() to address your comments.

@tedyu
Copy link
Contributor

tedyu commented Dec 7, 2022

The catch block handles errors beyond PVC creation failure.

        case NonFatal(e) =>

Execution may not reach the resource(pvc).create() call when the error happens.
So we would only know the cause by introducing additional flag, success shown above.

The decrementAndGet call should be executed in the scenario shown in #38948

@dongjoon-hyun
Copy link
Member Author

I commented on your PR.

@dongjoon-hyun
Copy link
Member Author

Here is a PR including test case to address the comment.

beliefer pushed a commit to beliefer/spark that referenced this pull request Dec 18, 2022
### What changes were proposed in this pull request?

This PR aims to support `PVC-oriented executor pod allocation` which means Spark driver will create a fixed number of PVCs (= `spark.executor.instances` or `spark.dynamicAllocation.maxExecutors`) and hold on new executor pod creations if the number of created PVCs reached the limit.

### Why are the changes needed?

This will allow Spark to hand over the existing PVCs from dead executors to new executors. Previously, Spark creates new executors without waiting the dead executors release their PVCs.

### Does this PR introduce _any_ user-facing change?

No, this is a new feature which is disabled by default.

### How was this patch tested?

Pass the CIs with the newly added test case.

Closes apache#38943 from dongjoon-hyun/SPARK-41410.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants