Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,21 @@ class JavaSparkContext(val sc: SparkContext) extends JavaSparkContextVarargsWork
* // In a separate thread:
* sc.cancelJobGroup("some_job_to_cancel");
* }}}
*
* If interruptOnCancel is set to true for the job group, then job cancellation will result
* in Thread.interrupt() being called on the job's executor threads. This is useful to help ensure
* that the tasks are actually stopped in a timely manner, but is off by default due to HDFS-1208,
* where HDFS may respond to Thread.interrupt() by marking nodes as dead.
*/
def setJobGroup(groupId: String, description: String, interruptOnCancel: Boolean): Unit =
sc.setJobGroup(groupId, description, interruptOnCancel)

/**
* Assigns a group ID to all the jobs started by this thread until the group ID is set to a
* different value or cleared.
*
* @see `setJobGroup(groupId: String, description: String, interruptThread: Boolean)`.
* This method sets interruptOnCancel to false.
*/
def setJobGroup(groupId: String, description: String): Unit = sc.setJobGroup(groupId, description)

Expand Down
36 changes: 34 additions & 2 deletions core/src/test/scala/org/apache/spark/JobCancellationSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import java.util.concurrent.Semaphore

import scala.concurrent.Await
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.duration._
import scala.concurrent.future

import org.scalatest.{BeforeAndAfter, FunSuite}
Expand Down Expand Up @@ -101,18 +101,50 @@ class JobCancellationSuite extends FunSuite with ShouldMatchers with BeforeAndAf
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(10); i }.count()
}

// Block until both tasks of job A have started and cancel job A.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The diff here is really dumb, I just put the sem.acquire(2) above the creation of jobB in this test ("job group") and then copied it to make my test ("job group with interruption"). The latter test is only different in that I sleep for an extraordinarily long amount of time, so that cancellation must interrupt the sleep or else make Patrick angry.

sem.acquire(2)

sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(jobB.get() === 100)
}


test("job group with interruption") {
sc = new SparkContext("local[2]", "test")

// Add a listener to release the semaphore once any tasks are launched.
val sem = new Semaphore(0)
sc.addSparkListener(new SparkListener {
override def onTaskStart(taskStart: SparkListenerTaskStart) {
sem.release()
}
})

// jobA is the one to be cancelled.
val jobA = future {
sc.setJobGroup("jobA", "this is a job to be cancelled", interruptOnCancel = true)
sc.parallelize(1 to 10000, 2).map { i => Thread.sleep(100000); i }.count()
}

// Block until both tasks of job A have started and cancel job A.
sem.acquire(2)

sc.clearJobGroup()
val jobB = sc.parallelize(1 to 100, 2).countAsync()
sc.cancelJobGroup("jobA")
val e = intercept[SparkException] { Await.result(jobA, Duration.Inf) }
val e = intercept[SparkException] { Await.result(jobA, 5.seconds) }
assert(e.getMessage contains "cancel")

// Once A is cancelled, job B should finish fairly quickly.
assert(jobB.get() === 100)
}

/*
test("two jobs sharing the same stage") {
// sem1: make sure cancel is issued after some tasks are launched
Expand Down