Skip to content

Commit 119f5a3

Browse files
author
Stavros Kontopoulos
committed
add cli submitops
1 parent 57ae251 commit 119f5a3

File tree

12 files changed

+505
-42
lines changed

12 files changed

+505
-42
lines changed

core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala

Lines changed: 52 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,10 @@ import java.lang.reflect.{InvocationTargetException, Modifier, UndeclaredThrowab
2222
import java.net.{URI, URL}
2323
import java.security.PrivilegedExceptionAction
2424
import java.text.ParseException
25-
import java.util.UUID
25+
import java.util.{ServiceLoader, UUID}
2626

2727
import scala.annotation.tailrec
28+
import scala.collection.JavaConverters._
2829
import scala.collection.mutable.{ArrayBuffer, HashMap, Map}
2930
import scala.util.{Properties, Try}
3031

@@ -96,20 +97,35 @@ private[spark] class SparkSubmit extends Logging {
9697
}
9798

9899
/**
99-
* Kill an existing submission using the REST protocol. Standalone and Mesos cluster mode only.
100+
* Kill an existing submission.
100101
*/
101102
private def kill(args: SparkSubmitArguments): Unit = {
102-
new RestSubmissionClient(args.master)
103-
.killSubmission(args.submissionToKill)
103+
if (RestSubmissionClient.supportsRestClient(args.master)) {
104+
new RestSubmissionClient(args.master)
105+
.killSubmission(args.submissionToKill)
106+
} else {
107+
val sparkConf = args.toSparkConf()
108+
sparkConf.set("spark.master", args.master)
109+
SparkSubmitUtils
110+
.getSubmitOperations(args.master)
111+
.kill(args.submissionToKill, sparkConf)
112+
}
104113
}
105114

106115
/**
107-
* Request the status of an existing submission using the REST protocol.
108-
* Standalone and Mesos cluster mode only.
116+
* Request the status of an existing submission.
109117
*/
110118
private def requestStatus(args: SparkSubmitArguments): Unit = {
111-
new RestSubmissionClient(args.master)
112-
.requestSubmissionStatus(args.submissionToRequestStatusFor)
119+
if (RestSubmissionClient.supportsRestClient(args.master)) {
120+
new RestSubmissionClient(args.master)
121+
.requestSubmissionStatus(args.submissionToRequestStatusFor)
122+
} else {
123+
val sparkConf = args.toSparkConf()
124+
sparkConf.set("spark.master", args.master)
125+
SparkSubmitUtils
126+
.getSubmitOperations(args.master)
127+
.printSubmissionStatus(args.submissionToRequestStatusFor, sparkConf)
128+
}
113129
}
114130

115131
/** Print version information to the log. */
@@ -320,7 +336,8 @@ private[spark] class SparkSubmit extends Logging {
320336
}
321337
}
322338

323-
args.sparkProperties.foreach { case (k, v) => sparkConf.set(k, v) }
339+
// update spark config from args
340+
args.toSparkConf(Option(sparkConf))
324341
val hadoopConf = conf.getOrElse(SparkHadoopUtil.newConfiguration(sparkConf))
325342
val targetDir = Utils.createTempDir()
326343

@@ -1348,6 +1365,23 @@ private[spark] object SparkSubmitUtils {
13481365
}
13491366
}
13501367

1368+
private[deploy] def getSubmitOperations(master: String): SparkSubmitOperation = {
1369+
val loader = Utils.getContextOrSparkClassLoader
1370+
val serviceLoaders =
1371+
ServiceLoader.load(classOf[SparkSubmitOperation], loader)
1372+
.asScala
1373+
.filter(_.supports(master))
1374+
1375+
serviceLoaders.size match {
1376+
case x if x > 1 =>
1377+
throw new SparkException(s"Multiple($x) external SparkSubmitOperations " +
1378+
s"clients registered for master url ${master}.")
1379+
case 1 => serviceLoaders.headOption.get
1380+
case _ =>
1381+
throw new IllegalArgumentException(s"No external SparkSubmitOperations " +
1382+
s"clients found for master url: '$master'")
1383+
}
1384+
}
13511385
}
13521386

13531387
/**
@@ -1360,3 +1394,12 @@ private case class OptionAssigner(
13601394
deployMode: Int,
13611395
clOption: String = null,
13621396
confKey: String = null)
1397+
1398+
private[spark] trait SparkSubmitOperation {
1399+
1400+
def kill(submissionId: String, conf: SparkConf): Unit
1401+
1402+
def printSubmissionStatus(submissionId: String, conf: SparkConf): Unit
1403+
1404+
def supports(master: String): Boolean
1405+
}

core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.collection.mutable.{ArrayBuffer, HashMap}
2929
import scala.io.Source
3030
import scala.util.Try
3131

32-
import org.apache.spark.{SparkException, SparkUserAppException}
32+
import org.apache.spark.{SparkConf, SparkException, SparkUserAppException}
3333
import org.apache.spark.deploy.SparkSubmitAction._
3434
import org.apache.spark.internal.{config, Logging}
3535
import org.apache.spark.internal.config.DYN_ALLOCATION_ENABLED
@@ -305,19 +305,12 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
305305
}
306306

307307
private def validateKillArguments(): Unit = {
308-
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
309-
error("Killing submissions is only supported in standalone or Mesos mode!")
310-
}
311308
if (submissionToKill == null) {
312309
error("Please specify a submission to kill.")
313310
}
314311
}
315312

316313
private def validateStatusRequestArguments(): Unit = {
317-
if (!master.startsWith("spark://") && !master.startsWith("mesos://")) {
318-
error(
319-
"Requesting submission statuses is only supported in standalone or Mesos mode!")
320-
}
321314
if (submissionToRequestStatusFor == null) {
322315
error("Please specify a submission to request status for.")
323316
}
@@ -574,6 +567,8 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
574567
|
575568
| Spark standalone or Mesos with cluster deploy mode only:
576569
| --supervise If given, restarts the driver on failure.
570+
|
571+
| Spark standalone, Mesos or K8s with cluster deploy mode only:
577572
| --kill SUBMISSION_ID If given, kills the driver specified.
578573
| --status SUBMISSION_ID If given, requests the status of the driver specified.
579574
|
@@ -662,4 +657,10 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S
662657

663658
private def error(msg: String): Unit = throw new SparkException(msg)
664659

660+
private[deploy] def toSparkConf(sparkConf: Option[SparkConf] = None): SparkConf = {
661+
// either use an existing config or create a new empty one
662+
sparkProperties.foldLeft(sparkConf.getOrElse(new SparkConf())) {
663+
case (conf, (k, v)) => conf.set(k, v)
664+
}
665+
}
665666
}

core/src/main/scala/org/apache/spark/deploy/rest/RestSubmissionClient.scala

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,8 +61,6 @@ import org.apache.spark.util.Utils
6161
private[spark] class RestSubmissionClient(master: String) extends Logging {
6262
import RestSubmissionClient._
6363

64-
private val supportedMasterPrefixes = Seq("spark://", "mesos://")
65-
6664
private val masters: Array[String] = if (master.startsWith("spark://")) {
6765
Utils.parseStandaloneMasterUrls(master)
6866
} else {
@@ -409,6 +407,8 @@ private[spark] class RestSubmissionClient(master: String) extends Logging {
409407

410408
private[spark] object RestSubmissionClient {
411409

410+
val supportedMasterPrefixes = Seq("spark://", "mesos://")
411+
412412
// SPARK_HOME and SPARK_CONF_DIR are filtered out because they are usually wrong
413413
// on the remote machine (SPARK-12345) (SPARK-25934)
414414
private val BLACKLISTED_SPARK_ENV_VARS = Set("SPARK_ENV_LOADED", "SPARK_HOME", "SPARK_CONF_DIR")
@@ -424,6 +424,10 @@ private[spark] object RestSubmissionClient {
424424
(k.startsWith("SPARK_") && !BLACKLISTED_SPARK_ENV_VARS.contains(k)) || k.startsWith("MESOS_")
425425
}
426426
}
427+
428+
private[spark] def supportsRestClient(master: String): Boolean = {
429+
supportedMasterPrefixes.exists(master.startsWith)
430+
}
427431
}
428432

429433
private[spark] class RestSubmissionClientApp extends SparkApplication {
@@ -456,5 +460,4 @@ private[spark] class RestSubmissionClientApp extends SparkApplication {
456460
val env = RestSubmissionClient.filterSystemEnvironment(sys.env)
457461
run(appResource, mainClass, appArgs, conf, env)
458462
}
459-
460463
}

core/src/main/scala/org/apache/spark/util/CommandLineUtils.scala

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,12 @@ import org.apache.spark.SparkException
2525
* Contains basic command line parsing functionality and methods to parse some common Spark CLI
2626
* options.
2727
*/
28-
private[spark] trait CommandLineUtils {
28+
private[spark] trait CommandLineUtils extends CommandLineLoggingUtils {
2929

30+
def main(args: Array[String]): Unit
31+
}
32+
33+
trait CommandLineLoggingUtils {
3034
// Exposed for testing
3135
private[spark] var exitFn: Int => Unit = (exitCode: Int) => System.exit(exitCode)
3236

@@ -41,6 +45,4 @@ private[spark] trait CommandLineUtils {
4145
printMessage("Run with --help for usage help or --verbose for debug output")
4246
exitFn(1)
4347
}
44-
45-
def main(args: Array[String]): Unit
4648
}

core/src/test/scala/org/apache/spark/deploy/SparkSubmitSuite.scala

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1239,6 +1239,23 @@ class SparkSubmitSuite
12391239

12401240
conf.get(nonDelimSpaceFromFile._1) should be ("blah")
12411241
}
1242+
1243+
test("get a Spark configuration from arguments") {
1244+
val testConf = "spark.test.hello" -> "world"
1245+
val masterConf = "spark.master" -> "yarn"
1246+
val clArgs = Seq(
1247+
"--conf", s"${testConf._1}=${testConf._2}",
1248+
"--conf", s"${masterConf._1}=${masterConf._2}",
1249+
"--class", "Foo",
1250+
"app.jar")
1251+
val conf = new SparkSubmitArguments(clArgs).toSparkConf()
1252+
Seq(
1253+
testConf,
1254+
masterConf
1255+
).foreach { case (k, v) =>
1256+
conf.get(k) should be (v)
1257+
}
1258+
}
12421259
}
12431260

12441261
object SparkSubmitSuite extends SparkFunSuite with TimeLimits {

0 commit comments

Comments
 (0)