-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-34952][SQL] Aggregate (Min/Max/Count) push down for Parquet #32049
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #136892 has finished for PR 32049 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious - would anyone ever not want to push it down?
I'm surprised, I thought we already did this!
CC @cloud-fan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@srowen Hello Sean :)
Actually we only have filter push down for parquet, not aggregate push down yet. I will probably change the default to true after this PR gets reviewed and fully tested.
e8c90af to
82b4592
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One suggestion: Can we reuse PushableColumn which is used by predicate pushdown to capture pushed column?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
typo? SELECT (*)? You mean SELECT count(*) FROM table -> SELECT count(1) FROM table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For key methods added here, can you add some descriptive comment?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: aggResultToSparkInternalRows => createInternalRowFromAggResult?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
values(i).asInstanceOf[Integer]? Or values(i).asInstanceOf[Long]? It is PrimitiveTypeName.INT64.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should be values(i).asInstanceOf[Long]. Fixed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So if we use aggregate pushdown for Parquet, we cannot use vectorized Parquet reader, right? Can you describe it too in the config doc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems that it's supported to read aggregation result into a ColumnarBatch below in buildColumnarReader. So we can still do aggregation push down with vectorized reader enabled right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it doesn't matter if the vectorized reader is enabled or not. Since we are reading the statistics information from the parquet footer, we don't really create a VectorizedReader. But if columnar reader is enabled, we return a ColumnarBatch instead of a InternalRow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct me if I misunderstand it.
Seems that this method reads each block then get aggregated results for each aggregate function. The aggregated results are put into an array.
Consider two aggregate functions max(col1) and min(col1), So the array content looks like [max(col1), min(col2)].
How does this deal with more than one block case? Seems this method puts aggregated results sequentially like [max(col1) for block1, min(col2) for block1, max(col1) for block2, min(col2) for block2, ...]?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I didn't do this right. Will fix this.
|
Test build #136897 has finished for PR 32049 at commit
|
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shall it be an internal config or not? Do we expect this one to be user-facing and tune it frequently?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for reviewing!
I think this should be similar to PARQUET_FILTER_PUSHDOWN_ENABLED and be a user-facing config. I guess we can default it to true in the future after we have more testing.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just want to double check, Parquet will always make sure the min/max statistics to be presented in footer, right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good question. I actually need to check if Parquet returns the min/max statistics. If not, I will either throw Exception or fall back to the no push down way. I think fall back is a better solution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I can't find a good way to fall back. We won't be able to read footer until FilePartitionReaderFactory.createReader, that's when we can get a partition of the file to read. Seems to me it's too late to fall back at that time. I looked at Presto parquet partial aggregation implementation, and it throws Exception. I will throw Exception for now. If anybody has a better idea, please let me know.
|
Test build #136902 has finished for PR 32049 at commit
|
|
Test build #136904 has finished for PR 32049 at commit
|
|
@huaxingao Can you briefly introduce the new aggregate pushdown framework? How do we push down aggregate through different operators and eventually hit the scan node? Do we support both partial+final and global aggregate? |
|
@cloud-fan I will have a SPIP for this. |
4ecabfb to
d9dc0ba
Compare
|
Kubernetes integration test starting |
|
Kubernetes integration test status failure |
|
Test build #137188 has finished for PR 32049 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #137183 has finished for PR 32049 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Test build #137192 has finished for PR 32049 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is pushdown used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If children are all push down Count, I think they are all not nullable because Count is not nullable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is different than other push down aggregation functions. Can you add a comment here why Count needs to overwrite its updateExpressions?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank for your comments. I will rewrite Count to Sum for pushed down Count to minimize code change, based on our off-line discussion.
| // +- RelationV2[min(c1)#21, max(c1)#22] parquet file ... | ||
| var index = 0 | ||
| val output = resultExpressions.map { | ||
| case Alias(_, name) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is this corrected? if the query is SELECT max(c) + min(c) as res FROM t, what we push down is max(c) and min(c), and the expected output of the scan relation should be max(c)#id and min(c)#id, instead of res#id.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One idea to construct the output:
val newOutput = scan.readSchema().toAttributes
val groupAttrs = groupingExpressions.zip(newOutput).map {
case (a: Attribute, b: Attribute) => b.withExprId(a.exprId)
case other => b
}
val output = groupAttrs ++ newOutput.drop(groupAttrs.length)
| val aggregates = resultExpressions.flatMap { expr => | ||
| expr.collect { | ||
| case agg: AggregateExpression => | ||
| replaceAlias(agg, getAliasMap(project)).asInstanceOf[AggregateExpression] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since project.forall(_.isInstanceOf[AttributeReference]), I don't think we need to de-alias any more.
| translatedFilters: Seq[sources.Filter], | ||
| handledFilters: Seq[sources.Filter]) extends Scan { | ||
| handledFilters: Seq[sources.Filter], | ||
| pushedAggregates: Aggregation) extends Scan { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why do we put it here if we are not able to support it?
| val translatedAggregates = aggregates.map(DataSourceStrategy.translateAggregate) | ||
| val translatedGroupBys = groupBy.map(columnAsString) | ||
|
|
||
| val agg = Aggregation(translatedAggregates.flatten, translatedGroupBys.flatten) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we can only apply pushdown if all the group by cols are supported. e.g. GROUP BY a, substring(b), c, it's wrong to pushdown GROUP BY a, c
|
Test build #140982 has finished for PR 32049 at commit
|
|
Kubernetes integration test unable to build dist. exiting with code: 1 |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #140984 has finished for PR 32049 at commit
|
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
...re/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2ScanRelationPushDown.scala
Outdated
Show resolved
Hide resolved
|
Test build #141033 has finished for PR 32049 at commit
|
|
retest this please |
|
Kubernetes integration test starting |
|
Test build #141034 has finished for PR 32049 at commit
|
|
retest this please |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Kubernetes integration test starting |
|
Kubernetes integration test status success |
|
Test build #141035 has finished for PR 32049 at commit
|
|
Per offline discussion with @cloud-fan, we will split this PR into two PRs: the first one will add interface and APIs, and the second one will add Parquet implementation. I will close this PR for now. Thanks every one for reviewing! Here is the first PR #33352 |
### What changes were proposed in this pull request? Add interfaces and APIs to push down Aggregates to V2 Data Source ### Why are the changes needed? improve performance ### Does this PR introduce _any_ user-facing change? SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED was added. If this is set to true, Aggregates are pushed down to Data Source. ### How was this patch tested? New tests were added to test aggregates push down in #32049. The original PR is split into two PRs. This PR doesn't contain new tests. Closes #33352 from huaxingao/aggPushDownInterface. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request? Add interfaces and APIs to push down Aggregates to V2 Data Source ### Why are the changes needed? improve performance ### Does this PR introduce _any_ user-facing change? SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLED was added. If this is set to true, Aggregates are pushed down to Data Source. ### How was this patch tested? New tests were added to test aggregates push down in #32049. The original PR is split into two PRs. This PR doesn't contain new tests. Closes #33352 from huaxingao/aggPushDownInterface. Authored-by: Huaxin Gao <[email protected]> Signed-off-by: Wenchen Fan <[email protected]> (cherry picked from commit c561ee6) Signed-off-by: Wenchen Fan <[email protected]>
What changes were proposed in this pull request?
Push down Min/Max/Count to Parquet
Why are the changes needed?
Since parquet has the statistics information for min, max and count, we want to take advantage of this info and push down Min/Max/Count to parquet layer for better performance.
Does this PR introduce any user-facing change?
Yes,
SQLConf.PARQUET_AGGREGATE_PUSHDOWN_ENABLEDwas added. If this is set to true, we will push down Min/Max/Count to Parquet.How was this patch tested?
New tests were added.