Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Mar 9, 2022

What changes were proposed in this pull request?

  1. This pr add a new logical plan visitor named DistinctKeyVisitor to find out all the distinct attributes in current logical plan. For example:

    spark.sql("CREATE TABLE t(a int, b int, c int) using parquet")
    spark.sql("SELECT a, b, a % 10, max(c), sum(b) FROM t GROUP BY a, b").queryExecution.analyzed.distinctKeys

    The output is: {a#1, b#2}.

  2. Enhance RemoveRedundantAggregates to remove the aggregation if it is groupOnly and the child can guarantee distinct. For example:

    set spark.sql.autoBroadcastJoinThreshold=-1; -- avoid PushDownLeftSemiAntiJoin
    create table t1 using parquet as select id a, id as b from range(10);
    create table t2 using parquet as select id as a, id as b from range(8);
    select t11.a, t11.b from (select distinct a, b from t1) t11 left semi join t2 on (t11.a = t2.a) group by t11.a, t11.b;

    Before this PR:

    == Optimized Logical Plan ==
    Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
    +- Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
       :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
       :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
       :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
       +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
          +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
             +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
    

    After this PR:

    == Optimized Logical Plan ==
    Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
    :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
    :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
    :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
    +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
       +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
          +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
    

Why are the changes needed?

Improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and TPC-DS benchmark test.

SQL Before this PR(Seconds) After this PR(Seconds)
q14a 206  193
q38 59 41
q87 127 113

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I still don't get why child.deterministic is required. If the child output is completely random, then it should not report any distinct keys.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking twice, it is not required.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it just a.foldable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. Alias is'n foldable, but the child is foldable.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should only do this filter in if (aliases.isEmpty). The distinct key can be a + b and the project list may have a + b AS c, then the result distinct key should be c.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Then do the filter again in L49. It's incorrect to do this filter this early.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why p.deterministic is required?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't p.child.distinctKeys already a Set[ExpressionSet]?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, how does this work? groupBy('a)('a, TrueLiteral) is not group only and not literal-only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RemoveRedundantAggregates already supported before this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah this works because these two aggregates are adjacent. If they are not, we have a problem.

I'm thinking that we should refine Aggregate.groupOnly: .groupBy('a)('a, TrueLiteral) is also group only.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#35795 to address this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what can go wrong if we optimize this case?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can optimize this case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you summarize the plan changes? do we have less shuffles now?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

q14a/a14b reduces 1 Exchange and 2 HashAggregates.
q38 reduces 3 Exchange and 4 HashAggregates.
q38 reduces 3 Exchange and 4 HashAggregates.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except for some minor comments

@cloud-fan
Copy link
Contributor

Do we see regressions in any TPCDS queries?

@wangyum
Copy link
Member Author

wangyum commented Mar 10, 2022

Do we see regressions in any TPCDS queries?

There is no regression.

if agg.groupOnly && child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) =>
Project(agg.aggregateExpressions, child)

case agg @ Aggregate(groupingExps, aggregateExps, child)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this case? agg.groupOnly should cover it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we do not need it.

@wangyum wangyum closed this in c16a66a Mar 14, 2022
@wangyum
Copy link
Member Author

wangyum commented Mar 14, 2022

Merged to master.

@wangyum wangyum deleted the SPARK-36194 branch March 14, 2022 13:59
wangyum pushed a commit that referenced this pull request Apr 8, 2022
### What changes were proposed in this pull request?

This PR is a followup of #35779 , to propagate distinct keys more precisely in 2 cases:
1. For `LIMIT 1`, each output attribute is a distinct key, not the entire tuple.
2. For aggregate, we can still propagate distinct keys from child.

### Why are the changes needed?

make the optimization cover more cases

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #36100 from cloud-fan/followup.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
wangyum pushed a commit that referenced this pull request Apr 8, 2022
### What changes were proposed in this pull request?

This PR is a followup of #35779 , to propagate distinct keys more precisely in 2 cases:
1. For `LIMIT 1`, each output attribute is a distinct key, not the entire tuple.
2. For aggregate, we can still propagate distinct keys from child.

### Why are the changes needed?

make the optimization cover more cases

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new tests

Closes #36100 from cloud-fan/followup.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Yuming Wang <[email protected]>
(cherry picked from commit fbe82fb)
Signed-off-by: Yuming Wang <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants