From 83686af9a91fa1f5a998f639a653aac577c3e499 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Fri, 25 Dec 2020 16:58:25 +0800 Subject: [PATCH 1/7] [SPARK-33912][SQL] Refactor DependencyUtils ivy property parameter --- .../org/apache/spark/deploy/SparkSubmit.scala | 11 +++++ .../spark/deploy/SparkSubmitArguments.scala | 24 +++++------ .../spark/deploy/worker/DriverWrapper.scala | 2 +- .../apache/spark/util/DependencyUtils.scala | 40 +++++++++---------- 4 files changed, 43 insertions(+), 34 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index ad95b18ecaeb..57374b861ed4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -1483,6 +1483,17 @@ private case class OptionAssigner( confKey: String = null, mergeFn: Option[(String, String) => String] = None) +private object OptionAssigner { + def apply( + value: Option[String], + clusterManager: Int, + deployMode: Int, + clOption: String = null, + confKey: String = null, + mergeFn: Option[(String, String) => String] = None): OptionAssigner = + new OptionAssigner(value.get, clusterManager, deployMode, clOption, confKey, mergeFn) +} + private[spark] trait SparkSubmitOperation { def kill(submissionId: String, conf: SparkConf): Unit diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 9da1a73bba69..0753360e27c1 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -60,11 +60,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var name: String = null var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null - var packages: String = null - var repositories: String = null - var ivyRepoPath: String = null + var packages: Option[String] = None + var repositories: Option[String] = None + var ivyRepoPath: Option[String] = None var ivySettingsPath: Option[String] = None - var packagesExclusions: String = null + var packagesExclusions: Option[String] = None var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null @@ -185,13 +185,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S files = Option(files).orElse(sparkProperties.get(config.FILES.key)).orNull archives = Option(archives).orElse(sparkProperties.get(config.ARCHIVES.key)).orNull pyFiles = Option(pyFiles).orElse(sparkProperties.get(config.SUBMIT_PYTHON_FILES.key)).orNull - ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull + ivyRepoPath = sparkProperties.get("spark.jars.ivy") ivySettingsPath = sparkProperties.get("spark.jars.ivySettings") - packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull - packagesExclusions = Option(packagesExclusions) - .orElse(sparkProperties.get("spark.jars.excludes")).orNull - repositories = Option(repositories) - .orElse(sparkProperties.get("spark.jars.repositories")).orNull + packages = packages.orElse(sparkProperties.get("spark.jars.packages")) + packagesExclusions = packagesExclusions.orElse(sparkProperties.get("spark.jars.excludes")) + repositories = repositories.orElse(sparkProperties.get("spark.jars.repositories")) deployMode = Option(deployMode) .orElse(sparkProperties.get(config.SUBMIT_DEPLOY_MODE.key)) .orElse(env.get("DEPLOY_MODE")) @@ -409,13 +407,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S jars = Utils.resolveURIs(value) case PACKAGES => - packages = value + packages = Some(value) case PACKAGES_EXCLUDE => - packagesExclusions = value + packagesExclusions = Some(value) case REPOSITORIES => - repositories = value + repositories = Some(value) case CONF => val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value) diff --git a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala index c1288d64c53f..99d032e881c5 100644 --- a/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala +++ b/core/src/main/scala/org/apache/spark/deploy/worker/DriverWrapper.scala @@ -83,7 +83,7 @@ object DriverWrapper extends Logging { val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies(true, ivyProperties.packagesExclusions, ivyProperties.packages, ivyProperties.repositories, - ivyProperties.ivyRepoPath, Option(ivyProperties.ivySettingsPath)) + ivyProperties.ivyRepoPath, ivyProperties.ivySettingsPath) val jars = { val jarsProp = sys.props.get(config.JARS.key).orNull if (!StringUtils.isBlank(resolvedMavenCoordinates)) { diff --git a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala index 9956ccedf584..6c243bc97bdb 100644 --- a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala @@ -29,11 +29,11 @@ import org.apache.spark.deploy.SparkSubmitUtils import org.apache.spark.internal.Logging case class IvyProperties( - packagesExclusions: String, - packages: String, - repositories: String, - ivyRepoPath: String, - ivySettingsPath: String) + packagesExclusions: Option[String], + packages: Option[String], + repositories: Option[String], + ivyRepoPath: Option[String], + ivySettingsPath: Option[String]) private[spark] object DependencyUtils extends Logging { @@ -44,7 +44,7 @@ private[spark] object DependencyUtils extends Logging { "spark.jars.repositories", "spark.jars.ivy", "spark.jars.ivySettings" - ).map(sys.props.get(_).orNull) + ).map(sys.props.get(_)) IvyProperties(packagesExclusions, packages, repositories, ivyRepoPath, ivySettingsPath) } @@ -69,10 +69,10 @@ private[spark] object DependencyUtils extends Logging { * Example: Input: excludeorg.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http * Output: [org.mortbay.jetty:jetty,org.eclipse.jetty:jetty-http] */ - private def parseQueryParams(uri: URI): (Boolean, String) = { + private def parseQueryParams(uri: URI): (Boolean, Option[String]) = { val uriQuery = uri.getQuery if (uriQuery == null) { - (false, "") + (false, None) } else { val mapTokens = uriQuery.split("&").map(_.split("=")) if (mapTokens.exists(isInvalidQueryString)) { @@ -103,7 +103,7 @@ private[spark] object DependencyUtils extends Logging { } excludes }.mkString(",") - }.getOrElse("") + } val validParams = Set("transitive", "exclude") val invalidParams = groupedParams.keys.filterNot(validParams.contains).toSeq @@ -150,36 +150,36 @@ private[spark] object DependencyUtils extends Logging { resolveMavenDependencies( transitive, exclusionList, - authority, + Some(authority), ivyProperties.repositories, ivyProperties.ivyRepoPath, - Option(ivyProperties.ivySettingsPath) + ivyProperties.ivySettingsPath ).split(",") } def resolveMavenDependencies( packagesTransitive: Boolean, - packagesExclusions: String, - packages: String, - repositories: String, - ivyRepoPath: String, + packagesExclusions: Option[String], + packages: Option[String], + repositories: Option[String], + ivyRepoPath: Option[String], ivySettingsPath: Option[String]): String = { val exclusions: Seq[String] = - if (!StringUtils.isBlank(packagesExclusions)) { - packagesExclusions.split(",") + if (packagesExclusions.nonEmpty) { + packagesExclusions.map(_.split(",")).get } else { Nil } // Create the IvySettings, either load from file or build defaults val ivySettings = ivySettingsPath match { case Some(path) => - SparkSubmitUtils.loadIvySettings(path, Option(repositories), Option(ivyRepoPath)) + SparkSubmitUtils.loadIvySettings(path, repositories, ivyRepoPath) case None => - SparkSubmitUtils.buildIvySettings(Option(repositories), Option(ivyRepoPath)) + SparkSubmitUtils.buildIvySettings(repositories, ivyRepoPath) } - SparkSubmitUtils.resolveMavenCoordinates(packages, ivySettings, + SparkSubmitUtils.resolveMavenCoordinates(packages.get, ivySettings, transitive = packagesTransitive, exclusions = exclusions) } From 978e9948b4301575f1f197e9133ac3cf391c8655 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Sun, 27 Dec 2020 13:42:21 +0800 Subject: [PATCH 2/7] Update SparkSubmit.scala --- .../org/apache/spark/deploy/SparkSubmit.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 57374b861ed4..0194a6c05a63 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -588,7 +588,7 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = SUBMIT_DEPLOY_MODE.key), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), - OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), + OptionAssignerWrapper(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, confKey = DRIVER_MEMORY.key), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -604,13 +604,13 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), // Propagate attributes for dependency resolution at the driver side - OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, + OptionAssignerWrapper(args.packages, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.packages"), - OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES, + OptionAssignerWrapper(args.repositories, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.repositories"), - OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, + OptionAssignerWrapper(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.ivy"), - OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, + OptionAssignerWrapper(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.excludes"), // Yarn only @@ -646,7 +646,7 @@ private[spark] class SparkSubmit extends Logging { confKey = DRIVER_CORES.key), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, confKey = DRIVER_SUPERVISE.key), - OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), + OptionAssignerWrapper(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), // An internal option used only for spark-shell to add user jars to repl's classloader, // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to @@ -1483,7 +1483,7 @@ private case class OptionAssigner( confKey: String = null, mergeFn: Option[(String, String) => String] = None) -private object OptionAssigner { +private object OptionAssignerWrapper { def apply( value: Option[String], clusterManager: Int, From 23d44512d84cc5824a0b0b3e56040e01c39cf8ca Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 28 Dec 2020 09:28:16 +0800 Subject: [PATCH 3/7] Revert "Update SparkSubmit.scala" This reverts commit 978e9948b4301575f1f197e9133ac3cf391c8655. --- .../org/apache/spark/deploy/SparkSubmit.scala | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 0194a6c05a63..57374b861ed4 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -588,7 +588,7 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = SUBMIT_DEPLOY_MODE.key), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), - OptionAssignerWrapper(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), + OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, confKey = DRIVER_MEMORY.key), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -604,13 +604,13 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), // Propagate attributes for dependency resolution at the driver side - OptionAssignerWrapper(args.packages, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.packages"), - OptionAssignerWrapper(args.repositories, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.repositories"), - OptionAssignerWrapper(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.ivy"), - OptionAssignerWrapper(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.excludes"), // Yarn only @@ -646,7 +646,7 @@ private[spark] class SparkSubmit extends Logging { confKey = DRIVER_CORES.key), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, confKey = DRIVER_SUPERVISE.key), - OptionAssignerWrapper(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), + OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), // An internal option used only for spark-shell to add user jars to repl's classloader, // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to @@ -1483,7 +1483,7 @@ private case class OptionAssigner( confKey: String = null, mergeFn: Option[(String, String) => String] = None) -private object OptionAssignerWrapper { +private object OptionAssigner { def apply( value: Option[String], clusterManager: Int, From 7da53f26fd623184cf66d8ef5b695e104971026e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Mon, 28 Dec 2020 09:29:04 +0800 Subject: [PATCH 4/7] Update SparkSubmit.scala --- .../org/apache/spark/deploy/SparkSubmit.scala | 23 +++++-------------- 1 file changed, 6 insertions(+), 17 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 57374b861ed4..e14750b388c7 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -588,7 +588,7 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = SUBMIT_DEPLOY_MODE.key), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), - OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), + OptionAssigner(args.ivyRepoPath.orNull, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, confKey = DRIVER_MEMORY.key), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -604,13 +604,13 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), // Propagate attributes for dependency resolution at the driver side - OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.packages.orNull, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.packages"), - OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.repositories.orNull, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.repositories"), - OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.ivyRepoPath.orNull, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.ivy"), - OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.packagesExclusions.orNull, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.excludes"), // Yarn only @@ -646,7 +646,7 @@ private[spark] class SparkSubmit extends Logging { confKey = DRIVER_CORES.key), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, confKey = DRIVER_SUPERVISE.key), - OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), + OptionAssigner(args.ivyRepoPath.orNull, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), // An internal option used only for spark-shell to add user jars to repl's classloader, // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to @@ -1483,17 +1483,6 @@ private case class OptionAssigner( confKey: String = null, mergeFn: Option[(String, String) => String] = None) -private object OptionAssigner { - def apply( - value: Option[String], - clusterManager: Int, - deployMode: Int, - clOption: String = null, - confKey: String = null, - mergeFn: Option[(String, String) => String] = None): OptionAssigner = - new OptionAssigner(value.get, clusterManager, deployMode, clOption, confKey, mergeFn) -} - private[spark] trait SparkSubmitOperation { def kill(submissionId: String, conf: SparkConf): Unit From c7d1aee8d9ef3bced1c1c476efbb9f14fe22d6bb Mon Sep 17 00:00:00 2001 From: angerszhu Date: Wed, 30 Dec 2020 09:05:17 +0800 Subject: [PATCH 5/7] follow comment --- .../org/apache/spark/deploy/SparkSubmit.scala | 16 ++++++------- .../spark/deploy/SparkSubmitArguments.scala | 24 ++++++++++--------- 2 files changed, 21 insertions(+), 19 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala index 9858c2b9b978..7c75eea99012 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala @@ -304,8 +304,8 @@ private[spark] class SparkSubmit extends Logging { // Resolve maven dependencies if there are any and add classpath to jars. Add them to py-files // too for packages that include Python code val resolvedMavenCoordinates = DependencyUtils.resolveMavenDependencies( - packagesTransitive = true, args.packagesExclusions, args.packages, - args.repositories, args.ivyRepoPath, args.ivySettingsPath) + packagesTransitive = true, Option(args.packagesExclusions), Option(args.packages), + Option(args.repositories), Option(args.ivyRepoPath), args.ivySettingsPath) if (resolvedMavenCoordinates.nonEmpty) { // In K8s client mode, when in the driver, add resolved jars early as we might need @@ -589,7 +589,7 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.deployMode, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = SUBMIT_DEPLOY_MODE.key), OptionAssigner(args.name, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, confKey = "spark.app.name"), - OptionAssigner(args.ivyRepoPath.orNull, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), + OptionAssigner(args.ivyRepoPath, ALL_CLUSTER_MGRS, CLIENT, confKey = "spark.jars.ivy"), OptionAssigner(args.driverMemory, ALL_CLUSTER_MGRS, CLIENT, confKey = DRIVER_MEMORY.key), OptionAssigner(args.driverExtraClassPath, ALL_CLUSTER_MGRS, ALL_DEPLOY_MODES, @@ -605,13 +605,13 @@ private[spark] class SparkSubmit extends Logging { OptionAssigner(args.pyFiles, ALL_CLUSTER_MGRS, CLUSTER, confKey = SUBMIT_PYTHON_FILES.key), // Propagate attributes for dependency resolution at the driver side - OptionAssigner(args.packages.orNull, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.packages, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.packages"), - OptionAssigner(args.repositories.orNull, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.repositories, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.repositories"), - OptionAssigner(args.ivyRepoPath.orNull, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.ivyRepoPath, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.ivy"), - OptionAssigner(args.packagesExclusions.orNull, STANDALONE | MESOS | KUBERNETES, + OptionAssigner(args.packagesExclusions, STANDALONE | MESOS | KUBERNETES, CLUSTER, confKey = "spark.jars.excludes"), // Yarn only @@ -647,7 +647,7 @@ private[spark] class SparkSubmit extends Logging { confKey = DRIVER_CORES.key), OptionAssigner(args.supervise.toString, STANDALONE | MESOS, CLUSTER, confKey = DRIVER_SUPERVISE.key), - OptionAssigner(args.ivyRepoPath.orNull, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), + OptionAssigner(args.ivyRepoPath, STANDALONE, CLUSTER, confKey = "spark.jars.ivy"), // An internal option used only for spark-shell to add user jars to repl's classloader, // previously it uses "spark.jars" or "spark.yarn.dist.jars" which now may be pointed to diff --git a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala index 0753360e27c1..9da1a73bba69 100644 --- a/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala +++ b/core/src/main/scala/org/apache/spark/deploy/SparkSubmitArguments.scala @@ -60,11 +60,11 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S var name: String = null var childArgs: ArrayBuffer[String] = new ArrayBuffer[String]() var jars: String = null - var packages: Option[String] = None - var repositories: Option[String] = None - var ivyRepoPath: Option[String] = None + var packages: String = null + var repositories: String = null + var ivyRepoPath: String = null var ivySettingsPath: Option[String] = None - var packagesExclusions: Option[String] = None + var packagesExclusions: String = null var verbose: Boolean = false var isPython: Boolean = false var pyFiles: String = null @@ -185,11 +185,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S files = Option(files).orElse(sparkProperties.get(config.FILES.key)).orNull archives = Option(archives).orElse(sparkProperties.get(config.ARCHIVES.key)).orNull pyFiles = Option(pyFiles).orElse(sparkProperties.get(config.SUBMIT_PYTHON_FILES.key)).orNull - ivyRepoPath = sparkProperties.get("spark.jars.ivy") + ivyRepoPath = sparkProperties.get("spark.jars.ivy").orNull ivySettingsPath = sparkProperties.get("spark.jars.ivySettings") - packages = packages.orElse(sparkProperties.get("spark.jars.packages")) - packagesExclusions = packagesExclusions.orElse(sparkProperties.get("spark.jars.excludes")) - repositories = repositories.orElse(sparkProperties.get("spark.jars.repositories")) + packages = Option(packages).orElse(sparkProperties.get("spark.jars.packages")).orNull + packagesExclusions = Option(packagesExclusions) + .orElse(sparkProperties.get("spark.jars.excludes")).orNull + repositories = Option(repositories) + .orElse(sparkProperties.get("spark.jars.repositories")).orNull deployMode = Option(deployMode) .orElse(sparkProperties.get(config.SUBMIT_DEPLOY_MODE.key)) .orElse(env.get("DEPLOY_MODE")) @@ -407,13 +409,13 @@ private[deploy] class SparkSubmitArguments(args: Seq[String], env: Map[String, S jars = Utils.resolveURIs(value) case PACKAGES => - packages = Some(value) + packages = value case PACKAGES_EXCLUDE => - packagesExclusions = Some(value) + packagesExclusions = value case REPOSITORIES => - repositories = Some(value) + repositories = value case CONF => val (confName, confValue) = SparkSubmitUtils.parseSparkConfProperty(value) From de0eee16db6c5934ee66c37faba19ab959b2f09e Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 5 Jan 2021 03:49:46 +0800 Subject: [PATCH 6/7] Update DependencyUtils.scala --- .../scala/org/apache/spark/util/DependencyUtils.scala | 8 ++------ 1 file changed, 2 insertions(+), 6 deletions(-) diff --git a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala index 8d0273550e13..95917a565406 100644 --- a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala @@ -165,11 +165,7 @@ private[spark] object DependencyUtils extends Logging { ivyRepoPath: Option[String], ivySettingsPath: Option[String]): Seq[String] = { val exclusions: Seq[String] = - if (packagesExclusions.nonEmpty) { - packagesExclusions.map(_.split(",")).get - } else { - Nil - } + packagesExclusions.map(_.split(",")).getOrElse[Seq[String]](Nil) // Create the IvySettings, either load from file or build defaults val ivySettings = ivySettingsPath match { case Some(path) => @@ -179,7 +175,7 @@ private[spark] object DependencyUtils extends Logging { SparkSubmitUtils.buildIvySettings(repositories, ivyRepoPath) } - SparkSubmitUtils.resolveMavenCoordinates(packages.get, ivySettings, + SparkSubmitUtils.resolveMavenCoordinates(packages.orNull, ivySettings, transitive = packagesTransitive, exclusions = exclusions) } From de1225d6701774385f32d297b11e75d6f2642361 Mon Sep 17 00:00:00 2001 From: angerszhu Date: Tue, 5 Jan 2021 09:38:34 +0800 Subject: [PATCH 7/7] Update DependencyUtils.scala --- core/src/main/scala/org/apache/spark/util/DependencyUtils.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala index 95917a565406..a029342e1104 100644 --- a/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala +++ b/core/src/main/scala/org/apache/spark/util/DependencyUtils.scala @@ -165,7 +165,7 @@ private[spark] object DependencyUtils extends Logging { ivyRepoPath: Option[String], ivySettingsPath: Option[String]): Seq[String] = { val exclusions: Seq[String] = - packagesExclusions.map(_.split(",")).getOrElse[Seq[String]](Nil) + packagesExclusions.map(_.split(",").toSeq).getOrElse(Nil) // Create the IvySettings, either load from file or build defaults val ivySettings = ivySettingsPath match { case Some(path) =>