Skip to content

Commit a2f1bc1

Browse files
committed
address comments
1 parent 639e9b6 commit a2f1bc1

File tree

6 files changed

+11
-12
lines changed

6 files changed

+11
-12
lines changed

sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/physical/partitioning.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ case class ClusteredDistribution(
9292
}
9393

9494
/**
95-
* Represents data where tuples have been partitioned according to the hash of the given
95+
* Represents data where tuples have been clustered according to the hash of the given
9696
* `expressions`. The hash function is defined as `HashPartitioning.partitionIdExpression`, so only
9797
* [[HashPartitioning]] can satisfy this distribution.
9898
*
9999
* This is a strictly stronger guarantee than [[ClusteredDistribution]]. Given a tuple and the
100100
* number of partitions, this distribution strictly requires which partition the tuple should be in.
101101
*/
102-
case class HashPartitionedDistribution(expressions: Seq[Expression]) extends Distribution {
102+
case class HashClusteredDistribution(expressions: Seq[Expression]) extends Distribution {
103103
require(
104104
expressions != Nil,
105105
"The expressions for hash of a HashPartitionedDistribution should not be Nil. " +
@@ -208,7 +208,7 @@ case class HashPartitioning(expressions: Seq[Expression], numPartitions: Int)
208208
override def satisfies(required: Distribution): Boolean = {
209209
super.satisfies(required) || {
210210
required match {
211-
case h: HashPartitionedDistribution =>
211+
case h: HashClusteredDistribution =>
212212
expressions.length == h.expressions.length && expressions.zip(h.expressions).forall {
213213
case (l, r) => l.semanticEquals(r)
214214
}

sql/core/src/main/scala/org/apache/spark/sql/execution/SparkPlan.scala

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,10 @@ abstract class SparkPlan extends QueryPlan[SparkPlan] with Logging with Serializ
104104
* guarantees that the outputs of these children will have same number of partitions, so that the
105105
* operator can safely zip partitions of these children's result RDDs. Some operators can leverage
106106
* this guarantee to satisfy some interesting requirement, e.g., non-broadcast joins can specify
107-
* HashPartitionedDistribution(a,b) for its left child, and specify
108-
* HashPartitionedDistribution(c,d) for its right child, then it's guaranteed that left and right
109-
* child are co-partitioned by a,b/c,d, which means tuples of same value are in the
110-
* partitions of same index, e.g., (a=1,b=2) and (c=1,d=2) are both in the second partition of
111-
* left and right child.
107+
* HashClusteredDistribution(a,b) for its left child, and specify HashClusteredDistribution(c,d)
108+
* for its right child, then it's guaranteed that left and right child are co-partitioned by
109+
* a,b/c,d, which means tuples of same value are in the partitions of same index, e.g.,
110+
* (a=1,b=2) and (c=1,d=2) are both in the second partition of left and right child.
112111
*/
113112
def requiredChildDistribution: Seq[Distribution] =
114113
Seq.fill(children.size)(UnspecifiedDistribution)

sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/EnsureRequirements.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ case class EnsureRequirements(conf: SQLConf) extends Rule[SparkPlan] {
6868
// these children may not be partitioned in the same way.
6969
// Please see the comment in withCoordinator for more details.
7070
val supportsDistribution = requiredChildDistributions.forall { dist =>
71-
dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashPartitionedDistribution]
71+
dist.isInstanceOf[ClusteredDistribution] || dist.isInstanceOf[HashClusteredDistribution]
7272
}
7373
children.length > 1 && supportsDistribution
7474
}

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/ShuffledHashJoinExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ case class ShuffledHashJoinExec(
4646
"avgHashProbe" -> SQLMetrics.createAverageMetric(sparkContext, "avg hash probe"))
4747

4848
override def requiredChildDistribution: Seq[Distribution] =
49-
HashPartitionedDistribution(leftKeys) :: HashPartitionedDistribution(rightKeys) :: Nil
49+
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
5050

5151
private def buildHashedRelation(iter: Iterator[InternalRow]): HashedRelation = {
5252
val buildDataSize = longMetric("buildDataSize")

sql/core/src/main/scala/org/apache/spark/sql/execution/joins/SortMergeJoinExec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ case class SortMergeJoinExec(
7878
}
7979

8080
override def requiredChildDistribution: Seq[Distribution] =
81-
HashPartitionedDistribution(leftKeys) :: HashPartitionedDistribution(rightKeys) :: Nil
81+
HashClusteredDistribution(leftKeys) :: HashClusteredDistribution(rightKeys) :: Nil
8282

8383
override def outputOrdering: Seq[SortOrder] = joinType match {
8484
// For inner join, orders of both sides keys should be kept.

sql/core/src/main/scala/org/apache/spark/sql/execution/objects.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ case class CoGroupExec(
460460
right: SparkPlan) extends BinaryExecNode with ObjectProducerExec {
461461

462462
override def requiredChildDistribution: Seq[Distribution] =
463-
HashPartitionedDistribution(leftGroup) :: HashPartitionedDistribution(rightGroup) :: Nil
463+
HashClusteredDistribution(leftGroup) :: HashClusteredDistribution(rightGroup) :: Nil
464464

465465
override def requiredChildOrdering: Seq[Seq[SortOrder]] =
466466
leftGroup.map(SortOrder(_, Ascending)) :: rightGroup.map(SortOrder(_, Ascending)) :: Nil

0 commit comments

Comments
 (0)