-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-38959][SQL][FOLLOWUP] Optimizer batch PartitionPruning should optimize subqueries
#38557
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,10 +17,10 @@ | |
|
|
||
| package org.apache.spark.sql.execution.dynamicpruning | ||
|
|
||
| import org.apache.spark.sql.catalyst.expressions.{And, Attribute, DynamicPruningSubquery, Expression, PredicateHelper, V2ExpressionUtils} | ||
| import org.apache.spark.sql.catalyst.expressions.{Attribute, DynamicPruningExpression, Expression, InSubquery, ListQuery, PredicateHelper, V2ExpressionUtils} | ||
| import org.apache.spark.sql.catalyst.expressions.Literal.TrueLiteral | ||
| import org.apache.spark.sql.catalyst.planning.GroupBasedRowLevelOperation | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project} | ||
| import org.apache.spark.sql.catalyst.plans.logical.{Aggregate, Filter, LogicalPlan} | ||
| import org.apache.spark.sql.catalyst.rules.Rule | ||
| import org.apache.spark.sql.connector.read.SupportsRuntimeV2Filtering | ||
| import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, DataSourceV2Relation, DataSourceV2ScanRelation} | ||
|
|
@@ -37,8 +37,7 @@ import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Implicits, Dat | |
| * | ||
| * Note this rule only applies to group-based row-level operations. | ||
| */ | ||
| case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[LogicalPlan]) | ||
| extends Rule[LogicalPlan] with PredicateHelper { | ||
| object RowLevelOperationRuntimeGroupFiltering extends Rule[LogicalPlan] with PredicateHelper { | ||
|
|
||
| import DataSourceV2Implicits._ | ||
|
|
||
|
|
@@ -65,8 +64,7 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic | |
| Filter(dynamicPruningCond, r) | ||
| } | ||
|
|
||
| // optimize subqueries to rewrite them as joins and trigger job planning | ||
| replaceData.copy(query = optimizeSubqueries(newQuery)) | ||
| replaceData.copy(query = newQuery) | ||
| } | ||
|
|
||
| private def buildMatchingRowsPlan( | ||
|
|
@@ -89,10 +87,8 @@ case class RowLevelOperationRuntimeGroupFiltering(optimizeSubqueries: Rule[Logic | |
| buildKeys: Seq[Attribute], | ||
| pruningKeys: Seq[Attribute]): Expression = { | ||
|
|
||
| val buildQuery = Project(buildKeys, matchingRowsPlan) | ||
| val dynamicPruningSubqueries = pruningKeys.zipWithIndex.map { case (key, index) => | ||
| DynamicPruningSubquery(key, buildQuery, buildKeys, index, onlyInBroadcast = false) | ||
| } | ||
| dynamicPruningSubqueries.reduce(And) | ||
| val buildQuery = Aggregate(buildKeys, buildKeys, matchingRowsPlan) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are there any downsides of rewriting I see some special branches for exchange reuse in those rules that would not apply now.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see any downside. We can only reuse broadcast if the DPP filter is derived from a join, which doesn't apply here.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. I was originally worried we could miss some future optimizations given that dynamic pruning for row-level operations would go through a different route compared to the normal DPP. One alternative could be to extend
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My rationale is, what we really need is a subquery here. This is completely different from dynamic partition pruning. One limitation is DS v2 runtime filter pushdown only applies to
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, DS v2 runtime filtering framework is fairly limited at this point. |
||
| DynamicPruningExpression( | ||
| InSubquery(pruningKeys, ListQuery(buildQuery, childOutputs = buildQuery.output))) | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This makes sense. Just wondering that is this particularly related to SPARK-38959?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, because this PR adds
OptimizeSubqueriesto the batchPartitionPruningand we should not break #33664