diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala index 9f65897d5158e..b48493e4ff027 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala @@ -389,7 +389,17 @@ case class InMemoryRelation( @volatile var statsOfPlanToCache: Statistics = null - override def innerChildren: Seq[SparkPlan] = Seq(cachedPlan) + + override lazy val innerChildren: Seq[SparkPlan] = { + // The cachedPlan needs to be cloned here because it does not get cloned when SparkPlan.clone is + // called. This is a problem because when the explain output is generated for + // a plan it traverses the innerChildren and modifies their TreeNode.tags. If the plan is not + // cloned, there is a thread safety issue in the case that two plans with a shared cache + // operator have explain called at the same time. The cachedPlan cannot be cloned because + // it contains stateful information so we only clone it for the purpose of generating the + // explain output. + Seq(cachedPlan.clone()) + } override def doCanonicalize(): logical.LogicalPlan = copy(output = output.map(QueryPlan.normalizeExpressions(_, cachedPlan.output)), diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala new file mode 100644 index 0000000000000..72b3a4bc1095a --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryRelationSuite.scala @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.columnar + +import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.test.SharedSparkSessionBase +import org.apache.spark.storage.StorageLevel + +class InMemoryRelationSuite extends SparkFunSuite with SharedSparkSessionBase { + test("SPARK-43157: Clone innerChildren cached plan") { + val d = spark.range(1) + val relation = InMemoryRelation(StorageLevel.MEMORY_ONLY, d.queryExecution, None) + val cloned = relation.clone().asInstanceOf[InMemoryRelation] + + val relationCachedPlan = relation.innerChildren.head + val clonedCachedPlan = cloned.innerChildren.head + + // verify the plans are not the same object but are logically equivalent + assert(!relationCachedPlan.eq(clonedCachedPlan)) + assert(relationCachedPlan === clonedCachedPlan) + } +}