Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Feb 24, 2022

What changes were proposed in this pull request?

  1. This pr add a new logical plan visitor named DistinctKeyVisitor to find out all the distinct attributes in current logical plan. For example:

    spark.sql("CREATE TABLE t(a int, b int, c int) using parquet")
    spark.sql("SELECT a, b, a % 10, a AS aliased_a, max(c), sum(b) FROM t GROUP BY a, b").queryExecution.analyzed.distinctKeys

    The output is: {a#1, b#2}, {b#2, aliased_a#0}.

  2. Enhance RemoveRedundantAggregates to remove the aggregation if it is groupOnly and the child can guarantee distinct. For example:

    set spark.sql.autoBroadcastJoinThreshold=-1; -- avoid PushDownLeftSemiAntiJoin
    create table t1 using parquet as select id a, id as b from range(10);
    create table t2 using parquet as select id as a, id as b from range(8);
    select t11.a, t11.b from (select distinct a, b from t1) t11 left semi join t2 on (t11.a = t2.a) group by t11.a, t11.b;

    Before this PR:

    == Optimized Logical Plan ==
    Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
    +- Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
       :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
       :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
       :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
       +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
          +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
             +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
    

    After this PR:

    == Optimized Logical Plan ==
    Join LeftSemi, (a#6L = a#8L), Statistics(sizeInBytes=1492.0 B)
    :- Aggregate [a#6L, b#7L], [a#6L, b#7L], Statistics(sizeInBytes=1492.0 B)
    :  +- Filter isnotnull(a#6L), Statistics(sizeInBytes=1492.0 B)
    :     +- Relation default.t1[a#6L,b#7L] parquet, Statistics(sizeInBytes=1492.0 B)
    +- Project [a#8L], Statistics(sizeInBytes=984.0 B)
       +- Filter isnotnull(a#8L), Statistics(sizeInBytes=1476.0 B)
          +- Relation default.t2[a#8L,b#9L] parquet, Statistics(sizeInBytes=1476.0 B)
    

Why are the changes needed?

Improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test and TPC-DS benchmark test.

SQL Before this PR(Seconds) After this PR(Seconds)
q14a 206  193
q38 59 41
q87 127 113

@github-actions github-actions bot added the SQL label Feb 24, 2022
@wangyum wangyum changed the title [SPARK-36194][SQL] Remove the aggregation from left semi/anti join if the same aggregation has already been done on left side [SPARK-36194][SQL] Add A logical plan visitor to propagate the distinct attributes Feb 24, 2022
* }}}
*/
trait LogicalPlanDistinctKeys { self: LogicalPlan =>
lazy val distinctKeys: Set[AttributeSet] = DistinctKeyVisitor.visit(self)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a config for this feature? If the config is off, here we just return Set.empty

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g. spark.sql.optimizer.propagateDistinctKeys.enabled

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

case ne => ne
}
keys.exists(_.equals(ExpressionSet(references)))
}.map(s => AttributeSet(s.map(_.toAttribute))).toSet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about

    val outputSet = ExpressionSet(projectList.map(_.toAttribute))
    val aliases = projectList.filter(_.isInstanceOf[Alias])
    if (aliases.isEmpty) return keys.filter(_.subsetOf(outputSet))

    val aliasedDistinctKeys = keys.map { expressionSet =>
      expressionSet.map { expression =>
        expression transform {
          case expr: Expression =>
            aliases
              .collectFirst { case a: Alias if a.child.semanticEquals(expr) => a.toAttribute }
              .getOrElse(expr)
        }
      }
    }
    aliasedDistinctKeys.collect {
      case es: ExpressionSet if es.subsetOf(outputSet) => ExpressionSet(es)
    }

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If one expression has multiple aliases, we need to further expand the distinct keys set. We can do it later as it's rather a corner case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK


override def visitGenerate(p: Generate): Set[AttributeSet ] = default(p)

override def visitGlobalLimit(p: GlobalLimit): Set[AttributeSet ] = p.child.distinctKeys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the limit value is 1 or 0, all output columns are distinct.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


override def visitJoin(p: Join): Set[AttributeSet] = {
p.joinType match {
case LeftExistence(_) => p.left.distinctKeys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we exclude ExistenceJoin?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

override def visitRepartitionByExpr(p: RepartitionByExpression): Set[AttributeSet] =
p.child.distinctKeys

override def visitSample(p: Sample): Set[AttributeSet] = default(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For Sample without replacement, we can propagate the distinct keys from child.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

override def visitRebalancePartitions(p: RebalancePartitions): Set[AttributeSet] =
p.child.distinctKeys

override def visitWithCTE(p: WithCTE): Set[AttributeSet] = default(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CTE can also propagate distinct keys from child.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@cloud-fan
Copy link
Contributor

due to significant plan changes caused by this PR, can we verify the TPCDS performance?

@wangyum
Copy link
Member Author

wangyum commented Feb 25, 2022

due to significant plan changes caused by this PR, can we verify the TPCDS performance?

OK

@wangyum wangyum changed the title [SPARK-36194][SQL] Add A logical plan visitor to propagate the distinct attributes [SPARK-36194][SQL] Add a logical plan visitor to propagate the distinct attributes Feb 25, 2022
newAggregate
}

case agg @ Aggregate(groupingExps, _, child) if agg.groupOnly && child.deterministic &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does child.deterministic matter here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had a test like this before:

test("Remove redundant aggregate with non-deterministic upper") {
val query = relation
.groupBy('a)('a)
.groupBy('a)('a, rand(0) as 'c)
.analyze
val expected = relation
.groupBy('a)('a, rand(0) as 'c)
.analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, expected)
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which means child.deterministic doesn't matter? The test you posted did optimize out one aggregate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry. This test:

test("Keep non-redundant aggregate - upper references non-deterministic non-grouping") {
val query = relation
.groupBy('a)('a, ('a + rand(0)) as 'c)
.groupBy('a, 'c)('a, 'c)
.analyze
val optimized = Optimize.execute(query)
comparePlans(optimized, query)
}

case Inner =>
p match {
case ExtractEquiJoinKeys(_, leftKeys, rightKeys, _, _, _, _, _)
if p.left.distinctKeys.exists(_.subsetOf(ExpressionSet(leftKeys))) &&
Copy link
Contributor

@cloud-fan cloud-fan Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should use || here. If there is only one side has valid distinct keys, we should still propagate that side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql("create table t1(a int, b int) using parquet")
spark.sql("create table t2(x int, y int) using parquet")

spark.sql("insert into t1 values(1, 1), (2, 2)")
spark.sql("insert into t2 values(1, 1), (1, 1), (2, 2), (2, 2)")

spark.sql("select * from (select distinct * from t1 )t1 join t2 on t1.a = t2.x and t1.b = t2.y").show

The output is:

+---+---+---+---+
|  a|  b|  x|  y|
+---+---+---+---+
|  1|  1|  1|  1|
|  1|  1|  1|  1|
|  2|  2|  2|  2|
|  2|  2|  2|  2|
+---+---+---+---+

We can't distinguish the distinct keys.

Set(ExpressionSet(leftKeys), ExpressionSet(rightKeys))
case _ => default(p)
}
case _ => default(p)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for left outer, we can propagate from right side.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql("create table t1(a int, b int) using parquet")
spark.sql("create table t2(x int, y int) using parquet")

spark.sql("insert into t1 values(1, 1), (2, 2)")
spark.sql("insert into t2 values(3, 3), (4, 4)")

spark.sql("select * from t1 left join (select distinct * from t2)t2 on t1.a = t2.x and t1.b = t2.y").show

The output is:

+---+---+----+----+
|  a|  b|   x|   y|
+---+---+----+----+
|  2|  2|null|null|
|  1|  1|null|null|
+---+---+----+----+

We can't distinguish the distinct keys.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry my fault. We can propagate the left side distinct keys if p.right.distinctKeys.exists(_.subsetOf(rightJoinKeySet))

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1


def constraintPropagationEnabled: Boolean = getConf(CONSTRAINT_PROPAGATION_ENABLED)

def propagateDistinctKeysEnabled: Boolean = getConf(PROPAGATE_DISTINCT_KEYS_ENABLED)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's only called once, we can inline it.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

comparePlans(optimized, correctAnswer)
}

test("SPARK-36194: Negative case: The grouping expressions not same") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't require them to be the same. We need the child distinct keys to be a subset of the required grouping keys.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. update the test name.

Seq(LeftSemi, LeftAnti).foreach { joinType =>
val originalQuery = x.groupBy('a, 'b)('a, 'b)
.join(y, joinType, Some("x.a".attr === "y.a".attr && "x.b".attr === "y.b".attr))
.groupBy("x.a".attr, "x.b".attr)(TrueLiteral)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmmm why can't we optimize this query?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Update RemoveRedundantAggregates to support this case:

case agg @ Aggregate(groupingExps, aggregateExps, child)
if aggregateExps.forall(a => a.isInstanceOf[Alias] && a.children.forall(_.foldable)) &&
child.deterministic &&
child.distinctKeys.exists(_.subsetOf(ExpressionSet(groupingExps))) =>
Project(agg.aggregateExpressions, child)

}
}

test("SPARK-36194: Negative case: The aggregate expressions not same") {
Copy link
Contributor

@cloud-fan cloud-fan Feb 28, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the test name is a bit misleading. I don't think aggregate expressions matter (as long as it's a group only aggregate), grouping expressions matter.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the test name.

Project(agg.aggregateExpressions, child)

case agg @ Aggregate(groupingExps, aggregateExps, child)
if aggregateExps.forall(a => a.isInstanceOf[Alias] && a.children.forall(_.foldable)) &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how about a mix? e.g. SELECT a, 1, b FROM ... GROUP BY a, b, c

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case ExtractEquiJoinKeys(Inner, leftKeys, rightKeys, _, _, _, _, _)
if p.left.distinctKeys.exists(_.subsetOf(ExpressionSet(leftKeys))) &&
p.right.distinctKeys.exists(_.subsetOf(ExpressionSet(rightKeys))) =>
Set(ExpressionSet(leftKeys), ExpressionSet(rightKeys))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if p.left.distinctKeys.exists(_.subsetOf(ExpressionSet(leftKeys))), we can propagate the right side distinct keys, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@wangyum
Copy link
Member Author

wangyum commented Mar 9, 2022

New PR: #35779

This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants