diff --git a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala index 4188a9038aa2..f25a0171205b 100644 --- a/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala +++ b/resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocator.scala @@ -455,7 +455,6 @@ class ExecutorPodsAllocator( .inNamespace(namespace) .resource(createdExecutorPod) .delete() - PVC_COUNTER.decrementAndGet() throw e } } diff --git a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala index e4c3a853d18a..a066775f7dab 100644 --- a/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala +++ b/resource-managers/kubernetes/core/src/test/scala/org/apache/spark/scheduler/cluster/k8s/ExecutorPodsAllocatorSuite.scala @@ -882,6 +882,47 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter { assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1) } + test("SPARK-41410: An exception during PVC creation should not increase PVC counter") { + val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1" + val confWithPVC = conf.clone + .set(KUBERNETES_DRIVER_OWN_PVC.key, "true") + .set(KUBERNETES_DRIVER_REUSE_PVC.key, "true") + .set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true") + .set(EXECUTOR_INSTANCES.key, "1") + .set(s"$prefix.mount.path", "/spark-local-dir") + .set(s"$prefix.mount.readOnly", "false") + .set(s"$prefix.option.claimName", "OnDemand") + .set(s"$prefix.option.sizeLimit", "200Gi") + .set(s"$prefix.option.storageClass", "gp3") + + when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr), + meq(kubernetesClient), any(classOf[ResourceProfile]))) + .thenAnswer((invocation: InvocationOnMock) => { + val k8sConf: KubernetesExecutorConf = invocation.getArgument(0) + KubernetesExecutorSpec( + executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId), + Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi"))) + }) + + podsAllocatorUnderTest = new ExecutorPodsAllocator( + confWithPVC, secMgr, executorBuilder, + kubernetesClient, snapshotsStore, waitForExecutorPodsClock) + podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend) + + val startTime = Instant.now.toEpochMilli + waitForExecutorPodsClock.setTime(startTime) + + val counter = PrivateMethod[AtomicInteger](Symbol("PVC_COUNTER"))() + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + + when(pvcResource.create()).thenThrow(new KubernetesClientException("PVC fails to create")) + intercept[KubernetesClientException] { + podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1)) + } + assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0) + assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0) + } + private def executorPodAnswer(): Answer[KubernetesExecutorSpec] = (invocation: InvocationOnMock) => { val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)