-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-26601][SQL] Make broadcast-exchange thread pool configurable #23519
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Test build #101080 has finished for PR 23519 at commit
|
## What changes were proposed in this pull request? Per discussion in apache#23391 (comment) this proposes to just remove the old pre-Spark-3 time parsing behavior. This is a rebase of apache#23411 ## How was this patch tested? Existing tests. Closes apache#23495 from srowen/SPARK-26503.2. Authored-by: Sean Owen <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…gories ## What changes were proposed in this pull request? The PR makes hardcoded configs below to use `ConfigEntry`. * spark.ui * spark.ssl * spark.authenticate * spark.master.rest * spark.master.ui * spark.metrics * spark.admin * spark.modify.acl This patch doesn't change configs which are not relevant to SparkConf (e.g. system properties). ## How was this patch tested? Existing tests. Closes apache#23423 from HeartSaVioR/SPARK-26466. Authored-by: Jungtaek Lim (HeartSaVioR) <[email protected]> Signed-off-by: Marcelo Vanzin <[email protected]>
…x field and having is not null predicate on another one
## What changes were proposed in this pull request?
Schema pruning has errors when selecting one complex field and having is not null predicate on another one:
```scala
val query = sql("select * from contacts")
.where("name.middle is not null")
.select(
"id",
"name.first",
"name.middle",
"name.last"
)
.where("last = 'Jones'")
.select(count("id"))
```
```
java.lang.IllegalArgumentException: middle does not exist. Available: last
[info] at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:303)
[info] at scala.collection.immutable.Map$Map1.getOrElse(Map.scala:119)
[info] at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:302)
[info] at org.apache.spark.sql.execution.ProjectionOverSchema.$anonfun$getProjection$6(ProjectionOverSchema.scala:58)
[info] at scala.Option.map(Option.scala:163)
[info] at org.apache.spark.sql.execution.ProjectionOverSchema.getProjection(ProjectionOverSchema.scala:56)
[info] at org.apache.spark.sql.execution.ProjectionOverSchema.unapply(ProjectionOverSchema.scala:32)
[info] at org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaPruning$$anonfun$$nestedInanonfun$buildNewProjection$1$1.applyOrElse(Parque
tSchemaPruning.scala:153)
```
## How was this patch tested?
Added tests.
Closes apache#23474 from viirya/SPARK-26551.
Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: DB Tsai <[email protected]>
…unexpected confs ## What changes were proposed in this pull request? Fix race condition where streams can have unexpected conf values. New streaming queries should run with isolated SparkSessions so that they aren't affected by conf updates after they are started. In StreamExecution, the parent SparkSession is cloned and used to run each batch, but this cloning happens in a separate thread and may happen after DataStreamWriter.start() returns. If a stream is started and a conf key is set immediately after, the stream is likely to have the new value. ## How was this patch tested? New unit test that fails prior to the production change and passes with it. Please review http://spark.apache.org/contributing.html before opening a pull request. Closes apache#23513 from mukulmurthy/26586. Authored-by: Mukul Murthy <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
## What changes were proposed in this pull request? `ChunkFetchIntegrationSuite.fetchBothChunks` fails frequently due to timeout in Apache Spark Jenkins environments. ```scala org.apache.spark.network.ChunkFetchIntegrationSuite [ERROR] fetchBothChunks(org.apache.spark.network.ChunkFetchIntegrationSuite) Time elapsed: 5.015 s <<< FAILURE! java.lang.AssertionError: Timeout getting response from the server at org.apache.spark.network.ChunkFetchIntegrationSuite.fetchChunks(ChunkFetchIntegrationSuite.java:176) at org.apache.spark.network.ChunkFetchIntegrationSuite.fetchBothChunks(ChunkFetchIntegrationSuite.java:210) ``` The followings are the recent failures on `amp-jenkins-worker-05`. The timeout seems to be too sensitive in low-end machines. This PR increases the timeout from 5 seconds to 60 seconds in order to be more robust. - [master 5856](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5856/) - [master 5837](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5837/testReport) - [master 5835](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5835/testReport) - [master 5829](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5829/testReport) - [master 5828](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5828/testReport) - [master 5822](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5822/testReport) - [master 5814](https://amplab.cs.berkeley.edu/jenkins/view/Spark%20QA%20Test%20(Dashboard)/job/spark-master-test-maven-hadoop-2.7/5814/testReport) - [SparkPullRequestBuilder 100784](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100784/consoleFull) - [SparkPullRequestBuilder 100785](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100785/consoleFull) - [SparkPullRequestBuilder 100787](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100787/consoleFull) - [SparkPullRequestBuilder 100788](https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/100788/consoleFull) ## How was this patch tested? N/A (Monitor the Jenkins on `amp-jenkins-worker-05` machine) Closes apache#23522 from dongjoon-hyun/SPARK-25692. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
## What changes were proposed in this pull request? This fixes K8S integration test compilation failure introduced by apache#23423 . ```scala $ build/sbt -Pkubernetes-integration-tests test:package ... [error] /Users/dongjoon/APACHE/spark/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala:71: type mismatch; [error] found : org.apache.spark.internal.config.OptionalConfigEntry[Boolean] [error] required: String [error] .set(IS_TESTING, false) [error] ^ [error] /Users/dongjoon/APACHE/spark/resource-managers/kubernetes/integration-tests/src/test/scala/org/apache/spark/deploy/k8s/integrationtest/KubernetesTestComponents.scala:71: type mismatch; [error] found : Boolean(false) [error] required: String [error] .set(IS_TESTING, false) [error] ^ [error] two errors found ``` ## How was this patch tested? Pass the K8S integration test. Closes apache#23527 from dongjoon-hyun/SPARK-26482. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…CatalogVersionsSuite ## What changes were proposed in this pull request? The vote of final release of `branch-2.2` passed and the branch goes EOL. This PR removes Spark 2.2.x from the testing coverage. ## How was this patch tested? Pass the Jenkins. Closes apache#23526 from dongjoon-hyun/SPARK-26607. Authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…stgres numeric array ## What changes were proposed in this pull request? When determining CatalystType for postgres columns with type `numeric[]` set the type of array element to `DecimalType(38, 18)` instead of `DecimalType(0,0)`. ## How was this patch tested? Tested with modified `org.apache.spark.sql.jdbc.JDBCSuite`. Ran the `PostgresIntegrationSuite` manually. Closes apache#23456 from a-shkarupin/postgres_numeric_array. Lead-authored-by: Oleksii Shkarupin <[email protected]> Co-authored-by: Dongjoon Hyun <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
…hecking ## What changes were proposed in this pull request? If users set equivalent values to spark.network.timeout and spark.executor.heartbeatInterval, they get the following message: ``` java.lang.IllegalArgumentException: requirement failed: The value of spark.network.timeout=120s must be no less than the value of spark.executor.heartbeatInterval=120s. ``` But it's misleading since it can be read as they could be equal. So this PR replaces "no less than" with "greater than". Also, it fixes similar inconsistencies found in MLlib and SQL components. ## How was this patch tested? Ran Spark with equivalent values for them manually and confirmed that the revised message was displayed. Closes apache#23488 from sekikn/SPARK-26564. Authored-by: Kengo Seki <[email protected]> Signed-off-by: Sean Owen <[email protected]>
…rser.enabled ## What changes were proposed in this pull request? The SQL config `spark.sql.legacy.timeParser.enabled` was removed by apache#23495. The PR cleans up the SQL migration guide and the comment for `UnixTimestamp`. Closes apache#23529 from MaxGekk/get-rid-off-legacy-parser-followup. Authored-by: Maxim Gekk <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
| private[execution] val executionContext = ExecutionContext.fromExecutorService( | ||
| ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", 128)) | ||
| ThreadUtils.newDaemonCachedThreadPool("broadcast-exchange", | ||
| new SparkConf().get(StaticSQLConf.MAX_BROADCAST_EXCHANGE_THREADNUMBER))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SQLConf.get
|
|
||
| val MAX_BROADCAST_EXCHANGE_THREADNUMBER = | ||
| buildStaticConf("spark.sql.broadcastExchange.maxThreadNumber") | ||
| .doc("MAX number of threads can hold by BroadcastExchangeExec.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you elaborate this in the doc about what this number controls? For instance, it controls the parallelism of fetching and broadcasting the table.
|
Also, how was this patch tested? looks UT doesn't cover the current change. |
So sounds you want to lower the number of threads? By doing this, don't you also decrease parallelism and have longer waiting for other broadcasting? |
@viirya thanks,for some long running thriftServer shows that we can lower this number to keep the server's high availability.If we use default number 128, we may encounter an OOM error and currently we can just turn off the broadcast join to fix this memory error.Make this number configurable will give users a more useful option. |
@HyukjinKwon thanks too much. I will add an UT for this case. |
## What changes were proposed in this pull request? Make it possible for the master to enable TCP keep alive on the RPC connections with clients. ## How was this patch tested? Manually tested. Added the following: ``` spark.rpc.io.enableTcpKeepAlive true ``` to spark-defaults.conf. Observed the following on the Spark master: ``` $ netstat -town | grep 7077 tcp6 0 0 10.240.3.134:7077 10.240.1.25:42851 ESTABLISHED keepalive (6736.50/0/0) tcp6 0 0 10.240.3.134:44911 10.240.3.134:7077 ESTABLISHED keepalive (4098.68/0/0) tcp6 0 0 10.240.3.134:7077 10.240.3.134:44911 ESTABLISHED keepalive (4098.68/0/0) ``` Which proves that the keep alive setting is taking effect. It's currently possible to enable TCP keep alive on the worker / executor, but is not possible to configure on other RPC connections. It's unclear to me why this could be the case. Keep alive is more important for the master to protect it against suddenly departing workers / executors, thus I think it's very important to have it. Particularly this makes the master resilient in case of using preemptible worker VMs in GCE. GCE has the concept of shutdown scripts, which it doesn't guarantee to execute. So workers often don't get shutdown gracefully and the TCP connections on the master linger as there's nothing to close them. Thus the need of enabling keep alive. This enables keep-alive on connections besides the master's connections, but that shouldn't cause harm. Closes apache#20512 from peshopetrov/master. Authored-by: Petar Petrov <[email protected]> Signed-off-by: Sean Owen <[email protected]>
… projection ## What changes were proposed in this pull request? When creating some unsafe projections, Spark rebuilds the map of schema attributes once for each expression in the projection. Some file format readers create one unsafe projection per input file, others create one per task. ProjectExec also creates one unsafe projection per task. As a result, for wide queries on wide tables, Spark might build the map of schema attributes hundreds of thousands of times. This PR changes two functions to reuse the same AttributeSeq instance when creating BoundReference objects for each expression in the projection. This avoids the repeated rebuilding of the map of schema attributes. ### Benchmarks The time saved by this PR depends on size of the schema, size of the projection, number of input files (or number of file splits), number of tasks, and file format. I chose a couple of example cases. In the following tests, I ran the query ```sql select * from table where id1 = 1 ``` Matching rows are about 0.2% of the table. #### Orc table 6000 columns, 500K rows, 34 input files baseline | pr | improvement ----|----|---- 1.772306 min | 1.487267 min | 16.082943% #### Orc table 6000 columns, 500K rows, *17* input files baseline | pr | improvement ----|----|---- 1.656400 min | 1.423550 min | 14.057595% #### Orc table 60 columns, 50M rows, 34 input files baseline | pr | improvement ----|----|---- 0.299878 min | 0.290339 min | 3.180926% #### Parquet table 6000 columns, 500K rows, 34 input files baseline | pr | improvement ----|----|---- 1.478306 min | 1.373728 min | 7.074165% Note: The parquet reader does not create an unsafe projection. However, the filter operation in the query causes the planner to add a ProjectExec, which does create an unsafe projection for each task. So these results have nothing to do with Parquet itself. #### Parquet table 60 columns, 50M rows, 34 input files baseline | pr | improvement ----|----|---- 0.245006 min | 0.242200 min | 1.145099% #### CSV table 6000 columns, 500K rows, 34 input files baseline | pr | improvement ----|----|---- 2.390117 min | 2.182778 min | 8.674844% #### CSV table 60 columns, 50M rows, 34 input files baseline | pr | improvement ----|----|---- 1.520911 min | 1.510211 min | 0.703526% ## How was this patch tested? SQL unit tests Python core and SQL test Closes apache#23392 from bersprockets/norebuild. Authored-by: Bruce Robbins <[email protected]> Signed-off-by: Herman van Hovell <[email protected]>
## What changes were proposed in this pull request? This is to fix a bug in apache#23036 that would cause a join hint to be applied on node it is not supposed to after join reordering. For example, ``` val join = df.join(df, "id") val broadcasted = join.hint("broadcast") val join2 = join.join(broadcasted, "id").join(broadcasted, "id") ``` There should only be 2 broadcast hints on `join2`, but after join reordering there would be 4. It is because the hint application in join reordering compares the attribute set for testing relation equivalency. Moreover, it could still be problematic even if the child relations were used in testing relation equivalency, due to the potential exprId conflict in nested self-join. As a result, this PR simply reverts the join reorder hint behavior change introduced in apache#23036, which means if a join hint is present, the join node itself will not participate in the join reordering, while the sub-joins within its children still can. ## How was this patch tested? Added new tests Closes apache#23524 from maryannxue/query-hint-followup-2. Authored-by: maryannxue <[email protected]> Signed-off-by: gatorsmile <[email protected]>
## What changes were proposed in this pull request? Make sure broadcast hint is applied to partitioned tables. ## How was this patch tested? - A new unit test in PruneFileSourcePartitionsSuite - Unit test suites touched by SPARK-14581: JoinOptimizationSuite, FilterPushdownSuite, ColumnPruningSuite, and PruneFiltersSuite Closes apache#23507 from jzhuge/SPARK-26576. Closes apache#23530 from jzhuge/SPARK-26576-master. Authored-by: John Zhuge <[email protected]> Signed-off-by: gatorsmile <[email protected]>
…matter ## What changes were proposed in this pull request? In the PR, I propose to switch on `TimestampFormatter`/`DateFormatter` in casting dates/timestamps to strings. The changes should make the date/timestamp casting consistent to JSON/CSV datasources and time-related functions like `to_date`, `to_unix_timestamp`/`from_unixtime`. Local formatters are moved out from `DateTimeUtils` to where they are actually used. It allows to avoid re-creation of new formatter instance per-each call. Another reason is to have separate parser for `PartitioningUtils` because default parsing pattern cannot be used (expected optional section `[.S]`). ## How was this patch tested? It was tested by `DateTimeUtilsSuite`, `CastSuite` and `JDBC*Suite`. Closes apache#23391 from MaxGekk/thread-local-date-format. Lead-authored-by: Maxim Gekk <[email protected]> Co-authored-by: Maxim Gekk <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
|
Test build #101185 has finished for PR 23519 at commit
|
|
Test build #101184 has finished for PR 23519 at commit
|
## What changes were proposed in this pull request? This PR allows the user to override `kafka.group.id` for better monitoring or security. The user needs to make sure there are not multiple queries or sources using the same group id. It also fixes a bug that the `groupIdPrefix` option cannot be retrieved. ## How was this patch tested? The new added unit tests. Closes apache#23301 from zsxwing/SPARK-26350. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Shixiong Zhu <[email protected]>
… for branch-2.4+ only ## What changes were proposed in this pull request? To skip some steps to remove binary license/notice files in a source release for branch2.3 (these files only exist in master/branch-2.4 now), this pr checked a Spark release version in `dev/create-release/release-build.sh`. ## How was this patch tested? Manually checked. Closes apache#23538 from maropu/FixReleaseScript. Authored-by: Takeshi Yamamuro <[email protected]> Signed-off-by: Sean Owen <[email protected]>
|
Test build #101461 has finished for PR 23519 at commit
|
|
@viirya Sorry for bother! |
| Thread.sleep(5*1000) | ||
| } (BroadcastExchangeExec.executionContext) | ||
|
|
||
| val f = Future {} (BroadcastExchangeExec.executionContext) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You don't have to test Java's thread executors. Can you just check if BroadcastExchangeExec.executionContext .getMaximumPoolSize is as configured?
|
|
||
| val MAX_BROADCAST_EXCHANGE_THREADNUMBER = | ||
| buildStaticConf("spark.sql.broadcastExchange.maxThreadNumber") | ||
| .doc("The maximum degree of parallelism to fetch and broadcast the table.If we " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra sapce -> table. If
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for this style @HyukjinKwon actually i found some other code has the same problem,can i open pr to fix that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's around this code and that's the only the one, yea, let's do that. If there are multiple across this files, let's don't include.
| buildStaticConf("spark.sql.broadcastExchange.maxThreadNumber") | ||
| .doc("The maximum degree of parallelism to fetch and broadcast the table.If we " + | ||
| "encounter memory issue when broadcast table we can decrease this number." + | ||
| "Notice the number should be carefully chosen since decrease parallelism will " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
decrease -> decreasing
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will -> might
| .doc("The maximum degree of parallelism to fetch and broadcast the table.If we " + | ||
| "encounter memory issue when broadcast table we can decrease this number." + | ||
| "Notice the number should be carefully chosen since decrease parallelism will " + | ||
| "cause longer waiting for other broadcasting.And increase parallelism may " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
broadcasting.And -> broadcasting. Also, increasing
| val MAX_BROADCAST_EXCHANGE_THREADNUMBER = | ||
| buildStaticConf("spark.sql.broadcastExchange.maxThreadNumber") | ||
| .doc("The maximum degree of parallelism to fetch and broadcast the table.If we " + | ||
| "encounter memory issue when broadcast table we can decrease this number." + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
memory issue: can you elaborate which memory issue here?
| val MAX_BROADCAST_EXCHANGE_THREADNUMBER = | ||
| buildStaticConf("spark.sql.broadcastExchange.maxThreadNumber") | ||
| .doc("The maximum degree of parallelism to fetch and broadcast the table.If we " + | ||
| "encounter memory issue when broadcast table we can decrease this number." + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this number in order to ...
|
retest this please. |
| .intConf | ||
| .createWithDefault(1000) | ||
|
|
||
| val MAX_BROADCAST_EXCHANGE_THREADNUMBER = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about BROADCAST_EXCHANGE_MAX_THREAD_THREASHOLD?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok @maropu Thanks
| "Notice the number should be carefully chosen since decrease parallelism will " + | ||
| "cause longer waiting for other broadcasting.And increase parallelism may " + | ||
| "cause memory problem.") | ||
| .intConf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
plz check .checkValue(thres => thres > 0, ....
|
Test build #101463 has finished for PR 23519 at commit
|
|
@caneGuy Retested and the failed test is passed now. |
|
Test build #101751 has finished for PR 23519 at commit
|
What changes were proposed in this pull request?
Currently,thread number of broadcast-exchange thread pool is fixed and keepAliveSeconds is also fixed as 60s.
But some times, if the Thead object do not GC quickly it may caused server(driver) OOM. In such case,we need to make this thread pool configurable.
A case has described in https://issues.apache.org/jira/browse/SPARK-26601
How was this patch tested?
UT