Skip to content

Commit f63bee3

Browse files
committed
Fix
1 parent 7217460 commit f63bee3

File tree

4 files changed

+9
-13
lines changed

4 files changed

+9
-13
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/joins.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,8 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
119119
createOrderedJoin(input, conditions)
120120
}
121121

122-
// Checks if joins were reordered. If not reordered, returns the original plan
122+
// To avoid applying this rule repeatedly, we don't change the plan in case of
123+
// the same join order between `p` and `reordered`.
123124
if (!sameJoinOrder(reordered, p)) {
124125
if (p.sameOutput(reordered)) {
125126
reordered
@@ -129,7 +130,7 @@ object ReorderJoin extends Rule[LogicalPlan] with PredicateHelper {
129130
Project(p.output, reordered)
130131
}
131132
} else {
132-
reordered
133+
p
133134
}
134135
}
135136
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -250,8 +250,8 @@ object ExtractFiltersAndInnerJoins extends PredicateHelper {
250250
val (plans, conditions) = flattenJoin(left, joinType)
251251
(plans ++ Seq((right, joinType)), conditions ++
252252
cond.toSeq.flatMap(splitConjunctivePredicates))
253-
case Filter(filterCondition, j @ Join(_, _, _: InnerLike, _, hint)) if hint == JoinHint.NONE =>
254-
val (plans, conditions) = flattenJoin(j)
253+
case Filter(filterCondition, child) =>
254+
val (plans, conditions) = flattenJoin(child)
255255
(plans, conditions ++ splitConjunctivePredicates(filterCondition))
256256
case p @ Project(_, child)
257257
// Keep flattening joins when the project has attributes only

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinOptimizationSuite.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ class JoinOptimizationSuite extends PlanTest {
137137
}
138138
}
139139

140-
test("SPARK-23172 skip projections when flattening joins") {
140+
test("Skip projections when flattening joins") {
141141
def checkExtractInnerJoins(plan: LogicalPlan): Unit = {
142142
val expectedTables = plan.collectLeaves().map { case p => (p, Inner) }
143143
val expectedConditions = plan.collect {
@@ -170,7 +170,7 @@ class JoinOptimizationSuite extends PlanTest {
170170
checkExtractInnerJoins(joined)
171171
}
172172

173-
test("SPARK-23172 reorder joins with projections") {
173+
test("Reorder joins with projections") {
174174
withSQLConf(
175175
SQLConf.STARSCHEMA_DETECTION.key -> "true",
176176
SQLConf.CBO_ENABLED.key -> "false") {

sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.dsl.expressions._
2121
import org.apache.spark.sql.catalyst.dsl.plans._
2222
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap}
2323
import org.apache.spark.sql.catalyst.plans.{Inner, PlanTest}
24-
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan, Project}
24+
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, LocalRelation, LogicalPlan}
2525
import org.apache.spark.sql.catalyst.rules.RuleExecutor
2626
import org.apache.spark.sql.catalyst.statsEstimation.{StatsEstimationTestBase, StatsTestPlan}
2727
import org.apache.spark.sql.internal.SQLConf._
@@ -580,12 +580,7 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
580580

581581
private def assertEqualPlans(plan1: LogicalPlan, plan2: LogicalPlan): Unit = {
582582
val analyzed = plan1.analyze
583-
val optimized = Optimize.execute(analyzed) match {
584-
// `ReorderJoin` adds `Project` to keep the same order of output attributes.
585-
// So, we drop a top `Project` for tests.
586-
case project: Project => project.child
587-
case p => p
588-
}
583+
val optimized = Optimize.execute(analyzed)
589584
val expected = plan2.analyze
590585

591586
assert(equivalentOutput(analyzed, expected)) // if this fails, the expected itself is incorrect

0 commit comments

Comments
 (0)