Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,6 @@ class ExecutorPodsAllocator(
.inNamespace(namespace)
.resource(createdExecutorPod)
.delete()
PVC_COUNTER.decrementAndGet()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is for case 1, right? For case 2, we don't need to do anything as PVC_COUNTER was not changed for that case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, right for case 1 and 2. So, this is a complete and minimal patch, @viirya .

Copy link
Member Author

@dongjoon-hyun dongjoon-hyun Dec 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is used for driver-owned PVCs, the deletion of pods is irrelevant to the number of PVC. That was my logical bug at the initial patch.

throw e
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -882,6 +882,47 @@ class ExecutorPodsAllocatorSuite extends SparkFunSuite with BeforeAndAfter {
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 1)
}

test("SPARK-41410: An exception during PVC creation should not increase PVC counter") {
val prefix = "spark.kubernetes.executor.volumes.persistentVolumeClaim.spark-local-dir-1"
val confWithPVC = conf.clone
.set(KUBERNETES_DRIVER_OWN_PVC.key, "true")
.set(KUBERNETES_DRIVER_REUSE_PVC.key, "true")
.set(KUBERNETES_DRIVER_WAIT_TO_REUSE_PVC.key, "true")
.set(EXECUTOR_INSTANCES.key, "1")
.set(s"$prefix.mount.path", "/spark-local-dir")
.set(s"$prefix.mount.readOnly", "false")
.set(s"$prefix.option.claimName", "OnDemand")
.set(s"$prefix.option.sizeLimit", "200Gi")
.set(s"$prefix.option.storageClass", "gp3")

when(executorBuilder.buildFromFeatures(any(classOf[KubernetesExecutorConf]), meq(secMgr),
meq(kubernetesClient), any(classOf[ResourceProfile])))
.thenAnswer((invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
KubernetesExecutorSpec(
executorPodWithIdAndVolume(k8sConf.executorId.toInt, k8sConf.resourceProfileId),
Seq(persistentVolumeClaim("pvc-0", "gp3", "200Gi")))
})

podsAllocatorUnderTest = new ExecutorPodsAllocator(
confWithPVC, secMgr, executorBuilder,
kubernetesClient, snapshotsStore, waitForExecutorPodsClock)
podsAllocatorUnderTest.start(TEST_SPARK_APP_ID, schedulerBackend)

val startTime = Instant.now.toEpochMilli
waitForExecutorPodsClock.setTime(startTime)

val counter = PrivateMethod[AtomicInteger](Symbol("PVC_COUNTER"))()
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0)

when(pvcResource.create()).thenThrow(new KubernetesClientException("PVC fails to create"))
intercept[KubernetesClientException] {
podsAllocatorUnderTest.setTotalExpectedExecutors(Map(defaultProfile -> 1))
}
assert(podsAllocatorUnderTest.invokePrivate(counter).get() === 0)
assert(podsAllocatorUnderTest.numOutstandingPods.get() == 0)
}

private def executorPodAnswer(): Answer[KubernetesExecutorSpec] =
(invocation: InvocationOnMock) => {
val k8sConf: KubernetesExecutorConf = invocation.getArgument(0)
Expand Down