Skip to content

Conversation

@caneGuy
Copy link
Contributor

@caneGuy caneGuy commented Jan 28, 2019

What changes were proposed in this pull request?

Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s.

object BroadcastExchangeExec {
  private[execution] val executionContext = ExecutionContext.fromExecutorService(
    ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
}

 /**
   * Create a cached thread pool whose max number of threads is `maxThreadNumber`. Thread names
   * are formatted as prefix-ID, where ID is a unique, sequentially assigned integer.
   */
  def newDaemonCachedThreadPool(
      prefix: String, maxThreadNumber: Int, keepAliveSeconds: Int = 60): ThreadPoolExecutor = {
    val threadFactory = namedThreadFactory(prefix)
    val threadPool = new ThreadPoolExecutor(
      maxThreadNumber, // corePoolSize: the max number of threads to create before queuing the tasks
      maxThreadNumber, // maximumPoolSize: because we use LinkedBlockingDeque, this one is not used
      keepAliveSeconds,
      TimeUnit.SECONDS,
      new LinkedBlockingQueue[Runnable],
      threadFactory)
    threadPool.allowCoreThreadTimeOut(true)
    threadPool
  }

But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. In such case,we need to make this thread pool configurable.
A case has described in https://issues.apache.org/jira/browse/SPARK-26601

How was this patch tested?

UT

@SparkQA
Copy link

SparkQA commented Jan 28, 2019

Test build #101752 has finished for PR 23670 at commit 23ae9a0.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Jan 28, 2019

@viirya @maropu @HyukjinKwon
Sorry for faulty operation on #23519. i opened a new pr here and refined the comments except one.
And for the comment:
BroadcastExchangeExec.executionContext .getMaximumPoolSize
Actually i have searched the source code of ExecutionContext and could not find a solution to call getMaximumPoolSize since this api just declared by ThreadPoolExecutor. Any suggestion?Thanks
Unit test result on my desktop is below:
_002

.createWithDefault(1000)

val BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD =
buildStaticConf("spark.sql.broadcastExchange.maxThreadNumber")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.broadcastExchange.maxThreadThreshold

// for other test
SparkSession.getActiveSession.get.sparkContext.conf.
set(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD, previousNumber)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this test? In the other similar prs (e.g., https://github.com/apache/spark/pull/22847/files), we added no test. If necessary, we can add tests in ExecutorSideSQLConfSuite to check if executors can reference this static value...

@SparkQA
Copy link

SparkQA commented Jan 28, 2019

Test build #101755 has finished for PR 23670 at commit ba19ccc.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Hi, @caneGuy .
Newly added test case fails. Could you fix your test case failure?

org.apache.spark.sql.execution.ExchangeSuite.SPARK-26601: Make broadcast-exchange thread pool configurable

@caneGuy
Copy link
Contributor Author

caneGuy commented Feb 15, 2019

Thanks @dongjoon-hyun @maropu i remove the suite.since i think it's no need to test that.
And actually,i test locally,it has passed.

@SparkQA
Copy link

SparkQA commented Feb 15, 2019

Test build #102387 has finished for PR 23670 at commit 2117ee2.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.


val BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD =
buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
.doc("The maximum degree of parallelism to fetch and broadcast the table." +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: add a single space: ...table." ->...table. "

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

buildStaticConf("spark.sql.broadcastExchange.maxThreadThreshold")
.doc("The maximum degree of parallelism to fetch and broadcast the table." +
"If we encounter memory issue like frequently full GC or OOM when broadcast table " +
"we can decrease this number in order to reduce memory usage." +
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @maropu

@SparkQA
Copy link

SparkQA commented Feb 18, 2019

Test build #102439 has finished for PR 23670 at commit 27b0ec4.

  • This patch fails PySpark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Feb 21, 2019

Could you help retest this please? @maropu @dongjoon-hyun

@SparkQA
Copy link

SparkQA commented Feb 21, 2019

Test build #102573 has finished for PR 23670 at commit 27b0ec4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Mar 1, 2019

The failed unit tests is below which i think is not related with the pr @dongjoon-hyun @maropu
Caused by: sbt.ForkMain$ForkError: java.lang.ClassNotFoundException: javax.jdo.JDOException at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:424) at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.doLoadClass(IsolatedClientLoader.scala:231) at org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1.loadClass(IsolatedClientLoader.scala:220) at java.lang.ClassLoader.loadClass(ClassLoader.java:357) ... 47 more

@viirya
Copy link
Member

viirya commented Mar 1, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 1, 2019

Test build #102908 has finished for PR 23670 at commit 27b0ec4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Mar 7, 2019

Sorry for bothering @viirya but the failed case i think is still not related with this pr.
Any suggestion?Thanks
org.scalatest.exceptions.TestFailedException: scala.Predef.Set.apply[Int](0, 1, 2, 3).map[org.apache.spark.sql.Row, scala.collection.immutable.Set[org.apache.spark.sql.Row]](((x$3: Int) => org.apache.spark.sql.Row.apply(x$3)))(immutable.this.Set.canBuildFrom[org.apache.spark.sql.Row]).subsetOf(scala.Predef.refArrayOps[org.apache.spark.sql.Row](results).toSet[org.apache.spark.sql.Row]) was false Stacktrace sbt.ForkMain$ForkError: org.scalatest.exceptions.TestFailedException: scala.Predef.Set.apply[Int](0, 1, 2, 3).map[org.apache.spark.sql.Row, scala.collection.immutable.Set[org.apache.spark.sql.Row]](((x$3: Int) => org.apache.spark.sql.Row.apply(x$3)))(immutable.this.Set.canBuildFrom[org.apache.spark.sql.Row]).subsetOf(scala.Predef.refArrayOps[org.apache.spark.sql.Row](results).toSet[org.apache.spark.sql.Row]) was false at org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:528) at org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:527) at org.scalatest.FunSuite.newAssertionFailedException(FunSuite.scala:1560)

@viirya
Copy link
Member

viirya commented Mar 7, 2019

retest this please.

@SparkQA
Copy link

SparkQA commented Mar 7, 2019

Test build #103120 has finished for PR 23670 at commit 27b0ec4.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dilipbiswal
Copy link
Contributor

retest this please

@SparkQA
Copy link

SparkQA commented Mar 7, 2019

Test build #103129 has finished for PR 23670 at commit 27b0ec4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Mar 26, 2019

@dilipbiswal I have resolve the conflict,please help check this review,thanks

@SparkQA
Copy link

SparkQA commented Mar 26, 2019

Test build #103973 has finished for PR 23670 at commit 8da84e3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Apr 11, 2019

Hi, @caneGuy . Could you fix the indentation as @attilapiros mentioned? (#23670 (comment))

Also, ping @hvanhovell , @cloud-fan , @gatorsmile . How do you think about this new configuration?

done @dongjoon-hyun thanks!

@SparkQA
Copy link

SparkQA commented Apr 11, 2019

Test build #104495 has finished for PR 23670 at commit a4523e9.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented Apr 18, 2019

failed case is not caused by this case
bt.ForkMain$ForkError: java.lang.RuntimeException: [unresolved dependency: com.sun.jersey#jersey-core;1.14: configuration not found in com.sun.jersey#jersey-core;1.14: 'master(compile)'. Missing configuration: 'compile'. It was required from org.apache.hadoop#hadoop-yarn-common;2.7.4 compile] at org.apache.spark.deploy.SparkSubmitUtils$.resolveMavenCoordinates(SparkSubmit.scala:1321) at
please help retest this please thanks @dongjoon-hyun

@HyukjinKwon
Copy link
Member

retest this please

"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0, "The threshold should be positive.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this number cannot be more than 128, let's add that condition.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for checking thres <= 128, too.

@HyukjinKwon
Copy link
Member

retest this please

"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0, "The threshold should be positive.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it .internal() so that we can remove this away if there's anything wrong found with this configuration later.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I just want to comment like that. I also think this is advanced and should not be exposed to users in general. +1

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1.

@HyukjinKwon
Copy link
Member

LGTM otheriwse. @cloud-fan, I will get this in. Please let me know if you have other concerns.

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104686 has finished for PR 23670 at commit a4523e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Apr 18, 2019

Test build #104687 has finished for PR 23670 at commit a4523e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@dongjoon-hyun
Copy link
Member

Retest this please.

@SparkQA
Copy link

SparkQA commented Apr 22, 2019

Test build #104787 has finished for PR 23670 at commit a4523e9.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

ping @caneGuy

@caneGuy
Copy link
Contributor Author

caneGuy commented May 10, 2019

Sorry for late reply, i will fix today @HyukjinKwon

@caneGuy
Copy link
Contributor Author

caneGuy commented May 10, 2019

@HyukjinKwon refine!Thanks too much

@SparkQA
Copy link

SparkQA commented May 10, 2019

Test build #105309 has finished for PR 23670 at commit 46f454f.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy
Copy link
Contributor Author

caneGuy commented May 11, 2019

@HyukjinKwon ping thanks!

"cause longer waiting for other broadcasting. Also, increasing parallelism may " +
"cause memory problem.")
.intConf
.checkValue(thres => thres > 0 && thres <= 128, "The threshold should be positive.")
Copy link
Member

@viirya viirya May 11, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a maximum limit (128) for this config. The error message doesn't reflect that. Once an invalid value like 200 is set, it can't figure out what goes wrong with it, if not reading this code.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yea let's improve this one before getting this in

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @HyukjinKwon @viirya Thanks!

.intConf
.createWithDefault(1000)

val BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD -> BROADCAST_EXCHANGE_MAX_THREAD_THRESHOLD

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh sorry for this error,i have fixed that

private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
SQLConf.get.getConf(StaticSQLConf.BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD)))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ping @caneGuy seems like the last nit

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done @HyukjinKwon @kiszk thanks!

@SparkQA
Copy link

SparkQA commented May 12, 2019

Test build #105337 has finished for PR 23670 at commit a10dac3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@HyukjinKwon
Copy link
Member

Merged to master. Related tests and build passed

object BroadcastExchangeExec {
private[execution] val executionContext = ExecutionContext.fromExecutorService(
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128))
ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: the previous indentation was correct.

@SparkQA
Copy link

SparkQA commented May 13, 2019

Test build #105351 has finished for PR 23670 at commit 40c3592.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@caneGuy caneGuy deleted the zhoukang/make-broadcat-config branch June 1, 2020 02:15
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

10 participants