Skip to content

Conversation

@Ngone51
Copy link
Member

@Ngone51 Ngone51 commented May 24, 2019

What changes were proposed in this pull request?

PythonRunner uses an asynchronous way, which produces elements in WriteThread but consumes elements in another thread, to execute task. When child operator, like take()/first(), does not consume all elements produced by WriteThread, task would finish before WriteThread and releases all locks on blocks. However, WriteThread would continue to produce elements by pulling elements from parent operator until it exhausts all elements. And at the time WriteThread exhausts all elements, it will try to release the corresponding block but hit a AssertionError since task has already released that lock previously.

#24542 previously fix this by catching AssertionError, so that we won't fail our executor.

However, when not using PySpark, issue still exists when user implements a custom RDD or task, which spawn a separate child thread to consume iterator from a cached parent RDD. Below is a demo which could easily reproduce the issue.

    val rdd0 = sc.parallelize(Range(0, 10), 1).cache()
    rdd0.collect()
    rdd0.mapPartitions { iter =>
      val t = new Thread(new Runnable {
        override def run(): Unit = {
          while(iter.hasNext) {
            println(iter.next())
            Thread.sleep(1000)
          }
        }
      })
      t.setDaemon(false)
      t.start()
      Iterator(0)
    }.collect()
    Thread.sleep(100000)

So, if we could prevent the separate thread from releasing lock on block when TaskContext has already completed, we won't hit this issue again.

How was this patch tested?

Added in new unit test in RDDSuite.

def shutdownOnTaskCompletion() {
assert(context.isCompleted)
this.interrupt()
this.join()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ur..I think this change just hangs the execution.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This just blocks the current Thread and wait for interrupt to take effect?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It causes a deadlock on the lock on task context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @viirya. Can you explain more about 'deadlock' please ? I just notice that it's possible we fall into deadlock if WritedThread trying to acquire lock on TaskContext after this.join() invoked, but I haven't found where it did happens.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For example, when the writer thread is reading from a cached relation, it needs to add task complete listener. At the moment, it needs to acquire the lock on task context.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hmm...this.join() is called within task runner thread while holding the lock on task context. And once join() is called, WriteThread would throw InterruptedException, then, we run into case _: InterruptedException => code branch within WriteThread. In this branch, if we acquire the lock on task context, we'll fall into deadlock, otherwise, WriteThread exit normally. Right ?

And if WriteThread wants to acquire the lock on task context(as you mentioned, adding task complete listener due to read from a cached relation) out side of above code branch (or say, within its normal execution process), it just preform a normal lock competition between WriteThread and task runner thread. WDYT ?

Please correct me if I mis-understanding on some points.

@SparkQA
Copy link

SparkQA commented May 24, 2019

Test build #105762 has finished for PR 24699 at commit 74bbbad.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented May 25, 2019

cc @cloud-fan

@cloud-fan
Copy link
Contributor

After a second thought, it seems overkill to block the main thread and wait for the python writer thread to exit. If something bad happens we may block the main thread for a long time or even hang.

Perhaps a better solution is to add releaseIfLocked, which does nothing if the lock is already released.

@Ngone51 Ngone51 changed the title [SPARK-27666][CORE] Stop PythonRunner's WriteThread immediately when task finishes [SPARK-27666][CORE] Do not release lock while TaskContext already completed May 28, 2019
@Ngone51
Copy link
Member Author

Ngone51 commented May 28, 2019

Since killing PythonRunner WriteThread may be overkill and would face uncontrollable risk, after discussing with @cloud-fan offline, we decide to use another solution, which has updated in PR description.

And, for simple, I changed original JIRA title from "Stop PythonRunner's WriteThread immediately when task finishes" to current one.

ping @cloud-fan @jiangxb1987 @viirya

// SPARK-27666. Child thread spawned from task thread could produce race condition
// on block lock releasing. We should prevent child thread from releasing un-locked
// block when task thread has already finished.
if (taskContext.isDefined && taskContext.map(_.isCompleted()).get) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: taskContext.isDefined && taskContext.get.isCompleted

blockInfoManager.unlock(blockId, taskAttemptId)
def releaseLock(blockId: BlockId, taskContext: Option[TaskContext] = None): Unit = {
val taskAttemptId = taskContext.map(_.taskAttemptId())
// SPARK-27666. Child thread spawned from task thread could produce race condition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A simpler explanation: When a task completes, Spark automatically releases all the blocks locked by this task. We should not release any blocks for a task that is already completed.

t.start()
Iterator(0)
}.collect()
Thread.sleep(10 * 150)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use a more reliable way to test it? We can set up a CountDownLatch, and count it down at the end of the thread. Then we wait for the count down at the end of the test.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

CountDownLatch can't be serialized in Task, so it doesn't work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not add a TaskCompletionListener and wait for the listener to be triggered? Something like:

      eventually(timeout(10.seconds)) {
          assert(// Task completion triggered.)
      }

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIUC, TaskCompletionListener will be called after collect() done, but thread t will be still running at that time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense. Also curious how did the magic number 10 * 150 come out?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thread.sleep(10 * 150) > iter.size * Thread(100)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't use sleep in tests, as the test will become flaky sooner or later. If CountDownLatch doesn't work, can we use Spark Accumulator as signals?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried Accumulator previously, it looks like child thread continuously add acc by 1 every 100ms. And test thread waits until acc reaches 10. But it doesn't work, either. Because we want the signal comes from that child thread, but acc always comes with finished task. Unfortunately, in this case, task finished before that child thread.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this kind of end-to-end test doesn't work without sleep, how about we have an unit test for releaseLock behavior when task context is completed?

Copy link
Member Author

@Ngone51 Ngone51 Jun 13, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about this way eba8e96 ? I test it with/without this PR, and it works. @cloud-fan @jiangxb1987 @viirya

@jiangxb1987
Copy link
Contributor

jiangxb1987 commented May 28, 2019

Have you considered change the logic in https://github.com/apache/spark/blob/e9f3f62b2c0f521f3cc23fef381fc6754853ad4f/core/src/main/scala/org/apache/spark/storage/BlockManager.scala#L745~L747 ? Skip release lock if TaskContext has completed shall also resolve the issue, or there are something I missed?

@SparkQA
Copy link

SparkQA commented May 28, 2019

Test build #105875 has finished for PR 24699 at commit 26a7dd2.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented May 29, 2019

Skip release lock if TaskContext has completed shall also resolve the issue

Do you @jiangxb1987 mean like this ?

val ci = CompletionIterator[Any, Iterator[Any]](iter, {
 if (!taskContext.isCompleted()) {
   releaseLock(blockId, taskAttemptId)
 }
})

I was thinking about it, but for:

val ci = CompletionIterator[Any, Iterator[Any]](iterToReturn, {
releaseLockAndDispose(blockId, diskData, taskAttemptId)
})

it seems we can't wrap an if condition around releaseLockAndDispose in the same way. We have to dispose data any way. Right ? So, we need to pass taskContext into releaseLockAndDispose. In releaseLockAndDispose:

def releaseLockAndDispose(
blockId: BlockId,
data: BlockData,
taskAttemptId: Option[Long] = None): Unit = {
releaseLock(blockId, taskAttemptId)
data.dispose()
}

We could also warp a if condition around releaseLock. But, I think it may be better to reduce duplicate code, so, I move the logic into releaseLock itself finally.

@jiangxb1987
Copy link
Contributor

We can go either way, both looks fine to me. I would refactor releaseLockAndDispose() further to take into account whether task has been completed, but the current change should resolved the issue too, I'm just afraid it's kinda overkill to pass in the whole TaskContext.

* The param `taskAttemptId` should be passed in case we can't get the correct TID from
* TaskContext, for example, the input iterator of a cached RDD iterates to the end in a child
* The param `taskContext` should be passed in case we can't get the correct TaskContext
* for example, the input iterator of a cached RDD iterates to the end in a child
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: a missing , before for example.

// on block lock releasing. We should prevent child thread from releasing un-locked
// block when task thread has already finished.
if (taskContext.isDefined && taskContext.map(_.isCompleted()).get) {
logWarning(s"Task $taskAttemptId already completed, not releasing lock for $blockId")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${taskAttemptId.get}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this.

}

/**
* Release a lock on the given block with explicit TID.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

with explicit TaskContext

@SparkQA
Copy link

SparkQA commented May 30, 2019

Test build #105938 has finished for PR 24699 at commit e6a97b3.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@viirya
Copy link
Member

viirya commented May 30, 2019

retest this please

@SparkQA
Copy link

SparkQA commented May 30, 2019

Test build #105945 has finished for PR 24699 at commit e6a97b3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@Ngone51
Copy link
Member Author

Ngone51 commented Jun 11, 2019

ping @cloud-fan @jiangxb1987 @viirya

any more comments ?

@jiangxb1987
Copy link
Contributor

LGTM

@SparkQA
Copy link

SparkQA commented Jun 13, 2019

Test build #106468 has finished for PR 24699 at commit eba8e96.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 13, 2019

Test build #106469 has finished for PR 24699 at commit 4c64b03.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

}.collect()
val tmx = ManagementFactory.getThreadMXBean
var t = tmx.getThreadInfo(tid.value)
// getThreadInfo() will return null after child thread `t` died
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This reminds me of one thing: the tests are run in local mode, so driver and executor are in the same JVM. Seems CountDownLatch should work? What we really need is: the thread in the task sends a signal to the main thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems CountDownLatch should work?

CountDownLatch doesn't work because it is not serializable.

while (t != null && t.getThreadState != Thread.State.TERMINATED) {
t = tmx.getThreadInfo(tid.value)
}
}
Copy link
Member

@viirya viirya Jun 17, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use eventually with timeout instead of a while loop?

eventually(timeout(10.seconds))  {
  val t = tmx.getThreadInfo(tid.value)
  assert(t == null || t.getThreadState == Thread.State.TERMINATED)
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea!

@SparkQA
Copy link

SparkQA commented Jun 17, 2019

Test build #106584 has finished for PR 24699 at commit 7f0f360.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
  • class RDDSuite extends SparkFunSuite with SharedSparkContext with Eventually

@cloud-fan
Copy link
Contributor

thanks, merging to master!

@cloud-fan cloud-fan closed this in bb17aec Jun 18, 2019
@Ngone51
Copy link
Member Author

Ngone51 commented Jun 18, 2019

Thanks @cloud-fan @jiangxb1987 @viirya

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants