Skip to content

Commit 53b8e27

Browse files
committed
Address comments
1 parent 0e16591 commit 53b8e27

File tree

4 files changed

+23
-20
lines changed

4 files changed

+23
-20
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ case class AdaptiveSparkPlanExec(
8080
// TODO add more optimization rules
8181
override protected def batches: Seq[Batch] = Seq(
8282
Batch("Demote BroadcastHashJoin", Once, DemoteBroadcastHashJoin(conf)),
83-
Batch("Optimize Join to Empty Relation", Once, OptimizeJoinToEmptyRelation)
83+
Batch("Eliminate Join to Empty Relation", Once, EliminateJoinToEmptyRelation)
8484
)
8585
}
8686

sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeJoinToEmptyRelation.scala renamed to sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateJoinToEmptyRelation.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,19 +24,18 @@ import org.apache.spark.sql.catalyst.rules.Rule
2424
import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, EmptyHashedRelationWithAllNullKeys, HashedRelation}
2525

2626
/**
27-
* This optimization rule detects and converts a `Join` to an empty `LocalRelation`:
28-
* 1. `Join` is single column NULL-aware anti join (NAAJ), and broadcasted `HashedRelation`
29-
* is `EmptyHashedRelationWithAllNullKeys`.
27+
* This optimization rule detects and converts a Join to an empty [[LocalRelation]]:
28+
* 1. Join is single column NULL-aware anti join (NAAJ), and broadcasted [[HashedRelation]]
29+
* is [[EmptyHashedRelationWithAllNullKeys]].
3030
*
31-
* 2. `Join` is inner or left semi join, and broadcasted `HashedRelation` is `EmptyHashedRelation`.
31+
* 2. Join is inner or left semi join, and broadcasted [[HashedRelation]]
32+
is [[EmptyHashedRelation]].
3233
*/
33-
object OptimizeJoinToEmptyRelation extends Rule[LogicalPlan] {
34+
object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {
3435

35-
private def canEliminate(
36-
plan: LogicalPlan,
37-
expectedRelation: HashedRelation): Boolean = plan match {
36+
private def canEliminate(plan: LogicalPlan, emptyRelation: HashedRelation): Boolean = plan match {
3837
case LogicalQueryStage(_, stage: BroadcastQueryStageExec) if stage.resultOption.get().isDefined
39-
&& stage.broadcast.relationFuture.get().value == expectedRelation => true
38+
&& stage.broadcast.relationFuture.get().value == emptyRelation => true
4039
case _ => false
4140
}
4241

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -447,9 +447,9 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
447447
}
448448

449449
if (isEmptyHashedRelation) {
450-
s"""
451-
|// If HashedRelation is empty, hash inner join simply returns nothing.
452-
""".stripMargin
450+
"""
451+
|// If HashedRelation is empty, hash inner join simply returns nothing.
452+
""".stripMargin
453453
} else if (keyIsUnique) {
454454
s"""
455455
|// generate join key for stream side
@@ -573,9 +573,9 @@ trait HashJoin extends BaseJoinExec with CodegenSupport {
573573
val numOutput = metricTerm(ctx, "numOutputRows")
574574

575575
if (isEmptyHashedRelation) {
576-
s"""
577-
|// If HashedRelation is empty, hash semi join simply returns nothing.
578-
""".stripMargin
576+
"""
577+
|// If HashedRelation is empty, hash semi join simply returns nothing.
578+
""".stripMargin
579579
} else if (keyIsUnique) {
580580
s"""
581581
|// generate join key for stream side

sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1170,20 +1170,24 @@ class AdaptiveQueryExecSuite
11701170
}
11711171
}
11721172

1173-
test("SPARK-32573, SPARK-32649: optimize join to empty relation") {
1173+
test("SPARK-32573: Eliminate NAAJ when BuildSide is EmptyHashedRelationWithAllNullKeys") {
11741174
withSQLConf(
11751175
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
1176-
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
1177-
// Test NULL-aware anti join
1176+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> Long.MaxValue.toString) {
11781177
val (plan, adaptivePlan) = runAdaptiveAndVerifyResult(
11791178
"SELECT * FROM testData2 t1 WHERE t1.b NOT IN (SELECT b FROM testData3)")
11801179
val bhj = findTopLevelBroadcastHashJoin(plan)
11811180
assert(bhj.size == 1)
11821181
val join = findTopLevelBaseJoin(adaptivePlan)
11831182
assert(join.isEmpty)
11841183
checkNumLocalShuffleReaders(adaptivePlan)
1184+
}
1185+
}
11851186

1186-
// Test inner and left semi join
1187+
test("SPARK-32649: Eliminate inner and semi join to empty relation") {
1188+
withSQLConf(
1189+
SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true",
1190+
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "80") {
11871191
Seq(
11881192
// inner join (small table at right side)
11891193
"SELECT * FROM testData t1 join testData3 t2 ON t1.key = t2.a WHERE t2.b = 1",

0 commit comments

Comments
 (0)