Skip to content

Conversation

@sarutak
Copy link
Member

@sarutak sarutak commented Jul 25, 2025

What changes were proposed in this pull request?

This PR backports #51638 to branch-4.0.
This PR fixes an issue which happens when operations are interrupted, which is related to SPARK-50748 and SPARK-50889.

Regarding SPARK-50889, this issue happens if an execution thread for an operation id cleans up the corresponding ExecutionHolder as the result of interruption here before a response sender thread consumes a response here.
In this case, the cleanup finally calls ExecutorResponseObserver.removeAll() and all the responses are discarded, and the response sender thread can't escape this loop because neither gotResponse nor streamFinished becomes true.

The solution this PR proposes is changing the definition of streamFinished in ExecuteGrpcResponseSender so that a stream is regarded as finished in case the ExecutionResponseObserver is marked as completed and all the responses are discarded.
ExecutionResponseObserver.removeAll is called when the corresponding ExecutionHolder is closed or cleaned up by interruption so this solution could be reasonable.

Why are the changes needed?

To fix a potential issue.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Tested manually.
You can easily reproduce this issue without this change by inserting sleep to the test like as follows.

--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
@@ -331,6 +331,7 @@ class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
         // cancel
         val operationId = result.operationId
         val canceledId = spark.interruptOperation(operationId)
+        Thread.sleep(1000)
         assert(canceledId == Seq(operationId))
         // and check that it got canceled
         val e = intercept[SparkException] {

After this change applied, the test above doesn't hang.

Was this patch authored or co-authored using generative AI tooling?

No.

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you, @sarutak .

@dongjoon-hyun
Copy link
Member

cc @HyukjinKwon and @peter-toth

@dongjoon-hyun
Copy link
Member

Also, cc @grundprinzip and @hvanhovell too because this is a bug fix for Connect.

dongjoon-hyun pushed a commit that referenced this pull request Jul 27, 2025
…hich happens when operations are interrupted

### What changes were proposed in this pull request?
This PR backports #51638 to `branch-4.0`.
This PR fixes an issue which happens when operations are interrupted, which is related to SPARK-50748 and SPARK-50889.

Regarding SPARK-50889, this issue happens if an execution thread for an operation id cleans up the corresponding `ExecutionHolder` as the result of interruption [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L175) before a response sender thread consumes a response [here](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala#L183).
In this case, the cleanup finally calls `ExecutorResponseObserver.removeAll()` and all the responses are discarded, and the response sender thread can't escape [this loop](https://github.com/apache/spark/blob/a81d79256027708830bf714105f343d085a2f20c/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala#L245) because neither `gotResponse` nor `streamFinished` becomes true.

The solution this PR proposes is changing the definition of `streamFinished` in `ExecuteGrpcResponseSender` so that a stream is regarded as finished in case  the `ExecutionResponseObserver` is marked as completed and all the responses are discarded.
`ExecutionResponseObserver.removeAll` is called when the corresponding `ExecutionHolder` is closed or cleaned up by interruption so this solution could be reasonable.

### Why are the changes needed?
To fix a potential issue.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tested manually.
You can easily reproduce this issue without this change by inserting sleep to the test like as follows.
```
--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
 -331,6 +331,7  class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
         // cancel
         val operationId = result.operationId
         val canceledId = spark.interruptOperation(operationId)
+        Thread.sleep(1000)
         assert(canceledId == Seq(operationId))
         // and check that it got canceled
         val e = intercept[SparkException] {
```

After this change applied, the test above doesn't hang.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #51671 from sarutak/connect-race-condition.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
@dongjoon-hyun
Copy link
Member

dongjoon-hyun commented Jul 27, 2025

Merged to branch-4.0. Thank you, @sarutak and @peter-toth .

zifeif2 pushed a commit to zifeif2/spark that referenced this pull request Nov 14, 2025
…hich happens when operations are interrupted

### What changes were proposed in this pull request?
This PR backports apache#51638 to `branch-4.0`.
This PR fixes an issue which happens when operations are interrupted, which is related to SPARK-50748 and SPARK-50889.

Regarding SPARK-50889, this issue happens if an execution thread for an operation id cleans up the corresponding `ExecutionHolder` as the result of interruption [here](https://github.com/apache/spark/blob/bfec6692b102b172bdbcad7f983e2ec2844383c9/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteThreadRunner.scala#L175) before a response sender thread consumes a response [here](https://github.com/apache/spark/blob/bfec6692b102b172bdbcad7f983e2ec2844383c9/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala#L183).
In this case, the cleanup finally calls `ExecutorResponseObserver.removeAll()` and all the responses are discarded, and the response sender thread can't escape [this loop](https://github.com/apache/spark/blob/bfec6692b102b172bdbcad7f983e2ec2844383c9/sql/connect/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteGrpcResponseSender.scala#L245) because neither `gotResponse` nor `streamFinished` becomes true.

The solution this PR proposes is changing the definition of `streamFinished` in `ExecuteGrpcResponseSender` so that a stream is regarded as finished in case  the `ExecutionResponseObserver` is marked as completed and all the responses are discarded.
`ExecutionResponseObserver.removeAll` is called when the corresponding `ExecutionHolder` is closed or cleaned up by interruption so this solution could be reasonable.

### Why are the changes needed?
To fix a potential issue.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Tested manually.
You can easily reproduce this issue without this change by inserting sleep to the test like as follows.
```
--- a/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
+++ b/sql/connect/client/jvm/src/test/scala/org/apache/spark/sql/connect/SparkSessionE2ESuite.scala
 -331,6 +331,7  class SparkSessionE2ESuite extends ConnectFunSuite with RemoteSparkSession {
         // cancel
         val operationId = result.operationId
         val canceledId = spark.interruptOperation(operationId)
+        Thread.sleep(1000)
         assert(canceledId == Seq(operationId))
         // and check that it got canceled
         val e = intercept[SparkException] {
```

After this change applied, the test above doesn't hang.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#51671 from sarutak/connect-race-condition.

Authored-by: Kousuke Saruta <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants