From cf7bdba442f3d0d77fb12a3930aabe577a638074 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 12 Sep 2022 16:41:13 +0800 Subject: [PATCH 1/7] [SPARK-40407][SQL] Fix the potential data skew caused by df.repartition This PR changes the starting position of RoundRobin. The default position calculated by `new Random(partitionId).nextInt(numPartitions)` may always be same for different partitions, which means each partition will output the data into the same keys when shuffle writes, and some key may not have any data in some special cases. The PR can fix the data skew issue for the special cases. No Will add some tests and watch CI pass --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 907198ad5d23..9132d3fd5843 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -17,7 +17,6 @@ package org.apache.spark.sql.execution.exchange -import java.util.Random import java.util.function.Supplier import scala.concurrent.Future @@ -298,8 +297,7 @@ object ShuffleExchangeExec { } def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => - // Distributes elements evenly across output partitions, starting from a random partition. - var position = new Random(TaskContext.get().partitionId()).nextInt(numPartitions) + var position = TaskContext.get().partitionId() (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 From 73620b4bc1b9eaec14a3846976c3ab725a852660 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 15 Sep 2022 10:27:33 +0800 Subject: [PATCH 2/7] change Random to XORShiftRandom --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++++++ .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 5 +++-- 3 files changed, 12 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 9132d3fd5843..fe4a508cadcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} +import org.apache.spark.util.random.XORShiftRandom /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. @@ -297,7 +298,8 @@ object ShuffleExchangeExec { } def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => - var position = TaskContext.get().partitionId() + // Distributes elements evenly across output partitions, starting from a random partition. + var position = new XORShiftRandom(TaskContext.get().partitionId()).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index d7ea766b21b6..69a9c0433b1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2147,6 +2147,12 @@ class DatasetSuite extends QueryTest (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), (3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) } + + test("SPARK-40407: repartition should not result in severe data skew") { + val df = spark.range(0, 100, 1, 50).repartition(4) + val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect() + assert(result.mkString(",") === "25,23,25,27") + } } case class Bar(a: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index ddf500b3f93b..e24aba8f0d83 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -74,6 +74,7 @@ class AdaptiveQueryExecSuite spark.sparkContext.addSparkListener(listener) val dfAdaptive = sql(query) + dfAdaptive.explain(true) val planBefore = dfAdaptive.queryExecution.executedPlan assert(planBefore.toString.startsWith("AdaptiveSparkPlan isFinalPlan=false")) val result = dfAdaptive.collect() @@ -2127,8 +2128,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { // partition size [0,258,72,72,72] checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4) - // partition size [72,216,216,144,72] - checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 4, 7) + // partition size [216,216,72,0,216] + checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7) } // no skewed partition should be optimized From 2c3c4b4689539ecae7ac1b1a9d809bd7d8c4181b Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 15 Sep 2022 10:27:33 +0800 Subject: [PATCH 3/7] change Random to XORShiftRandom --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 +++- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 6 ++++++ .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 ++-- 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 9132d3fd5843..fe4a508cadcd 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -38,6 +38,7 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} +import org.apache.spark.util.random.XORShiftRandom /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. @@ -297,7 +298,8 @@ object ShuffleExchangeExec { } def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => - var position = TaskContext.get().partitionId() + // Distributes elements evenly across output partitions, starting from a random partition. + var position = new XORShiftRandom(TaskContext.get().partitionId()).nextInt(numPartitions) (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index d7ea766b21b6..69a9c0433b1f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2147,6 +2147,12 @@ class DatasetSuite extends QueryTest (2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12), (3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13)) } + + test("SPARK-40407: repartition should not result in severe data skew") { + val df = spark.range(0, 100, 1, 50).repartition(4) + val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect() + assert(result.mkString(",") === "25,23,25,27") + } } case class Bar(a: Int) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index ddf500b3f93b..f87281ada782 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2127,8 +2127,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { // partition size [0,258,72,72,72] checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4) - // partition size [72,216,216,144,72] - checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 4, 7) + // partition size [216,216,72,0,216] + checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7) } // no skewed partition should be optimized From 20757ae7b02f2ace827cabd2b1c5ca87cedee75e Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 19 Sep 2022 08:04:21 +0800 Subject: [PATCH 4/7] resolve comments --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 7 +++++-- .../src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +- .../sql/execution/adaptive/AdaptiveQueryExecSuite.scala | 4 ++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index fe4a508cadcd..de9b10e48ba3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -17,9 +17,11 @@ package org.apache.spark.sql.execution.exchange +import java.util.Random import java.util.function.Supplier import scala.concurrent.Future +import scala.util.hashing import org.apache.spark._ import org.apache.spark.internal.config @@ -38,7 +40,6 @@ import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType import org.apache.spark.util.MutablePair import org.apache.spark.util.collection.unsafe.sort.{PrefixComparators, RecordComparator} -import org.apache.spark.util.random.XORShiftRandom /** * Common trait for all shuffle exchange implementations to facilitate pattern matching. @@ -299,7 +300,9 @@ object ShuffleExchangeExec { def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => // Distributes elements evenly across output partitions, starting from a random partition. - var position = new XORShiftRandom(TaskContext.get().partitionId()).nextInt(numPartitions) + val partitionId = TaskContext.get().partitionId() + var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions) + (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index 69a9c0433b1f..e5fddac9e16d 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2151,7 +2151,7 @@ class DatasetSuite extends QueryTest test("SPARK-40407: repartition should not result in severe data skew") { val df = spark.range(0, 100, 1, 50).repartition(4) val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect() - assert(result.mkString(",") === "25,23,25,27") + assert(result.mkString(",") === "25,31,25,19") } } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index e24aba8f0d83..fcdf2f5f6d19 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -2128,8 +2128,8 @@ class AdaptiveQueryExecSuite withSQLConf(SQLConf.ADVISORY_PARTITION_SIZE_IN_BYTES.key -> "150") { // partition size [0,258,72,72,72] checkPartitionNumber("SELECT /*+ REBALANCE(c1) */ * FROM v", 2, 4) - // partition size [216,216,72,0,216] - checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 6, 7) + // partition size [144,72,144,216,144] + checkPartitionNumber("SELECT /*+ REBALANCE */ * FROM v", 2, 6) } // no skewed partition should be optimized From 8968d4fdc867553bfc3f1844e56de741414d4396 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Mon, 19 Sep 2022 13:34:21 +0800 Subject: [PATCH 5/7] remove redudant empty line --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index de9b10e48ba3..ee9a883c12b2 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -302,7 +302,6 @@ object ShuffleExchangeExec { // Distributes elements evenly across output partitions, starting from a random partition. val partitionId = TaskContext.get().partitionId() var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions) - (row: InternalRow) => { // The HashPartitioner will handle the `mod` by the number of partitions position += 1 From 0f1c677f38b713a05ef8c28ccb6795edd58def4a Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Wed, 21 Sep 2022 20:43:57 +0800 Subject: [PATCH 6/7] resolve comments --- sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala index e5fddac9e16d..bc7775144e16 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/DatasetSuite.scala @@ -2151,7 +2151,7 @@ class DatasetSuite extends QueryTest test("SPARK-40407: repartition should not result in severe data skew") { val df = spark.range(0, 100, 1, 50).repartition(4) val result = df.mapPartitions(iter => Iterator.single(iter.length)).collect() - assert(result.mkString(",") === "25,31,25,19") + assert(result.sorted.toSeq === Seq(19, 25, 25, 31)) } } From 57773d4168defd1d329a9acb47d1f44872515093 Mon Sep 17 00:00:00 2001 From: Bobby Wang Date: Thu, 22 Sep 2022 17:18:14 +0800 Subject: [PATCH 7/7] add comments --- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index ee9a883c12b2..6f287028f740 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -300,6 +300,12 @@ object ShuffleExchangeExec { def getPartitionKeyExtractor(): InternalRow => Any = newPartitioning match { case RoundRobinPartitioning(numPartitions) => // Distributes elements evenly across output partitions, starting from a random partition. + // nextInt(numPartitions) implementation has a special case when bound is a power of 2, + // which is basically taking several highest bits from the initial seed, with only a + // minimal scrambling. Due to deterministic seed, using the generator only once, + // and lack of scrambling, the position values for power-of-two numPartitions always + // end up being almost the same regardless of the index. substantially scrambling the + // seed by hashing will help. Refer to SPARK-21782 for more details. val partitionId = TaskContext.get().partitionId() var position = new Random(hashing.byteswap32(partitionId)).nextInt(numPartitions) (row: InternalRow) => {