Skip to content

Conversation

@nsyca
Copy link
Contributor

@nsyca nsyca commented Apr 3, 2017

What changes were proposed in this pull request?

This commit moves two rules right next to the rule OptimizeSubqueries.

  1. PullupCorrelatedPredicates: the rewrite of [Not] Exists and [Not] In (ListQuery) to PredicateSubquery
  2. RewritePredicateSubquery: the rewrite of PredicateSubquery to LeftSemi/LeftAnti

With this change, [Not] Exists/In subquery is now rewritten to LeftSemi/LeftAnti at the beginning of Optimizer.
One Todo is to merge the two-stage rewrite in rule PullupCorrelatedPredicates and rule RewritePredicateSubquery into a single stage rewrite rule.

How was this patch tested?

Unit tests with test cases in SQLQueryTestSuite under the directory ./sql/core/src/test/resources/sql-tests/inputs/subquery.

nsyca added 15 commits July 29, 2016 17:43
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
…rrect results

## What changes were proposed in this pull request?

This patch fixes the incorrect results in the rule ResolveSubquery in Catalyst's Analysis phase.

## How was this patch tested?
./dev/run-tests
a new unit test on the problematic pattern.
…timizeSubqueries

This commit moves two rules right next to the rule OptimizeSubqueries.
 1. PullupCorrelatedPredicates:
    the rewrite of [Not] Exists and [Not] In (ListQuery) to PredicateSubquery
 2. RewritePredicateSubquery:
    the rewrite of PredicateSubquery to LeftSemi/LeftAnti

With this change, [Not] Exists/In subquery is now rewritten to LeftSemi/LeftAnti
at the beginning of Optimizer.

By moving rule PullupCorrelatedPredicates after rule OptimizerSubqueries, all
the rules from the nested call to the entire Optimizer on the plans in subqueries
will need to deal with (1). the correlated columns wrapped with OuterReference,
and (2) the SubqueryExpression.

We will block any push down of both types of expressions for the following reasons:

1. We do not want to push any correlated expressions further down the plan tree.
   Deep correlation is not yet supported in Spark, and, even when supported,
   deep correlation is more difficult to be unnested to a join.
2. We do not want to push any correlated subquery down because the correlated
   columns' ExprIds in the subquery may need to remap to different ExprIds from
   the plan below the current Filter that hosts the subquery.

Another side effect is we used to push down Exists/In subquery as if it is a
predicate in rule PushDownPredicate and rule PushPredicateThroughJoin. Now
Exists/In subquery is rewritten to LeftSemi/LeftAnti, we need to handle
the push down of LeftSemi/LeftAnti instead. This will be done in a followup
commit.

Another Todo is to merge the two-stage rewrite in rule PullupCorrelatedPredicates
and rule RewritePredicateSubquery into a single stage rewrite.
@SparkQA
Copy link

SparkQA commented Apr 3, 2017

Test build #75483 has finished for PR 17520 at commit 380d5d7.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@nsyca
Copy link
Contributor Author

nsyca commented Apr 3, 2017

Commit bc4fe93 is an initial work to demonstrate the idea of merging the 2-stage transformation of [NOT] Exists/IN subquery into LeftSemi/LeftAnti. It has the skeleton of the work but needs to fill in more details.

More explanation...

By moving rule PullupCorrelatedPredicates after rule OptimizerSubqueries, all the rules from the nested call to the entire Optimizer on the plans in subqueries will need to deal with (1) the correlated columns wrapped with OuterReference, and (2) the SubqueryExpression.

We will block any push down of both types of expressions for the following reasons:

  1. We do not want to push any correlated expressions further down the plan tree. Deep correlation is not yet supported in Spark, and, even when supported, deep correlation is more difficult to be unnested to a join.
  2. We do not want to push any correlated subquery down because the correlated columns' ExprIds in the subquery may need to remap to different ExprIds from the plan below the current Filter that hosts the subquery.

One side effect is we used to push down Exists/In subquery as if it is a predicate in rule PushDownPredicate and rule PushPredicateThroughJoin. Now Exists/In subquery is rewritten to LeftSemi/LeftAnti, we need to handle the push down of LeftSemi/LeftAnti instead. This will be done in a followup commit.

@nsyca
Copy link
Contributor Author

nsyca commented Apr 3, 2017

Commit 4aaab02 has the complete functionality and new test cases.

}
Project(projectList, Join(grandChild, rightOp, joinType, newJoinCond))
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[To reviewers] Should we separate this into a new rule?

join
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[To reviewers] Should we separate this into a new rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unlike the cases of Aggregate and Union (see the comments below), Window never reduces the cardinality of the input stream, hence pushing down the LeftSemi/LeftAnti join (which guarantees by their semantics that it never expands the cardinality of their parent table) is safer.

join
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[To reviewers] Should we separate this into a new rule?

This code is added to preserver the existing behaviour when subquery is in the form of a predicate. A point worth making here is pushing down EXISTS/IN subquery is not always a winner. If the processing in the subquery is expensive such as scanning a very large table and the filtering effect is marginal, performing the join below the aggregate may not save much of the aggregate. If the aggregate can reduce more because of the small number of distinct values on the group by columns, not pushing down the subquery is actually a better plan.

join
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[To reviewers] Should we separate this into a new rule?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Similar to the LeftSemi/LeftAnti over Aggregate, pushing down LeftSemi/LeftAnti under a Union means we need to repeat the processing of LeftSemi/LeftAnti once for each branch of the Union.

This code is added to preserve the existing behaviour.

u.withNewChildren(Seq(Join(u.child, rightOp, joinType, Option(joinCond))))
}
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[To reviewers] Should we separate this into a new rule?

/**
* Pushes down a subquery, in the form of [[Join LeftSemi/LeftAnti]] operator
* to the left or right side of a join below.
*/
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pushing down LeftSemi/LeftAnti through a join is better done as part of the cost-based join reordering. This code is put here to preserve the existing behaviour.

val newJoinType = buildNewJoinType(j, child, subquery.outputSet)
if (newJoinType == child.joinType) j else {
Join(child.copy(joinType = newJoinType), subquery, joinType, joinCond)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new rewrite to convert the outer joins below LeftSemi/LeftAnti to an inner join (or left or right outer join in the case of the original full outer join). EXISTS/IN/NOT EXISTS subqueries are null-filtering predicates if the correlated predicates in the subquery are null-filtering.

@nsyca
Copy link
Contributor Author

nsyca commented Apr 3, 2017

cc: @hvanhovell

@SparkQA
Copy link

SparkQA commented Apr 3, 2017

Test build #75493 has finished for PR 17520 at commit 4aaab02.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 4, 2017

Test build #75494 has finished for PR 17520 at commit 0bab4fd.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented Apr 4, 2017

Actually I think the most of code changes is LeftSemi/LeftAnti pushdown. We should update the description and maybe the title to reflect it.

@SparkQA
Copy link

SparkQA commented Apr 4, 2017

Test build #75522 has finished for PR 17520 at commit f3c7851.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 5, 2017

Test build #75526 has finished for PR 17520 at commit be19da1.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 10, 2017

Test build #75668 has finished for PR 17520 at commit b923bd5.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • case class SparkListenerBlockManagerAdded(
  • class StorageStatus(
  • public final class JavaStructuredSessionization
  • public static class LineWithTimestamp implements Serializable
  • public static class Event implements Serializable
  • public static class SessionInfo implements Serializable
  • public static class SessionUpdate implements Serializable
  • case class Event(sessionId: String, timestamp: Timestamp)
  • case class SessionInfo(
  • case class SessionUpdate(
  • class Correlation(object):
  • case class UnresolvedMapObjects(
  • case class AssertNotNull(child: Expression, walkedTypePath: Seq[String] = Nil)
  • case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper
  • * Helper case class to hold (plan, rowCount) pairs.

@nsyca
Copy link
Contributor Author

nsyca commented Apr 11, 2017

@cloud-fan: would you be interested in reviewing this PR since I have not heard from @hvanhovell for a while? Note this is a WIP and I want to hear your feedback on the issues I put in the comments along with the code. The code, as it is, is to preserve the current behaviour but not necessary a desired one.

@wangyum
Copy link
Member

wangyum commented Nov 27, 2017

@nsyca Can you resolve conflicts?

@SparkQA
Copy link

SparkQA commented Aug 13, 2018

Test build #94661 has finished for PR 17520 at commit b923bd5.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • case class SparkListenerBlockManagerAdded(
  • class StorageStatus(
  • public final class JavaStructuredSessionization
  • public static class LineWithTimestamp implements Serializable
  • public static class Event implements Serializable
  • public static class SessionInfo implements Serializable
  • public static class SessionUpdate implements Serializable
  • case class Event(sessionId: String, timestamp: Timestamp)
  • case class SessionInfo(
  • case class SessionUpdate(
  • class Correlation(object):
  • case class UnresolvedMapObjects(
  • case class AssertNotNull(child: Expression, walkedTypePath: Seq[String] = Nil)
  • case class StarSchemaDetection(conf: SQLConf) extends PredicateHelper
  • * Helper case class to hold (plan, rowCount) pairs.

@dongjoon-hyun
Copy link
Member

Is this still valid, @nsyca ?

@dilipbiswal
Copy link
Contributor

@dongjoon-hyun I had asked about this to @gatorsmile some time back. He said we may revisit this after 2.4.

@dongjoon-hyun
Copy link
Member

Got it. Thanks, @dilipbiswal . :)

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97827 has started for PR 17520 at commit b923bd5.

@cloud-fan
Copy link
Contributor

is it time to revisit it?

@SparkQA
Copy link

SparkQA commented Oct 22, 2018

Test build #97835 has started for PR 17520 at commit b923bd5.

@AmplabJenkins
Copy link

Build finished. Test FAILed.

@gatorsmile
Copy link
Member

@dilipbiswal Could you take this over?

@dilipbiswal
Copy link
Contributor

dilipbiswal commented Oct 30, 2018

@gatorsmile Sure Sean.
@cloud-fan sorry, i didn't see ur comment earlier. I wasn't watching this PR :-)

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while. This isn't a judgement on the merit of the PR in any way. It's just a way of keeping the PR queue manageable.
If you'd like to revive this PR, please reopen it and ask a committer to remove the Stale tag!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

9 participants