Skip to content
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,24 @@
*/
package org.apache.spark.sql.execution

import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning}
import org.apache.spark.sql.catalyst.expressions.{Alias, AttributeMap, AttributeReference, Expression, NamedExpression, SortOrder}
import org.apache.spark.sql.catalyst.plans.physical.Partitioning

/**
* A trait that provides functionality to handle aliases in the `outputExpressions`.
*/
trait AliasAwareOutputExpression extends UnaryExecNode {
protected def outputExpressions: Seq[NamedExpression]

protected def hasAlias: Boolean = outputExpressions.collectFirst { case _: Alias => }.isDefined
private lazy val aliasMap = AttributeMap(outputExpressions.collect {
case a @ Alias(child: AttributeReference, _) => (child, a.toAttribute)
})

protected def replaceAliases(exprs: Seq[Expression]): Seq[Expression] = {
exprs.map {
case a: AttributeReference => replaceAlias(a).getOrElse(a)
case other => other
}
}
protected def hasAlias: Boolean = aliasMap.nonEmpty

protected def replaceAlias(attr: AttributeReference): Option[Attribute] = {
outputExpressions.collectFirst {
case a @ Alias(child: AttributeReference, _) if child.semanticEquals(attr) =>
a.toAttribute
protected def normalizeExpression(exp: Expression): Expression = {
exp.transform {
case attr: AttributeReference => aliasMap.getOrElse(attr, attr)
}
}
}
Expand All @@ -50,7 +46,8 @@ trait AliasAwareOutputPartitioning extends AliasAwareOutputExpression {
final override def outputPartitioning: Partitioning = {
if (hasAlias) {
child.outputPartitioning match {
case h: HashPartitioning => h.copy(expressions = replaceAliases(h.expressions))
case e: Expression =>
normalizeExpression(e).asInstanceOf[Partitioning]
case other => other
}
} else {
Expand All @@ -68,12 +65,7 @@ trait AliasAwareOutputOrdering extends AliasAwareOutputExpression {

final override def outputOrdering: Seq[SortOrder] = {
if (hasAlias) {
orderingExpressions.map { s =>
s.child match {
case a: AttributeReference => s.copy(child = replaceAlias(a).getOrElse(a))
case _ => s
}
}
orderingExpressions.map(normalizeExpression(_).asInstanceOf[SortOrder])
} else {
orderingExpressions
}
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -1,68 +1,65 @@
WholeStageCodegen (16)
WholeStageCodegen (15)
Sort [d_week_seq1]
InputAdapter
Exchange [d_week_seq1] #1
WholeStageCodegen (15)
WholeStageCodegen (14)
Project [d_week_seq1,sun_sales1,sun_sales2,mon_sales1,mon_sales2,tue_sales1,tue_sales2,wed_sales1,wed_sales2,thu_sales1,thu_sales2,fri_sales1,fri_sales2,sat_sales1,sat_sales2]
SortMergeJoin [d_week_seq1,d_week_seq2]
InputAdapter
WholeStageCodegen (7)
WholeStageCodegen (6)
Sort [d_week_seq1]
InputAdapter
Exchange [d_week_seq1] #2
WholeStageCodegen (6)
Project [d_week_seq,sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales]
BroadcastHashJoin [d_week_seq,d_week_seq]
HashAggregate [d_week_seq,sum,sum,sum,sum,sum,sum,sum] [sum(UnscaledValue(CASE WHEN (d_day_name = Sunday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Monday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Tuesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Wednesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Thursday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Friday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Saturday) THEN sales_price ELSE null END)),sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales,sum,sum,sum,sum,sum,sum,sum]
InputAdapter
Exchange [d_week_seq] #3
WholeStageCodegen (4)
HashAggregate [d_week_seq,d_day_name,sales_price] [sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum]
Project [sales_price,d_week_seq,d_day_name]
BroadcastHashJoin [sold_date_sk,d_date_sk]
InputAdapter
Union
WholeStageCodegen (1)
Project [ws_sold_date_sk,ws_ext_sales_price]
Filter [ws_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.web_sales [ws_sold_date_sk,ws_ext_sales_price]
WholeStageCodegen (2)
Project [cs_sold_date_sk,cs_ext_sales_price]
Filter [cs_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.catalog_sales [cs_sold_date_sk,cs_ext_sales_price]
InputAdapter
BroadcastExchange #4
WholeStageCodegen (3)
Filter [d_date_sk,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_week_seq,d_day_name]
InputAdapter
BroadcastExchange #5
WholeStageCodegen (5)
Project [d_week_seq]
Filter [d_year,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_week_seq,d_year]
Project [d_week_seq,sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales]
BroadcastHashJoin [d_week_seq,d_week_seq]
HashAggregate [d_week_seq,sum,sum,sum,sum,sum,sum,sum] [sum(UnscaledValue(CASE WHEN (d_day_name = Sunday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Monday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Tuesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Wednesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Thursday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Friday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Saturday) THEN sales_price ELSE null END)),sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales,sum,sum,sum,sum,sum,sum,sum]
InputAdapter
Exchange [d_week_seq] #2
WholeStageCodegen (4)
HashAggregate [d_week_seq,d_day_name,sales_price] [sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum,sum]
Project [sales_price,d_week_seq,d_day_name]
BroadcastHashJoin [sold_date_sk,d_date_sk]
InputAdapter
Union
WholeStageCodegen (1)
Project [ws_sold_date_sk,ws_ext_sales_price]
Filter [ws_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.web_sales [ws_sold_date_sk,ws_ext_sales_price]
WholeStageCodegen (2)
Project [cs_sold_date_sk,cs_ext_sales_price]
Filter [cs_sold_date_sk]
ColumnarToRow
InputAdapter
Scan parquet default.catalog_sales [cs_sold_date_sk,cs_ext_sales_price]
InputAdapter
BroadcastExchange #3
WholeStageCodegen (3)
Filter [d_date_sk,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_date_sk,d_week_seq,d_day_name]
InputAdapter
BroadcastExchange #4
WholeStageCodegen (5)
Project [d_week_seq]
Filter [d_year,d_week_seq]
ColumnarToRow
InputAdapter
Scan parquet default.date_dim [d_week_seq,d_year]
InputAdapter
WholeStageCodegen (14)
WholeStageCodegen (13)
Sort [d_week_seq2]
InputAdapter
Exchange [d_week_seq2] #6
WholeStageCodegen (13)
Exchange [d_week_seq2] #5
WholeStageCodegen (12)
Project [d_week_seq,sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales]
BroadcastHashJoin [d_week_seq,d_week_seq]
HashAggregate [d_week_seq,sum,sum,sum,sum,sum,sum,sum] [sum(UnscaledValue(CASE WHEN (d_day_name = Sunday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Monday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Tuesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Wednesday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Thursday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Friday) THEN sales_price ELSE null END)),sum(UnscaledValue(CASE WHEN (d_day_name = Saturday) THEN sales_price ELSE null END)),sun_sales,mon_sales,tue_sales,wed_sales,thu_sales,fri_sales,sat_sales,sum,sum,sum,sum,sum,sum,sum]
InputAdapter
ReusedExchange [d_week_seq,sum,sum,sum,sum,sum,sum,sum] #3
ReusedExchange [d_week_seq,sum,sum,sum,sum,sum,sum,sum] #2
InputAdapter
BroadcastExchange #7
WholeStageCodegen (12)
BroadcastExchange #6
WholeStageCodegen (11)
Project [d_week_seq]
Filter [d_year,d_week_seq]
ColumnarToRow
Expand Down
Loading