From d09d905180c0eb9d8b1ed9aaf5d207e2fac6547c Mon Sep 17 00:00:00 2001 From: Min Qiu Date: Tue, 1 Dec 2015 18:01:02 -0800 Subject: [PATCH 1/2] Extract the common equality conditions that can be used as a join condition --- .../sql/catalyst/optimizer/Optimizer.scala | 50 +++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f4dba67f13b5..f390751f8e8c 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -40,6 +40,8 @@ object DefaultOptimizer extends Optimizer { Batch("Aggregate", FixedPoint(100), ReplaceDistinctWithAggregate, RemoveLiteralFromGroupExpressions) :: + Batch("CNF factorization", FixedPoint(100), + ExtractEqualJoinCondition) :: Batch("Operator Optimizations", FixedPoint(100), // Operator push down SetOperationPushDown, @@ -911,3 +913,51 @@ object RemoveLiteralFromGroupExpressions extends Rule[LogicalPlan] { a.copy(groupingExpressions = newGrouping) } } + +/** + * Extracts the equal-join condition if any, so that query planner avoids generating cartsian + * product which cause out of memory exception, and performance issues + */ +object ExtractEqualJoinCondition extends Rule[LogicalPlan] with PredicateHelper{ + def apply(plan: LogicalPlan): LogicalPlan = plan transform { + case f @ Join(left, right, joinType, joinCondition) => + joinCondition match { + case Some(e) if isDNF(e) => { + val disjConditions = splitDisjunctivePredicates(e) + val exprMatrix = disjConditions.map(splitConjunctivePredicates) + if(exprMatrix.length <= 1) f + else { + val pattern = exprMatrix(0) + val comExprs: Seq[Expression] = pattern.filter(p => isCommonExpr(p, exprMatrix, 1)) + val newExprMatrix = exprMatrix.map(_.diff(comExprs)) + val newJoinCond = (comExprs :+ newExprMatrix.map(_.reduceLeft(And)).reduceLeft(Or)) + .reduceLeftOption(And) + Join(left, right, joinType, newJoinCond) + } + } + case _ => f + } + } + + def isCommonExpr(pattern: Expression, matrix: Seq[Seq[Expression]], startIndex: Int) : Boolean = { + val duplicatedCount = matrix.drop(startIndex).count(arr => arr.contains(pattern)) + return duplicatedCount == matrix.length - startIndex + } + + def isDNF(condition: Expression) : Boolean = { + condition match { + case Or(left, right) => isDNF(left) && isDNF(right) + case And(left, right) => isCNF(left) && isCNF(right) + case _ => true + } + } + + def isCNF(condition: Expression): Boolean = { + condition match { + case And(left, right) => isCNF(left) && isCNF(right) + case Or(left, right) => false + case _ => true + } + } +} + From 08a76aefcc6036d600fa92aee037515cce22ae09 Mon Sep 17 00:00:00 2001 From: Min Qiu Date: Wed, 2 Dec 2015 17:01:41 -0800 Subject: [PATCH 2/2] bug fix for ColumnPruning rule to deal with Project <- Filter <- Join case --- .../spark/sql/catalyst/optimizer/Optimizer.scala | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala index f390751f8e8c..406e8426374b 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala @@ -199,6 +199,7 @@ object SetOperationPushDown extends Rule[LogicalPlan] with PredicateHelper { * - Aggregate * - Generate * - Project <- Join + * - Project <- Filter <- Join * - LeftSemiJoin */ object ColumnPruning extends Rule[LogicalPlan] { @@ -248,6 +249,16 @@ object ColumnPruning extends Rule[LogicalPlan] { Project(projectList, Join(pruneJoinChild(left), pruneJoinChild(right), joinType, condition)) + // Eliminate unneeded attributes from either side of a Join. + case Project(projectList, Filter(predicates, Join(left, right, joinType, condition))) => + val allReferences: AttributeSet = + AttributeSet( + projectList.flatMap(_.references.iterator)) ++ + predicates.references ++ + condition.map(_.references).getOrElse(AttributeSet(Seq.empty)) + Project(projectList, Filter(predicates, Join( + prunedChild(left, allReferences), prunedChild(right, allReferences), joinType, condition))) + // Eliminate unneeded attributes from right side of a LeftSemiJoin. case Join(left, right, LeftSemi, condition) => // Collect the list of all references required to evaluate the condition.