Skip to content

Commit fec4fe5

Browse files
authored
Merge branch 'master' into SPARK-33938
2 parents b6ba902 + 559f411 commit fec4fe5

File tree

191 files changed

+1938
-503
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

191 files changed

+1938
-503
lines changed

bin/pyspark

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export PYSPARK_DRIVER_PYTHON_OPTS
5050

5151
# Add the PySpark classes to the Python path:
5252
export PYTHONPATH="${SPARK_HOME}/python/:$PYTHONPATH"
53-
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH"
53+
export PYTHONPATH="${SPARK_HOME}/python/lib/py4j-0.10.9.1-src.zip:$PYTHONPATH"
5454

5555
# Load the PySpark shell.py script when ./pyspark is used interactively:
5656
export OLD_PYTHONSTARTUP="$PYTHONSTARTUP"

bin/pyspark2.cmd

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ if "x%PYSPARK_DRIVER_PYTHON%"=="x" (
3030
)
3131

3232
set PYTHONPATH=%SPARK_HOME%\python;%PYTHONPATH%
33-
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9-src.zip;%PYTHONPATH%
33+
set PYTHONPATH=%SPARK_HOME%\python\lib\py4j-0.10.9.1-src.zip;%PYTHONPATH%
3434

3535
set OLD_PYTHONSTARTUP=%PYTHONSTARTUP%
3636
set PYTHONSTARTUP=%SPARK_HOME%\python\pyspark\shell.py

core/pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -414,7 +414,7 @@
414414
<dependency>
415415
<groupId>net.sf.py4j</groupId>
416416
<artifactId>py4j</artifactId>
417-
<version>0.10.9</version>
417+
<version>0.10.9.1</version>
418418
</dependency>
419419
<dependency>
420420
<groupId>org.apache.spark</groupId>

core/src/main/resources/org/apache/spark/ui/static/executorspage.js

Lines changed: 56 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -414,38 +414,74 @@ $(document).ready(function () {
414414
},
415415
{
416416
data: function (row, type) {
417-
if (type !== 'display')
418-
return row.peakMemoryMetrics.JVMHeapMemory;
419-
else
420-
return (formatBytes(row.peakMemoryMetrics.JVMHeapMemory, type) + ' / ' +
421-
formatBytes(row.peakMemoryMetrics.JVMOffHeapMemory, type));
417+
var peakMemoryMetrics = row.peakMemoryMetrics;
418+
if (typeof peakMemoryMetrics !== 'undefined') {
419+
if (type !== 'display')
420+
return peakMemoryMetrics.JVMHeapMemory;
421+
else
422+
return (formatBytes(peakMemoryMetrics.JVMHeapMemory, type) + ' / ' +
423+
formatBytes(peakMemoryMetrics.JVMOffHeapMemory, type));
424+
} else {
425+
if (type !== 'display') {
426+
return 0;
427+
} else {
428+
return '0.0 B / 0.0 B';
429+
}
430+
}
422431
}
423432
},
424433
{
425434
data: function (row, type) {
426-
if (type !== 'display')
427-
return row.peakMemoryMetrics.OnHeapExecutionMemory;
428-
else
429-
return (formatBytes(row.peakMemoryMetrics.OnHeapExecutionMemory, type) + ' / ' +
430-
formatBytes(row.peakMemoryMetrics.OffHeapExecutionMemory, type));
435+
var peakMemoryMetrics = row.peakMemoryMetrics;
436+
if (typeof peakMemoryMetrics !== 'undefined') {
437+
if (type !== 'display')
438+
return peakMemoryMetrics.OnHeapExecutionMemory;
439+
else
440+
return (formatBytes(peakMemoryMetrics.OnHeapExecutionMemory, type) + ' / ' +
441+
formatBytes(peakMemoryMetrics.OffHeapExecutionMemory, type));
442+
} else {
443+
if (type !== 'display') {
444+
return 0;
445+
} else {
446+
return '0.0 B / 0.0 B';
447+
}
448+
}
431449
}
432450
},
433451
{
434452
data: function (row, type) {
435-
if (type !== 'display')
436-
return row.peakMemoryMetrics.OnHeapStorageMemory;
437-
else
438-
return (formatBytes(row.peakMemoryMetrics.OnHeapStorageMemory, type) + ' / ' +
439-
formatBytes(row.peakMemoryMetrics.OffHeapStorageMemory, type));
453+
var peakMemoryMetrics = row.peakMemoryMetrics;
454+
if (typeof peakMemoryMetrics !== 'undefined') {
455+
if (type !== 'display')
456+
return peakMemoryMetrics.OnHeapStorageMemory;
457+
else
458+
return (formatBytes(peakMemoryMetrics.OnHeapStorageMemory, type) + ' / ' +
459+
formatBytes(peakMemoryMetrics.OffHeapStorageMemory, type));
460+
} else {
461+
if (type !== 'display') {
462+
return 0;
463+
} else {
464+
return '0.0 B / 0.0 B';
465+
}
466+
}
440467
}
441468
},
442469
{
443470
data: function (row, type) {
444-
if (type !== 'display')
445-
return row.peakMemoryMetrics.DirectPoolMemory;
446-
else
447-
return (formatBytes(row.peakMemoryMetrics.DirectPoolMemory, type) + ' / ' +
448-
formatBytes(row.peakMemoryMetrics.MappedPoolMemory, type));
471+
var peakMemoryMetrics = row.peakMemoryMetrics;
472+
if (typeof peakMemoryMetrics !== 'undefined') {
473+
if (type !== 'display')
474+
return peakMemoryMetrics.DirectPoolMemory;
475+
else
476+
return (formatBytes(peakMemoryMetrics.DirectPoolMemory, type) + ' / ' +
477+
formatBytes(peakMemoryMetrics.MappedPoolMemory, type));
478+
} else {
479+
if (type !== 'display') {
480+
return 0;
481+
} else {
482+
return '0.0 B / 0.0 B';
483+
}
484+
}
449485
}
450486
},
451487
{data: 'diskUsed', render: formatBytes},

core/src/main/scala/org/apache/spark/api/python/PythonUtils.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import org.apache.spark.SparkContext
2727
import org.apache.spark.api.java.{JavaRDD, JavaSparkContext}
2828

2929
private[spark] object PythonUtils {
30-
val PY4J_ZIP_NAME = "py4j-0.10.9-src.zip"
30+
val PY4J_ZIP_NAME = "py4j-0.10.9.1-src.zip"
3131

3232
/** Get the PYTHONPATH for PySpark, either from SPARK_HOME, if it is set, or from our JAR */
3333
def sparkPythonPath: String = {

core/src/main/scala/org/apache/spark/deploy/DeployMessage.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ private[deploy] object DeployMessages {
7777
object DecommissionWorker extends DeployMessage
7878

7979
/**
80-
* A message that sent by the Worker to itself when it receives PWR signal,
80+
* A message that sent by the Worker to itself when it receives a signal,
8181
* indicating the Worker starts to decommission.
8282
*/
83-
object WorkerSigPWRReceived extends DeployMessage
83+
object WorkerDecommissionSigReceived extends DeployMessage
8484

8585
/**
8686
* A message sent from Worker to Master to tell Master that the Worker has started

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1194,11 +1194,11 @@ private[spark] object SparkSubmitUtils {
11941194
}
11951195

11961196
/**
1197-
* Output a comma-delimited list of paths for the downloaded jars to be added to the classpath
1197+
* Output a list of paths for the downloaded jars to be added to the classpath
11981198
* (will append to jars in SparkSubmit).
11991199
* @param artifacts Sequence of dependencies that were resolved and retrieved
1200-
* @param cacheDirectory directory where jars are cached
1201-
* @return a comma-delimited list of paths for the dependencies
1200+
* @param cacheDirectory Directory where jars are cached
1201+
* @return List of paths for the dependencies
12021202
*/
12031203
def resolveDependencyPaths(
12041204
artifacts: Array[AnyRef],

core/src/main/scala/org/apache/spark/deploy/worker/Worker.scala

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -66,16 +66,17 @@ private[deploy] class Worker(
6666
Utils.checkHost(host)
6767
assert (port > 0)
6868

69-
// If worker decommissioning is enabled register a handler on PWR to shutdown.
69+
// If worker decommissioning is enabled register a handler on the configured signal to shutdown.
7070
if (conf.get(config.DECOMMISSION_ENABLED)) {
71-
logInfo("Registering SIGPWR handler to trigger decommissioning.")
72-
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
71+
val signal = conf.get(config.Worker.WORKER_DECOMMISSION_SIGNAL)
72+
logInfo(s"Registering SIG$signal handler to trigger decommissioning.")
73+
SignalUtils.register(signal, s"Failed to register SIG$signal handler - " +
7374
"disabling worker decommission feature.") {
74-
self.send(WorkerSigPWRReceived)
75+
self.send(WorkerDecommissionSigReceived)
7576
true
7677
}
7778
} else {
78-
logInfo("Worker decommissioning not enabled, SIGPWR will result in exiting.")
79+
logInfo("Worker decommissioning not enabled.")
7980
}
8081

8182
// A scheduled executor used to send messages at the specified time.
@@ -682,7 +683,7 @@ private[deploy] class Worker(
682683
case DecommissionWorker =>
683684
decommissionSelf()
684685

685-
case WorkerSigPWRReceived =>
686+
case WorkerDecommissionSigReceived =>
686687
decommissionSelf()
687688
// Tell the Master that we are starting decommissioning
688689
// so it stops trying to launch executor/driver on us

core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,9 +82,10 @@ private[spark] class CoarseGrainedExecutorBackend(
8282

8383
override def onStart(): Unit = {
8484
if (env.conf.get(DECOMMISSION_ENABLED)) {
85-
logInfo("Registering PWR handler to trigger decommissioning.")
86-
SignalUtils.register("PWR", "Failed to register SIGPWR handler - " +
87-
"disabling executor decommission feature.") (self.askSync[Boolean](ExecutorSigPWRReceived))
85+
val signal = env.conf.get(EXECUTOR_DECOMMISSION_SIGNAL)
86+
logInfo(s"Registering SIG$signal handler to trigger decommissioning.")
87+
SignalUtils.register(signal, s"Failed to register SIG$signal handler - disabling" +
88+
s" executor decommission feature.") (self.askSync[Boolean](ExecutorDecommissionSigReceived))
8889
}
8990

9091
logInfo("Connecting to driver: " + driverUrl)
@@ -208,7 +209,7 @@ private[spark] class CoarseGrainedExecutorBackend(
208209
}
209210

210211
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
211-
case ExecutorSigPWRReceived =>
212+
case ExecutorDecommissionSigReceived =>
212213
var driverNotified = false
213214
try {
214215
driver.foreach { driverRef =>

core/src/main/scala/org/apache/spark/internal/config/Worker.scala

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,4 +82,11 @@ private[spark] object Worker {
8282
.version("2.0.2")
8383
.intConf
8484
.createWithDefault(100)
85+
86+
val WORKER_DECOMMISSION_SIGNAL =
87+
ConfigBuilder("spark.worker.decommission.signal")
88+
.doc("The signal that used to trigger the worker to start decommission.")
89+
.version("3.2.0")
90+
.stringConf
91+
.createWithDefaultString("PWR")
8592
}

0 commit comments

Comments
 (0)