Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
117 commits
Select commit Hold shift + click to select a range
1980c72
Convert applicable udaf.sql tests into UDF integrated test base
vinodkc Jul 11, 2019
19bcce1
[SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into int/long/decimal i…
HyukjinKwon Jul 11, 2019
8dff711
[SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Col…
revans2 Jul 11, 2019
e83583e
[MINOR][SQL] Clean up ObjectProducerExec operators
jaceklaskowski Jul 11, 2019
e399373
Fixed review comment
vinodkc Jul 11, 2019
d1ef6be
[SPARK-26978][SQL][FOLLOWUP] Initialize date-time constants by foldab…
MaxGekk Jul 11, 2019
d47c219
[SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient c…
gaborgsomogyi Jul 11, 2019
f830005
[SPARK-23472][CORE] Add defaultJavaOptions for driver and executor.
gaborgsomogyi Jul 11, 2019
9eca58e
[SPARK-28334][SQL][TEST] Port select.sql
wangyum Jul 11, 2019
507b745
[SPARK-28139][SQL] Add v2 ALTER TABLE implementation.
rdblue Jul 12, 2019
a5c88ec
[SPARK-28321][SQL] 0-args Java UDF should not be called only once
HyukjinKwon Jul 12, 2019
27e41d6
[SPARK-28270][TEST-MAVEN][FOLLOW-UP][SQL][PYTHON][TESTS] Avoid cast i…
HyukjinKwon Jul 12, 2019
42b80ae
[SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in SQL
WangGuangxin Jul 12, 2019
fe22faa
[SPARK-28034][SQL][TEST] Port with.sql
peter-toth Jul 12, 2019
1c29212
[SPARK-28357][CORE][TEST] Fix Flaky Test - FileAppenderSuite.rollingf…
dongjoon-hyun Jul 12, 2019
13ae9eb
[SPARK-28354][INFRA] Use JIRA user name instead of JIRA user key
dongjoon-hyun Jul 12, 2019
1a26126
[SPARK-28228][SQL] Fix substitution order of nested WITH clauses
peter-toth Jul 12, 2019
687dd4e
[SPARK-28260][SQL] Add CLOSED state to ExecutionState
wangyum Jul 12, 2019
aa41dce
[SPARK-28159][ML][FOLLOWUP] fix typo & (0 until v.size).toList => Lis…
zhengruifeng Jul 12, 2019
60b89cf
[SPARK-28361][SQL][TEST] Test equality of generated code with id in c…
gatorsmile Jul 12, 2019
79e2047
[SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which com…
jessecai Jul 13, 2019
b5a9baa
[SPARK-28247][SS] Fix flaky test "query without test harness" on Cont…
HeartSaVioR Jul 13, 2019
7f9da2b
[SPARK-28371][SQL] Make Parquet "StartsWith" filter null-safe
Jul 13, 2019
fab75db
[SPARK-28370][BUILD][TEST] Upgrade Mockito to 2.28.2
dongjoon-hyun Jul 13, 2019
707411f
[SPARK-28378][PYTHON] Remove usage of cgi.escape
viirya Jul 14, 2019
76079fa
[SPARK-28343][SQL][TEST] Enabling cartesian product and ansi mode for…
wangyum Jul 14, 2019
7548a88
[SPARK-28199][SS] Move Trigger implementations to Triggers.scala and …
HeartSaVioR Jul 14, 2019
591de42
[SPARK-28381][PYSPARK] Upgraded version of Pyrolite to 4.30
viirya Jul 15, 2019
a2f71a8
[SPARK-28133][SQL] Add acosh/asinh/atanh functions to SQL
Jul 15, 2019
e238ebe
[SPARK-28387][SQL][TEST] Port select_having.sql
wangyum Jul 15, 2019
72cc853
[SPARK-28384][SQL][TEST] Port select_distinct.sql
wangyum Jul 15, 2019
a7a02a8
[SPARK-28392][SQL][TESTS] Add traits for UDF and PostgreSQL tests to …
HyukjinKwon Jul 15, 2019
f241fc7
[SPARK-28389][SQL] Use Java 8 API in add_months
MaxGekk Jul 15, 2019
8ecbb67
[SPARK-28311][SQL] Fix STS OpenSession failed return wrong origin PRO…
Jul 15, 2019
8d1e87a
[SPARK-28150][CORE][FOLLOW-UP] Don't try to log in when impersonating.
Jul 15, 2019
2f3997f
[SPARK-28306][SQL][FOLLOWUP] Fix NormalizeFloatingNumbers rule idempo…
yeshengm Jul 15, 2019
8f7ccc5
[SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousP…
gaborgsomogyi Jul 15, 2019
d8996fd
[SPARK-28152][SQL] Mapped ShortType to SMALLINT and FloatType to REAL…
shivsood Jul 15, 2019
8e26d4d
[SPARK-28408][SQL][TEST] Restrict test values for DateType, Timestamp…
MaxGekk Jul 16, 2019
b94fa97
[SPARK-28345][SQL][PYTHON] PythonUDF predicate should be able to push…
viirya Jul 16, 2019
be4a552
[SPARK-28106][SQL] When Spark SQL use "add jar" , before add to Spa…
Jul 16, 2019
6926849
[SPARK-28395][SQL] Division operator support integral division
wangyum Jul 16, 2019
9a7f01d
[SPARK-28201][SQL][TEST][FOLLOWUP] Fix Integration test suite accordi…
dongjoon-hyun Jul 16, 2019
421d9d5
[SPARK-27485] EnsureRequirements.reorder should handle duplicate expr…
hvanhovell Jul 16, 2019
d1a1376
[SPARK-28356][SQL] Do not reduce the number of partitions for reparti…
carsonwang Jul 16, 2019
f74ad3d
[SPARK-28129][SQL][TEST] Port float8.sql
wangyum Jul 16, 2019
113f62d
[SPARK-27485][FOLLOWUP] Do not reduce the number of partitions for re…
gaborgsomogyi Jul 16, 2019
282a12d
[SPARK-27944][ML] Unify the behavior of checking empty output column …
zhengruifeng Jul 16, 2019
71882f1
[SPARK-28343][FOLLOW-UP][SQL][TEST] Enable spark.sql.function.preferI…
wangyum Jul 16, 2019
43d68cd
[SPARK-27959][YARN] Change YARN resource configs to use .amount
tgravescs Jul 16, 2019
1134fae
[SPARK-18299][SQL] Allow more aggregations on KeyValueGroupedDataset
nooberfsh Jul 16, 2019
2ddeff9
[SPARK-27963][CORE] Allow dynamic allocation without a shuffle service.
Jul 16, 2019
66179fa
[SPARK-28418][PYTHON][SQL] Wait for event process in 'test_query_exec…
HyukjinKwon Jul 17, 2019
28774cd
[SPARK-28359][SQL][PYTHON][TESTS] Make integrated UDF tests robust by…
HyukjinKwon Jul 17, 2019
eb5dc74
[SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect
mojodna Jul 17, 2019
70073b1
[SPARK-27609][PYTHON] Convert values of function options to strings
MaxGekk Jul 18, 2019
971e832
[SPARK-28411][PYTHON][SQL] InsertInto with overwrite is not honored
huaxingao Jul 18, 2019
d6346ae
[SPARK-28270][SQL][FOLLOW-UP] Explicitly cast into int/long/decimal i…
HyukjinKwon Jul 11, 2019
3966812
[SPARK-28213][SQL] Replace ColumnarBatchScan with equivilant from Col…
revans2 Jul 11, 2019
dd97a8a
[MINOR][SQL] Clean up ObjectProducerExec operators
jaceklaskowski Jul 11, 2019
2b9e090
[SPARK-26978][SQL][FOLLOWUP] Initialize date-time constants by foldab…
MaxGekk Jul 11, 2019
132f1c7
[SPARK-28055][SS][DSTREAMS] Add delegation token custom AdminClient c…
gaborgsomogyi Jul 11, 2019
a384b0f
[SPARK-23472][CORE] Add defaultJavaOptions for driver and executor.
gaborgsomogyi Jul 11, 2019
7946d26
[SPARK-28334][SQL][TEST] Port select.sql
wangyum Jul 11, 2019
50add0f
[SPARK-28139][SQL] Add v2 ALTER TABLE implementation.
rdblue Jul 12, 2019
6280475
[SPARK-28321][SQL] 0-args Java UDF should not be called only once
HyukjinKwon Jul 12, 2019
308ddfb
[SPARK-28270][TEST-MAVEN][FOLLOW-UP][SQL][PYTHON][TESTS] Avoid cast i…
HyukjinKwon Jul 12, 2019
6cc4b8b
[SPARK-28257][SQL] Use ConfigEntry for hardcoded configs in SQL
WangGuangxin Jul 12, 2019
1d99657
[SPARK-28034][SQL][TEST] Port with.sql
peter-toth Jul 12, 2019
509dc18
[SPARK-28357][CORE][TEST] Fix Flaky Test - FileAppenderSuite.rollingf…
dongjoon-hyun Jul 12, 2019
46c9c4e
[SPARK-28354][INFRA] Use JIRA user name instead of JIRA user key
dongjoon-hyun Jul 12, 2019
0ea8244
[SPARK-28228][SQL] Fix substitution order of nested WITH clauses
peter-toth Jul 12, 2019
ad4090b
[SPARK-28260][SQL] Add CLOSED state to ExecutionState
wangyum Jul 12, 2019
98a5be4
[SPARK-28159][ML][FOLLOWUP] fix typo & (0 until v.size).toList => Lis…
zhengruifeng Jul 12, 2019
0553bd2
[SPARK-28361][SQL][TEST] Test equality of generated code with id in c…
gatorsmile Jul 12, 2019
18003d6
[SPARK-28355][CORE][PYTHON] Use Spark conf for threshold at which com…
jessecai Jul 13, 2019
71c9034
[SPARK-28247][SS] Fix flaky test "query without test harness" on Cont…
HeartSaVioR Jul 13, 2019
3c04e59
[SPARK-28371][SQL] Make Parquet "StartsWith" filter null-safe
Jul 13, 2019
f873d5a
[SPARK-28370][BUILD][TEST] Upgrade Mockito to 2.28.2
dongjoon-hyun Jul 13, 2019
d94ab59
[SPARK-28378][PYTHON] Remove usage of cgi.escape
viirya Jul 14, 2019
22346c6
[SPARK-28343][SQL][TEST] Enabling cartesian product and ansi mode for…
wangyum Jul 14, 2019
de21737
[SPARK-28199][SS] Move Trigger implementations to Triggers.scala and …
HeartSaVioR Jul 14, 2019
1b72367
[SPARK-28381][PYSPARK] Upgraded version of Pyrolite to 4.30
viirya Jul 15, 2019
22a959c
[SPARK-28133][SQL] Add acosh/asinh/atanh functions to SQL
Jul 15, 2019
919790f
[SPARK-28387][SQL][TEST] Port select_having.sql
wangyum Jul 15, 2019
23db571
[SPARK-28384][SQL][TEST] Port select_distinct.sql
wangyum Jul 15, 2019
3cc9759
[SPARK-28392][SQL][TESTS] Add traits for UDF and PostgreSQL tests to …
HyukjinKwon Jul 15, 2019
528a434
[SPARK-28389][SQL] Use Java 8 API in add_months
MaxGekk Jul 15, 2019
464e8fe
[SPARK-28311][SQL] Fix STS OpenSession failed return wrong origin PRO…
Jul 15, 2019
de7009d
[SPARK-28150][CORE][FOLLOW-UP] Don't try to log in when impersonating.
Jul 15, 2019
911d52d
[SPARK-28306][SQL][FOLLOWUP] Fix NormalizeFloatingNumbers rule idempo…
yeshengm Jul 15, 2019
62952c2
[SPARK-28404][SS] Fix negative timeout value in RateStreamContinuousP…
gaborgsomogyi Jul 15, 2019
cc3df92
[SPARK-28152][SQL] Mapped ShortType to SMALLINT and FloatType to REAL…
shivsood Jul 15, 2019
d581487
[SPARK-28408][SQL][TEST] Restrict test values for DateType, Timestamp…
MaxGekk Jul 16, 2019
b12a32d
[SPARK-28345][SQL][PYTHON] PythonUDF predicate should be able to push…
viirya Jul 16, 2019
a127af4
[SPARK-28106][SQL] When Spark SQL use "add jar" , before add to Spa…
Jul 16, 2019
671fe59
[SPARK-28395][SQL] Division operator support integral division
wangyum Jul 16, 2019
da8c4e0
[SPARK-28201][SQL][TEST][FOLLOWUP] Fix Integration test suite accordi…
dongjoon-hyun Jul 16, 2019
47fbe53
[SPARK-27485] EnsureRequirements.reorder should handle duplicate expr…
hvanhovell Jul 16, 2019
a89c364
[SPARK-28356][SQL] Do not reduce the number of partitions for reparti…
carsonwang Jul 16, 2019
ec5b825
[SPARK-28129][SQL][TEST] Port float8.sql
wangyum Jul 16, 2019
99c6c10
[SPARK-27485][FOLLOWUP] Do not reduce the number of partitions for re…
gaborgsomogyi Jul 16, 2019
aee96a2
[SPARK-27944][ML] Unify the behavior of checking empty output column …
zhengruifeng Jul 16, 2019
8d5c409
[SPARK-28343][FOLLOW-UP][SQL][TEST] Enable spark.sql.function.preferI…
wangyum Jul 16, 2019
88811a5
[SPARK-27959][YARN] Change YARN resource configs to use .amount
tgravescs Jul 16, 2019
c42e0a1
[SPARK-18299][SQL] Allow more aggregations on KeyValueGroupedDataset
nooberfsh Jul 16, 2019
71ef01e
[SPARK-27963][CORE] Allow dynamic allocation without a shuffle service.
Jul 16, 2019
ebab9f3
[SPARK-28418][PYTHON][SQL] Wait for event process in 'test_query_exec…
HyukjinKwon Jul 17, 2019
9dc86d2
[SPARK-28359][SQL][PYTHON][TESTS] Make integrated UDF tests robust by…
HyukjinKwon Jul 17, 2019
8c53db3
[SPARK-28097][SQL] Map ByteType to SMALLINT for PostgresDialect
mojodna Jul 17, 2019
1ad2c0a
[SPARK-27609][PYTHON] Convert values of function options to strings
MaxGekk Jul 18, 2019
0086080
[SPARK-28411][PYTHON][SQL] InsertInto with overwrite is not honored
huaxingao Jul 18, 2019
4fd6170
fix review comments
vinodkc Jul 18, 2019
c72e9bf
Convert applicable udaf.sql tests into UDF integrated test base
vinodkc Jul 11, 2019
c68b77e
Fixed review comment
vinodkc Jul 11, 2019
939e4d1
fix review comments
vinodkc Jul 18, 2019
bc9b0d2
Merge branch 'br_Fix_SPARK-27921' of https://github.com/vinodkc/spark…
vinodkc Jul 18, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@
<dependency>
<groupId>net.razorvine</groupId>
<artifactId>pyrolite</artifactId>
<version>4.23</version>
<version>4.30</version>
<exclusions>
<exclusion>
<groupId>net.razorvine</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ private[spark] class ExecutorAllocationManager(
client: ExecutorAllocationClient,
listenerBus: LiveListenerBus,
conf: SparkConf,
cleaner: Option[ContextCleaner] = None,
clock: Clock = new SystemClock())
extends Logging {

Expand Down Expand Up @@ -148,7 +149,7 @@ private[spark] class ExecutorAllocationManager(
// Listener for Spark events that impact the allocation policy
val listener = new ExecutorAllocationListener

val executorMonitor = new ExecutorMonitor(conf, client, clock)
val executorMonitor = new ExecutorMonitor(conf, client, listenerBus, clock)

// Executor that handles the scheduling task.
private val executor =
Expand Down Expand Up @@ -194,11 +195,13 @@ private[spark] class ExecutorAllocationManager(
throw new SparkException(
s"s${DYN_ALLOCATION_SUSTAINED_SCHEDULER_BACKLOG_TIMEOUT.key} must be > 0!")
}
// Require external shuffle service for dynamic allocation
// Otherwise, we may lose shuffle files when killing executors
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED) && !testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
if (!conf.get(config.SHUFFLE_SERVICE_ENABLED)) {
if (conf.get(config.DYN_ALLOCATION_SHUFFLE_TRACKING)) {
logWarning("Dynamic allocation without a shuffle service is an experimental feature.")
} else if (!testing) {
throw new SparkException("Dynamic allocation of executors requires the external " +
"shuffle service. You may enable this through spark.shuffle.service.enabled.")
}
}

if (executorAllocationRatio > 1.0 || executorAllocationRatio <= 0.0) {
Expand All @@ -214,6 +217,7 @@ private[spark] class ExecutorAllocationManager(
def start(): Unit = {
listenerBus.addToManagementQueue(listener)
listenerBus.addToManagementQueue(executorMonitor)
cleaner.foreach(_.attachListener(executorMonitor))

val scheduleTask = new Runnable() {
override def run(): Unit = {
Expand Down
54 changes: 39 additions & 15 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,22 @@ class SparkContext(config: SparkConf) extends Logging {
None
}

// Optionally scale number of executors dynamically based on workload. Exposed for testing.
_cleaner =
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())

val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
cleaner = cleaner))
case _ =>
None
}
Expand All @@ -569,14 +577,6 @@ class SparkContext(config: SparkConf) extends Logging {
}
_executorAllocationManager.foreach(_.start())

_cleaner =
if (_conf.get(CLEANER_REFERENCE_TRACKING)) {
Some(new ContextCleaner(this))
} else {
None
}
_cleaner.foreach(_.start())

setupAndStartListenerBus()
postEnvironmentUpdate()
postApplicationStart()
Expand Down Expand Up @@ -1791,7 +1791,7 @@ class SparkContext(config: SparkConf) extends Logging {
* @note A path can be added only once. Subsequent additions of the same path are ignored.
*/
def addJar(path: String) {
def addJarFile(file: File): String = {
def addLocalJarFile(file: File): String = {
try {
if (!file.exists()) {
throw new FileNotFoundException(s"Jar ${file.getAbsolutePath} not found")
Expand All @@ -1808,12 +1808,36 @@ class SparkContext(config: SparkConf) extends Logging {
}
}

def checkRemoteJarFile(path: String): String = {
val hadoopPath = new Path(path)
val scheme = new URI(path).getScheme
if (!Array("http", "https", "ftp").contains(scheme)) {
try {
val fs = hadoopPath.getFileSystem(hadoopConfiguration)
if (!fs.exists(hadoopPath)) {
throw new FileNotFoundException(s"Jar ${path} not found")
}
if (fs.isDirectory(hadoopPath)) {
throw new IllegalArgumentException(
s"Directory ${path} is not allowed for addJar")
}
path
} catch {
case NonFatal(e) =>
logError(s"Failed to add $path to Spark environment", e)
null
}
} else {
path
}
}

if (path == null) {
logWarning("null specified as parameter to addJar")
} else {
val key = if (path.contains("\\")) {
// For local paths with backslashes on Windows, URI throws an exception
addJarFile(new File(path))
addLocalJarFile(new File(path))
} else {
val uri = new URI(path)
// SPARK-17650: Make sure this is a valid URL before adding it to the list of dependencies
Expand All @@ -1822,12 +1846,12 @@ class SparkContext(config: SparkConf) extends Logging {
// A JAR file which exists only on the driver node
case null =>
// SPARK-22585 path without schema is not url encoded
addJarFile(new File(uri.getRawPath))
addLocalJarFile(new File(uri.getRawPath))
// A JAR file which exists only on the driver node
case "file" => addJarFile(new File(uri.getPath))
case "file" => addLocalJarFile(new File(uri.getPath))
// A JAR file which exists locally on every worker node
case "local" => "file:" + uri.getPath
case _ => path
case _ => checkRemoteJarFile(path)
}
}
if (key != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,4 +81,8 @@ private[spark] object PythonUtils {
def isEncryptionEnabled(sc: JavaSparkContext): Boolean = {
sc.conf.get(org.apache.spark.internal.config.IO_ENCRYPTION_ENABLED)
}

def getBroadcastThreshold(sc: JavaSparkContext): Long = {
sc.conf.get(org.apache.spark.internal.config.BROADCAST_FOR_UDF_COMPRESSION_THRESHOLD)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -186,9 +186,6 @@ private[spark] object SerDeUtil extends Logging {
val unpickle = new Unpickler
iter.flatMap { row =>
val obj = unpickle.loads(row)
// `Opcodes.MEMOIZE` of Protocol 4 (Python 3.4+) will store objects in internal map
// of `Unpickler`. This map is cleared when calling `Unpickler.close()`.
unpickle.close()
if (batched) {
obj match {
case array: Array[Any] => array.toSeq
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import org.apache.spark.{SPARK_VERSION => sparkVersion, SparkConf}
import org.apache.spark.deploy.{Command, DeployMessages, DriverDescription}
import org.apache.spark.deploy.ClientArguments._
import org.apache.spark.internal.config
import org.apache.spark.launcher.SparkLauncher
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.util.Utils

Expand Down Expand Up @@ -135,6 +136,7 @@ private[rest] class StandaloneSubmitRequestServlet(
val sparkProperties = request.sparkProperties
val driverMemory = sparkProperties.get(config.DRIVER_MEMORY.key)
val driverCores = sparkProperties.get(config.DRIVER_CORES.key)
val driverDefaultJavaOptions = sparkProperties.get(SparkLauncher.DRIVER_DEFAULT_JAVA_OPTIONS)
val driverExtraJavaOptions = sparkProperties.get(config.DRIVER_JAVA_OPTIONS.key)
val driverExtraClassPath = sparkProperties.get(config.DRIVER_CLASS_PATH.key)
val driverExtraLibraryPath = sparkProperties.get(config.DRIVER_LIBRARY_PATH.key)
Expand All @@ -160,9 +162,11 @@ private[rest] class StandaloneSubmitRequestServlet(
.set("spark.master", updatedMasters)
val extraClassPath = driverExtraClassPath.toSeq.flatMap(_.split(File.pathSeparator))
val extraLibraryPath = driverExtraLibraryPath.toSeq.flatMap(_.split(File.pathSeparator))
val defaultJavaOpts = driverDefaultJavaOptions.map(Utils.splitCommandString)
.getOrElse(Seq.empty)
val extraJavaOpts = driverExtraJavaOptions.map(Utils.splitCommandString).getOrElse(Seq.empty)
val sparkJavaOpts = Utils.sparkJavaOpts(conf)
val javaOpts = sparkJavaOpts ++ extraJavaOpts
val javaOpts = sparkJavaOpts ++ defaultJavaOpts ++ extraJavaOpts
val command = new Command(
"org.apache.spark.deploy.worker.DriverWrapper",
Seq("{{WORKER_URL}}", "{{USER_JAR}}", mainClass) ++ appArgs, // args to the DriverWrapper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,12 +263,14 @@ private[spark] class HadoopDelegationTokenManager(
val ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keytab)
logInfo("Successfully logged into KDC.")
ugi
} else {
} else if (!SparkHadoopUtil.get.isProxyUser(UserGroupInformation.getCurrentUser())) {
logInfo(s"Attempting to load user's ticket cache.")
val ccache = sparkConf.getenv("KRB5CCNAME")
val user = Option(sparkConf.getenv("KRB5PRINCIPAL")).getOrElse(
UserGroupInformation.getCurrentUser().getUserName())
UserGroupInformation.getUGIFromTicketCache(ccache, user)
} else {
UserGroupInformation.getCurrentUser()
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,9 @@ private[spark] class TypedConfigBuilder[T](

/** Creates a [[ConfigEntry]] that does not have a default value. */
def createOptional: OptionalConfigEntry[T] = {
val entry = new OptionalConfigEntry[T](parent.key, parent._alternatives, converter,
stringConverter, parent._doc, parent._public)
val entry = new OptionalConfigEntry[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, converter, stringConverter, parent._doc,
parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -141,17 +142,19 @@ private[spark] class TypedConfigBuilder[T](
createWithDefaultString(default.asInstanceOf[String])
} else {
val transformedDefault = converter(stringConverter(default))
val entry = new ConfigEntryWithDefault[T](parent.key, parent._alternatives,
transformedDefault, converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefault[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, transformedDefault, converter,
stringConverter, parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
}

/** Creates a [[ConfigEntry]] with a function to determine the default value */
def createWithDefaultFunction(defaultFunc: () => T): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._alternatives, defaultFunc,
converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultFunction[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, defaultFunc, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_ (entry))
entry
}
Expand All @@ -161,8 +164,9 @@ private[spark] class TypedConfigBuilder[T](
* [[String]] and must be a valid value for the entry.
*/
def createWithDefaultString(default: String): ConfigEntry[T] = {
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._alternatives, default,
converter, stringConverter, parent._doc, parent._public)
val entry = new ConfigEntryWithDefaultString[T](parent.key, parent._prependedKey,
parent._prependSeparator, parent._alternatives, default, converter, stringConverter,
parent._doc, parent._public)
parent._onCreate.foreach(_(entry))
entry
}
Expand All @@ -178,6 +182,8 @@ private[spark] case class ConfigBuilder(key: String) {

import ConfigHelpers._

private[config] var _prependedKey: Option[String] = None
private[config] var _prependSeparator: String = ""
private[config] var _public = true
private[config] var _doc = ""
private[config] var _onCreate: Option[ConfigEntry[_] => Unit] = None
Expand All @@ -202,24 +208,34 @@ private[spark] case class ConfigBuilder(key: String) {
this
}

def withPrepended(key: String, separator: String = " "): ConfigBuilder = {
_prependedKey = Option(key)
_prependSeparator = separator
this
}

def withAlternative(key: String): ConfigBuilder = {
_alternatives = _alternatives :+ key
this
}

def intConf: TypedConfigBuilder[Int] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toInt, key, "int"))
}

def longConf: TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toLong, key, "long"))
}

def doubleConf: TypedConfigBuilder[Double] = {
checkPrependConfig
new TypedConfigBuilder(this, toNumber(_, _.toDouble, key, "double"))
}

def booleanConf: TypedConfigBuilder[Boolean] = {
checkPrependConfig
new TypedConfigBuilder(this, toBoolean(_, key))
}

Expand All @@ -228,20 +244,30 @@ private[spark] case class ConfigBuilder(key: String) {
}

def timeConf(unit: TimeUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, timeFromString(_, unit), timeToString(_, unit))
}

def bytesConf(unit: ByteUnit): TypedConfigBuilder[Long] = {
checkPrependConfig
new TypedConfigBuilder(this, byteFromString(_, unit), byteToString(_, unit))
}

def fallbackConf[T](fallback: ConfigEntry[T]): ConfigEntry[T] = {
val entry = new FallbackConfigEntry(key, _alternatives, _doc, _public, fallback)
val entry = new FallbackConfigEntry(key, _prependedKey, _prependSeparator, _alternatives, _doc,
_public, fallback)
_onCreate.foreach(_(entry))
entry
}

def regexConf: TypedConfigBuilder[Regex] = {
checkPrependConfig
new TypedConfigBuilder(this, regexFromString(_, this.key), _.toString)
}

private def checkPrependConfig = {
if (_prependedKey.isDefined) {
throw new IllegalArgumentException(s"$key type must be string if prepend used")
}
}
}
Loading