Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.sql.catalyst.planning

import scala.collection.mutable
import java.util.Locale

import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
Expand Down Expand Up @@ -310,3 +310,95 @@ object PhysicalWindow {
case _ => None
}
}

/**
* Extract partition push down condition from ExpressionSet
* Since origin judge condition is
* {
* !expression.references.isEmpty &&
* expression.references.subsetOf(partitionKeyIds)
* }
*
* This can only push down simple condition expression.
* Such as table:
* CREATE TABLE DEFAULT.PARTITION_TABLE(
* A STRING,
* B STRING)
* PARTITIONED BY(DT STRING)
*
* With SQL:
* SELECT A, B
* FROM DEFAULT.PARTITION_TABLE
* WHERE DT = 20190601 OR (DT = 20190602 AND C = "TEST")
*
* Where condition "DT = 20190601 OR (DT = 20190602 AND C = "TEST")"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's be more clear about the approach here. We try to weaken a predicate so that it only refers to partition columns, by removing some branches in AND. Let's also mention the corner case when there is no partition column attribute in the predicate(we should return Nil in this case?).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
In this place it will return (DT = 20190601 OR DT = 20190602).
But this whole condition will still return to filter.
What I want to do is purely to avoid read unnecessary partitions. When this case we only read partition(dt=20190601 + dt=20190602), If we don't push down this, we will read all data.

In condition " (DT = 20190602 AND C = "TEST") ", DT = 20190602 is C = "TEST"'s precondition.

If the whole condition is DT = 20190601 OR (DT = 20190602 OR C = "TEST"). We should return null, since DT = 20190602 is not C = "TEST"'s constraint.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan
In my code. coming predicate Set[Expression] has a potential AND logical.
For one Expression, it will be restricted by other same level Expression.
and :

  • if it is a combine of AND each side can be a constraint to others, so it one side is tenable, it can return a tenable condition.
  • if it is a combine of OR, if one side is out of control(such as have no condition about partition cols) this whole OR Expression should return NONE. Only when both side of OR 's child is reasonable, it can return a tenable combine of OR.
  • if it 's a multilayer nested Expression combined by BinaryOperator. It will visit the lowest level, if it found one level's OR Expression is untenable, it will break this Expression totally and return null.

* can't be pushed down since it's reference is not subsetOf partition cols
* [[ExtractPartitionPredicates]] is to help extract hided partition logic in Or expression.
* It will return Or( DT = 20190601 , DT = 20190602 ) for partition push down.
*
* For special Or condition such as :
* SELECT A, B
* FROM DEFAULT.PARTITION_TABLE
* WHERE DT = 20190601 OR (DT = 20190602 OR C = "TEST")
*
* It won't think it's a validate push down condition and return a empty expression set.
*
*/
object ExtractPartitionPredicates extends Logging {

private def resolvePredicatesExpression(expr: Expression,
partitionKeyIds: AttributeSet): Expression = {
if (!expr.references.isEmpty && expr.references.subsetOf(partitionKeyIds)) {
expr
} else {
null
}
}

private def constructBinaryOperators(left: Expression,
right: Expression,
op_type: String): Expression = {
op_type.toUpperCase(Locale.ROOT) match {
// When construct 'Or' predicate only when hist children is valid.
// If not, we will return null
case "OR" if left != null && right != null => Or(left, right)
// For 'And' expression , left and right constraints contradict each other.
// It's ok to return one side and both side
case "AND" if left != null || right != null =>
if (left == null) {
right
} else if (right == null) {
left
} else {
And(left, right)
}
case _ => null
}
}

private def resolveExpression(expr: Expression, partitionKeyIds: AttributeSet): Expression = {
expr match {
case And(left, right) =>
constructBinaryOperators(
resolveExpression(left, partitionKeyIds),
resolveExpression(right, partitionKeyIds),
"and")
case or@Or(left, right)
// only Or's both left and right child have partition keys can be chose
// Not valid Or expression will be handled by [[resolvePredicatesExpression]]
// It will return null and destroy treetop 'Or' expression and return null
if or.children.forall(_.references.exists(ref => partitionKeyIds.contains(ref))) =>
constructBinaryOperators(
resolveExpression(left, partitionKeyIds),
resolveExpression(right, partitionKeyIds),
"or")
case _ => resolvePredicatesExpression(expr, partitionKeyIds)
}
}

def apply(predicates: Seq[Expression],
partitionKeyIds: AttributeSet): Seq[Expression] = {
predicates.map(resolveExpression(_, partitionKeyIds))
.filter(_ != null)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.catalog.BucketSpec
import org.apache.spark.sql.catalyst.expressions
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.planning.{ExtractPartitionPredicates, PhysicalOperation}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.{FileSourceScanExec, SparkPlan}
import org.apache.spark.util.collection.BitSet
Expand Down Expand Up @@ -154,8 +154,7 @@ object FileSourceStrategy extends Strategy with Logging {
fsRelation.partitionSchema, fsRelation.sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(normalizedFilters
.filter(_.references.subsetOf(partitionSet)))
ExpressionSet(ExtractPartitionPredicates(normalizedFilters, partitionSet))

logInfo(s"Pruning directories with: ${partitionKeyFilters.mkString(",")}")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources

import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.planning.{ExtractPartitionPredicates, PhysicalOperation}
import org.apache.spark.sql.catalyst.plans.logical.{Filter, LogicalPlan, Project}
import org.apache.spark.sql.catalyst.rules.Rule

Expand Down Expand Up @@ -48,8 +48,7 @@ private[sql] object PruneFileSourcePartitions extends Rule[LogicalPlan] {
partitionSchema, sparkSession.sessionState.analyzer.resolver)
val partitionSet = AttributeSet(partitionColumns)
val partitionKeyFilters =
ExpressionSet(normalizedFilters
.filter(_.references.subsetOf(partitionSet)))
ExpressionSet(ExtractPartitionPredicates(normalizedFilters, partitionSet))

if (partitionKeyFilters.nonEmpty) {
val prunedFileIndex = catalogFileIndex.filterPartitions(partitionKeyFilters.toSeq)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning._
import org.apache.spark.sql.catalyst.plans.logical.{InsertIntoDir, InsertIntoTable, LogicalPlan,
ScriptTransformation}
ScriptTransformation}
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution._
import org.apache.spark.sql.execution.command.{CreateTableCommand, DDLUtils}
Expand Down Expand Up @@ -237,21 +237,24 @@ private[hive] trait HiveStrategies {
* applied.
*/
object HiveTableScans extends Strategy {

def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
case PhysicalOperation(projectList, predicates, relation: HiveTableRelation) =>
// Filter out all predicates that only deal with partition keys, these are given to the
// hive table scan operator to be used for partition pruning.
val partitionKeyIds = AttributeSet(relation.partitionCols)
val (pruningPredicates, otherPredicates) = predicates.partition { predicate =>
val (_, otherPredicates) = predicates.partition { predicate => {
!predicate.references.isEmpty &&
predicate.references.subsetOf(partitionKeyIds)
predicate.references.subsetOf(partitionKeyIds)
}
}
val extractedPruningPredicates = ExtractPartitionPredicates(predicates, partitionKeyIds)

pruneFilterProject(
projectList,
otherPredicates,
identity[Seq[Expression]],
HiveTableScanExec(_, relation, pruningPredicates)(sparkSession)) :: Nil
HiveTableScanExec(_, relation, extractedPruningPredicates)(sparkSession)) :: Nil
case _ =>
Nil
}
Expand Down