Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b988651
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
069ed8f
[SPARK-16804][SQL] Correlated subqueries containing LIMIT return inco…
nsyca Jul 29, 2016
edca333
New positive test cases
nsyca Jul 30, 2016
64184fd
Fix unit test case failure
nsyca Aug 1, 2016
29f82b0
blocking TABLESAMPLE
nsyca Aug 5, 2016
ac43ab4
Fixing code styling
nsyca Aug 5, 2016
631d396
Correcting Scala test style
nsyca Aug 7, 2016
7eb9b2d
One (last) attempt to correct the Scala style tests
nsyca Aug 8, 2016
1387cf5
Merge remote-tracking branch 'upstream/master'
nsyca Aug 12, 2016
648afac
Merge remote-tracking branch 'upstream/master'
nsyca Mar 14, 2017
dfd476d
Merge remote-tracking branch 'upstream/master'
nsyca Mar 16, 2017
9e1c18c
Merge remote-tracking branch 'upstream/master'
nsyca Mar 20, 2017
bc4fe93
Move PullupCorrelatedPredicates and RewritePredicateSubquery after Op…
nsyca Mar 22, 2017
dc3aa7e
Merge remote-tracking branch 'upstream/master'
nsyca Apr 3, 2017
380d5d7
Merge branch 'master' into phase2-1-clean
nsyca Apr 3, 2017
208f384
This commit works on 3 things:
nsyca Mar 22, 2017
a86f18b
Add LeftSemi/LeftAnti's constraints
nsyca Mar 23, 2017
fe89f35
Revert back QueryPlan.scala and fix FilterPushdownSuite
nsyca Mar 24, 2017
f078309
Clean up and add LeftSemi/Anti pushdown on empty joinCond
nsyca Mar 25, 2017
2479bcd
Fix bug in Join over Project that breaks LeftSemiOrAntiPushdownSuite …
nsyca Mar 25, 2017
bb8fad9
Update IN subquery pushdown test case
nsyca Apr 3, 2017
4aaab02
Fix merge conflict
nsyca Apr 3, 2017
0bab4fd
Fix test failure HiveCompatibilitySuite/subquery_in_having
nsyca Apr 4, 2017
2081fac
Handle Aggregate/Window/Union under LeftSemi/Anti and new test cases
nsyca Apr 4, 2017
8b36213
Merge remote-tracking branch 'upstream/master'
nsyca Apr 4, 2017
f3c7851
Merge branch 'master' into 19712-1
nsyca Apr 4, 2017
9dc8969
resolve conflict
nsyca Apr 4, 2017
be19da1
Merge branch '19712-1' of https://github.com/nsyca/spark into 19712-1
nsyca Apr 5, 2017
b923bd5
Merge branch 'master' into 19712-1
nsyca Apr 10, 2017
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,17 @@ object ScalarSubquery {
case _ => false
}.isDefined
}

def hasScalarSubquery(e: Expression): Boolean = {
e.find {
case s: ScalarSubquery => true
case _ => false
}.isDefined
}

def hasScalarSubquery(e: Seq[Expression]): Boolean = {
e.find(hasScalarSubquery(_)).isDefined
}
}

/**
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,11 @@ case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with Pred
* Returns whether the expression returns null or false when all inputs are nulls.
*/
private def canFilterOutNull(e: Expression): Boolean = {
if (!e.deterministic || SubqueryExpression.hasCorrelatedSubquery(e)) return false
if (!e.deterministic ||
SubqueryExpression.hasCorrelatedSubquery(e) ||
SubExprUtils.containsOuter(e)) {
return false
}
val attributes = e.references.toSeq
val emptyRow = new GenericInternalRow(attributes.length)
val boundE = BindReferences.bindReference(e, attributes)
Expand Down Expand Up @@ -147,9 +151,42 @@ case class EliminateOuterJoin(conf: SQLConf) extends Rule[LogicalPlan] with Pred
}
}

private def buildNewJoinType(upperJoin: Join, lowerJoin: Join, otherTableOutput: AttributeSet):
JoinType = {
val conditions = upperJoin.constraints
// Find the predicates reference only on the other table.
val localConditions = conditions.filter(_.references.subsetOf(otherTableOutput))
// Find the predicates reference either the left table or the join predicates
// between the left table and the other table.
val leftConditions = conditions.filter(_.references.
subsetOf(lowerJoin.left.outputSet ++ otherTableOutput)).diff(localConditions)
// Find the predicates reference either the right table or the join predicates
// between the right table and the other table.
val rightConditions = conditions.filter(_.references.
subsetOf(lowerJoin.right.outputSet ++ otherTableOutput)).diff(localConditions)

val leftHasNonNullPredicate = leftConditions.exists(canFilterOutNull)
val rightHasNonNullPredicate = rightConditions.exists(canFilterOutNull)

lowerJoin.joinType match {
case RightOuter if leftHasNonNullPredicate => Inner
case LeftOuter if rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate && rightHasNonNullPredicate => Inner
case FullOuter if leftHasNonNullPredicate => LeftOuter
case FullOuter if rightHasNonNullPredicate => RightOuter
case o => o
}
}

def apply(plan: LogicalPlan): LogicalPlan = plan transform {
case f @ Filter(condition, j @ Join(_, _, RightOuter | LeftOuter | FullOuter, _)) =>
val newJoinType = buildNewJoinType(f, j)
if (j.joinType == newJoinType) f else Filter(condition, j.copy(joinType = newJoinType))
case j @ Join(child @ Join(_, _, RightOuter | LeftOuter | FullOuter, _),
subquery, LeftSemiOrAnti(joinType), joinCond) =>
val newJoinType = buildNewJoinType(j, child, subquery.outputSet)
if (newJoinType == child.joinType) j else {
Join(child.copy(joinType = newJoinType), subquery, joinType, joinCond)
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a new rewrite to convert the outer joins below LeftSemi/LeftAnti to an inner join (or left or right outer join in the case of the original full outer join). EXISTS/IN/NOT EXISTS subqueries are null-filtering predicates if the correlated predicates in the subquery are null-filtering.

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,10 @@ object LeftExistence {
case _ => None
}
}

object LeftSemiOrAnti {
def unapply(joinType: JoinType): Option[JoinType] = joinType match {
case LeftSemi | LeftAnti => Some(joinType)
case _ => None
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ case class Join(
left.constraints
.union(right.constraints)
.union(splitConjunctivePredicates(condition.get).toSet)
case LeftSemi if condition.isDefined =>
case LeftSemi | LeftAnti if condition.isDefined =>
left.constraints
.union(splitConjunctivePredicates(condition.get).toSet)
case j: ExistenceJoin =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,14 @@ class FilterPushdownSuite extends PlanTest {
val batches =
Batch("Subqueries", Once,
EliminateSubqueryAliases) ::
Batch("Subquery", Once,
RewritePredicateSubquery) ::
Batch("Filter Pushdown", FixedPoint(10),
CombineFilters,
PushDownPredicate,
BooleanSimplification,
PushPredicateThroughJoin,
PushLeftSemiLeftAntiThroughJoin,
CollapseProject) :: Nil
}

Expand Down Expand Up @@ -855,8 +858,9 @@ class FilterPushdownSuite extends PlanTest {
.where(Exists(z.where("x.a".attr === "z.a".attr)))
.join(y, Inner, Option("x.a".attr === "y.a".attr))
.analyze
val optimized = Optimize.execute(Optimize.execute(query))
comparePlans(optimized, answer)
val optimized = Optimize.execute(query)
val expected = Optimize.execute(answer)
comparePlans(optimized, expected)
}

test("predicate subquery: push down complex") {
Expand All @@ -875,8 +879,9 @@ class FilterPushdownSuite extends PlanTest {
.join(x, Inner, Option("w.a".attr === "x.a".attr))
.join(y, LeftOuter, Option("x.a".attr === "y.a".attr))
.analyze
val optimized = Optimize.execute(Optimize.execute(query))
comparePlans(optimized, answer)
val optimized = Optimize.execute(query)
val expected = Optimize.execute(answer)
comparePlans(optimized, expected)
}

test("SPARK-20094: don't push predicate with IN subquery into join condition") {
Expand All @@ -890,13 +895,14 @@ class FilterPushdownSuite extends PlanTest {
("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr)))))
.analyze

val expectedPlan = x
val answer = x
.join(z, Inner, Some("x.b".attr === "z.b".attr))
.where("x.a".attr > 1 || "z.c".attr.in(ListQuery(w.select("w.d".attr))))
.analyze

val optimized = Optimize.execute(queryPlan)
comparePlans(optimized, expectedPlan)
val expected = Optimize.execute(answer)
comparePlans(optimized, expected)
}

test("Window: predicate push down -- basic") {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

/**
* Provides helper methods for comparing plans.
Expand Down Expand Up @@ -71,7 +72,11 @@ abstract class PlanTest extends SparkFunSuite with PredicateHelper {
val newCondition =
splitConjunctivePredicates(condition.get).map(rewriteEqual(_)).sortBy(_.hashCode())
.reduce(And)
Join(left, right, joinType, Some(newCondition))
val maskedJoinType = if (joinType.isInstanceOf[ExistenceJoin]) {
val exists = AttributeReference("exists", BooleanType, false)(exprId = ExprId(0))
ExistenceJoin(exists)
} else joinType
Join(left, right, maskedJoinType, Some(newCondition))
}
}

Expand Down
Loading