diff --git a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala index d2169246fe7e..f229061a6d0f 100644 --- a/core/src/main/scala/org/apache/spark/MapOutputTracker.scala +++ b/core/src/main/scala/org/apache/spark/MapOutputTracker.scala @@ -343,15 +343,18 @@ private[spark] abstract class MapOutputTracker(conf: SparkConf) extends Logging /** * Called from executors to get the server URIs and output sizes for each shuffle block that * needs to be read from a given range of map output partitions (startPartition is included but - * endPartition is excluded from the range) and is produced by a specific mapper. + * endPartition is excluded from the range) and is produced by + * a range of mappers (startMapIndex, endMapIndex, startMapIndex is included and + * the endMapIndex is excluded). * * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. */ - def getMapSizesByMapIndex( + def getMapSizesByRange( shuffleId: Int, - mapIndex: Int, + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] @@ -688,20 +691,25 @@ private[spark] class MapOutputTrackerMaster( } /** - * Return the location where the Mapper ran. The locations each includes both a host and an + * Return the locations where the Mappers ran. The locations each includes both a host and an * executor id on that host. * * @param dep shuffle dependency object - * @param mapId the map id + * @param startMapIndex the start map index + * @param endMapIndex the end map index * @return a sequence of locations where task runs. */ - def getMapLocation(dep: ShuffleDependency[_, _, _], mapId: Int): Seq[String] = + def getMapLocation( + dep: ShuffleDependency[_, _, _], + startMapIndex: Int, + endMapIndex: Int): Seq[String] = { val shuffleStatus = shuffleStatuses.get(dep.shuffleId).orNull if (shuffleStatus != null) { shuffleStatus.withMapStatuses { statuses => - if (mapId >= 0 && mapId < statuses.length) { - Seq(statuses(mapId).location.host) + if (startMapIndex < endMapIndex && (startMapIndex >= 0 && endMapIndex < statuses.length)) { + val statusesPicked = statuses.slice(startMapIndex, endMapIndex).filter(_ != null) + statusesPicked.map(_.location.host).toSeq } else { Nil } @@ -737,29 +745,26 @@ private[spark] class MapOutputTrackerMaster( case Some (shuffleStatus) => shuffleStatus.withMapStatuses { statuses => MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses) + shuffleId, startPartition, endPartition, statuses, 0, shuffleStatus.mapStatuses.length) } case None => Iterator.empty } } - override def getMapSizesByMapIndex( + override def getMapSizesByRange( shuffleId: Int, - mapIndex: Int, + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" + + logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + s"partitions $startPartition-$endPartition") shuffleStatuses.get(shuffleId) match { - case Some (shuffleStatus) => + case Some(shuffleStatus) => shuffleStatus.withMapStatuses { statuses => MapOutputTracker.convertMapStatuses( - shuffleId, - startPartition, - endPartition, - statuses, - Some(mapIndex)) + shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) } case None => Iterator.empty @@ -802,7 +807,7 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr val statuses = getStatuses(shuffleId, conf) try { MapOutputTracker.convertMapStatuses( - shuffleId, startPartition, endPartition, statuses) + shuffleId, startPartition, endPartition, statuses, 0, statuses.length) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -811,17 +816,18 @@ private[spark] class MapOutputTrackerWorker(conf: SparkConf) extends MapOutputTr } } - override def getMapSizesByMapIndex( + override def getMapSizesByRange( shuffleId: Int, - mapIndex: Int, + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { - logDebug(s"Fetching outputs for shuffle $shuffleId, mapIndex $mapIndex" + + logDebug(s"Fetching outputs for shuffle $shuffleId, mappers $startMapIndex-$endMapIndex" + s"partitions $startPartition-$endPartition") val statuses = getStatuses(shuffleId, conf) try { - MapOutputTracker.convertMapStatuses(shuffleId, startPartition, endPartition, - statuses, Some(mapIndex)) + MapOutputTracker.convertMapStatuses( + shuffleId, startPartition, endPartition, statuses, startMapIndex, endMapIndex) } catch { case e: MetadataFetchFailedException => // We experienced a fetch failure so our mapStatuses cache is outdated; clear it: @@ -980,7 +986,8 @@ private[spark] object MapOutputTracker extends Logging { * @param startPartition Start of map output partition ID range (included in range) * @param endPartition End of map output partition ID range (excluded from range) * @param statuses List of map statuses, indexed by map partition index. - * @param mapIndex When specified, only shuffle blocks from this mapper will be processed. + * @param startMapIndex Start Map index. + * @param endMapIndex End Map index. * @return A sequence of 2-item tuples, where the first item in the tuple is a BlockManagerId, * and the second item is a sequence of (shuffle block id, shuffle block size, map index) * tuples describing the shuffle blocks that are stored at that block manager. @@ -990,11 +997,12 @@ private[spark] object MapOutputTracker extends Logging { startPartition: Int, endPartition: Int, statuses: Array[MapStatus], - mapIndex : Option[Int] = None): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { + startMapIndex : Int, + endMapIndex: Int): Iterator[(BlockManagerId, Seq[(BlockId, Long, Int)])] = { assert (statuses != null) val splitsByAddress = new HashMap[BlockManagerId, ListBuffer[(BlockId, Long, Int)]] val iter = statuses.iterator.zipWithIndex - for ((status, mapIndex) <- mapIndex.map(index => iter.filter(_._2 == index)).getOrElse(iter)) { + for ((status, mapIndex) <- iter.slice(startMapIndex, endMapIndex)) { if (status == null) { val errorMessage = s"Missing an output location for shuffle $shuffleId" logError(errorMessage) diff --git a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala index 01aa43eb9763..057b0d6e0b0a 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/ShuffleManager.scala @@ -55,12 +55,14 @@ private[spark] trait ShuffleManager { metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] /** - * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) - * that are produced by one specific mapper. Called on executors by reduce tasks. + * Get a reader for a range of reduce partitions (startPartition to endPartition-1, inclusive) to + * read from map output (startMapIndex to endMapIndex - 1, inclusive). + * Called on executors by reduce tasks. */ - def getReaderForOneMapper[K, C]( + def getReaderForRange[K, C]( handle: ShuffleHandle, - mapIndex: Int, + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, diff --git a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala index 3cd04de0f741..aefcb59b8bb8 100644 --- a/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala +++ b/core/src/main/scala/org/apache/spark/shuffle/sort/SortShuffleManager.scala @@ -131,15 +131,16 @@ private[spark] class SortShuffleManager(conf: SparkConf) extends ShuffleManager shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) } - override def getReaderForOneMapper[K, C]( + override def getReaderForRange[K, C]( handle: ShuffleHandle, - mapIndex: Int, + startMapIndex: Int, + endMapIndex: Int, startPartition: Int, endPartition: Int, context: TaskContext, metrics: ShuffleReadMetricsReporter): ShuffleReader[K, C] = { - val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByMapIndex( - handle.shuffleId, mapIndex, startPartition, endPartition) + val blocksByAddress = SparkEnv.get.mapOutputTracker.getMapSizesByRange( + handle.shuffleId, startMapIndex, endMapIndex, startPartition, endPartition) new BlockStoreShuffleReader( handle.asInstanceOf[BaseShuffleHandle[K, _, C]], blocksByAddress, context, metrics, shouldBatchFetch = canUseBatchFetch(startPartition, endPartition, context)) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala index 1e05b6e2f99e..c1e2e46b18b1 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala @@ -451,6 +451,36 @@ object SQLConf { .booleanConf .createWithDefault(true) + val ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED = + buildConf("spark.sql.adaptive.optimizeSkewedJoin.enabled") + .doc("When true and adaptive execution is enabled, a skewed join is automatically handled at " + + "runtime.") + .booleanConf + .createWithDefault(true) + + val ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD = + buildConf("spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionSizeThreshold") + .doc("Configures the minimum size in bytes for a partition that is considered as a skewed " + + "partition in adaptive skewed join.") + .bytesConf(ByteUnit.BYTE) + .createWithDefault(64 * 1024 * 1024) + + val ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR = + buildConf("spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionFactor") + .doc("A partition is considered as a skewed partition if its size is larger than" + + " this factor multiple the median partition size and also larger than " + + s" ${ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key}") + .intConf + .createWithDefault(10) + + val ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS = + buildConf("spark.sql.adaptive.optimizeSkewedJoin.skewedPartitionMaxSplits") + .doc("Configures the maximum number of task to handle a skewed partition in adaptive skewed" + + "join.") + .intConf + .checkValue( _ >= 1, "The split size at least be 1") + .createWithDefault(5) + val NON_EMPTY_PARTITION_RATIO_FOR_BROADCAST_JOIN = buildConf("spark.sql.adaptive.nonEmptyPartitionRatioForBroadcastJoin") .doc("The relation with a non-empty partition ratio lower than this config will not be " + diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala index 4c19f95796d0..efa493923ccc 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/ShuffledRowRDD.scala @@ -116,7 +116,7 @@ class CoalescedPartitioner(val parent: Partitioner, val partitionStartIndices: A class ShuffledRowRDD( var dependency: ShuffleDependency[Int, InternalRow, InternalRow], metrics: Map[String, SQLMetric], - specifiedPartitionStartIndices: Option[Array[Int]] = None) + specifiedPartitionIndices: Option[Array[(Int, Int)]] = None) extends RDD[InternalRow](dependency.rdd.context, Nil) { if (SQLConf.get.fetchShuffleBlocksInBatchEnabled) { @@ -126,8 +126,8 @@ class ShuffledRowRDD( private[this] val numPreShufflePartitions = dependency.partitioner.numPartitions - private[this] val partitionStartIndices: Array[Int] = specifiedPartitionStartIndices match { - case Some(indices) => indices + private[this] val partitionStartIndices: Array[Int] = specifiedPartitionIndices match { + case Some(indices) => indices.map(_._1) case None => // When specifiedPartitionStartIndices is not defined, every post-shuffle partition // corresponds to a pre-shuffle partition. @@ -142,16 +142,15 @@ class ShuffledRowRDD( override val partitioner: Option[Partitioner] = Some(part) override def getPartitions: Array[Partition] = { - assert(partitionStartIndices.length == part.numPartitions) - Array.tabulate[Partition](partitionStartIndices.length) { i => - val startIndex = partitionStartIndices(i) - val endIndex = - if (i < partitionStartIndices.length - 1) { - partitionStartIndices(i + 1) - } else { - numPreShufflePartitions + specifiedPartitionIndices match { + case Some(indices) => + Array.tabulate[Partition](indices.length) { i => + new ShuffledRowRDDPartition(i, indices(i)._1, indices(i)._2) + } + case None => + Array.tabulate[Partition](numPreShufflePartitions) { i => + new ShuffledRowRDDPartition(i, i, i + 1) } - new ShuffledRowRDDPartition(i, startIndex, endIndex) } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala index f5591072f696..65d0f6a6a41d 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/AdaptiveSparkPlanExec.scala @@ -87,6 +87,10 @@ case class AdaptiveSparkPlanExec( // optimizations should be stage-independent. @transient private val queryStageOptimizerRules: Seq[Rule[SparkPlan]] = Seq( ReuseAdaptiveSubquery(conf, context.subqueryCache), + // Here the 'OptimizeSkewedPartitions' rule should be executed + // before 'ReduceNumShufflePartitions', as the skewed partition handled + // in 'OptimizeSkewedPartitions' rule, should be omitted in 'ReduceNumShufflePartitions'. + OptimizeSkewedJoin(conf), ReduceNumShufflePartitions(conf), // The rule of 'OptimizeLocalShuffleReader' need to make use of the 'partitionStartIndices' // in 'ReduceNumShufflePartitions' rule. So it must be after 'ReduceNumShufflePartitions' rule. diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala index 6385ea67c49f..19b78f5e36c9 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/LocalShuffledRowRDD.scala @@ -82,7 +82,7 @@ class LocalShuffledRowRDD( override def getPreferredLocations(partition: Partition): Seq[String] = { val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] - tracker.getMapLocation(dependency, partition.index) + tracker.getMapLocation(dependency, partition.index, partition.index + 1) } override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { @@ -92,9 +92,11 @@ class LocalShuffledRowRDD( // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, // as well as the `tempMetrics` for basic shuffle metrics. val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) - val reader = SparkEnv.get.shuffleManager.getReaderForOneMapper( + + val reader = SparkEnv.get.shuffleManager.getReaderForRange( dependency.shuffleHandle, mapIndex, + mapIndex + 1, localRowPartition.startPartition, localRowPartition.endPartition, context, diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala index 0659a89d2f80..fd55f6c5eb4e 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeLocalShuffleReader.scala @@ -71,7 +71,7 @@ case class OptimizeLocalShuffleReader(conf: SQLConf) extends Rule[SparkPlan] { plan match { case c @ CoalescedShuffleReaderExec(s: ShuffleQueryStageExec, _) => LocalShuffleReaderExec( - s, getPartitionStartIndices(s, Some(c.partitionStartIndices.length))) + s, getPartitionStartIndices(s, Some(c.partitionIndices.length))) case s: ShuffleQueryStageExec => LocalShuffleReaderExec(s, getPartitionStartIndices(s, None)) } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala new file mode 100644 index 000000000000..75d4184a2c14 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/OptimizeSkewedJoin.scala @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import scala.collection.mutable +import scala.collection.mutable.ArrayBuffer + +import org.apache.spark.{MapOutputStatistics, MapOutputTrackerMaster, SparkEnv} +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Attribute +import org.apache.spark.sql.catalyst.plans._ +import org.apache.spark.sql.catalyst.plans.physical.{Partitioning, UnknownPartitioning} +import org.apache.spark.sql.catalyst.rules.Rule +import org.apache.spark.sql.execution._ +import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec +import org.apache.spark.sql.execution.joins.SortMergeJoinExec +import org.apache.spark.sql.internal.SQLConf + +case class OptimizeSkewedJoin(conf: SQLConf) extends Rule[SparkPlan] { + + private val supportedJoinTypes = + Inner :: Cross :: LeftSemi :: LeftAnti :: LeftOuter :: RightOuter :: Nil + + /** + * A partition is considered as a skewed partition if its size is larger than the median + * partition size * spark.sql.adaptive.skewedPartitionFactor and also larger than + * spark.sql.adaptive.skewedPartitionSizeThreshold. + */ + private def isSkewed( + stats: MapOutputStatistics, + partitionId: Int, + medianSize: Long): Boolean = { + val size = stats.bytesByPartitionId(partitionId) + size > medianSize * conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_FACTOR) && + size > conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD) + } + + private def medianSize(stats: MapOutputStatistics): Long = { + val numPartitions = stats.bytesByPartitionId.length + val bytes = stats.bytesByPartitionId.sorted + if (bytes(numPartitions / 2) > 0) bytes(numPartitions / 2) else 1 + } + + /** + * Get the map size of the specific reduce shuffle Id. + */ + private def getMapSizesForReduceId(shuffleId: Int, partitionId: Int): Array[Long] = { + val mapOutputTracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + mapOutputTracker.shuffleStatuses(shuffleId).mapStatuses.map{_.getSizeForBlock(partitionId)} + } + + /** + * Split the skewed partition based on the map size and the max split number. + */ + private def getMapStartIndices(stage: ShuffleQueryStageExec, partitionId: Int): Array[Int] = { + val shuffleId = stage.shuffle.shuffleDependency.shuffleHandle.shuffleId + val mapPartitionSizes = getMapSizesForReduceId(shuffleId, partitionId) + val maxSplits = math.min(conf.getConf( + SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_MAX_SPLITS), mapPartitionSizes.length) + val avgPartitionSize = mapPartitionSizes.sum / maxSplits + val advisoryPartitionSize = math.max(avgPartitionSize, + conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD)) + val partitionIndices = mapPartitionSizes.indices + val partitionStartIndices = ArrayBuffer[Int]() + var postMapPartitionSize = mapPartitionSizes(0) + partitionStartIndices += 0 + partitionIndices.drop(1).foreach { nextPartitionIndex => + val nextMapPartitionSize = mapPartitionSizes(nextPartitionIndex) + if (postMapPartitionSize + nextMapPartitionSize > advisoryPartitionSize) { + partitionStartIndices += nextPartitionIndex + postMapPartitionSize = nextMapPartitionSize + } else { + postMapPartitionSize += nextMapPartitionSize + } + } + + if (partitionStartIndices.size > maxSplits) { + partitionStartIndices.take(maxSplits).toArray + } else partitionStartIndices.toArray + } + + private def getStatistics(stage: ShuffleQueryStageExec): MapOutputStatistics = { + assert(stage.resultOption.isDefined, "ShuffleQueryStageExec should" + + " already be ready when executing OptimizeSkewedPartitions rule") + stage.resultOption.get.asInstanceOf[MapOutputStatistics] + } + + private def supportSplitOnLeftPartition(joinType: JoinType) = { + joinType == Inner || joinType == Cross || joinType == LeftSemi || + joinType == LeftAnti || joinType == LeftOuter + } + + private def supportSplitOnRightPartition(joinType: JoinType) = { + joinType == Inner || joinType == Cross || joinType == RightOuter + } + + private def getNumMappers(stage: ShuffleQueryStageExec): Int = { + stage.shuffle.shuffleDependency.rdd.partitions.length + } + + def handleSkewJoin(plan: SparkPlan): SparkPlan = plan.transformUp { + case smj @ SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, + s1 @ SortExec(_, _, left: ShuffleQueryStageExec, _), + s2 @ SortExec(_, _, right: ShuffleQueryStageExec, _)) + if supportedJoinTypes.contains(joinType) => + val leftStats = getStatistics(left) + val rightStats = getStatistics(right) + val numPartitions = leftStats.bytesByPartitionId.length + + val leftMedSize = medianSize(leftStats) + val rightMedSize = medianSize(rightStats) + val leftSizeInfo = s"median size: $leftMedSize, max size: ${leftStats.bytesByPartitionId.max}" + val rightSizeInfo = s"median size: $rightMedSize," + + s" max size: ${rightStats.bytesByPartitionId.max}" + logDebug( + s""" + |Try to optimize skewed join. + |Left side partition size: $leftSizeInfo + |Right side partition size: $rightSizeInfo + """.stripMargin) + + val skewedPartitions = mutable.HashSet[Int]() + val subJoins = mutable.ArrayBuffer[SparkPlan]() + for (partitionId <- 0 until numPartitions) { + val isLeftSkew = isSkewed(leftStats, partitionId, leftMedSize) + val isRightSkew = isSkewed(rightStats, partitionId, rightMedSize) + val leftMapIdStartIndices = if (isLeftSkew && supportSplitOnLeftPartition(joinType)) { + getMapStartIndices(left, partitionId) + } else { + Array(0) + } + val rightMapIdStartIndices = if (isRightSkew && supportSplitOnRightPartition(joinType)) { + getMapStartIndices(right, partitionId) + } else { + Array(0) + } + + if (leftMapIdStartIndices.length > 1 || rightMapIdStartIndices.length > 1) { + skewedPartitions += partitionId + for (i <- 0 until leftMapIdStartIndices.length; + j <- 0 until rightMapIdStartIndices.length) { + val leftEndMapId = if (i == leftMapIdStartIndices.length - 1) { + getNumMappers(left) + } else { + leftMapIdStartIndices(i + 1) + } + val rightEndMapId = if (j == rightMapIdStartIndices.length - 1) { + getNumMappers(right) + } else { + rightMapIdStartIndices(j + 1) + } + // TODO: we may can optimize the sort merge join to broad cast join after + // obtaining the raw data size of per partition, + val leftSkewedReader = SkewedPartitionReaderExec( + left, partitionId, leftMapIdStartIndices(i), leftEndMapId) + val rightSkewedReader = SkewedPartitionReaderExec(right, partitionId, + rightMapIdStartIndices(j), rightEndMapId) + subJoins += SortMergeJoinExec(leftKeys, rightKeys, joinType, condition, + s1.copy(child = leftSkewedReader), s2.copy(child = rightSkewedReader)) + } + } + } + logDebug(s"number of skewed partitions is ${skewedPartitions.size}") + if (skewedPartitions.nonEmpty) { + val optimizedSmj = smj.transformDown { + case sort @ SortExec(_, _, shuffleStage: ShuffleQueryStageExec, _) => + sort.copy(child = PartialShuffleReaderExec(shuffleStage, skewedPartitions.toSet)) + } + subJoins += optimizedSmj + UnionExec(subJoins) + } else { + smj + } + } + + override def apply(plan: SparkPlan): SparkPlan = { + if (!conf.getConf(SQLConf.ADAPTIVE_EXECUTION_SKEWED_JOIN_ENABLED)) { + return plan + } + + def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match { + case _: LocalShuffleReaderExec => Nil + case _: CoalescedShuffleReaderExec => Nil + case stage: ShuffleQueryStageExec => Seq(stage) + case _ => plan.children.flatMap(collectShuffleStages) + } + + val shuffleStages = collectShuffleStages(plan) + + if (shuffleStages.length == 2) { + // Currently we only support handling skewed join for 2 table join. + handleSkewJoin(plan) + } else { + plan + + } + } +} + +/** + * A wrapper of shuffle query stage, which submits one reduce task to read a single + * shuffle partition 'partitionIndex' produced by the mappers in range [startMapIndex, endMapIndex). + * This is used to increase the parallelism when reading skewed partitions. + * + * @param child It's usually `ShuffleQueryStageExec`, but can be the shuffle exchange + * node during canonicalization. + * @param partitionIndex The pre shuffle partition index. + * @param startMapIndex The start map index. + * @param endMapIndex The end map index. + */ +case class SkewedPartitionReaderExec( + child: QueryStageExec, + partitionIndex: Int, + startMapIndex: Int, + endMapIndex: Int) extends LeafExecNode { + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = { + UnknownPartitioning(1) + } + private var cachedSkewedShuffleRDD: SkewedShuffledRowRDD = null + + override def doExecute(): RDD[InternalRow] = { + if (cachedSkewedShuffleRDD == null) { + cachedSkewedShuffleRDD = child match { + case stage: ShuffleQueryStageExec => + stage.shuffle.createSkewedShuffleRDD(partitionIndex, startMapIndex, endMapIndex) + case _ => + throw new IllegalStateException("operating on canonicalization plan") + } + } + cachedSkewedShuffleRDD + } +} + +/** + * A wrapper of shuffle query stage, which skips some partitions when reading the shuffle blocks. + * + * @param child It's usually `ShuffleQueryStageExec`, but can be the shuffle exchange node during + * canonicalization. + * @param excludedPartitions The partitions to skip when reading. + */ +case class PartialShuffleReaderExec( + child: QueryStageExec, + excludedPartitions: Set[Int]) extends UnaryExecNode { + + override def output: Seq[Attribute] = child.output + + override def outputPartitioning: Partitioning = { + UnknownPartitioning(1) + } + + private def shuffleExchange(): ShuffleExchangeExec = child match { + case stage: ShuffleQueryStageExec => stage.shuffle + case _ => + throw new IllegalStateException("operating on canonicalization plan") + } + + private def getPartitionIndexRanges(): Array[(Int, Int)] = { + val length = shuffleExchange().shuffleDependency.partitioner.numPartitions + (0 until length).filterNot(excludedPartitions.contains).map(i => (i, i + 1)).toArray + } + + private var cachedShuffleRDD: RDD[InternalRow] = null + + override def doExecute(): RDD[InternalRow] = { + if (cachedShuffleRDD == null) { + cachedShuffleRDD = if (excludedPartitions.isEmpty) { + child.execute() + } else { + shuffleExchange().createShuffledRDD(Some(getPartitionIndexRanges())) + } + } + cachedShuffleRDD + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala index 3767e6cae46e..2c50b638b4d4 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/ReduceNumShufflePartitions.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.execution.adaptive -import scala.collection.mutable.ArrayBuffer +import scala.collection.mutable.{ArrayBuffer, HashSet} import org.apache.spark.MapOutputStatistics import org.apache.spark.rdd.RDD @@ -54,20 +54,28 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { if (!conf.reducePostShufflePartitionsEnabled) { return plan } - if (!plan.collectLeaves().forall(_.isInstanceOf[QueryStageExec])) { + // 'SkewedShufflePartitionReader' is added by us, so it's safe to ignore it when changing + // number of reducers. + val leafNodes = plan.collectLeaves().filter(!_.isInstanceOf[SkewedPartitionReaderExec]) + if (!leafNodes.forall(_.isInstanceOf[QueryStageExec])) { // If not all leaf nodes are query stages, it's not safe to reduce the number of // shuffle partitions, because we may break the assumption that all children of a spark plan // have same number of output partitions. return plan } - def collectShuffleStages(plan: SparkPlan): Seq[ShuffleQueryStageExec] = plan match { + def collectShuffles(plan: SparkPlan): Seq[SparkPlan] = plan match { case _: LocalShuffleReaderExec => Nil + case p: PartialShuffleReaderExec => Seq(p) case stage: ShuffleQueryStageExec => Seq(stage) - case _ => plan.children.flatMap(collectShuffleStages) + case _ => plan.children.flatMap(collectShuffles) } - val shuffleStages = collectShuffleStages(plan) + val shuffles = collectShuffles(plan) + val shuffleStages = shuffles.map { + case PartialShuffleReaderExec(s: ShuffleQueryStageExec, _) => s + case s: ShuffleQueryStageExec => s + } // ShuffleExchanges introduced by repartition do not support changing the number of partitions. // We change the number of partitions in the stage only if all the ShuffleExchanges support it. if (!shuffleStages.forall(_.shuffle.canChangeNumPartitions)) { @@ -86,15 +94,32 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { // partition) and a result of a SortMergeJoin (multiple partitions). val distinctNumPreShufflePartitions = validMetrics.map(stats => stats.bytesByPartitionId.length).distinct - if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1) { - val partitionStartIndices = estimatePartitionStartIndices(validMetrics.toArray) + val distinctExcludedPartitions = shuffles.map { + case PartialShuffleReaderExec(_, excludedPartitions) => excludedPartitions + case _: ShuffleQueryStageExec => Set.empty[Int] + }.distinct + if (validMetrics.nonEmpty && distinctNumPreShufflePartitions.length == 1 + && distinctExcludedPartitions.length == 1) { + val excludedPartitions = distinctExcludedPartitions.head + val partitionIndices = estimatePartitionStartAndEndIndices( + validMetrics.toArray, excludedPartitions) // This transformation adds new nodes, so we must use `transformUp` here. - plan.transformUp { - // even for shuffle exchange whose input RDD has 0 partition, we should still update its - // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same - // number of output partitions. - case stage: ShuffleQueryStageExec => - CoalescedShuffleReaderExec(stage, partitionStartIndices) + // Even for shuffle exchange whose input RDD has 0 partition, we should still update its + // `partitionStartIndices`, so that all the leaf shuffles in a stage have the same + // number of output partitions. + val visitedStages = HashSet.empty[Int] + plan.transformDown { + // Replace `PartialShuffleReaderExec` with `CoalescedShuffleReaderExec`, which keeps the + // "excludedPartition" requirement and also merges some partitions. + case PartialShuffleReaderExec(stage: ShuffleQueryStageExec, _) => + visitedStages.add(stage.id) + CoalescedShuffleReaderExec(stage, partitionIndices) + + // We are doing `transformDown`, so the `ShuffleQueryStageExec` may already be optimized + // and wrapped by `CoalescedShuffleReaderExec`. + case stage: ShuffleQueryStageExec if !visitedStages.contains(stage.id) => + visitedStages.add(stage.id) + CoalescedShuffleReaderExec(stage, partitionIndices) } } else { plan @@ -103,13 +128,15 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { } /** - * Estimates partition start indices for post-shuffle partitions based on - * mapOutputStatistics provided by all pre-shuffle stages. + * Estimates partition start and end indices for post-shuffle partitions based on + * mapOutputStatistics provided by all pre-shuffle stages and skip the omittedPartitions + * already handled in skewed partition optimization. */ // visible for testing. - private[sql] def estimatePartitionStartIndices( - mapOutputStatistics: Array[MapOutputStatistics]): Array[Int] = { - val minNumPostShufflePartitions = conf.minNumPostShufflePartitions + private[sql] def estimatePartitionStartAndEndIndices( + mapOutputStatistics: Array[MapOutputStatistics], + excludedPartitions: Set[Int] = Set.empty): Array[(Int, Int)] = { + val minNumPostShufflePartitions = conf.minNumPostShufflePartitions - excludedPartitions.size val advisoryTargetPostShuffleInputSize = conf.targetPostShuffleInputSize // If minNumPostShufflePartitions is defined, it is possible that we need to use a // value less than advisoryTargetPostShuffleInputSize as the target input size of @@ -141,39 +168,35 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { distinctNumPreShufflePartitions.length == 1, "There should be only one distinct value of the number pre-shuffle partitions " + "among registered Exchange operator.") - val numPreShufflePartitions = distinctNumPreShufflePartitions.head val partitionStartIndices = ArrayBuffer[Int]() - // The first element of partitionStartIndices is always 0. - partitionStartIndices += 0 - - var postShuffleInputSize = 0L - - var i = 0 - while (i < numPreShufflePartitions) { - // We calculate the total size of ith pre-shuffle partitions from all pre-shuffle stages. - // Then, we add the total size to postShuffleInputSize. - var nextShuffleInputSize = 0L - var j = 0 - while (j < mapOutputStatistics.length) { - nextShuffleInputSize += mapOutputStatistics(j).bytesByPartitionId(i) - j += 1 - } - - // If including the nextShuffleInputSize would exceed the target partition size, then start a - // new partition. - if (i > 0 && postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize) { - partitionStartIndices += i - // reset postShuffleInputSize. - postShuffleInputSize = nextShuffleInputSize - } else { - postShuffleInputSize += nextShuffleInputSize - } - - i += 1 + val partitionEndIndices = ArrayBuffer[Int]() + val numPartitions = distinctNumPreShufflePartitions.head + val includedPartitions = (0 until numPartitions).filter(!excludedPartitions.contains(_)) + val firstStartIndex = includedPartitions(0) + partitionStartIndices += firstStartIndex + var postShuffleInputSize = mapOutputStatistics.map(_.bytesByPartitionId(firstStartIndex)).sum + var i = firstStartIndex + includedPartitions.drop(1).foreach { nextPartitionIndex => + val nextShuffleInputSize = + mapOutputStatistics.map(_.bytesByPartitionId(nextPartitionIndex)).sum + // If nextPartitionIndices is skewed and omitted, or including + // the nextShuffleInputSize would exceed the target partition size, + // then start a new partition. + if (nextPartitionIndex != i + 1 || + (postShuffleInputSize + nextShuffleInputSize > targetPostShuffleInputSize)) { + partitionEndIndices += i + 1 + partitionStartIndices += nextPartitionIndex + // reset postShuffleInputSize. + postShuffleInputSize = nextShuffleInputSize + i = nextPartitionIndex + } else { + postShuffleInputSize += nextShuffleInputSize + i += 1 + } } - - partitionStartIndices.toArray + partitionEndIndices += i + 1 + partitionStartIndices.zip(partitionEndIndices).toArray } } @@ -186,12 +209,12 @@ case class ReduceNumShufflePartitions(conf: SQLConf) extends Rule[SparkPlan] { */ case class CoalescedShuffleReaderExec( child: SparkPlan, - partitionStartIndices: Array[Int]) extends UnaryExecNode { + partitionIndices: Array[(Int, Int)]) extends UnaryExecNode { override def output: Seq[Attribute] = child.output override def outputPartitioning: Partitioning = { - UnknownPartitioning(partitionStartIndices.length) + UnknownPartitioning(partitionIndices.length) } private var cachedShuffleRDD: ShuffledRowRDD = null @@ -200,7 +223,7 @@ case class CoalescedShuffleReaderExec( if (cachedShuffleRDD == null) { cachedShuffleRDD = child match { case stage: ShuffleQueryStageExec => - stage.shuffle.createShuffledRDD(Some(partitionStartIndices)) + stage.shuffle.createShuffledRDD(Some(partitionIndices)) case _ => throw new IllegalStateException("operating on canonicalization plan") } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewedShuffledRowRDD.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewedShuffledRowRDD.scala new file mode 100644 index 000000000000..52f793b24aa1 --- /dev/null +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/SkewedShuffledRowRDD.scala @@ -0,0 +1,78 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.execution.adaptive + +import org.apache.spark._ +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.execution.metric.{SQLMetric, SQLShuffleReadMetricsReporter} + +/** + * The [[Partition]] used by [[SkewedShuffledRowRDD]]. + */ +class SkewedShuffledRowRDDPartition(override val index: Int) extends Partition + +/** + * This is a specialized version of [[org.apache.spark.sql.execution.ShuffledRowRDD]]. This is used + * in Spark SQL adaptive execution to solve data skew issues. This RDD includes rearranged + * partitions from mappers. + * + * This RDD takes a [[ShuffleDependency]] (`dependency`), a partitionIndex + * and the range of startMapIndex to endMapIndex. + */ +class SkewedShuffledRowRDD( + var dependency: ShuffleDependency[Int, InternalRow, InternalRow], + partitionIndex: Int, + startMapIndex: Int, + endMapIndex: Int, + metrics: Map[String, SQLMetric]) + extends RDD[InternalRow](dependency.rdd.context, Nil) { + + override def getDependencies: Seq[Dependency[_]] = List(dependency) + + override def getPartitions: Array[Partition] = { + Array(new SkewedShuffledRowRDDPartition(0)) + } + + override def getPreferredLocations(partition: Partition): Seq[String] = { + val tracker = SparkEnv.get.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster] + tracker.getMapLocation(dependency, startMapIndex, endMapIndex) + } + + override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { + val tempMetrics = context.taskMetrics().createTempShuffleReadMetrics() + // `SQLShuffleReadMetricsReporter` will update its own metrics for SQL exchange operator, + // as well as the `tempMetrics` for basic shuffle metrics. + val sqlMetricsReporter = new SQLShuffleReadMetricsReporter(tempMetrics, metrics) + + val reader = SparkEnv.get.shuffleManager.getReaderForRange( + dependency.shuffleHandle, + startMapIndex, + endMapIndex, + partitionIndex, + partitionIndex + 1, + context, + sqlMetricsReporter) + reader.read().asInstanceOf[Iterator[Product2[Int, InternalRow]]].map(_._2) + } + + override def clearDependencies() { + super.clearDependencies() + dependency = null + } +} diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 4281f01e2756..ffcd6c778335 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Uns import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.physical._ import org.apache.spark.sql.execution._ -import org.apache.spark.sql.execution.adaptive.LocalShuffledRowRDD +import org.apache.spark.sql.execution.adaptive.{LocalShuffledRowRDD, SkewedShuffledRowRDD} import org.apache.spark.sql.execution.metric.{SQLMetric, SQLMetrics, SQLShuffleReadMetricsReporter, SQLShuffleWriteMetricsReporter} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.StructType @@ -90,8 +90,9 @@ case class ShuffleExchangeExec( writeMetrics) } - def createShuffledRDD(partitionStartIndices: Option[Array[Int]]): ShuffledRowRDD = { - new ShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndices) + def createShuffledRDD( + partitionRanges: Option[Array[(Int, Int)]]): ShuffledRowRDD = { + new ShuffledRowRDD(shuffleDependency, readMetrics, partitionRanges) } def createLocalShuffleRDD( @@ -99,6 +100,14 @@ case class ShuffleExchangeExec( new LocalShuffledRowRDD(shuffleDependency, readMetrics, partitionStartIndicesPerMapper) } + def createSkewedShuffleRDD( + partitionIndex: Int, + startMapIndex: Int, + endMapIndex: Int): SkewedShuffledRowRDD = { + new SkewedShuffledRowRDD(shuffleDependency, + partitionIndex, startMapIndex, endMapIndex, readMetrics) + } + /** * Caches the created ShuffleRowRDD so we can reuse that. */ diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala index 70d5a8a615ad..04b4d4f29f85 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ReduceNumShufflePartitionsSuite.scala @@ -61,7 +61,7 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA new MapOutputStatistics(index, bytesByPartitionId) } val estimatedPartitionStartIndices = - rule.estimatePartitionStartIndices(mapOutputStatistics) + rule.estimatePartitionStartAndEndIndices(mapOutputStatistics).map(_._1) assert(estimatedPartitionStartIndices === expectedPartitionStartIndices) } @@ -133,7 +133,8 @@ class ReduceNumShufflePartitionsSuite extends SparkFunSuite with BeforeAndAfterA Array( new MapOutputStatistics(0, bytesByPartitionId1), new MapOutputStatistics(1, bytesByPartitionId2)) - intercept[AssertionError](rule.estimatePartitionStartIndices(mapOutputStatistics)) + intercept[AssertionError](rule.estimatePartitionStartAndEndIndices( + mapOutputStatistics)) } { diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala index fb24eaf2a4bf..c2daae071afc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/adaptive/AdaptiveQueryExecSuite.scala @@ -579,6 +579,153 @@ class AdaptiveQueryExecSuite } } + test("SPARK-29544: adaptive skew join with different join types") { + Seq("false", "true").foreach { reducePostShufflePartitionsEnabled => + withSQLConf( + SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true", + SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "-1", + SQLConf.ADAPTIVE_EXECUTION_SKEWED_PARTITION_SIZE_THRESHOLD.key -> "100", + SQLConf.REDUCE_POST_SHUFFLE_PARTITIONS_ENABLED.key -> reducePostShufflePartitionsEnabled, + SQLConf.SHUFFLE_TARGET_POSTSHUFFLE_INPUT_SIZE.key -> "700") { + withTempView("skewData1", "skewData2") { + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 2 as key1", "id as value1") + .createOrReplaceTempView("skewData1") + spark + .range(0, 1000, 1, 10) + .selectExpr("id % 1 as key2", "id as value2") + .createOrReplaceTempView("skewData2") + // skewed inner join optimization + val (innerPlan, innerAdaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM skewData1 join skewData2 ON key1 = key2") + val innerSmj = findTopLevelSortMergeJoin(innerPlan) + assert(innerSmj.size == 1) + // left stats: [3496, 0, 0, 0, 4014] + // right stats:[6292, 0, 0, 0, 0] + // the partition 0 in both left and right side are all skewed. + // And divide into 5 splits both in left and right (the max splits number). + // So there are 5 x 5 smjs for partition 0. + // Partition 4 in left side is skewed and is divided into 5 splits. + // The right side of partition 4 is not skewed. + // So there are 5 smjs for partition 4. + // So total (25 + 5 + 1) smjs. + // Union + // +- SortMergeJoin + // +- Sort + // +- CoalescedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- CoalescedShuffleReader + // +- ShuffleQueryStage + // +- SortMergeJoin + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // . + // . + // . + // +- SortMergeJoin + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + + val innerSmjAfter = findTopLevelSortMergeJoin(innerAdaptivePlan) + assert(innerSmjAfter.size == 31) + + // skewed left outer join optimization + val (leftPlan, leftAdaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM skewData1 left outer join skewData2 ON key1 = key2") + val leftSmj = findTopLevelSortMergeJoin(leftPlan) + assert(leftSmj.size == 1) + // left stats: [3496, 0, 0, 0, 4014] + // right stats:[6292, 0, 0, 0, 0] + // The partition 0 in both left and right are all skewed. + // The partition 4 in left side is skewed. + // But for left outer join, we don't split the right partition even skewed. + // So the partition 0 in left side is divided into 5 splits(the max split number). + // the partition 4 in left side is divided into 5 splits(the max split number). + // So total (5 + 5 + 1) smjs. + // Union + // +- SortMergeJoin + // +- Sort + // +- CoalescedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- CoalescedShuffleReader + // +- ShuffleQueryStage + // +- SortMergeJoin + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // . + // . + // . + // +- SortMergeJoin + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + + val leftSmjAfter = findTopLevelSortMergeJoin(leftAdaptivePlan) + assert(leftSmjAfter.size == 11) + + // skewed right outer join optimization + val (rightPlan, rightAdaptivePlan) = runAdaptiveAndVerifyResult( + "SELECT * FROM skewData1 right outer join skewData2 ON key1 = key2") + val rightSmj = findTopLevelSortMergeJoin(rightPlan) + assert(rightSmj.size == 1) + // left stats: [3496, 0, 0, 0, 4014] + // right stats:[6292, 0, 0, 0, 0] + // The partition 0 in both left and right side are all skewed. + // And the partition 4 in left side is skewed. + // But for right outer join, we don't split the left partition even skewed. + // And divide right side into 5 splits(the max split number) + // So total 6 smjs. + // Union + // +- SortMergeJoin + // +- Sort + // +- CoalescedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- CoalescedShuffleReader + // +- ShuffleQueryStage + // +- SortMergeJoin + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // . + // . + // . + // +- SortMergeJoin + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + // +- Sort + // +- SkewedShuffleReader + // +- ShuffleQueryStage + + val rightSmjAfter = findTopLevelSortMergeJoin(rightAdaptivePlan) + assert(rightSmjAfter.size == 6) + } + } + } + } + test("SPARK-30291: AQE should catch the exceptions when doing materialize") { withSQLConf( SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "true") {