Skip to content

Conversation

@wangyum
Copy link
Member

@wangyum wangyum commented Nov 2, 2020

What changes were proposed in this pull request?

This pr simplify CaseWhen with EqualTo if all values are Literal, this is a real case from production:

create table t1 using parquet as select * from range(100);
create table t2 using parquet as select * from range(200);

create temp view v1 as                                                             
select 'a' as event_type, * from t1                                                
union all                                                                          
select CASE WHEN id = 1 THEN 'b' WHEN id = 3 THEN 'c' end as event_type, * from t2 

explain select * from v1 where event_type = 'a';

Before this PR:

== Physical Plan ==
Union
:- *(1) Project [a AS event_type#30533, id#30535L]
:  +- *(1) ColumnarToRow
:     +- FileScan parquet default.t1[id#30535L] Batched: true, DataFilters: [], Format: Parquet
+- *(2) Project [CASE WHEN (id#30536L = 1) THEN b WHEN (id#30536L = 3) THEN c END AS event_type#30534, id#30536L]
   +- *(2) Filter (CASE WHEN (id#30536L = 1) THEN b WHEN (id#30536L = 3) THEN c END = a)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.t2[id#30536L] Batched: true, DataFilters: [(CASE WHEN (id#30536L = 1) THEN b WHEN (id#30536L = 3) THEN c END = a)], Format: Parquet

After this PR:

== Physical Plan ==
*(1) Project [a AS event_type#8, id#4L]
+- *(1) ColumnarToRow
   +- FileScan parquet default.t1[id#4L] Batched: true, DataFilters: [], Format: Parquet

Why are the changes needed?

Improve query performance.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Unit test.

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35118/

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Test build #130518 has finished for PR 30222 at commit 3a1cd10.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35118/

@wangyum
Copy link
Member Author

wangyum commented Nov 2, 2020

retest this please.

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35121/

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35121/

@SparkQA
Copy link

SparkQA commented Nov 2, 2020

Test build #130521 has finished for PR 30222 at commit 3a1cd10.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Also, cc @cloud-fan and @sunchao

@SparkQA
Copy link

SparkQA commented Nov 3, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35141/

@SparkQA
Copy link

SparkQA commented Nov 3, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35141/

@SparkQA
Copy link

SparkQA commented Nov 3, 2020

Test build #130541 has finished for PR 30222 at commit 593678c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Nov 4, 2020

Hive optimized it to predicate: CASE WHEN ((a = 100)) THEN (false) WHEN ((b > 1000)) THEN (true) WHEN (c is not null) THEN (false) ELSE (null) END (type: boolean). But this condition can not push down. We can optimized it to b > 1000 and push down it.

hive> explain SELECT *
    > FROM   (SELECT CASE
    >                  WHEN a = 100 THEN 1
    >                  WHEN b > 1000 THEN 2
    >                  WHEN c IS NOT NULL THEN 3
    >                END AS x
    >         FROM   t) tmp
    > WHERE  x = 2;
OK
STAGE DEPENDENCIES:
  Stage-0 is a root stage

STAGE PLANS:
  Stage: Stage-0
    Fetch Operator
      limit: -1
      Processor Tree:
        TableScan
          alias: t
          Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
          Filter Operator
            predicate: CASE WHEN ((a = 100)) THEN (false) WHEN ((b > 1000)) THEN (true) WHEN (c is not null) THEN (false) ELSE (null) END (type: boolean)
            Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
            Select Operator
              expressions: CASE WHEN ((a = 100)) THEN (1) WHEN ((b > 1000)) THEN (2) WHEN (c is not null) THEN (3) ELSE (null) END (type: int)
              outputColumnNames: _col0
              Statistics: Num rows: 1 Data size: 0 Basic stats: PARTIAL Column stats: NONE
              ListSink

}

case EqualTo(CaseWhen(branches, _), right)
if branches.count(_._2.semanticEquals(right)) == 1 =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if there are more than one matches, shall we combine the conditions with Or?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes.

e.copy(branches = branches.take(i).map(branch => (branch._1, elseValue)))
}

case EqualTo(CaseWhen(branches, _), right)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit worried about dropping other branches in CASE WHEN. a.semanticEquals(b) means a is always equal to b. But !a.semanticEquals(b) doesn't mean that a will never be equal to b.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an example (CASE WHEN a=1 THEN 1 ELSE b) = 1 can be true if a=1 or b=1.

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Test build #130629 has finished for PR 30222 at commit ee5e6dd.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35235/

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35235/

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Test build #130630 has finished for PR 30222 at commit b611659.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35237/

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35237/

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM except one minor comment.

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35276/

@SparkQA
Copy link

SparkQA commented Nov 5, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35276/

@wangyum
Copy link
Member Author

wangyum commented Nov 6, 2020

It seems it is caused by deterministic. cc @viirya

== Analyzed Logical Plan ==
label: double, features: vector, fold: int
Filter (UDF(fold#14) AND NOT (fold#14 = 2))
+- Repartition 2, true
   +- Project [label#3, features#4, fold#14]
      +- Project [label#3, features#4, random#10, CASE WHEN (random#10 < 0.33) THEN 0 WHEN (random#10 < 0.66) THEN 1 ELSE 2 END AS fold#14]
         +- Project [label#3, features#4, rand(100) AS random#10]
            +- Repartition 1, true
               +- SerializeFromObject [knownnotnull(assertnotnull(input[0, org.apache.spark.ml.feature.LabeledPoint, true])).label AS label#3, newInstance(class org.apache.spark.ml.linalg.VectorUDT).serialize AS features#4]
                  +- ExternalRDD [obj#2]

== Optimized Logical Plan ==
LocalRelation <empty>, [label#3, features#4, fold#14]

@HyukjinKwon
Copy link
Member

@wangyum, it's #21852 right? Can you file a blocker JIRA?

@cloud-fan
Copy link
Contributor

@wangyum do you know how we optimize the plan wrongly step by step?

@wangyum
Copy link
Member Author

wangyum commented Nov 6, 2020

We can reproduce it by:

spark.sql("CREATE TABLE t(a int, b int, c int) using parquet")
spark.sql(
  """
    |SELECT *
    |  FROM   (SELECT CASE
    |    WHEN rd > 1 THEN 1
    |    WHEN b > 1000 THEN 2
    |    WHEN c < 100 THEN 3
    |    ELSE 4
    |END AS x
    |FROM (SELECT *, rand(100) as rd FROM t) t1) t2
    |WHERE  x = 2
    |""".stripMargin).explain
  1. Alias.toAttribute construct AttributeReference with default deterministic, that is true:

    AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier)

  2. Therefore, deterministic is true, andSimplifyConditionals can simplify it:
    image

}

case EqualTo(c @ CaseWhen(branches, elseValue), right)
if c.deterministic &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More precisely, I think we only need to make sure the skipped branches are all deterministic.

val (picked, skipped) = branches.partition(_._2.equals(right))
if (skipped.forall(_._1.determinisitc)) {
  ...
} else {
  original
}

@SparkQA
Copy link

SparkQA commented Nov 6, 2020

Test build #130694 has finished for PR 30222 at commit 5a90bfc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Nov 6, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35304/

@dongjoon-hyun
Copy link
Member

This seems to fail still.

@SparkQA
Copy link

SparkQA commented Nov 6, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/35304/

@wangyum
Copy link
Member Author

wangyum commented Nov 6, 2020

Sorry. This change has logic issue, for example:

spark.sql("CREATE TABLE t using parquet AS SELECT if(id % 2 = 7, null, id) AS a FROM range(7)")
spark.sql(
  """
    |SELECT *
    |  FROM   (SELECT CASE
    |    WHEN a > 1 THEN 1
    |    WHEN a > 3 THEN 3
    |    WHEN a > 5 THEN 5
    |    ELSE 6
    |END AS x
    |FROM t ) t1
    |WHERE x = 3
    |""".stripMargin).show

Before this pr, the result is empty, after this pr, the result is not empty.

@wangyum wangyum closed this Nov 6, 2020
@cloud-fan
Copy link
Contributor

I see, the case when conditions are not orthogonal. We can't skip any of them.

@wangyum wangyum deleted the SPARK-33315 branch November 6, 2020 09:37
@dongjoon-hyun
Copy link
Member

Thank you for your decision, @wangyum and @cloud-fan .

@wangyum wangyum restored the SPARK-33315 branch December 11, 2020 10:22
@wangyum wangyum reopened this Dec 11, 2020
@github-actions github-actions bot added the SQL label Dec 11, 2020
@SparkQA
Copy link

SparkQA commented Dec 11, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37273/

@SparkQA
Copy link

SparkQA commented Dec 11, 2020

Kubernetes integration test status success
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/37273/

@SparkQA
Copy link

SparkQA commented Dec 11, 2020

Test build #132669 has finished for PR 30222 at commit 312c613.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@wangyum
Copy link
Member Author

wangyum commented Dec 12, 2020

@cloud-fan @dongjoon-hyun We can improve the following case to reduce Union operator:

create table t1 using parquet as select * from range(100);
create table t2 using parquet as select * from range(200);

create temp view v1 as                                                             
select 'a' as event_type, * from t1                                                
union all                                                                          
select CASE WHEN id = 1 THEN 'b' WHEN id = 3 THEN 'c' end as event_type, * from t2;

explain select * from v1 where event_type = 'a';
== Physical Plan ==
Union
:- *(1) Project [a AS event_type#8, id#10L]
:  +- *(1) ColumnarToRow
:     +- FileScan parquet default.t1[id#10L] Batched: true, DataFilters: [], Format: Parquet,
+- *(2) Project [CASE WHEN (id#11L = 1) THEN b WHEN (id#11L = 3) THEN c END AS event_type#9, id#11L]
   +- *(2) Filter (CASE WHEN (id#11L = 1) THEN b WHEN (id#11L = 3) THEN c END = a)
      +- *(2) ColumnarToRow
         +- FileScan parquet default.t2[id#11L] Batched: true, DataFilters: [(CASE WHEN (id#11L = 1) THEN b WHEN (id#11L = 3) THEN c END = a)], Format: Parquet


explain select * from v1 where event_type = 'b';
== Physical Plan ==
*(1) Project [CASE WHEN (id#11L = 1) THEN b WHEN (id#11L = 3) THEN c END AS event_type#8, id#11L AS id#10L]
+- *(1) Filter (CASE WHEN (id#11L = 1) THEN b WHEN (id#11L = 3) THEN c END = b)
   +- *(1) ColumnarToRow
      +- FileScan parquet default.t2[id#11L] Batched: true, DataFilters: [(CASE WHEN (id#11L = 1) THEN b WHEN (id#11L = 3) THEN c END = b)], Format: Parquet

if c.deterministic &&
right.isInstanceOf[Literal] && branches.forall(_._2.isInstanceOf[Literal]) &&
elseValue.forall(_.isInstanceOf[Literal]) =>
if ((branches.map(_._2) ++ elseValue).forall(!_.equals(right))) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use an EqualTo expression to compare literals? and how about the null semantic?

right.isInstanceOf[Literal] && branches.forall(_._2.isInstanceOf[Literal]) &&
elseValue.forall(_.isInstanceOf[Literal]) =>
if ((branches.map(_._2) ++ elseValue).forall(!_.equals(right))) {
FalseLiteral
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's update the JIRA/PR title, as it's a different optimization now.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@wangyum wangyum closed this Dec 16, 2020
@wangyum wangyum deleted the SPARK-33315 branch December 16, 2020 01:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants