Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 39 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
package org.apache.spark

import java.io._
import java.net.URI
import java.net.{URI, URL}
import java.util.{Properties, UUID}
import java.util.concurrent.atomic.AtomicInteger

Expand All @@ -44,7 +44,7 @@ import org.apache.spark.scheduler.cluster.mesos.{CoarseMesosSchedulerBackend, Me
import org.apache.spark.scheduler.local.LocalBackend
import org.apache.spark.storage.{BlockManagerSource, RDDInfo, StorageStatus, StorageUtils}
import org.apache.spark.ui.SparkUI
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, Utils}
import org.apache.spark.util.{ClosureCleaner, MetadataCleaner, MetadataCleanerType, TimeStampedHashMap, SparkURLClassLoader, Utils}

/**
* Main entry point for Spark functionality. A SparkContext represents the connection to a Spark
Expand Down Expand Up @@ -130,6 +130,18 @@ class SparkContext(

val isLocal = (master == "local" || master.startsWith("local["))

// Create a classLoader for use by the driver so that jars added via addJar are available to the
// driver. Do this before all other initialization so that any thread pools created for this
// SparkContext uses the class loader.
// In the future it might make sense to expose this to users so they can assign it as the
// context class loader for other threads.
// Note that this is config-enabled as classloaders can introduce subtle side effects
private[spark] val classLoader = if (conf.getBoolean("spark.driver.loadAddedJars", false)) {
val loader = new SparkURLClassLoader(Array.empty[URL], this.getClass.getClassLoader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you should set its parent to the current thread's context class loader if one exists. Otherwise users who try to add some class loader before starting SparkContext (e.g. if they're in some server environment) will lose it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch - this is definitely something that needs to change.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I'm pretty sure there is almost no way that Spark contexts can work properly inside of some server environment, with simply using Thread context classloaders. The reason is that Spark spins up so many other threads. To make everything work easier, I believe we should instead have a standard classloader set in SparkEnv or somewhere like that, which can inherit from Thread context in the thread that started SparkContext, but which can be used everywhere else that spins up new threads.

Thread.currentThread.setContextClassLoader(loader)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this only work if addJars is called from the thread that created the SparkContext?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader.

I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent.

This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will capture a pointer to the classlaoder in which the SC was created. So addJars can be called from anywhere and it will always augment this class loader.

I think this means that the class will be visible to (a) the thread that created the sc and (b) any threads created by that thread. Though it would be good to verify that the context class loader is passed on to child threads or they delegate to that of the parent.

This does mean that a thread entirely outside of the SparkContext-creating thread and its children won't have the class loaded. I think that's actually desirable given that you may have a case where mutliple SparkContext's are created in the same JVM.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll defer to @velvia on this one though as it's his design.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, ok, I understand now. In that case, to make things simpler, would it possibly make sense to not load the jars to the current thread and only load them for the SparkContext/executors? Classloader stuff can be confusing to deal with and keeping it as isolated as possible could make things easier for users. This would also line up a little more with how the MR distributed cache works - jars that get added to it don't become accessible for to driver code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey Sandy - not sure what you mean exactly by "load them for the SparkContext". The SparkContext is just a java object. The scenario we want to handle is like this:

val sc = new SparkContext(...)
sc.addJar("jar-containing-lib-foo")
val x: Seq[Foo] = sc.textFile(...).map(...).collect()

There are two ways "Foo" can be visible for the list line. Either it can be included in the classpath when launching the JVM or it can be added dynamically to the classloader of the calling thread. Is there another way?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had misunderstood how the original mechanism worked. I take this all back.

Some(loader)
} else None

if (master == "yarn-client") System.setProperty("SPARK_YARN_MODE", "true")

// Create the Spark execution environment (cache, map output tracker, etc)
Expand Down Expand Up @@ -726,6 +738,8 @@ class SparkContext(
* Adds a JAR dependency for all tasks to be executed on this SparkContext in the future.
* The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported
* filesystems), an HTTP, HTTPS or FTP URI, or local:/path for a file on every worker node.
* NOTE: If you enable spark.driver.loadAddedJars, then the JAR will also be made available
* to this SparkContext and chld threads. local: JARs must be available on the driver node.
*/
def addJar(path: String) {
if (path == null) {
Expand Down Expand Up @@ -767,6 +781,20 @@ class SparkContext(
case _ =>
path
}

// Add jar to driver class loader so it is available for driver,
// even if it is not on the classpath
uri.getScheme match {
case null | "file" | "local" =>
// Assume file exists on current (driver) node as well. Unlike executors, driver
// doesn't need to download the jar since it's local.
addUrlToDriverLoader(new URL("file:" + uri.getPath))
case "http" | "https" | "ftp" =>
// Should be handled by the URLClassLoader, pass along entire URL
addUrlToDriverLoader(new URL(path))
case other =>
logWarning(s"This URI scheme for URI $path is not supported by the driver class loader")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a poor way to handle this case. Why don't we create a ClassLoader from HDFS or whatever the other URI scheme is? In general if some URI scheme is supported on the workers, it should also be supported on the driver.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other option BTW would be to throw an exception, since the user explicitly enabled loading JARs in the driver. In general logging a warning and hoping the user will notice it is not great.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes true. At the minimum will replace with an exception.

}
}
if (key != null) {
addedJars(key) = System.currentTimeMillis
Expand All @@ -775,6 +803,15 @@ class SparkContext(
}
}

private def addUrlToDriverLoader(url: URL) {
classLoader.foreach { loader =>
if (!loader.getURLs.contains(url)) {
logInfo("Adding JAR " + url + " to driver class loader")
loader.addURL(url)
}
}
}

/**
* Clear the job's list of JARs added by `addJar` so that they do not get downloaded to
* any new nodes.
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/scala/org/apache/spark/SparkEnv.scala
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ object SparkEnv extends Logging {
isDriver: Boolean,
isLocal: Boolean): SparkEnv = {

val classLoader = Thread.currentThread.getContextClassLoader

val securityManager = new SecurityManager(conf)
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("spark", hostname, port, conf = conf,
securityManager = securityManager)
Expand All @@ -133,8 +135,6 @@ object SparkEnv extends Logging {
conf.set("spark.driver.port", boundPort.toString)
}

val classLoader = Thread.currentThread.getContextClassLoader

// Create an instance of the class named by the given Java system property, or by
// defaultClassName if the property is not set, and return it as a T
def instantiateClass[T](propertyName: String, defaultClassName: String): T = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ private[spark] object CoarseGrainedExecutorBackend {
// Create a new ActorSystem to run the backend, because we can't create a SparkEnv / Executor
// before getting started with all our system properties, etc
val (actorSystem, boundPort) = AkkaUtils.createActorSystem("sparkExecutor", hostname, 0,
indestructible = true, conf = conf, new SecurityManager(conf))
indestructible = true, conf = conf, securityManager = new SecurityManager(conf))
// set it
val sparkHostPort = hostname + ":" + boundPort
actorSystem.actorOf(
Expand Down
6 changes: 3 additions & 3 deletions core/src/main/scala/org/apache/spark/executor/Executor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import org.apache.spark._
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.scheduler._
import org.apache.spark.storage.{StorageLevel, TaskResultBlockId}
import org.apache.spark.util.Utils
import org.apache.spark.util.{Utils, SparkURLClassLoader}

/**
* Spark executor used with Mesos, YARN, and the standalone scheduler.
Expand Down Expand Up @@ -295,15 +295,15 @@ private[spark] class Executor(
* Create a ClassLoader for use in tasks, adding any JARs specified by the user or any classes
* created by the interpreter to the search path
*/
private def createClassLoader(): ExecutorURLClassLoader = {
private def createClassLoader(): SparkURLClassLoader = {
val loader = this.getClass.getClassLoader

// For each of the jars in the jarSet, add them to the class loader.
// We assume each of the files has already been fetched.
val urls = currentJars.keySet.map { uri =>
new File(uri.split("/").last).toURI.toURL
}.toArray
new ExecutorURLClassLoader(urls, loader)
new SparkURLClassLoader(urls, loader)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ private[spark] class TaskResultGetter(sparkEnv: SparkEnv, scheduler: TaskSchedul
} catch {
case cnf: ClassNotFoundException =>
val loader = Thread.currentThread.getContextClassLoader
taskSetManager.abort("ClassNotFound with classloader: " + loader)
taskSetManager.abort(s"ClassNotFound [${cnf.getMessage}] with classloader: " + loader)
case ex: Throwable =>
taskSetManager.abort("Exception while deserializing and fetching task: %s".format(ex))
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.executor
package org.apache.spark.util

import java.net.{URLClassLoader, URL}

/**
* The addURL method in URLClassLoader is protected. We subclass it to make this accessible.
*/
private[spark] class ExecutorURLClassLoader(urls: Array[URL], parent: ClassLoader)
private[spark] class SparkURLClassLoader(urls: Array[URL], parent: ClassLoader)
extends URLClassLoader(urls, parent) {

override def addURL(url: URL) {
Expand Down
24 changes: 4 additions & 20 deletions core/src/test/scala/org/apache/spark/FileServerSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
override def beforeEach() {
super.beforeEach()
resetSparkContext()
System.setProperty("spark.authenticate", "false")
}

override def beforeAll() {
super.beforeAll()
System.setProperty("spark.authenticate", "false")

val tmpDir = new File(Files.createTempDir(), "test")
tmpDir.mkdir()

Expand All @@ -47,27 +48,10 @@ class FileServerSuite extends FunSuite with LocalSparkContext {
pw.close()

val jarFile = new File(tmpDir, "test.jar")
val jarStream = new FileOutputStream(jarFile)
val jar = new JarOutputStream(jarStream, new java.util.jar.Manifest())
System.setProperty("spark.authenticate", "false")

val jarEntry = new JarEntry(textFile.getName)
jar.putNextEntry(jarEntry)

val in = new FileInputStream(textFile)
val buffer = new Array[Byte](10240)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jar.write(buffer, 0, nRead)
}

in.close()
jar.close()
jarStream.close()
val jarUrl = TestUtils.createJar(Seq(textFile), jarFile)

tmpFile = textFile
tmpJarUrl = jarFile.toURI.toURL.toString
tmpJarUrl = jarUrl.toString
}

test("Distributing files locally") {
Expand Down
52 changes: 52 additions & 0 deletions core/src/test/scala/org/apache/spark/FileSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark

import java.io.{File, FileWriter}
import java.util.concurrent.Semaphore

import scala.io.Source

Expand All @@ -29,8 +30,59 @@ import org.scalatest.FunSuite

import org.apache.spark.SparkContext._
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat
import scala.util.Try

class FileSuite extends FunSuite with LocalSparkContext {
test("adding jars to classpath at the driver") {
val tmpDir = Files.createTempDir()
val classFile = TestUtils.createCompiledClass("HelloSpark", tmpDir)
val jarFile = new File(tmpDir, "test.jar")
TestUtils.createJar(Seq(classFile), jarFile)

def canLoadClass(clazz: String) =
Try(Class.forName(clazz, true, Thread.currentThread().getContextClassLoader)).isSuccess

val loadedBefore = canLoadClass("HelloSpark")

val conf = new SparkConf().setMaster("local-cluster[1,1,512]").setAppName("test")
.set("spark.driver.loadAddedJars", "true")

var driverLoadedAfter = false
var childLoadedAfter = false

val sem = new Semaphore(1)
sem.acquire()

new Thread() {
override def run() {
val sc = new SparkContext(conf)
sc.addJar(jarFile.getAbsolutePath)
driverLoadedAfter = canLoadClass("HelloSpark")

// Test visibility in a child thread
val childSem = new Semaphore(1)
childSem.acquire()
new Thread() {
override def run() {
childLoadedAfter = canLoadClass("HelloSpark")
childSem.release()
}
}.start()

childSem.acquire()
sem.release()
}
}.start()
sem.acquire()

// Test visibility in a parent thread
val parentLoadedAfter = canLoadClass("HelloSpark")

assert(false === loadedBefore, "Class visible before being added")
assert(true === driverLoadedAfter, "Class was not visible after being added")
assert(true === childLoadedAfter, "Class was not visible to child thread after being added")
assert(false === parentLoadedAfter, "Class was visible to parent thread after being added")
}

test("text files") {
sc = new SparkContext("local", "test")
Expand Down
80 changes: 80 additions & 0 deletions core/src/test/scala/org/apache/spark/TestUtils.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark

import scala.collection.JavaConversions._

import java.io.{FileInputStream, FileOutputStream, File}
import java.util.jar.{JarEntry, JarOutputStream}
import java.net.{URL, URI}
import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}

object TestUtils {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we really need this or could we ship a .jar file around for use in tests? I believe we had one for some other tests with addJar.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't include a compiled .jar file in the Spark repo but we could create a project in the build that builds a test jar. I thought this was strictly better since we can test more sophisticated things down the road given this mechanism.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pwendell this does enable testing more sophisticated things, but especially for testing Scala sources it becomes harder and harder to do this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure - this can only create java classes. This is just for tests that want to see visibility of code defined in different jar files.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By the way, I'm in the middle of creating a new pull request, but it's
taking a while.

On Fri, Apr 4, 2014 at 11:24 AM, Patrick Wendell
[email protected]:

In core/src/test/scala/org/apache/spark/TestUtils.scala:

  • * distributed under the License is distributed on an "AS IS" BASIS,
  • * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  • * See the License for the specific language governing permissions and
  • * limitations under the License.
  • */
    +
    +package org.apache.spark
    +
    +import scala.collection.JavaConversions._
    +
    +import java.io.{FileInputStream, FileOutputStream, File}
    +import java.util.jar.{JarEntry, JarOutputStream}
    +import java.net.{URL, URI}
    +import javax.tools.{JavaFileObject, SimpleJavaFileObject, ToolProvider}
    +
    +object TestUtils {

Sure - this can only create java classes. This is just for tests that want
to see visibility of code defined in different jar files.

Reply to this email directly or view it on GitHubhttps://github.com//pull/119/files#r11307071
.

The fruit of silence is prayer;
the fruit of prayer is faith;
the fruit of faith is love;
the fruit of love is service;
the fruit of service is peace. -- Mother Teresa


/** Create a jar file that contains this set of files. All files will be located at the root
* of the jar. */
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment format is wrong

def createJar(files: Seq[File], jarFile: File): URL = {
val jarFileStream = new FileOutputStream(jarFile)
val jarStream = new JarOutputStream(jarFileStream, new java.util.jar.Manifest())

for (file <- files) {
val jarEntry = new JarEntry(file.getName)
jarStream.putNextEntry(jarEntry)

val in = new FileInputStream(file)
val buffer = new Array[Byte](10240)
var nRead = 0
while (nRead <= 0) {
nRead = in.read(buffer, 0, buffer.length)
jarStream.write(buffer, 0, nRead)
}
in.close()
}
jarStream.close()
jarFileStream.close()

jarFile.toURI.toURL
}

// Adapted from the JavaCompiler.java doc examples
private val SOURCE = JavaFileObject.Kind.SOURCE
private def createURI(name: String) = {
URI.create(s"string:///${name.replace(".", "/")}${SOURCE.extension}")
}
private class JavaSourceFromString(val name: String, val code: String)
extends SimpleJavaFileObject(createURI(name), SOURCE) {
override def getCharContent(ignoreEncodingErrors: Boolean) = code
}

/** Creates a compiled class with the given name. Class file will be placed in destDir. */
def createCompiledClass(className: String, destDir: File): File = {
val compiler = ToolProvider.getSystemJavaCompiler
val sourceFile = new JavaSourceFromString(className, s"public class $className {}")

// Calling this outputs a class file in pwd. It's easier to just rename the file than
// build a custom FileManager that controls the output location.
compiler.getTask(null, null, null, null, null, Seq(sourceFile)).call()

val fileName = className + ".class"
val result = new File(fileName)
if (!result.exists()) throw new Exception("Compiled file not found: " + fileName)
val out = new File(destDir, fileName)
result.renameTo(out)
out
}
}
3 changes: 2 additions & 1 deletion docs/cluster-overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ and `addFile`.
- **hdfs:**, **http:**, **https:**, **ftp:** - these pull down files and JARs from the URI as expected
- **local:** - a URI starting with local:/ is expected to exist as a local file on each worker node. This
means that no network IO will be incurred, and works well for large files/JARs that are pushed to each worker,
or shared via NFS, GlusterFS, etc.
or shared via NFS, GlusterFS, etc. Note that if `spark.driver.loadAddedJars` is set,
then the file must be visible to the node running the SparkContext as well.

Note that JARs and files are copied to the working directory for each SparkContext on the executor nodes.
Over time this can use up a significant amount of space and will need to be cleaned up.
Expand Down
12 changes: 11 additions & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -392,6 +392,16 @@ Apart from these, the following properties are also available, and may be useful
Port for the driver to listen on.
</td>
</tr>
<tr>
<td>spark.driver.loadAddedJars</td>
<td>false</td>
<td>
If true, the SparkContext uses a class loader to make jars added via `addJar` available to
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the second sentence be simplified to "The default behavior is that jars added via addJar must already be on the classpath."?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

the SparkContext. The default behavior is that jars added via `addJar` must already be on
the classpath. Jar contents will be visible to the thread that created the SparkContext
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add ", but this flag is useful in situations where code is added dynamically, such as a job server"

and all of its child threads.
</td>
</tr>
<tr>
<td>spark.cleaner.ttl</td>
<td>(infinite)</td>
Expand Down Expand Up @@ -430,7 +440,7 @@ Apart from these, the following properties are also available, and may be useful
<td>spark.broadcast.blockSize</td>
<td>4096</td>
<td>
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
Size of each piece of a block in kilobytes for <code>TorrentBroadcastFactory</code>.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

False change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep but this removes a trailing space so might be good to keep it.

Too large a value decreases parallelism during broadcast (makes it slower); however, if it is too small, <code>BlockManager</code> might take a performance hit.
</td>
</tr>
Expand Down