Skip to content

Commit 14762b3

Browse files
committed
[SPARK-47177][SQL] Cached SQL plan do not display final AQE plan in explain string
### What changes were proposed in this pull request? This pr adds lock for ExplainUtils.processPlan to avoid tag race condition. ### Why are the changes needed? To fix the issue [SPARK-47177](https://issues.apache.org/jira/browse/SPARK-47177) ### Does this PR introduce _any_ user-facing change? yes, affect plan explain ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes #45282 from ulysses-you/SPARK-47177. Authored-by: ulysses-you <[email protected]> Signed-off-by: youxiduo <[email protected]> (cherry picked from commit 6e62a56) Signed-off-by: youxiduo <[email protected]>
1 parent 8991530 commit 14762b3

File tree

4 files changed

+38
-28
lines changed

4 files changed

+38
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,18 +1030,19 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
10301030
append(str)
10311031
append("\n")
10321032

1033-
if (innerChildren.nonEmpty) {
1033+
val innerChildrenLocal = innerChildren
1034+
if (innerChildrenLocal.nonEmpty) {
10341035
lastChildren.add(children.isEmpty)
10351036
lastChildren.add(false)
1036-
innerChildren.init.foreach(_.generateTreeString(
1037+
innerChildrenLocal.init.foreach(_.generateTreeString(
10371038
depth + 2, lastChildren, append, verbose,
10381039
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent))
10391040
lastChildren.remove(lastChildren.size() - 1)
10401041
lastChildren.remove(lastChildren.size() - 1)
10411042

10421043
lastChildren.add(children.isEmpty)
10431044
lastChildren.add(true)
1044-
innerChildren.last.generateTreeString(
1045+
innerChildrenLocal.last.generateTreeString(
10451046
depth + 2, lastChildren, append, verbose,
10461047
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent)
10471048
lastChildren.remove(lastChildren.size() - 1)

sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,12 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
7575
* Given a input physical plan, performs the following tasks.
7676
* 1. Generates the explain output for the input plan excluding the subquery plans.
7777
* 2. Generates the explain output for each subquery referenced in the plan.
78+
*
79+
* Note that, ideally this is a no-op as different explain actions operate on different plan,
80+
* instances but cached plan is an exception. The `InMemoryRelation#innerChildren` use a shared
81+
* plan instance across multi-queries. Add lock for this method to avoid tag race condition.
7882
*/
79-
def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
83+
def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = synchronized {
8084
try {
8185
// Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
8286
// intentional overwriting of IDs generated in previous AQE iteration

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -392,17 +392,7 @@ case class InMemoryRelation(
392392

393393
@volatile var statsOfPlanToCache: Statistics = null
394394

395-
396-
override lazy val innerChildren: Seq[SparkPlan] = {
397-
// The cachedPlan needs to be cloned here because it does not get cloned when SparkPlan.clone is
398-
// called. This is a problem because when the explain output is generated for
399-
// a plan it traverses the innerChildren and modifies their TreeNode.tags. If the plan is not
400-
// cloned, there is a thread safety issue in the case that two plans with a shared cache
401-
// operator have explain called at the same time. The cachedPlan cannot be cloned because
402-
// it contains stateful information so we only clone it for the purpose of generating the
403-
// explain output.
404-
Seq(cachedPlan.clone())
405-
}
395+
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
406396

407397
override def doCanonicalize(): logical.LogicalPlan =
408398
copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,42 @@
1818
package org.apache.spark.sql.execution.columnar
1919

2020
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.execution.SparkPlan
22+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
23+
import org.apache.spark.sql.functions.expr
2124
import org.apache.spark.sql.test.SharedSparkSessionBase
2225
import org.apache.spark.storage.StorageLevel
2326

24-
class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase {
25-
test("SPARK-43157: Clone innerChildren cached plan") {
26-
val d = spark.range(1)
27-
val relation = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None)
28-
val cloned = relation.clone().asInstanceOf[InMemoryRelation]
29-
30-
val relationCachedPlan = relation.innerChildren.head
31-
val clonedCachedPlan = cloned.innerChildren.head
32-
33-
// verify the plans are not the same object but are logically equivalent
34-
assert(!relationCachedPlan.eq(clonedCachedPlan))
35-
assert(relationCachedPlan === clonedCachedPlan)
36-
}
27+
class InMemoryRelationSuite extends SparkFunSuite
28+
with SharedSparkSessionBase with AdaptiveSparkPlanHelper {
3729

3830
test("SPARK-46779: InMemoryRelations with the same cached plan are semantically equivalent") {
3931
val d = spark.range(1)
4032
val r1 = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None)
4133
val r2 = r1.withOutput(r1.output.map(_.newInstance()))
4234
assert(r1.sameResult(r2))
4335
}
36+
37+
test("SPARK-47177: Cached SQL plan do not display final AQE plan in explain string") {
38+
def findIMRInnerChild(p: SparkPlan): SparkPlan = {
39+
val tableCache = find(p) {
40+
case _: InMemoryTableScanExec => true
41+
case _ => false
42+
}
43+
assert(tableCache.isDefined)
44+
tableCache.get.asInstanceOf[InMemoryTableScanExec].relation.innerChildren.head
45+
}
46+
47+
val d1 = spark.range(1).withColumn("key", expr("id % 100"))
48+
.groupBy("key").agg(Map("key" -> "count"))
49+
val cached_d2 = d1.cache()
50+
val df = cached_d2.withColumn("key2", expr("key % 10"))
51+
.groupBy("key2").agg(Map("key2" -> "count"))
52+
53+
assert(findIMRInnerChild(df.queryExecution.executedPlan).treeString
54+
.contains("AdaptiveSparkPlan isFinalPlan=false"))
55+
df.collect()
56+
assert(findIMRInnerChild(df.queryExecution.executedPlan).treeString
57+
.contains("AdaptiveSparkPlan isFinalPlan=true"))
58+
}
4459
}

0 commit comments

Comments
 (0)