Skip to content

Commit 5df4c53

Browse files
committed
add T2 type parameter to ReuseMap and revert logic to ArrayBuffer
1 parent eaf4ad0 commit 5df4c53

File tree

3 files changed

+16
-20
lines changed

3 files changed

+16
-20
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/util/ReuseMap.scala

Lines changed: 11 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,7 @@
1717

1818
package org.apache.spark.sql.util
1919

20-
import scala.collection.mutable.Map
21-
import scala.language.existentials
20+
import scala.collection.mutable.{ArrayBuffer, Map}
2221

2322
import org.apache.spark.sql.catalyst.plans.QueryPlan
2423
import org.apache.spark.sql.types.StructType
@@ -29,11 +28,11 @@ import org.apache.spark.sql.types.StructType
2928
* To avoid costly canonicalization of a plan:
3029
* - we use its schema first to check if it can be replaced to a reused one at all
3130
* - we insert it into the map of canonicalized plans only when at least 2 have the same schema
31+
*
32+
* @tparam T the type of the node we want to reuse
3233
*/
33-
class ReuseMap[T <: QueryPlan[_]] {
34-
// scalastyle:off structural.type
35-
private val map = Map[StructType, (T, Map[T2 forSome { type T2 >: T }, T])]()
36-
// scalastyle:on structural.type
34+
class ReuseMap[T <: T2, T2 <: QueryPlan[T2]] {
35+
private val map = Map[StructType, ArrayBuffer[T]]()
3736

3837
/**
3938
* Find a matching plan with the same canonicalized form in the map or add the new plan to the
@@ -43,16 +42,12 @@ class ReuseMap[T <: QueryPlan[_]] {
4342
* @return the matching plan or the input plan
4443
*/
4544
def lookupOrElseAdd(plan: T): T = {
46-
// The first plan with a schema is not inserted to the `sameResultPlans` map immediately
47-
val (firstSameSchemaPlan, sameResultPlans) = map.getOrElseUpdate(plan.schema, plan -> Map())
48-
if (firstSameSchemaPlan ne plan) {
49-
// Add the first plan to the `sameResultPlans` map only when the 2nd plan arrives with the
50-
// same schema
51-
if (sameResultPlans.isEmpty) {
52-
sameResultPlans += firstSameSchemaPlan.canonicalized -> firstSameSchemaPlan
53-
}
54-
sameResultPlans.getOrElseUpdate(plan.canonicalized, plan)
45+
val sameSchema = map.getOrElseUpdate(plan.schema, ArrayBuffer())
46+
val samePlan = sameSchema.find(plan.sameResult)
47+
if (samePlan.isDefined) {
48+
samePlan.get
5549
} else {
50+
sameSchema += plan
5651
plan
5752
}
5853
}
@@ -63,6 +58,7 @@ class ReuseMap[T <: QueryPlan[_]] {
6358
*
6459
* @param plan the input plan
6560
* @param f the function to apply
61+
* @tparam T2 the type of the reuse node
6662
* @return the matching plan with `f` applied or the input plan
6763
*/
6864
def reuseOrElseAdd[T2 >: T](plan: T, f: T => T2): T2 = {

sql/catalyst/src/test/scala/org/apache/spark/sql/util/ReuseMapSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ class ReuseMapSuite extends SparkFunSuite {
3636
private def reuse(testNode: TestNode) = TestReuseNode(testNode)
3737

3838
test("no reuse if same instance") {
39-
val reuseMap = new ReuseMap[TestNode]()
39+
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
4040

4141
reuseMap.reuseOrElseAdd(leafNode1, reuse)
4242
reuseMap.reuseOrElseAdd(parentNode1, reuse)
@@ -46,7 +46,7 @@ class ReuseMapSuite extends SparkFunSuite {
4646
}
4747

4848
test("reuse if different instance with same canonicalized plan") {
49-
val reuseMap = new ReuseMap[TestNode]()
49+
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
5050
reuseMap.reuseOrElseAdd(leafNode1, reuse)
5151
reuseMap.reuseOrElseAdd(parentNode1, reuse)
5252

@@ -57,7 +57,7 @@ class ReuseMapSuite extends SparkFunSuite {
5757
}
5858

5959
test("no reuse if different canonicalized plan") {
60-
val reuseMap = new ReuseMap[TestNode]()
60+
val reuseMap = new ReuseMap[TestNode, LogicalPlan]()
6161
reuseMap.reuseOrElseAdd(leafNode1, reuse)
6262
reuseMap.reuseOrElseAdd(parentNode1, reuse)
6363

sql/core/src/main/scala/org/apache/spark/sql/execution/reuse/ReuseExchangeAndSubquery.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ case class ReuseExchangeAndSubquery(conf: SQLConf) extends Rule[SparkPlan] {
3131

3232
def apply(plan: SparkPlan): SparkPlan = {
3333
if (conf.exchangeReuseEnabled || conf.subqueryReuseEnabled) {
34-
val exchanges = new ReuseMap[Exchange]()
35-
val subqueries = new ReuseMap[BaseSubqueryExec]()
34+
val exchanges = new ReuseMap[Exchange, SparkPlan]()
35+
val subqueries = new ReuseMap[BaseSubqueryExec, SparkPlan]()
3636

3737
def reuse(plan: SparkPlan): SparkPlan = plan.transformUp {
3838
case exchange: Exchange if conf.exchangeReuseEnabled =>

0 commit comments

Comments
 (0)