Skip to content

Conversation

@peter-toth
Copy link
Contributor

@peter-toth peter-toth commented Jun 20, 2023

What changes were proposed in this pull request?

This PR proposes a new way to do subexpression elimination in EquivalentExpressions. The main change of the PR is that ExpressionStats stores the expected evaluation count of subexpressions split into evalCount that records sure evaluations and conditionalEvalCount that records expected conditional evaluations.

Please note that the expected conditional evaluation count is not the same as how many times a subexpression appears in EquivalentExpressions conditionally (conditional use count). The expected conditional evaluation count better describes how likely a conditional subexpression is evaluated and so we can use it easier to define when we should consider a subexpression for elimination (e.g. some kind of threshold is reached).
The idea behind using expected conditional evaluation count is that if we consider 2 cases where a c non-leaf subexpression:

  • appears only in one branch of a simple 2-branched If expression
  • or it appears only in the last branch of a 10-branched CaseWhen expression,

then the conditional use count is 1 in both cases. But the expected conditional evaluation counts are different. Very likely c will be evaluated more in the first case if we consider random input data. Since we don't know the exact probabilities of the branches, for the sake of simplicity all branchings are modelled with 0.5 / 0.5 probabilities in this PR.

Please find a related conversation here about the default 0 value of the config: #32987 (comment)

Here are a few example expressions and the ExpressionStats (sure + expected conditional evaluation counts) of a non-leaf c subexpression from the equivalence maps built from the expressions:

Expression ExpressionStats of c
c c -> (1 + 0.0)
c + c c -> (2 + 0.0)
If(_, c, _) c -> (0 + 0.5)
If(_, c + c, _) c -> (0 + 1.0)
If(_, c, c) c -> (1 + 0.0)
If(c, c, _) c -> (1 + 0.5)
If(c, c, c) c -> (2 + 0.0)

This PR:

  • Fixes the issue of subexpressions that are surely evaluated only once but there is a certain probability that they are evaluated more. These subexpressions are now considered for elimination based on the newly introduced spark.sql.subexpressionElimination.minExpectedConditionalEvaluationCount config.
  • Fixes the issue of branch groups in CaseWhen and Coalesce expressions. Branch groups were used for calculating common subexpressions in conditional branches based on the idea that subexpressions that appear in all elements of a group are surely evaluated once. If we take the CaseWhen(w1, t1, w2, t2, w3, t3, e) example then the previously defined (t1, t2,t3, e) group made sense, but for some reason the (w1, w2, w3) group was also defined, which didn't make sense because w1 was also considered always evaluated. Also, some other groups that would have made sense (t1, w2) and (t1, t2, w3) were not defined. This PR completely removes branch groups from ConditionalExpression and uses a new way to calculate surely evaluated subexpressions.

Why are the changes needed?

Improve subexpression elimination.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Existing and new UTs (including the ones from @Kimahriman's PR: #32987).

@github-actions github-actions bot added the SQL label Jun 20, 2023
@peter-toth peter-toth force-pushed the SPARK-35564-improve-subexpression-elimination branch 2 times, most recently from e7bc27a to 7a405a9 Compare June 21, 2023 08:09
@peter-toth
Copy link
Contributor Author

This PR is still WIP because I want to add more tests, but @Kimahriman, @cloud-fan, @rednaxelafx, @ulysses-you, @viirya, @wankunde you might be interrested...

@Kimahriman
Copy link
Contributor

This hurts my brain thinking about probabilistic conditional evaluations, and I feel like the subexpression elimination logic is already overly complicated. If I wanted to just create subexpressions for anything that is definitely executed once and maybe executed one other time (regardless of how nested inside a CaseWhen or Coalesce operation), what do I even set the new setting to?

@peter-toth
Copy link
Contributor Author

peter-toth commented Jun 21, 2023

This hurts my brain thinking about probabilistic conditional evaluations, and I feel like the subexpression elimination logic is already overly complicated. If I wanted to just create subexpressions for anything that is definitely executed once and maybe executed one other time (regardless of how nested inside a CaseWhen or Coalesce operation), what do I even set the new setting to?

Got it, thanks for your feedback. If conditionalEvalCount is considered is an overkill then I can revert it conditionalUseCount or a simple boolean flag. BTW, I think with this PR the ExpressionStats calculation logic becomes much simpler than it was before (especially if we revert to conditionalEvalCount or a boolean flag), the getCommonSubexpressions() method is what became a bit more complicated.

Currently the new config is used as conditionalEvalCount >= <config value> so you could use a very small value to behave the same way as conditionalUseCount > 0 does. Or we can change the config semantics to > and use 0 config value for the same effect.

@Kimahriman
Copy link
Contributor

Kimahriman commented Jun 21, 2023

If you just have conditionalUseCount than that's basically what I have in my PR I think (except true/false currently in mine instead of number of conditional uses). Removing the "branch groups" definitely simplifies things though if we're just willing to make those be conditional uses instead, as that is a bulk of the complexity right now. Working within that was hardest part of getting the conditional usage working.

@peter-toth
Copy link
Contributor Author

peter-toth commented Jun 21, 2023

Removing the "branch groups" definitely simplifies things though if we're just willing to make those be conditional uses instead

No, I mean although I removed branchGroups (and alwaysEvaluatedInputs) from ConditionalExpression, this PR maintains the count of sure evaluations correctly in ExpressionStats.evalCount as before. See If(_, c, c) => c -> (1 + 0.0) example in the description. And actually it is better than it was before in case of CaseWhen e.g. CaseWhen(_, c, c, c, _) => c -> (1 + 0.25).

// But we can continue the previous logic further because if `w2` is evaluated, then based
// on the result of `w2` either `t2` or `w3` is also evaluated.
// So eventually the local equivalence map can be calculated as
// `W1 | (T1 & W2 | T2 & (W3 | T3 & ... & (Wn | Tn & E)))`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a missing parenthesis here? I'm trying to understand the order of operations once you get to T2.
Is it W1 | T1 & (W2 | T2 & (W3 | ...
assuming normal higher precedence of &

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I fixed the comment in bebfa21 to avoid confusion.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+ I've changed the new config in: 38a0996 and from now the default 0 value has the same effect as spark.sql.subexpressionElimination.conditionals.enabled in your PR.

@peter-toth
Copy link
Contributor Author

The failure in [Run / Linters, licenses, dependencies and documentation generation] seems unrelated.

@peter-toth peter-toth changed the title [WIP][SPARK-35564][SQL] Improve subexpression elimination [SPARK-35564][SQL] Improve subexpression elimination Jul 6, 2023
@peter-toth
Copy link
Contributor Author

I've added a few more test cases and this PR is now ready for review.

@peter-toth peter-toth force-pushed the SPARK-35564-improve-subexpression-elimination branch from 9d88d8e to c1576f1 Compare October 16, 2023 10:17
.filter(_ >= 0d),
allowLeafExpressions: Boolean = false) {

// The subexpressions are stored by height to speed up certain calculations.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorted by height?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maps is an array buffer and each element stores a map that contains expressions with certain height. The ith element contains a map of expressions with height i+1.

*
* Please note that `EquivalentExpressions` is mainly used in subexpression elimination where common
* non-leaf expression subtrees are calculated, but there there is one special use case in
* `PhysicalAggregation` where `EquivalentExpressions` is used as a mutable set of non-deterministic
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm, how can EquivalentExpressions handle non-deterministic expressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, this is not right. I should have written mutable set of deterministic expressions

* Adds each expression to this data structure and returns true if there was already a matching
* expression.
*/
def addExpr(expr: Expression): Boolean = {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like the only difference between this and addExprTree is, addExprTree allows non-deterministic expression. Shall we name these two methods better?

* A wrapper in place of using Seq[Expression] to record a group of equivalent expressions.
* This class stores the expected evaluation count of expressions split into `evalCount` +
* `realEvalCount` that records sure evaluations and `condEvalCount` + `realCondEvalCount` that
* records conditional evaluations. The `real...` fields are filled up during `inflate()`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite get the meaning of the real... fields by reading the comment here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it seems like "direct eval count" and "transitive eval count". e.g. if we addExpr(a + 1), then the "direct eval count" of a + b is 1, and the "transive eval count" of a is 1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree, direct and transitive are better prefixes.

Copy link
Contributor Author

@peter-toth peter-toth Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw transitive (currently called real) is actually a sum of the direct additions and the transitive additions from parent expressions.
If we addExprTree(a + b) and addExpr((a + b) + 1) then the direct of a + b is 1 and the transitive of a + b is 2.

We wouldn't need the transitive fields if we did recurse during addExprTree().
But you know, the previous version of EquivalentExpressions used useCount. And with useCount when same or overlapping expressions were added to the data structure the second addExprTree() didn't fully recurse, but it stopped when the first common subexpression was found. Now with the new structure, we just record the direct additions during addExprTree() and fill the transitives during inflate() to be par on with the old version.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunaltey the intersect and union operation are not possible with the old "compressed" useCount and that's why we need evalCounts.

case _: LambdaVariable => true
private def inflateExprState(exprStats: ExpressionStats): Unit = {
val expr = exprStats.expr
if (!expr.isInstanceOf[LeafExpression] || allowLeafExpressions) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will go wrong if we always allow leaf expressions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need to store leaf expressions when we use EquivalentExpressions for CSE as leafs don't make sense to evaluate in advance. addExprTree() didn't recurse to leafs in the previous version but now that addExprTree() doesn't recurse, we need this flag.

} else {
(otherValue.realEvalCount, value.realEvalCount)
}
value.realCondEvalCount += otherValue.realCondEvalCount + max - min
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this be value.realCondEvalCount = (value.realCondEvalCount + otherValue.realCondEvalCount) / 2?

Copy link
Contributor Author

@peter-toth peter-toth Nov 7, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value.realCondEvalCount = (value.realCondEvalCount + otherValue.realCondEvalCount + max - min) / 2 is the full calculation, but the value.realCondEvalCount /= 2 extracted a bit below.

The max - min / 2 is also need. E.g. if we have If(_, a + b, (a + b) + (a + b)) then during the intersect of the then and else branches we have a + b -> 1 + 0 in then and a + b -> 2 + 0 in else. The result should be a + b -> 1 + 0.5 (sure + conditional).

# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/conditionalExpressions.scala
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/nullExpressions.scala
# Conflicts:
#	sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/EquivalentExpressions.scala
@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

@github-actions github-actions bot added the Stale label Mar 23, 2024
@peter-toth
Copy link
Contributor Author

@cloud-fan, @Kimahriman please let me know if we should take this PR further, otherwise I let this PR closed by the automation.

@github-actions github-actions bot closed this Mar 24, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants