Skip to content

Commit 2dd29c1

Browse files
committed
Try to only remove attribute-only projects in removeProjectBeforeFilter.
1 parent 0158d85 commit 2dd29c1

File tree

2 files changed

+17
-42
lines changed

2 files changed

+17
-42
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/Optimizer.scala

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -650,7 +650,9 @@ object ColumnPruning extends Rule[LogicalPlan] {
650650
*/
651651
private def removeProjectBeforeFilter(plan: LogicalPlan): LogicalPlan = plan transformUp {
652652
case p1 @ Project(_, f @ Filter(_, p2 @ Project(_, child)))
653-
if p2.outputSet.subsetOf(child.outputSet) =>
653+
if p2.outputSet.subsetOf(child.outputSet) &&
654+
// We only remove attribute-only project.
655+
p2.projectList.forall(_.isInstanceOf[AttributeReference]) =>
654656
p1.copy(child = f.copy(child = child))
655657
}
656658
}

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/subquery.scala

Lines changed: 14 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -517,33 +517,20 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
517517
val alwaysTrueRef = AttributeReference(ALWAYS_TRUE_COLNAME,
518518
BooleanType)(exprId = alwaysTrueExprId)
519519

520+
val aggValRef = query.output.head
521+
520522
if (havingNode.isEmpty) {
521523
// CASE 2: Subquery with no HAVING clause
522524

523-
// We replace original expression id with a new one. The added Alias column
524-
// must use expr id of original output. If we don't replace old expr id in the
525-
// query, the added Project in potential Project-Filter-Project can be removed
526-
// by removeProjectBeforeFilter in ColumnPruning.
527-
val newExprId = NamedExpression.newExprId
528-
val newQuery =
529-
query.transformExpressions(replaceOldExprId(origOutput.exprId, newExprId))
530-
531-
val result = resultWithZeroTups.get
532-
.transform(replaceOldExprId(origOutput.exprId, newExprId))
533-
534-
val newCondition =
535-
conditions.map(_.transform(replaceOldExprId(origOutput.exprId, newExprId)))
536-
537-
val newExpr = Alias(
538-
If(IsNull(alwaysTrueRef),
539-
result,
540-
newQuery.output.head), origOutput.name)(exprId = origOutput.exprId)
541-
542525
Project(
543-
currentChild.output :+ newExpr,
526+
currentChild.output :+
527+
Alias(
528+
If(IsNull(alwaysTrueRef),
529+
resultWithZeroTups.get,
530+
aggValRef), origOutput.name)(exprId = origOutput.exprId),
544531
Join(currentChild,
545-
Project(newQuery.output :+ alwaysTrueExpr, newQuery),
546-
LeftOuter, newCondition.reduceOption(And), JoinHint.NONE))
532+
Project(query.output :+ alwaysTrueExpr, query),
533+
LeftOuter, conditions.reduceOption(And), JoinHint.NONE))
547534

548535
} else {
549536
// CASE 3: Subquery with HAVING clause. Pull the HAVING clause above the join.
@@ -560,34 +547,20 @@ object RewriteCorrelatedScalarSubquery extends Rule[LogicalPlan] {
560547
case op => sys.error(s"Unexpected operator $op in corelated subquery")
561548
}
562549

563-
// We replace original expression id with a new one. The added Alias column
564-
// must use expr id of original output. If we don't replace old expr id in the
565-
// query, the added Project in potential Project-Filter-Project can be removed
566-
// by removeProjectBeforeFilter in ColumnPruning.
567-
val newExprId = NamedExpression.newExprId
568-
val newQuery =
569-
subqueryRoot.transformExpressions(replaceOldExprId(origOutput.exprId, newExprId))
570-
571-
val result = resultWithZeroTups.get
572-
.transform(replaceOldExprId(origOutput.exprId, newExprId))
573-
574-
val newCondition =
575-
conditions.map(_.transform(replaceOldExprId(origOutput.exprId, newExprId)))
576-
577550
// CASE WHEN alwaysTrue IS NULL THEN resultOnZeroTups
578551
// WHEN NOT (original HAVING clause expr) THEN CAST(null AS <type of aggVal>)
579552
// ELSE (aggregate value) END AS (original column name)
580553
val caseExpr = Alias(CaseWhen(Seq(
581-
(IsNull(alwaysTrueRef), result),
582-
(Not(havingNode.get.condition), Literal.create(null, newQuery.output.head.dataType))),
583-
newQuery.output.head),
554+
(IsNull(alwaysTrueRef), resultWithZeroTups.get),
555+
(Not(havingNode.get.condition), Literal.create(null, aggValRef.dataType))),
556+
aggValRef),
584557
origOutput.name)(exprId = origOutput.exprId)
585558

586559
Project(
587560
currentChild.output :+ caseExpr,
588561
Join(currentChild,
589-
Project(newQuery.output :+ alwaysTrueExpr, newQuery),
590-
LeftOuter, newCondition.reduceOption(And), JoinHint.NONE))
562+
Project(subqueryRoot.output :+ alwaysTrueExpr, subqueryRoot),
563+
LeftOuter, conditions.reduceOption(And), JoinHint.NONE))
591564
}
592565
}
593566
}

0 commit comments

Comments
 (0)