Skip to content

Commit 6e62a56

Browse files
committed
[SPARK-47177][SQL] Cached SQL plan do not display final AQE plan in explain string
### What changes were proposed in this pull request? This pr adds lock for ExplainUtils.processPlan to avoid tag race condition. ### Why are the changes needed? To fix the issue [SPARK-47177](https://issues.apache.org/jira/browse/SPARK-47177) ### Does this PR introduce _any_ user-facing change? yes, affect plan explain ### How was this patch tested? add test ### Was this patch authored or co-authored using generative AI tooling? no Closes #45282 from ulysses-you/SPARK-47177. Authored-by: ulysses-you <[email protected]> Signed-off-by: youxiduo <[email protected]>
1 parent 73aa144 commit 6e62a56

File tree

4 files changed

+38
-28
lines changed

4 files changed

+38
-28
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/trees/TreeNode.scala

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,18 +1009,19 @@ abstract class TreeNode[BaseType <: TreeNode[BaseType]]
10091009
append(str)
10101010
append("\n")
10111011

1012-
if (innerChildren.nonEmpty) {
1012+
val innerChildrenLocal = innerChildren
1013+
if (innerChildrenLocal.nonEmpty) {
10131014
lastChildren.add(children.isEmpty)
10141015
lastChildren.add(false)
1015-
innerChildren.init.foreach(_.generateTreeString(
1016+
innerChildrenLocal.init.foreach(_.generateTreeString(
10161017
depth + 2, lastChildren, append, verbose,
10171018
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent))
10181019
lastChildren.remove(lastChildren.size() - 1)
10191020
lastChildren.remove(lastChildren.size() - 1)
10201021

10211022
lastChildren.add(children.isEmpty)
10221023
lastChildren.add(true)
1023-
innerChildren.last.generateTreeString(
1024+
innerChildrenLocal.last.generateTreeString(
10241025
depth + 2, lastChildren, append, verbose,
10251026
addSuffix = addSuffix, maxFields = maxFields, printNodeId = printNodeId, indent = indent)
10261027
lastChildren.remove(lastChildren.size() - 1)

sql/core/src/main/scala/org/apache/spark/sql/execution/ExplainUtils.scala

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,12 @@ object ExplainUtils extends AdaptiveSparkPlanHelper {
7575
* Given a input physical plan, performs the following tasks.
7676
* 1. Generates the explain output for the input plan excluding the subquery plans.
7777
* 2. Generates the explain output for each subquery referenced in the plan.
78+
*
79+
* Note that, ideally this is a no-op as different explain actions operate on different plan,
80+
* instances but cached plan is an exception. The `InMemoryRelation#innerChildren` use a shared
81+
* plan instance across multi-queries. Add lock for this method to avoid tag race condition.
7882
*/
79-
def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = {
83+
def processPlan[T <: QueryPlan[T]](plan: T, append: String => Unit): Unit = synchronized {
8084
try {
8185
// Initialize a reference-unique set of Operators to avoid accdiental overwrites and to allow
8286
// intentional overwriting of IDs generated in previous AQE iteration

sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -403,17 +403,7 @@ case class InMemoryRelation(
403403

404404
@volatile var statsOfPlanToCache: Statistics = null
405405

406-
407-
override lazy val innerChildren: Seq[SparkPlan] = {
408-
// The cachedPlan needs to be cloned here because it does not get cloned when SparkPlan.clone is
409-
// called. This is a problem because when the explain output is generated for
410-
// a plan it traverses the innerChildren and modifies their TreeNode.tags. If the plan is not
411-
// cloned, there is a thread safety issue in the case that two plans with a shared cache
412-
// operator have explain called at the same time. The cachedPlan cannot be cloned because
413-
// it contains stateful information so we only clone it for the purpose of generating the
414-
// explain output.
415-
Seq(cachedPlan.clone())
416-
}
406+
override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
417407

418408
override def doCanonicalize(): logical.LogicalPlan =
419409
copy(output = output.map(QueryPlan.normalizeExpressions(_, output)),

sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala

Lines changed: 28 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -18,27 +18,42 @@
1818
package org.apache.spark.sql.execution.columnar
1919

2020
import org.apache.spark.SparkFunSuite
21+
import org.apache.spark.sql.execution.SparkPlan
22+
import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper
23+
import org.apache.spark.sql.functions.expr
2124
import org.apache.spark.sql.test.SharedSparkSessionBase
2225
import org.apache.spark.storage.StorageLevel
2326

24-
class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase {
25-
test("SPARK-43157: Clone innerChildren cached plan") {
26-
val d = spark.range(1)
27-
val relation = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None)
28-
val cloned = relation.clone().asInstanceOf[InMemoryRelation]
29-
30-
val relationCachedPlan = relation.innerChildren.head
31-
val clonedCachedPlan = cloned.innerChildren.head
32-
33-
// verify the plans are not the same object but are logically equivalent
34-
assert(!relationCachedPlan.eq(clonedCachedPlan))
35-
assert(relationCachedPlan === clonedCachedPlan)
36-
}
27+
class InMemoryRelationSuite extends SparkFunSuite
28+
with SharedSparkSessionBase with AdaptiveSparkPlanHelper {
3729

3830
test("SPARK-46779: InMemoryRelations with the same cached plan are semantically equivalent") {
3931
val d = spark.range(1)
4032
val r1 = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None)
4133
val r2 = r1.withOutput(r1.output.map(_.newInstance()))
4234
assert(r1.sameResult(r2))
4335
}
36+
37+
test("SPARK-47177: Cached SQL plan do not display final AQE plan in explain string") {
38+
def findIMRInnerChild(p: SparkPlan): SparkPlan = {
39+
val tableCache = find(p) {
40+
case _: InMemoryTableScanExec => true
41+
case _ => false
42+
}
43+
assert(tableCache.isDefined)
44+
tableCache.get.asInstanceOf[InMemoryTableScanExec].relation.innerChildren.head
45+
}
46+
47+
val d1 = spark.range(1).withColumn("key", expr("id % 100"))
48+
.groupBy("key").agg(Map("key" -> "count"))
49+
val cached_d2 = d1.cache()
50+
val df = cached_d2.withColumn("key2", expr("key % 10"))
51+
.groupBy("key2").agg(Map("key2" -> "count"))
52+
53+
assert(findIMRInnerChild(df.queryExecution.executedPlan).treeString
54+
.contains("AdaptiveSparkPlan isFinalPlan=false"))
55+
df.collect()
56+
assert(findIMRInnerChild(df.queryExecution.executedPlan).treeString
57+
.contains("AdaptiveSparkPlan isFinalPlan=true"))
58+
}
4459
}

0 commit comments

Comments
 (0)