Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -224,14 +224,17 @@ abstract class RDD[T: ClassTag](
* not use `this` because RDDs are user-visible, so users might have added their own locking on
* RDDs; sharing that could lead to a deadlock.
*
* One thread might hold the lock on many of these, for a chain of RDD dependencies; but
* because DAGs are acyclic, and we only ever hold locks for one path in that DAG, there is no
* chance of deadlock.
* One thread might hold the lock on many of these, for a chain of RDD dependencies. Deadlocks
* are possible if we try to lock another resource while holding the stateLock,
* and the lock acquisition sequence of these locks is not guaranteed to be the same.
* This can lead lead to a deadlock as one thread might first acquire the stateLock,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. lead lead -> lead

* and then the resource,
* while another thread might first acquire the resource, and then the stateLock.
*
* Executors may reference the shared fields (though they should never mutate them,
* that only happens on the driver).
*/
private val stateLock = new Serializable {}
private[spark] val stateLock = new Serializable {}

// Our dependencies and partitions will be gotten by calling subclass's methods below, and will
// be overwritten when we're checkpointed
Expand Down
31 changes: 18 additions & 13 deletions core/src/main/scala/org/apache/spark/scheduler/DAGScheduler.scala
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ private[spark] class DAGScheduler(
* locations where that RDD partition is cached.
*
* All accesses to this map should be guarded by synchronizing on it (see SPARK-4454).
* If you need to access any RDD while synchronizing on the cache locations,
* first synchronize on the RDD, and then synchronize on this map to avoid deadlocks. The RDD
* could try to access the cache locations after synchronizing on the RDD.
*/
private val cacheLocs = new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]

Expand Down Expand Up @@ -435,22 +438,24 @@ private[spark] class DAGScheduler(
}

private[scheduler]
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
if (!cacheLocs.contains(rdd.id)) {
// Note: if the storage level is NONE, we don't need to get locations from block manager.
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] = rdd.stateLock.synchronized {
cacheLocs.synchronized {
// Note: this doesn't use `getOrElse()` because this method is called O(num tasks) times
if (!cacheLocs.contains(rdd.id)) {
// Note: if the storage level is NONE, we don't need to get locations from block manager.
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
}
}
cacheLocs(rdd.id) = locs
}
cacheLocs(rdd.id) = locs
cacheLocs(rdd.id)
}
cacheLocs(rdd.id)
}

private def clearCacheLocs(): Unit = cacheLocs.synchronized {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ import org.apache.spark.scheduler.SchedulingMode.SchedulingMode
import org.apache.spark.scheduler.local.LocalSchedulerBackend
import org.apache.spark.shuffle.{FetchFailedException, MetadataFetchFailedException}
import org.apache.spark.storage.{BlockId, BlockManager, BlockManagerId, BlockManagerMaster}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, Utils}
import org.apache.spark.util.{AccumulatorContext, AccumulatorV2, CallSite, Clock, LongAccumulator, SystemClock, ThreadUtils, Utils}
import org.apache.spark.util.ArrayImplicits._

class DAGSchedulerEventProcessLoopTester(dagScheduler: DAGScheduler)
Expand Down Expand Up @@ -612,6 +612,42 @@ class DAGSchedulerSuite extends SparkFunSuite with TempLocalSparkContext with Ti
assertDataStructuresEmpty()
}

// Note that this test is NOT perfectly reproducible when there is a deadlock as it uses
// Thread.sleep, but it should never fail / flake when there is no deadlock.
// If this test starts to flake, this shows that there is a deadlock!
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Thank you for warning.

test("No Deadlock between getCacheLocs and CoalescedRDD") {
val rdd = sc.parallelize(1 to 10, numSlices = 10)
val coalescedRDD = rdd.coalesce(2)
val executionContext = ThreadUtils.newDaemonFixedThreadPool(
nThreads = 2, "test-getCacheLocs")
// Used to only make progress on getCacheLocs after we acquired the lock to the RDD.
val rddLock = new java.util.concurrent.Semaphore(0)
val partitionsFuture = executionContext.submit(new Runnable {
override def run(): Unit = {
coalescedRDD.stateLock.synchronized {
rddLock.release(1)
// Try to access the partitions of the coalescedRDD. This will cause a call to
// getCacheLocs internally.
Thread.sleep(5000)
coalescedRDD.partitions
}
}
})
val getCacheLocsFuture = executionContext.submit(new Runnable {
override def run(): Unit = {
rddLock.acquire()
// Access the cache locations.
// If the partition location cache is locked before the stateLock is locked,
// we'll run into a deadlock.
sc.dagScheduler.getCacheLocs(coalescedRDD)
}
})
// If any of the futures throw a TimeOutException, this shows that there is a deadlock between
// getCacheLocs and accessing partitions of an RDD.
getCacheLocsFuture.get(120, TimeUnit.SECONDS)
partitionsFuture.get(120, TimeUnit.SECONDS)
}

test("All shuffle files on the storage endpoint should be cleaned up when it is lost") {
conf.set(config.SHUFFLE_SERVICE_ENABLED.key, "true")
conf.set("spark.files.fetchFailure.unRegisterOutputOnHost", "true")
Expand Down