1717
1818package org .apache .spark .sql .execution .adaptive
1919
20- import org .apache .spark .sql .catalyst .InternalRow
2120import org .apache .spark .sql .catalyst .planning .ExtractSingleColumnNullAwareAntiJoin
2221import org .apache .spark .sql .catalyst .plans .{Inner , LeftAnti , LeftSemi }
2322import org .apache .spark .sql .catalyst .plans .logical .{Join , LocalRelation , LogicalPlan }
@@ -35,8 +34,7 @@ import org.apache.spark.sql.execution.joins.{EmptyHashedRelation, HashedRelation
3534 * because sort merge join and shuffled hash join will be changed to broadcast hash join with AQE
3635 * at the first place.
3736 *
38- * 3. Join is left anti join without condition, and broadcasted join right side is not empty.
39- * This applies to broadcast nested loop join only.
37+ * 3. Join is left anti join without condition, and join right side is non-empty.
4038 */
4139object EliminateJoinToEmptyRelation extends Rule [LogicalPlan ] {
4240
@@ -59,18 +57,12 @@ object EliminateJoinToEmptyRelation extends Rule[LogicalPlan] {
5957 LocalRelation (j.output, data = Seq .empty, isStreaming = j.isStreaming)
6058
6159 case j @ Join (_, _, LeftAnti , None , _) =>
62- val isNonEmptyBroadcastedRightSide = j.right match {
63- case LogicalQueryStage (_, stage : BroadcastQueryStageExec )
64- if stage.resultOption.get().isDefined =>
65- stage.broadcast.relationFuture.get().value match {
66- // Match with Array[InternalRow] as this is the type of broadcast result
67- // in [[BroadcastNestedLoopJoinExec]].
68- case v : Array [InternalRow ] => v.nonEmpty
69- case _ => false
70- }
60+ val isNonEmptyRightSide = j.right match {
61+ case LogicalQueryStage (_, stage : QueryStageExec ) if stage.resultOption.get().isDefined =>
62+ stage.getRuntimeStatistics.rowCount.exists(_ > 0 )
7163 case _ => false
7264 }
73- if (isNonEmptyBroadcastedRightSide ) {
65+ if (isNonEmptyRightSide ) {
7466 LocalRelation (j.output, data = Seq .empty, isStreaming = j.isStreaming)
7567 } else {
7668 j
0 commit comments