diff --git a/core/src/main/scala/org/apache/spark/Dependency.scala b/core/src/main/scala/org/apache/spark/Dependency.scala index 573608c4327e..2751d7b3b2e0 100644 --- a/core/src/main/scala/org/apache/spark/Dependency.scala +++ b/core/src/main/scala/org/apache/spark/Dependency.scala @@ -79,13 +79,25 @@ abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] { class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, - val serializer: Serializer = SparkEnv.get.serializer, - val keyOrdering: Option[Ordering[K]] = None, - val aggregator: Option[Aggregator[K, V, C]] = None, - val mapSideCombine: Boolean = false, - val shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor) + val serializer: Serializer, + val keyOrdering: Option[Ordering[K]], + val aggregator: Option[Aggregator[K, V, C]], + val mapSideCombine: Boolean, + val shuffleWriterProcessor: ShuffleWriteProcessor, + val isInDeterministic: Boolean) extends Dependency[Product2[K, V]] with Logging { + def this ( + rdd: RDD[_ <: Product2[K, V]], + partitioner: Partitioner, + serializer: Serializer = SparkEnv.get.serializer, + keyOrdering: Option[Ordering[K]] = None, + aggregator: Option[Aggregator[K, V, C]] = None, + mapSideCombine: Boolean = false, + shuffleWriterProcessor: ShuffleWriteProcessor = new ShuffleWriteProcessor + ) = this(rdd, partitioner, serializer, keyOrdering, aggregator, mapSideCombine, + shuffleWriterProcessor, false) + if (mapSideCombine) { require(aggregator.isDefined, "Map-side combine without Aggregator specified!") } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 80db818b77e4..fb8564827b02 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -2105,6 +2105,9 @@ abstract class RDD[T: ClassTag]( val deterministicLevelCandidates = dependencies.map { // The shuffle is not really happening, treat it like narrow dependency and assume the output // deterministic level of current RDD is same as parent. + case dep: ShuffleDependency[_, _, _] if dep.isInDeterministic => + DeterministicLevel.INDETERMINATE + case dep: ShuffleDependency[_, _, _] if dep.rdd.partitioner.exists(_ == dep.partitioner) => dep.rdd.outputDeterministicLevel diff --git a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala index aee92ba928b4..a9efdd7ca1d0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala @@ -1898,25 +1898,51 @@ private[spark] class DAGScheduler( // Make sure the task's accumulators are updated before any other processing happens, so that // we can post a task end event before any jobs or stages are updated. The accumulators are // only updated in certain cases. - event.reason match { + val (readLockTaken, isIndeterministicZombie) = event.reason match { case Success => - task match { - case rt: ResultTask[_, _] => - val resultStage = stage.asInstanceOf[ResultStage] - resultStage.activeJob match { - case Some(job) => - // Only update the accumulator once for each result task. - if (!job.finished(rt.outputId)) { - updateAccumulators(event) - } - case None => // Ignore update if task's job has finished. - } - case _ => - updateAccumulators(event) + stage.acquireStageReadLock() + val isZombieIndeterminate = + (task.stageAttemptId < stage.latestInfo.attemptNumber() + && stage.isIndeterminate) || + stage.treatAllPartitionsMissing(task.stageAttemptId) + if (!isZombieIndeterminate) { + task match { + case rt: ResultTask[_, _] => + val resultStage = stage.asInstanceOf[ResultStage] + resultStage.activeJob match { + case Some(job) => + // Only update the accumulator once for each result task. + if (!job.finished(rt.outputId)) { + updateAccumulators(event) + } + case _ => // Ignore update if task's job has finished. + } + case _ => updateAccumulators(event) + } } - case _: ExceptionFailure | _: TaskKilled => updateAccumulators(event) - case _ => + (true, isZombieIndeterminate) + + case _: ExceptionFailure | _: TaskKilled => + updateAccumulators(event) + (false, false) + + case _ => (false, false) + } + + try { + handleTaskCompletionInOptionalReadLock(event, task, stageId, stage, isIndeterministicZombie) + } finally { + if (readLockTaken) { + stage.releaseStageReadLock() + } } + } + + private def handleTaskCompletionInOptionalReadLock( + event: CompletionEvent, + task: Task[_], stageId: Int, + stage: Stage, + isIndeterministicZombie: Boolean): Unit = { if (trackingCacheVisibility) { // Update rdd blocks' visibility status. blockManagerMaster.updateRDDBlockVisibility( @@ -1936,7 +1962,7 @@ private[spark] class DAGScheduler( } task match { - case rt: ResultTask[_, _] => + case rt: ResultTask[_, _] if !isIndeterministicZombie => // Cast to ResultStage here because it's part of the ResultTask // TODO Refactor this out to a function that accepts a ResultStage val resultStage = stage.asInstanceOf[ResultStage] @@ -1984,7 +2010,7 @@ private[spark] class DAGScheduler( logInfo(log"Ignoring result from ${MDC(RESULT, rt)} because its job has finished") } - case smt: ShuffleMapTask => + case smt: ShuffleMapTask if !isIndeterministicZombie => val shuffleStage = stage.asInstanceOf[ShuffleMapStage] // Ignore task completion for old attempt of indeterminate stage val ignoreIndeterminate = stage.isIndeterminate && @@ -2017,6 +2043,8 @@ private[spark] class DAGScheduler( processShuffleMapStageCompletion(shuffleStage) } } + + case _ => // ignore } case FetchFailed(bmAddress, shuffleId, _, mapIndex, reduceId, failureMessage) => @@ -2121,92 +2149,111 @@ private[spark] class DAGScheduler( failedStages += failedStage failedStages += mapStage if (noResubmitEnqueued) { - // If the map stage is INDETERMINATE, which means the map tasks may return - // different result when re-try, we need to re-try all the tasks of the failed - // stage and its succeeding stages, because the input data will be changed after the - // map tasks are re-tried. - // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is - // guaranteed to be determinate, so the input data of the reducers will not change - // even if the map tasks are re-tried. - if (mapStage.isIndeterminate) { - // It's a little tricky to find all the succeeding stages of `mapStage`, because - // each stage only know its parents not children. Here we traverse the stages from - // the leaf nodes (the result stages of active jobs), and rollback all the stages - // in the stage chains that connect to the `mapStage`. To speed up the stage - // traversing, we collect the stages to rollback first. If a stage needs to - // rollback, all its succeeding stages need to rollback to. - val stagesToRollback = HashSet[Stage](mapStage) - - def collectStagesToRollback(stageChain: List[Stage]): Unit = { - if (stagesToRollback.contains(stageChain.head)) { - stageChain.drop(1).foreach(s => stagesToRollback += s) - } else { - stageChain.head.parents.foreach { s => - collectStagesToRollback(s :: stageChain) + val writeLockedStages = mutable.Buffer.empty[Stage] + try { + // If the map stage is INDETERMINATE, which means the map tasks may return + // different result when re-try, we need to re-try all the tasks of the failed + // stage and its succeeding stages, because the input data will be changed after the + // map tasks are re-tried. + // Note that, if map stage is UNORDERED, we are fine. The shuffle partitioner is + // guaranteed to be determinate, so the input data of the reducers will not change + // even if the map tasks are re-tried. + if (mapStage.isIndeterminate) { + // It's a little tricky to find all the succeeding stages of `mapStage`, because + // each stage only know its parents not children. Here we traverse the stages from + // the leaf nodes (the result stages of active jobs), and rollback all the stages + // in the stage chains that connect to the `mapStage`. To speed up the stage + // traversing, we collect the stages to rollback first. If a stage needs to + // rollback, all its succeeding stages need to rollback to. + val stagesToRollback = HashSet[Stage](mapStage) + + def collectStagesToRollback(stageChain: List[Stage]): Unit = { + if (stagesToRollback.contains(stageChain.head)) { + stageChain.drop(1).foreach(s => stagesToRollback += s) + } else { + stageChain.head.parents.foreach { s => + collectStagesToRollback(s :: stageChain) + } } } - } - def generateErrorMessage(stage: Stage): String = { - "A shuffle map stage with indeterminate output was failed and retried. " + - s"However, Spark cannot rollback the $stage to re-process the input data, " + - "and has to fail this job. Please eliminate the indeterminacy by " + - "checkpointing the RDD before repartition and try again." - } + def generateErrorMessage(stage: Stage): String = { + "A shuffle map stage with indeterminate output was failed and retried. " + + s"However, Spark cannot rollback the $stage to re-process the input data, " + + "and has to fail this job. Please eliminate the indeterminacy by " + + "checkpointing the RDD before repartition and try again." + } + + activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) - activeJobs.foreach(job => collectStagesToRollback(job.finalStage :: Nil)) - - // The stages will be rolled back after checking - val rollingBackStages = HashSet[Stage](mapStage) - stagesToRollback.foreach { - case mapStage: ShuffleMapStage => - val numMissingPartitions = mapStage.findMissingPartitions().length - if (numMissingPartitions < mapStage.numTasks) { - if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { - val reason = "A shuffle map stage with indeterminate output was failed " + - "and retried. However, Spark can only do this while using the new " + - "shuffle block fetching protocol. Please check the config " + - "'spark.shuffle.useOldFetchProtocol', see more detail in " + - "SPARK-27665 and SPARK-25341." - abortStage(mapStage, reason, None) + // The stages will be rolled back after checking + val rollingBackStages = HashSet[Stage](mapStage) + stagesToRollback.foreach { + case mapStage: ShuffleMapStage => + if (mapStage.acquireStageWriteLock()) { + writeLockedStages += mapStage + } + val numMissingPartitions = mapStage.findMissingPartitions().length + if (numMissingPartitions < mapStage.numTasks) { + if (sc.conf.get(config.SHUFFLE_USE_OLD_FETCH_PROTOCOL)) { + val reason = "A shuffle map stage with indeterminate output was failed " + + "and retried. However, Spark can only do this while using the new " + + "shuffle block fetching protocol. Please check the config " + + "'spark.shuffle.useOldFetchProtocol', see more detail in " + + "SPARK-27665 and SPARK-25341." + abortStage(mapStage, reason, None) + } else { + rollingBackStages += mapStage + mapStage.markAttemptIdForAllPartitionsMissing( + mapStage.latestInfo.attemptNumber()) + } } else { - rollingBackStages += mapStage + mapStage.markAttemptIdForAllPartitionsMissing( + mapStage.latestInfo.attemptNumber()) } - } - case resultStage: ResultStage if resultStage.activeJob.isDefined => - val numMissingPartitions = resultStage.findMissingPartitions().length - if (numMissingPartitions < resultStage.numTasks) { - // TODO: support to rollback result tasks. - abortStage(resultStage, generateErrorMessage(resultStage), None) - } + case resultStage: ResultStage if resultStage.activeJob.isDefined => + if (resultStage.acquireStageWriteLock()) { + writeLockedStages += resultStage + } + val numMissingPartitions = resultStage.findMissingPartitions().length + if (numMissingPartitions < resultStage.numTasks) { + // TODO: support to rollback result tasks. + abortStage(resultStage, generateErrorMessage(resultStage), None) + } else { + resultStage.markAttemptIdForAllPartitionsMissing( + resultStage.latestInfo.attemptNumber()) + } - case _ => + case _ => + } + logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + + log"we will roll back and rerun below stages which include itself and all its " + + log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") } - logInfo(log"The shuffle map stage ${MDC(SHUFFLE_ID, mapStage)} with indeterminate output was failed, " + - log"we will roll back and rerun below stages which include itself and all its " + - log"indeterminate child stages: ${MDC(STAGES, rollingBackStages)}") - } - // We expect one executor failure to trigger many FetchFailures in rapid succession, - // but all of those task failures can typically be handled by a single resubmission of - // the failed stage. We avoid flooding the scheduler's event queue with resubmit - // messages by checking whether a resubmit is already in the event queue for the - // failed stage. If there is already a resubmit enqueued for a different failed - // stage, that event would also be sufficient to handle the current failed stage, but - // producing a resubmit for each failed stage makes debugging and logging a little - // simpler while not producing an overwhelming number of scheduler events. - logInfo( - log"Resubmitting ${MDC(STAGE, mapStage)} " + - log"(${MDC(STAGE_NAME, mapStage.name)}) and ${MDC(FAILED_STAGE, failedStage)} " + - log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to fetch failure") - messageScheduler.schedule( - new Runnable { - override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) - }, - DAGScheduler.RESUBMIT_TIMEOUT, - TimeUnit.MILLISECONDS - ) + // We expect one executor failure to trigger many FetchFailures in rapid succession, + // but all of those task failures can typically be handled by a single resubmission of + // the failed stage. We avoid flooding the scheduler's event queue with resubmit + // messages by checking whether a resubmit is already in the event queue for the + // failed stage. If there is already a resubmit enqueued for a different failed + // stage, that event would also be sufficient to handle the current failed stage, but + // producing a resubmit for each failed stage makes debugging and logging a little + // simpler while not producing an overwhelming number of scheduler events. + logInfo( + log"Resubmitting ${MDC(STAGE, mapStage)} " + + log"(${MDC(STAGE_NAME, mapStage.name)}) and ${MDC(FAILED_STAGE, failedStage)} " + + log"(${MDC(FAILED_STAGE_NAME, failedStage.name)}) due to fetch failure") + messageScheduler.schedule( + new Runnable { + override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages) + }, + DAGScheduler.RESUBMIT_TIMEOUT, + TimeUnit.MILLISECONDS + ) + } finally { + writeLockedStages.foreach(_.releaseStageWriteLock()) + } } } @@ -2263,8 +2310,8 @@ private[spark] class DAGScheduler( log"and there is a more recent attempt for that stage (attempt " + log"${MDC(NUM_ATTEMPT, failedStage.latestInfo.attemptNumber())}) running") } else { - logInfo(log"Marking ${MDC(STAGE_ID, failedStage.id)} (${MDC(STAGE_NAME, failedStage.name)}) " + - log"as failed due to a barrier task failed.") + logInfo(log"Marking ${MDC(STAGE_ID, failedStage.id)} (${MDC(STAGE_NAME, failedStage.name)}) " + + log"as failed due to a barrier task failed.") val message = s"Stage failed because barrier task $task finished unsuccessfully.\n" + failure.toErrorString try { diff --git a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala index 7fdc3186e86b..97130ddd7455 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ResultStage.scala @@ -61,7 +61,12 @@ private[spark] class ResultStage( */ override def findMissingPartitions(): Seq[Int] = { val job = activeJob.get - (0 until job.numPartitions).filter(id => !job.finished(id)) + val allPartitions = (0 until job.numPartitions) + if (this.treatAllPartitionsMissing(this.latestInfo.attemptNumber())) { + allPartitions + } else { + allPartitions.filter(id => !job.finished(id)) + } } override def toString: String = "ResultStage " + id diff --git a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala index db09d19d0acf..1eb76b842ba0 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ShuffleMapStage.scala @@ -90,8 +90,13 @@ private[spark] class ShuffleMapStage( /** Returns the sequence of partition ids that are missing (i.e. needs to be computed). */ override def findMissingPartitions(): Seq[Int] = { - mapOutputTrackerMaster - .findMissingPartitions(shuffleDep.shuffleId) - .getOrElse(0 until numPartitions) + if (this.treatAllPartitionsMissing(this.latestInfo.attemptNumber())) { + 0 until numPartitions + } else { + mapOutputTrackerMaster + .findMissingPartitions(shuffleDep.shuffleId).getOrElse(0 until numPartitions) + } } + + override def isIndeterminate: Boolean = this.shuffleDep.isInDeterministic || super.isIndeterminate } diff --git a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala index f35beafd8748..146b34ed8285 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Stage.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Stage.scala @@ -17,6 +17,8 @@ package org.apache.spark.scheduler +import java.util.concurrent.locks.ReentrantReadWriteLock + import scala.collection.mutable.HashSet import org.apache.spark.executor.TaskMetrics @@ -63,6 +65,12 @@ private[scheduler] abstract class Stage( val resourceProfileId: Int) extends Logging { + @volatile + private var attemptIdAllPartitionsMissing: Int = -1 + + private val stageReattemptLock = new ReentrantReadWriteLock() + private val stageReadLock = stageReattemptLock.readLock() + private val stageWriteLock = stageReattemptLock.writeLock() val numPartitions = rdd.partitions.length /** Set of jobs that this stage belongs to. */ @@ -100,12 +108,21 @@ private[scheduler] abstract class Stage( def makeNewStageAttempt( numPartitionsToCompute: Int, taskLocalityPreferences: Seq[Seq[TaskLocation]] = Seq.empty): Unit = { - val metrics = new TaskMetrics - metrics.register(rdd.sparkContext) - _latestInfo = StageInfo.fromStage( - this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences, - resourceProfileId = resourceProfileId) - nextAttemptId += 1 + val writeLockTaken = this.acquireStageWriteLock() + try { + val metrics = new TaskMetrics + metrics.register(rdd.sparkContext) + _latestInfo = StageInfo.fromStage( + this, nextAttemptId, Some(numPartitionsToCompute), metrics, taskLocalityPreferences, + resourceProfileId = resourceProfileId) + nextAttemptId += 1 + // clear the entry in the allPartitionsAsMissing set + attemptIdAllPartitionsMissing = -1 + } finally { + if (writeLockTaken) { + this.releaseStageWriteLock() + } + } } /** Forward the nextAttemptId if skipped and get visited for the first time. */ @@ -131,4 +148,43 @@ private[scheduler] abstract class Stage( def isIndeterminate: Boolean = { rdd.outputDeterministicLevel == DeterministicLevel.INDETERMINATE } + + def treatAllPartitionsMissing(attemptId: Int): Boolean = + this.attemptIdAllPartitionsMissing == attemptId + + def markAttemptIdForAllPartitionsMissing(attemptId: Int): Unit = + this.attemptIdAllPartitionsMissing = attemptId + + def acquireStageReadLock(): Unit = { + this.stageReadLock.lockInterruptibly() + val prevSet = Stage.threadHoldingReadLock.get() + Stage.threadHoldingReadLock.set(prevSet + this.id) + } + + def releaseStageReadLock(): Unit = { + val prevSet = Stage.threadHoldingReadLock.get() + Stage.threadHoldingReadLock.set(prevSet - this.id) + this.stageReadLock.unlock() + } + + def acquireStageWriteLock(): Boolean = { + if (Stage.threadHoldingReadLock.get().contains(this.id)) { + false + } else { + stageWriteLock.lockInterruptibly() + true + } + } + + def releaseStageWriteLock(): Unit = { + stageWriteLock.unlock() + } +} + +object Stage { + private val threadHoldingReadLock = new ThreadLocal[Set[Int]] { + override protected def initialValue(): Set[Int] = { + Set.empty[Int] + } + } } diff --git a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala index 3e507df706ba..111bb25ce62e 100644 --- a/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala +++ b/core/src/test/scala/org/apache/spark/scheduler/DAGSchedulerSuite.scala @@ -3192,9 +3192,14 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val failedStages = scheduler.failedStages.toSeq assert(failedStages.map(_.id) == Seq(1, 2)) // Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry. + // TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage is inDeterminate all + // the partitions need to be retried + /* assert(failedStages.collect { + case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage + }.head.findMissingPartitions() == Seq(0)) */ assert(failedStages.collect { case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage - }.head.findMissingPartitions() == Seq(0)) + }.head.findMissingPartitions() == Seq(0, 1)) // The result stage is still waiting for its 2 tasks to complete assert(failedStages.collect { case stage: ResultStage => stage @@ -4163,9 +4168,16 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti val failedStages = scheduler.failedStages.toSeq assert(failedStages.map(_.id) == Seq(1, 2)) // Shuffle blocks of "hostC" is lost, so first task of the `shuffleMapRdd2` needs to retry. + // TODO: Asif THIS ASSERTION APPEARS TO BE WRONG. As the ShuffleMapStage is inDeterminate all + // the partitions need to be retried + /* assert(failedStages.collect { case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage }.head.findMissingPartitions() == Seq(0)) + */ + assert(failedStages.collect { + case stage: ShuffleMapStage if stage.shuffleDep.shuffleId == shuffleId2 => stage + }.head.findMissingPartitions() == Seq(0, 1)) // The result stage is still waiting for its 2 tasks to complete assert(failedStages.collect { case stage: ResultStage => stage diff --git a/python/pyspark/pandas/internal.py b/python/pyspark/pandas/internal.py index 3f6831b60067..bda05017e135 100644 --- a/python/pyspark/pandas/internal.py +++ b/python/pyspark/pandas/internal.py @@ -766,8 +766,9 @@ def __init__( for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) else: + # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - index_field.struct_field == struct_field + _drop_metadata(index_field.struct_field) == _drop_metadata(struct_field) for index_field, struct_field in zip(index_fields, struct_fields) ), (index_fields, struct_fields) @@ -794,8 +795,9 @@ def __init__( for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) else: + # TODO(SPARK-42965): For some reason, the metadata of StructField is different assert all( - data_field.struct_field == struct_field + _drop_metadata(data_field.struct_field) == _drop_metadata(struct_field) for data_field, struct_field in zip(data_fields, struct_fields) ), (data_fields, struct_fields) diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala index 7d4f8c3b2564..2a6d928f5ab2 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/Expression.scala @@ -115,6 +115,9 @@ abstract class Expression extends TreeNode[Expression] { */ lazy val deterministic: Boolean = children.forall(_.deterministic) + lazy val exprValHasIndeterministicCharacter: Boolean = !deterministic || + this.references.exists(_.exprValHasIndeterministicCharacter) + def nullable: Boolean /** diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala index 2af6a1ba84ec..421c2fd190a4 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala @@ -111,6 +111,10 @@ abstract class Attribute extends LeafExpression with NamedExpression { @transient override lazy val references: AttributeSet = AttributeSet(this) + override lazy val exprValHasIndeterministicCharacter: Boolean = + metadata.contains(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) && + metadata.getBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT) + def withNullability(newNullability: Boolean): Attribute def withQualifier(newQualifier: Seq[String]): Attribute def withName(newName: String): Attribute @@ -123,6 +127,10 @@ abstract class Attribute extends LeafExpression with NamedExpression { } +object Attribute { + val KEY_HAS_INDETERMINISTIC_COMPONENT = "hasIndeterministicComponent" +} + /** * Used to assign a new name to a computation. * For example the SQL expression "1 + 1 AS a" could be represented as follows: @@ -194,7 +202,13 @@ case class Alias(child: Expression, name: String)( override def toAttribute: Attribute = { if (resolved) { - AttributeReference(name, child.dataType, child.nullable, metadata)(exprId, qualifier) + val mdForAttrib = if (this.exprValHasIndeterministicCharacter) { + new MetadataBuilder().withMetadata(metadata). + putBoolean(Attribute.KEY_HAS_INDETERMINISTIC_COMPONENT, true).build() + } else { + metadata + } + AttributeReference(name, child.dataType, child.nullable, mdForAttrib)(exprId, qualifier) } else { UnresolvedAttribute.quoted(name) } diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala index 184f5a2a9485..2347410d4537 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/ExpressionEvalHelper.scala @@ -76,7 +76,7 @@ trait ExpressionEvalHelper extends ScalaCheckDrivenPropertyChecks with PlanTestB case _ => expr.mapChildren(replace) } - private def prepareEvaluation(expression: Expression): Expression = { + def prepareEvaluation(expression: Expression): Expression = { val serializer = new JavaSerializer(new SparkConf()).newInstance() val resolver = ResolveTimeZone val expr = replace(resolver.resolveTimeZones(expression)) diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala index bf1c930c0bd0..983810a5fdae 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/NondeterministicSuite.scala @@ -18,6 +18,7 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.SparkFunSuite +import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, KeyGroupedPartitioning, RangePartitioning} class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("MonotonicallyIncreasingID") { @@ -31,4 +32,37 @@ class NondeterministicSuite extends SparkFunSuite with ExpressionEvalHelper { test("InputFileName") { checkEvaluation(InputFileName(), "") } + + test("SPARK-51016: has Indeterministic Component") { + def assertIndeterminancyComponent(expression: Expression): Unit = + assert(prepareEvaluation(expression).exprValHasIndeterministicCharacter) + + assertIndeterminancyComponent(MonotonicallyIncreasingID()) + val alias = Alias(Multiply(MonotonicallyIncreasingID(), Literal(100L)), "al1")() + assertIndeterminancyComponent(alias) + assertIndeterminancyComponent(alias.toAttribute) + assertIndeterminancyComponent(Multiply(alias.toAttribute, Literal(1000L))) + assertIndeterminancyComponent( + HashPartitioning(Seq(Multiply(MonotonicallyIncreasingID(), Literal(100L))), 5)) + assertIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } + + test("SPARK-51016: has Deterministic Component") { + def assertNoIndeterminancyComponent(expression: Expression): Unit = + assert(!prepareEvaluation(expression).exprValHasIndeterministicCharacter) + + assertNoIndeterminancyComponent(Literal(1000L)) + val alias = Alias(Multiply(Literal(10000L), Literal(100L)), "al1")() + assertNoIndeterminancyComponent(alias) + assertNoIndeterminancyComponent(alias.toAttribute) + assertNoIndeterminancyComponent( + HashPartitioning(Seq(Multiply(Literal(10L), Literal(100L))), 5)) + assertNoIndeterminancyComponent(HashPartitioning(Seq(alias.toAttribute), 5)) + assertNoIndeterminancyComponent( + RangePartitioning(Seq(SortOrder.apply(alias.toAttribute, Descending)), 5)) + assertNoIndeterminancyComponent(KeyGroupedPartitioning(Seq(alias.toAttribute), 5)) + } } diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 31a3f53eb719..2e6261de4040 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -30,7 +30,7 @@ import org.apache.spark.serializer.Serializer import org.apache.spark.shuffle.{ShuffleWriteMetricsReporter, ShuffleWriteProcessor} import org.apache.spark.shuffle.sort.SortShuffleManager import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.{Attribute, BoundReference, Expression, UnsafeProjection, UnsafeRow} import org.apache.spark.sql.catalyst.expressions.BindReferences.bindReferences import org.apache.spark.sql.catalyst.expressions.codegen.LazilyGeneratedOrdering import org.apache.spark.sql.catalyst.plans.logical.Statistics @@ -467,7 +467,10 @@ object ShuffleExchangeExec { }, isOrderSensitive = isOrderSensitive) } } - + val isIndeterministic = newPartitioning match { + case expr: Expression => expr.exprValHasIndeterministicCharacter + case _ => false + } // Now, we manually create a ShuffleDependency. Because pairs in rddWithPartitionIds // are in the form of (partitionId, row) and every partitionId is in the expected range // [0, part.numPartitions - 1]. The partitioner of this is a PartitionIdPassthrough. @@ -476,7 +479,11 @@ object ShuffleExchangeExec { rddWithPartitionIds, new PartitionIdPassthrough(part.numPartitions), serializer, - shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics)) + None, + None, + false, + shuffleWriterProcessor = createShuffleWriteProcessor(writeMetrics), + isIndeterministic) dependency } diff --git a/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala new file mode 100644 index 000000000000..da3f619c4a2c --- /dev/null +++ b/sql/core/src/test/scala/org/apache/spark/scheduler/ShuffleMapStageTest.scala @@ -0,0 +1,61 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.scheduler + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.expressions.Literal +import org.apache.spark.sql.functions._ +import org.apache.spark.sql.internal.SQLConf +import org.apache.spark.sql.test.SharedSparkSession +import org.apache.spark.sql.types.LongType + +class ShuffleMapStageTest extends SharedSparkSession { + + test("SPARK-51016: ShuffleMapStage using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val outerDf = spark.createDataset( + Seq((1L, "aa")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val innerDf = spark.createDataset( + Seq((1L, "11"), (2L, "22")))( + Encoders.tuple(Encoders.scalaLong, Encoders.STRING)).toDF("pkRight", "strright") + + val leftOuter = outerDf.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val outerjoin = leftOuter.hint("shuffle_hash"). + join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + val shuffleStages: Array[ShuffleMapStage] = Array.ofDim(2) + spark.sparkContext.addSparkListener(new SparkListener() { + var i = 0 + override def onStageSubmitted(stageSubmitted: SparkListenerStageSubmitted): Unit = { + if (stageSubmitted.stageInfo.shuffleDepId.isDefined) { + shuffleStages(i) = + spark.sparkContext.dagScheduler.shuffleIdToMapStage(stageSubmitted.stageInfo.stageId) + i +=1 + } + } + }); + outerjoin.collect() + assert(shuffleStages.filter(_.isIndeterminate).size == 1) + } + } +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala index ec13d48d45f8..395c03b81055 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/ProjectedOrderingAndPartitioningSuite.scala @@ -17,14 +17,17 @@ package org.apache.spark.sql.execution -import org.apache.spark.rdd.RDD +import org.apache.spark.rdd.{DeterministicLevel, RDD} +import org.apache.spark.sql.Encoders import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference} +import org.apache.spark.sql.catalyst.expressions.{Alias, Attribute, AttributeReference, Literal} import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, PartitioningCollection, UnknownPartitioning} import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.execution.joins.ShuffledHashJoinExec +import org.apache.spark.sql.functions.{col, floor, isnull, rand, when} import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession -import org.apache.spark.sql.types.StringType +import org.apache.spark.sql.types.{LongType, StringType} class ProjectedOrderingAndPartitioningSuite extends SharedSparkSession with AdaptiveSparkPlanHelper { @@ -210,6 +213,37 @@ class ProjectedOrderingAndPartitioningSuite assert(outputOrdering.head.child.asInstanceOf[Attribute].name == "a") assert(outputOrdering.head.sameOrderExpressions.size == 0) } + + test("SPARK-51016: ShuffleRDD using indeterministic join keys should be INDETERMINATE") { + withSQLConf(SQLConf.ADAPTIVE_EXECUTION_ENABLED.key -> "false") { + val outerDf = spark.createDataset( + Seq((1L, "aa"), (null, "aa"), (2L, "bb"), (null, "bb"), (3L, "cc"), (null, "cc")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkLeftt", "strleft") + + val innerDf = spark.createDataset( + Seq((1L, "11"), (2L, "22"), (3L, "33")))( + Encoders.tupleEncoder(Encoders.LONG, Encoders.STRING)).toDF("pkRight", "strright") + + val leftOuter = outerDf.select( + col("strleft"), when(isnull(col("pkLeftt")), floor(rand() * Literal(10000000L)). + cast(LongType)). + otherwise(col("pkLeftt")).as("pkLeft")) + + val outerjoin = leftOuter.hint("shuffle_hash"). + join(innerDf, col("pkLeft") === col("pkRight"), "left_outer") + + outerjoin.collect() + val finalPlan = outerjoin.queryExecution.executedPlan + val shuffleHJExec = finalPlan.children(0).asInstanceOf[ShuffledHashJoinExec] + assert(shuffleHJExec.left.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.INDETERMINATE) + + assert(shuffleHJExec.right.asInstanceOf[InputAdapter].execute().outputDeterministicLevel == + DeterministicLevel.UNORDERED) + + assert(shuffleHJExec.execute().outputDeterministicLevel == DeterministicLevel.INDETERMINATE) + } + } } private case class DummyLeafPlanExec(output: Seq[Attribute]) extends LeafExecNode {