-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-32573][SQL] Anti Join Improvement with EmptyHashedRelation and EmptyHashedRelationWithAllNullKeys #29389
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
In [SPARK-32290](https://issues.apache.org/jira/browse/SPARK-32290), we introduced several new types of HashedRelation. * EmptyHashedRelation * EmptyHashedRelationWithAllNullKeys They were all limited to used only in NAAJ scenario. But as for a improvement, EmptyHashedRelation could also be used in Normal AntiJoin for fast stop, and as for in AQE, we can even eliminate anti join when we knew that buildSide is empty. This Patch including two changes. * In Non-AQE, using EmptyHashedRelation to do fast stop for common anti join as well * In AQE, eliminate anti join if buildSide is a EmptyHashedRelation of ShuffleWriteRecord is 0 LeftAntiJoin could apply `fast stop` when BuildSide is Empty, While within AQE, we can even eliminate the anti join. This should be a performance improvement in AntiJoin. No. * added case in AdaptiveQueryExecSuite. * added case in HashedRelationSuite. * Make sure SubquerySuite JoinSuite SQLQueryTestSuite passed. Change-Id: I718227725e319a54b99c91e6a0d1b9f022343b16
Change-Id: I47ebe0cc2e751b018a6ee02fc3018601805c4060
|
@cloud-fan could you please have a look at this followup? |
|
ok to test |
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/BroadcastExchangeExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
Change-Id: Icfe57ed02de8dd1b414f4d0c81a51353d49a6ead
|
@maropu updated. thanks for doing this in weekend. ^_^ |
|
You, too, @leanken ;) thanks for the work, anyway. |
|
Test build #127231 has finished for PR 29389 at commit
|
|
Test build #127233 has finished for PR 29389 at commit
|
sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/EliminateAntiJoin.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Outdated
Show resolved
Hide resolved
Change-Id: If5d776916fc12674068c486d605ff9a1fc28526f
|
@dongjoon-hyun updated, thanks |
| "buildTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to build"), | ||
| "broadcastTime" -> SQLMetrics.createTimingMetric(sparkContext, "time to broadcast")) | ||
|
|
||
| private var knownRowCount: Option[BigInt] = None |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we be consistent and use metrics to hold rowCount? You can follow how we track dataSize.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure will do the update.
|
|
||
| val dataSize = relation match { | ||
| case EmptyHashedRelation => | ||
| knownRowCount = Some(0L) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See L118. We already know the row count and we even use it to check numRows >= MAX_BROADCAST_TABLE_ROWS
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
Show resolved
Hide resolved
|
|
||
| /** | ||
| * A special HashedRelation indicates it built from a empty input:Iterator[InternalRow]. | ||
| * get & getValue will return null just like |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BTW where do we call these methods for EmptyHashedRelation?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. It will be called. EmptyHashedRelation now is also applied at all joinType. I confirmed it while running UT.
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/BroadcastHashJoinExec.scala
Outdated
Show resolved
Hide resolved
| def apply(plan: LogicalPlan): LogicalPlan = plan.transformDown { | ||
| // If the right side is empty, LeftAntiJoin simply returns the left side. | ||
| // Eliminate Join with left LogicalPlan instead. | ||
| case Join(left, right, LeftAnti, _, _) if canEliminate(right) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very useful, as the anti join is already very fast with the empty right side.
https://github.com/apache/spark/pull/29389/files#r467683621 is more useful, as for now we still need to consume all the left side data and return nothing. It will be a big improvement if we can directly convert the join node to an empty relation with AQE.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not very useful, as the anti join is already very fast with the empty right side.
https://github.com/apache/spark/pull/29389/files#r467683621 is more useful, as for now we still need to consume all the left side data and return nothing. It will be a big improvement if we can directly convert the join node to an empty relation with AQE.
In fact, we need to consume all the left side data and return all the rows when it's an EmptyHashedRelation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In fact, we need to consume all the left side data and return all the rows when it's an EmptyHashedRelation.
Yes, but this rule doesn't change it. By returning the left side plan, we still need to consume all the data of the left side.
eliminate naaj which buildSide is EmptyHashedRelationWithAllNullKeys while AQE is on, should gain more Performance improvement. Change-Id: I13c667628220da923fc59718281642460c80f71d
Change-Id: I0d0a96da107145b9fc681e15f2533abd165f344a
Change-Id: I3039efd9e472431131b52bec0d0619b84e2c6ee5
|
Test build #127249 has finished for PR 29389 at commit
|
|
@cloud-fan updated. |
|
|
@leanken please also update the PR description. |
|
Update PR title => Anti Join Improvement with EmptyHashedRelation and EmptyHashedRelationWithAllNullKeys |
|
Update RP desc and Jira desc accordingly, @cloud-fan |
|
Test build #127270 has finished for PR 29389 at commit
|
| protected override def codegenAnti(ctx: CodegenContext, input: Seq[ExprCode]): String = { | ||
| if (isNullAwareAntiJoin) { | ||
| val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) | ||
| val (broadcastRelation, relationTerm) = prepareBroadcast(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After a second look, I think this is incorrect. We may go to the else branch, which calls super.codegenAnti and call prepareBroadcast again.
I think we should keep this method untouched, and update HashJoin.codegenAnti to fast stop with EmptyHashedRelation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I thought about this problem too.
HashJoin.scala does not have the API prepareBroadcast, only the following prepareRelation available, which will not return HashedRelation.
protected def prepareRelation(ctx: CodegenContext): (String, Boolean)
I was thinking that call prepareBroadcast twice is OK since broadcast.Broadcast[HashedRelation] is a future call, and the collect and build action will only happen once, but the relationTerm will be generated twice
is it ok to just change prepareRelation to return a (String, Boolean, HashedRelation) ??
tell me when you decided which way to go. @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was also consider doing it inside BHJ because in ShuffledHashJoinExec
the hashedRelation is built among generated-code, it might be hard to just return hashedRelation in prepareRelation call.
protected override def prepareRelation(ctx: CodegenContext): (String, Boolean) = {
val thisPlan = ctx.addReferenceObj("plan", this)
val clsName = classOf[HashedRelation].getName
// Inline mutable state since not many join operations in a task
val relationTerm = ctx.addMutableState(clsName, "relation",
v => s"$v = $thisPlan.buildHashedRelation(inputs[1]);", forceInline = true)
(relationTerm, false)
}
Change-Id: Ieb631235d4bc8f1446387eb22f0c1a1cc1d3ed27
|
@cloud-fan updated. refine prepareRelation to return Code change int BHJ reverted, and do fast stop in HashJoin.scala codegenAnti using isEmptyHashedRelation |
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
Outdated
Show resolved
Hide resolved
sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala
Outdated
Show resolved
Hide resolved
cloud-fan
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM except a few nits.
|
Test build #127286 has finished for PR 29389 at commit
|
|
Test build #127296 has finished for PR 29389 at commit
|
|
@cloud-fan Test passed, ready to merge. |
|
thanks, merging to master! |
What changes were proposed in this pull request?
In SPARK-32290, we introduced several new types of HashedRelation.
They were all limited to used only in NAAJ scenario. These new HashedRelation could be applied to other scenario for performance improvements.
This Patch including two changes.
Why are the changes needed?
LeftAntiJoin could apply
fast stopwhen BuildSide is EmptyHashedRelation, While within AQE with EmptyHashedRelationWithAllNullKeys, we can eliminate the NAAJ. This should be a performance improvement in AntiJoin.Does this PR introduce any user-facing change?
No.
How was this patch tested?