From d4c7abd550e1e073f0f971f6100a092a328038bf Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Wed, 20 Aug 2025 18:44:19 +0900 Subject: [PATCH 1/6] Prevent from interruption when an operation status is pending --- .../spark/sql/connect/execution/ExecuteThreadRunner.scala | 5 ++++- .../spark/sql/connect/service/ExecuteEventsManager.scala | 2 +- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 7c4ad7df66fc..1570574039af 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -29,7 +29,7 @@ import org.apache.spark.connect.proto import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.planner.InvalidInputErrors -import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag, SparkConnectService} +import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag, ExecuteStatus, SparkConnectService} import org.apache.spark.sql.connect.utils.ErrorUtils import org.apache.spark.util.Utils @@ -79,6 +79,9 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends * true if the thread is running and interrupted. */ private[connect] def interrupt(): Boolean = { + if (executeHolder.eventsManager.status == ExecuteStatus.Pending) { + return false + } var currentState = state.getAcquire() while (currentState == ThreadState.notStarted || currentState == ThreadState.started) { val newState = if (currentState == ThreadState.notStarted) { diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala index 61cd95621d15..718040ae1b51 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala @@ -70,7 +70,7 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { private def sessionStatus = sessionHolder.eventManager.status - private var _status: ExecuteStatus = ExecuteStatus.Pending + @volatile private var _status: ExecuteStatus = ExecuteStatus.Pending private var error = Option.empty[Boolean] From 223d6082723022fead96f01895e9c77819538f45 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Thu, 21 Aug 2025 00:28:20 +0900 Subject: [PATCH 2/6] Add test --- .../execution/ExecuteThreadRunner.scala | 5 +- .../sql/connect/service/ExecuteHolder.scala | 3 + .../SparkConnectExecuteHolderSuite.scala | 76 +++++++++++++++++++ 3 files changed, 80 insertions(+), 4 deletions(-) create mode 100644 sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala index 1570574039af..7c4ad7df66fc 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala @@ -29,7 +29,7 @@ import org.apache.spark.connect.proto import org.apache.spark.internal.{Logging, LogKeys} import org.apache.spark.sql.connect.common.ProtoUtils import org.apache.spark.sql.connect.planner.InvalidInputErrors -import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag, ExecuteStatus, SparkConnectService} +import org.apache.spark.sql.connect.service.{ExecuteHolder, ExecuteSessionTag, SparkConnectService} import org.apache.spark.sql.connect.utils.ErrorUtils import org.apache.spark.util.Utils @@ -79,9 +79,6 @@ private[connect] class ExecuteThreadRunner(executeHolder: ExecuteHolder) extends * true if the thread is running and interrupted. */ private[connect] def interrupt(): Boolean = { - if (executeHolder.eventsManager.status == ExecuteStatus.Pending) { - return false - } var currentState = state.getAcquire() while (currentState == ThreadState.notStarted || currentState == ThreadState.started) { val newState = if (currentState == ThreadState.notStarted) { diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 9d8603d95c65..2007de530596 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -243,6 +243,9 @@ private[connect] class ExecuteHolder( * true if it was not interrupted before, false if it was already interrupted. */ def interrupt(): Boolean = { + if (eventsManager.status == ExecuteStatus.Pending) { + return false + } runner.interrupt() } diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala new file mode 100644 index 000000000000..180a99466f1a --- /dev/null +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.connect.service + +import java.util.UUID + +import org.scalatestplus.mockito.MockitoSugar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.connect.proto +import org.apache.spark.connect.proto.{ExecutePlanRequest, Plan, UserContext} +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.util.ManualClock + +class SparkConnectExecuteHolderSuite + extends SparkFunSuite + with MockitoSugar + with SharedSparkSession { + + val DEFAULT_CLOCK = new ManualClock() + val DEFAULT_USER_ID = "1" + val DEFAULT_USER_NAME = "userName" + val DEFAULT_SESSION_ID = UUID.randomUUID.toString + val DEFAULT_QUERY_ID = UUID.randomUUID.toString + val DEFAULT_CLIENT_TYPE = "clientType" + + test("SPARK-53339: ExecuteHolder should ignore interruption when execute status is Pending") { + val executeHolder = setupExecuteHolder() + executeHolder.sessionHolder.eventManager.postStarted() + + assert(executeHolder.eventsManager.status == ExecuteStatus.Pending) + assert(!executeHolder.interrupt()) + + executeHolder.eventsManager.postStarted() + assert(executeHolder.eventsManager.status == ExecuteStatus.Started) + assert(executeHolder.interrupt()) + } + + def setupExecuteHolder(): ExecuteHolder = { + val relation = proto.Relation.newBuilder + .setLimit(proto.Limit.newBuilder.setLimit(10)) + .build() + + val executePlanRequest = ExecutePlanRequest + .newBuilder() + .setPlan(Plan.newBuilder().setRoot(relation)) + .setUserContext( + UserContext + .newBuilder() + .setUserId(DEFAULT_USER_ID) + .setUserName(DEFAULT_USER_NAME)) + .setSessionId(DEFAULT_SESSION_ID) + .setOperationId(DEFAULT_QUERY_ID) + .setClientType(DEFAULT_CLIENT_TYPE) + .build() + + val sessionHolder = SessionHolder(DEFAULT_USER_ID, DEFAULT_SESSION_ID, spark) + val executeKey = ExecuteKey(executePlanRequest, sessionHolder) + new ExecuteHolder(executeKey, executePlanRequest, sessionHolder) + } +} From bcb8a70a480e65c452c32ab0e6dd085abacf329f Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 26 Aug 2025 14:28:31 +0900 Subject: [PATCH 3/6] Modify the comment to be consistent with the change --- .../org/apache/spark/sql/connect/service/ExecuteHolder.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 2007de530596..a35b764215df 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -240,7 +240,8 @@ private[connect] class ExecuteHolder( * Interrupt the execution. Interrupts the running thread, which cancels all running Spark Jobs * and makes the execution throw an OPERATION_CANCELED error. * @return - * true if it was not interrupted before, false if it was already interrupted. + * true if the execution is interrupted, + * false if it was already interrupted or interruption was ignored. */ def interrupt(): Boolean = { if (eventsManager.status == ExecuteStatus.Pending) { From b7430bdcd716016a45486d621d854f23b69191f5 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Tue, 26 Aug 2025 16:19:46 +0900 Subject: [PATCH 4/6] Fix style --- .../org/apache/spark/sql/connect/service/ExecuteHolder.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index a35b764215df..4871a2e4ef99 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -240,8 +240,8 @@ private[connect] class ExecuteHolder( * Interrupt the execution. Interrupts the running thread, which cancels all running Spark Jobs * and makes the execution throw an OPERATION_CANCELED error. * @return - * true if the execution is interrupted, - * false if it was already interrupted or interruption was ignored. + * true if the execution is interrupted, false if it was already interrupted or interruption + * was ignored. */ def interrupt(): Boolean = { if (eventsManager.status == ExecuteStatus.Pending) { From 14ed938390b8c9b01c3601fee05826ac11de61fb Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 1 Sep 2025 13:56:09 +0900 Subject: [PATCH 5/6] Protect status transition by lock --- .../service/ExecuteEventsManager.scala | 65 +++++++++++++++---- .../sql/connect/service/ExecuteHolder.scala | 6 +- .../SparkConnectExecutionManager.scala | 5 +- .../service/ExecuteEventsManagerSuite.scala | 14 ++++ 4 files changed, 72 insertions(+), 18 deletions(-) diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala index 718040ae1b51..a7affac65a0e 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.connect.service +import java.util.concurrent.locks.ReentrantLock + import com.fasterxml.jackson.annotation.JsonIgnore import com.google.protobuf.Message @@ -78,6 +80,12 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { private var producedRowCount = Option.empty[Long] + /** + * A lock to avoid race conditions between transition from pending status and interrupt to this + * execution + */ + private val cancelLock = new ReentrantLock() + /** * @return * Last event posted by the Connect request @@ -140,6 +148,33 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { listenerBus.post(event) } + /** + * Post @link org.apache.spark.sql.connect.service.SparkListenerConnectOperationStarted. This + * post fails if the status is being canceled or already canceled. + * @return + * true if this post succeeds, false otherwise. + */ + def tryPostStarted(): Boolean = { + if (cancelLock.tryLock()) { + if (status == ExecuteStatus.Pending) { + try { + postStarted() + true + } finally { + cancelLock.unlock() + } + } else { + // The status has already transitioned from pending to canceled, or transitioned from + // canceled to closed. + assert(status == ExecuteStatus.Canceled || status == ExecuteStatus.Closed) + false + } + } else { + // The status is transitioning to canceled + false + } + } + /** * Post @link org.apache.spark.sql.connect.service.SparkListenerConnectOperationAnalyzed. * @@ -175,17 +210,25 @@ case class ExecuteEventsManager(executeHolder: ExecuteHolder, clock: Clock) { * Post @link org.apache.spark.sql.connect.service.SparkListenerConnectOperationCanceled. */ def postCanceled(): Unit = { - assertStatus( - List( - ExecuteStatus.Started, - ExecuteStatus.Analyzed, - ExecuteStatus.ReadyForExecution, - ExecuteStatus.Finished, - ExecuteStatus.Failed), - ExecuteStatus.Canceled) - canceled = Some(true) - listenerBus - .post(SparkListenerConnectOperationCanceled(jobTag, operationId, clock.getTimeMillis())) + // Transition to canceled status can happen on interrupt asynchronously with transition to + // started status. So those transition need to be protected by lock. + cancelLock.lock() + try { + assertStatus( + List( + ExecuteStatus.Pending, + ExecuteStatus.Started, + ExecuteStatus.Analyzed, + ExecuteStatus.ReadyForExecution, + ExecuteStatus.Finished, + ExecuteStatus.Failed), + ExecuteStatus.Canceled) + canceled = Some(true) + listenerBus + .post(SparkListenerConnectOperationCanceled(jobTag, operationId, clock.getTimeMillis())) + } finally { + cancelLock.unlock() + } } /** diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala index 4871a2e4ef99..9d8603d95c65 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala @@ -240,13 +240,9 @@ private[connect] class ExecuteHolder( * Interrupt the execution. Interrupts the running thread, which cancels all running Spark Jobs * and makes the execution throw an OPERATION_CANCELED error. * @return - * true if the execution is interrupted, false if it was already interrupted or interruption - * was ignored. + * true if it was not interrupted before, false if it was already interrupted. */ def interrupt(): Boolean = { - if (eventsManager.status == ExecuteStatus.Pending) { - return false - } runner.interrupt() } diff --git a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala index 35c4073fe93c..b4bac83d2b2c 100644 --- a/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala +++ b/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectExecutionManager.scala @@ -179,8 +179,9 @@ private[connect] class SparkConnectExecutionManager() extends Logging { responseObserver: StreamObserver[proto.ExecutePlanResponse]): ExecuteHolder = { val executeHolder = createExecuteHolder(executeKey, request, sessionHolder) try { - executeHolder.eventsManager.postStarted() - executeHolder.start() + if (executeHolder.eventsManager.tryPostStarted()) { + executeHolder.start() + } } catch { // Errors raised before the execution holder has finished spawning a thread are considered // plan execution failure, and the client should not try reattaching it afterwards. diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala index a17c76ae9528..68aebfe947a7 100644 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala +++ b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/ExecuteEventsManagerSuite.scala @@ -341,6 +341,20 @@ class ExecuteEventsManagerSuite } } + test("SPARK-53339: Try transition to started status") { + val events1 = setupEvents(ExecuteStatus.Pending) + events1.postCanceled() + assert(events1.status == ExecuteStatus.Canceled) + events1.tryPostStarted() + assert(events1.status == ExecuteStatus.Canceled) + + val events2 = setupEvents(ExecuteStatus.Pending) + events2.tryPostStarted() + assert(events2.status == ExecuteStatus.Started) + events2.postCanceled() + assert(events2.status == ExecuteStatus.Canceled) + } + def setupEvents( executeStatus: ExecuteStatus, sessionStatus: SessionStatus = SessionStatus.Started): ExecuteEventsManager = { From 029e3ef67abff22efb994217aa85e666748bb950 Mon Sep 17 00:00:00 2001 From: Kousuke Saruta Date: Mon, 1 Sep 2025 13:56:47 +0900 Subject: [PATCH 6/6] Remove SparkConnectExecuteHolderSuite --- .../SparkConnectExecuteHolderSuite.scala | 76 ------------------- 1 file changed, 76 deletions(-) delete mode 100644 sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala diff --git a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala b/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala deleted file mode 100644 index 180a99466f1a..000000000000 --- a/sql/connect/server/src/test/scala/org/apache/spark/sql/connect/service/SparkConnectExecuteHolderSuite.scala +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.connect.service - -import java.util.UUID - -import org.scalatestplus.mockito.MockitoSugar - -import org.apache.spark.SparkFunSuite -import org.apache.spark.connect.proto -import org.apache.spark.connect.proto.{ExecutePlanRequest, Plan, UserContext} -import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.util.ManualClock - -class SparkConnectExecuteHolderSuite - extends SparkFunSuite - with MockitoSugar - with SharedSparkSession { - - val DEFAULT_CLOCK = new ManualClock() - val DEFAULT_USER_ID = "1" - val DEFAULT_USER_NAME = "userName" - val DEFAULT_SESSION_ID = UUID.randomUUID.toString - val DEFAULT_QUERY_ID = UUID.randomUUID.toString - val DEFAULT_CLIENT_TYPE = "clientType" - - test("SPARK-53339: ExecuteHolder should ignore interruption when execute status is Pending") { - val executeHolder = setupExecuteHolder() - executeHolder.sessionHolder.eventManager.postStarted() - - assert(executeHolder.eventsManager.status == ExecuteStatus.Pending) - assert(!executeHolder.interrupt()) - - executeHolder.eventsManager.postStarted() - assert(executeHolder.eventsManager.status == ExecuteStatus.Started) - assert(executeHolder.interrupt()) - } - - def setupExecuteHolder(): ExecuteHolder = { - val relation = proto.Relation.newBuilder - .setLimit(proto.Limit.newBuilder.setLimit(10)) - .build() - - val executePlanRequest = ExecutePlanRequest - .newBuilder() - .setPlan(Plan.newBuilder().setRoot(relation)) - .setUserContext( - UserContext - .newBuilder() - .setUserId(DEFAULT_USER_ID) - .setUserName(DEFAULT_USER_NAME)) - .setSessionId(DEFAULT_SESSION_ID) - .setOperationId(DEFAULT_QUERY_ID) - .setClientType(DEFAULT_CLIENT_TYPE) - .build() - - val sessionHolder = SessionHolder(DEFAULT_USER_ID, DEFAULT_SESSION_ID, spark) - val executeKey = ExecuteKey(executePlanRequest, sessionHolder) - new ExecuteHolder(executeKey, executePlanRequest, sessionHolder) - } -}