Skip to content

Commit 57bbb4c

Browse files
committed
[SPARK-44290][CONNECT] Session-based files and archives in Spark Connect
### Previous behaviour Previously, we kept `JobArtifactSet` and leveraged thread local for each client. 1. The execution block is wrapped with `SessionHolder.withSession` [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SparkConnectStreamHandler.scala#L53). 2. `SessionHolder.withContextClassLoader` is then called [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L130) which in turn calls `JobArtifactSet.withActive` [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala#L118) and sets the active set to `SessionHolder.connectJobArtifactSet` 3. The actual `JobArtifactSet` that is used is built up [here](https://github.com/apache/spark/blob/master/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala#L157) in `SparkConnectArtifactManager.jobArtifactSet` Since each client has their own `JobArtifactSet` made `active` when executing an operation, the `TaskDescription` would have artifacts specific to that client and subsequently, `IsolatedSessionState` in Executor. Therefore, we were able to separate the Spark Connect specific logic to the Spark Connect module. ### Problem Mainly it was all good; however, the problem is that we don't call `SparkContext.addFile` or `SparkContext.addJar`, but we just pass it directly at the scheduler (to `TaskDescription`). This is fine in general but exposes several problems by not directly calling `SparkContext.addFile`: - `SparkContext.postEnvironmentUpdate` is not invoked at `SparkContext.addFile` which matters in, for example, recording the events for History Server. - Specifically for archives, `Utils.unpack(source, dest)` is not invoked at `SparkContext.addFile` in order to untar properly in the Driver. Therefore, we should duplicate those logics in Spark Connect server side, which is not ideal. In addition, we already added the isolation logic into the Executor. Driver and Executor are the symmetry (not Spark Connect Server <> Executor). Therefore, it matters about code readability, and expectation in their roles. ### Solution in this PR This PR proposes to support session-based files and archives in Spark Connect. This PR leverages the basework for #41701 and #41625 (for jars in Spark Connect Scala client). The changed logic is as follows: - Keep the session UUID, and Spark Connect Server specific information such as REPL class path within a thread local. - Add session ID when we add files or archives. `SparkContext` keeps them with a map `Map(session -> Map(file and timestamp))` in order to reuse the existing logic to address the problem mentioned After that, on executor side, - Executors create additional directory, named by session UUID, on the top of the default directory (that is the current working directory, see `SparkFiles.getRootDirectory`). - When we execute Python workers, it sets the current working directory to the one created above. - End users access to these files via using the current working directory e.g., `./blahblah.txt` in their Python UDF. Therefore, compatible with/without Spark Connect. Note that: - Here it creates Python workers for individual session because we set the session UUID as an environment variable, and we create new Python workers if environment variables are different, see also `SparkEnv.createPythonWorker` - It already kills the daemon and Python workers if they are not used for a while. ### TODOs and limitations Executor also maintains the file list but with a cache so it can evict the cache. However, it has a problem - It works as follows: - New `IsolatedSessionState` is created. - Task is executed once, and `IsolatedSessionState` holds the file list. - Later `IsolatedSessionState` is evicted at https://github.com/apache/spark/pull/41625/files#diff-d7a989c491f3cb77cca02c701496a9e2a3443f70af73b0d1ab0899239f3a789dR187 - Executor will create a new `IsolatedSessionState` with empty file lists. - Executor will attempt to redownload and overwrite the files (see https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L1058-L1064) - `spark.files.overwrite` is `false` by default. So the task will suddenly fail at this point. Possible solutions are: - For 1., we should maintain a cache with TTL, and remove them - For 2. we should have a dedicated directory (which this PR does) and remove the directory away when the cache is evicted. So the overwrite does not happen ### Why are the changes needed? In order to allow session-based artifact control and multi tenancy. ### Does this PR introduce _any_ user-facing change? Yes, this PR now allows multiple sessions to have their own space. For example, session A and session B can add a file in the same name. Previously this was not possible. ### How was this patch tested? Unittests were added. Closes #41495 from HyukjinKwon/session-base-exec-dir. Authored-by: Hyukjin Kwon <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent e4a356a commit 57bbb4c

File tree

26 files changed

+326
-249
lines changed

26 files changed

+326
-249
lines changed

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/artifact/SparkConnectArtifactManager.scala

Lines changed: 30 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import scala.reflect.ClassTag
2929
import org.apache.commons.io.FileUtils
3030
import org.apache.hadoop.fs.{LocalFileSystem, Path => FSPath}
3131

32-
import org.apache.spark.{JobArtifactSet, SparkContext, SparkEnv}
32+
import org.apache.spark.{JobArtifactState, SparkContext, SparkEnv}
3333
import org.apache.spark.internal.Logging
3434
import org.apache.spark.sql.SparkSession
3535
import org.apache.spark.sql.connect.artifact.util.ArtifactUtils
@@ -56,15 +56,15 @@ import org.apache.spark.util.Utils
5656
class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging {
5757
import SparkConnectArtifactManager._
5858

59-
private val sessionUUID = sessionHolder.session.sessionUUID
6059
// The base directory/URI where all artifacts are stored for this `sessionUUID`.
6160
val (artifactPath, artifactURI): (Path, String) =
6261
getArtifactDirectoryAndUriForSession(sessionHolder)
6362
// The base directory/URI where all class file artifacts are stored for this `sessionUUID`.
6463
val (classDir, classURI): (Path, String) = getClassfileDirectoryAndUriForSession(sessionHolder)
64+
val state: JobArtifactState =
65+
JobArtifactState(sessionHolder.session.sessionUUID, Option(classURI))
6566

6667
private val jarsList = new CopyOnWriteArrayList[Path]
67-
private val jarsURI = new CopyOnWriteArrayList[String]
6868
private val pythonIncludeList = new CopyOnWriteArrayList[String]
6969

7070
/**
@@ -132,10 +132,16 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
132132
}
133133
Files.move(serverLocalStagingPath, target)
134134
if (remoteRelativePath.startsWith(s"jars${File.separator}")) {
135+
sessionHolder.session.sessionState.resourceLoader
136+
.addJar(target.toString, state.uuid)
135137
jarsList.add(target)
136-
jarsURI.add(artifactURI + "/" + remoteRelativePath.toString)
137138
} else if (remoteRelativePath.startsWith(s"pyfiles${File.separator}")) {
138-
sessionHolder.session.sparkContext.addFile(target.toString)
139+
sessionHolder.session.sparkContext.addFile(
140+
target.toString,
141+
recursive = false,
142+
addedOnSubmit = false,
143+
isArchive = false,
144+
sessionUUID = state.uuid)
139145
val stringRemotePath = remoteRelativePath.toString
140146
if (stringRemotePath.endsWith(".zip") || stringRemotePath.endsWith(
141147
".egg") || stringRemotePath.endsWith(".jar")) {
@@ -144,35 +150,28 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
144150
} else if (remoteRelativePath.startsWith(s"archives${File.separator}")) {
145151
val canonicalUri =
146152
fragment.map(UriBuilder.fromUri(target.toUri).fragment).getOrElse(target.toUri)
147-
sessionHolder.session.sparkContext.addArchive(canonicalUri.toString)
153+
sessionHolder.session.sparkContext.addFile(
154+
canonicalUri.toString,
155+
recursive = false,
156+
addedOnSubmit = false,
157+
isArchive = true,
158+
sessionUUID = state.uuid)
148159
} else if (remoteRelativePath.startsWith(s"files${File.separator}")) {
149-
sessionHolder.session.sparkContext.addFile(target.toString)
160+
sessionHolder.session.sparkContext.addFile(
161+
target.toString,
162+
recursive = false,
163+
addedOnSubmit = false,
164+
isArchive = false,
165+
sessionUUID = state.uuid)
150166
}
151167
}
152168
}
153169

154-
/**
155-
* Returns a [[JobArtifactSet]] pointing towards the session-specific jars and class files.
156-
*/
157-
def jobArtifactSet: JobArtifactSet = {
158-
val builder = Map.newBuilder[String, Long]
159-
jarsURI.forEach { jar =>
160-
builder += jar -> 0
161-
}
162-
163-
new JobArtifactSet(
164-
uuid = Option(sessionUUID),
165-
replClassDirUri = Option(classURI),
166-
jars = builder.result(),
167-
files = Map.empty,
168-
archives = Map.empty)
169-
}
170-
171170
/**
172171
* Returns a [[ClassLoader]] for session-specific jar/class file resources.
173172
*/
174173
def classloader: ClassLoader = {
175-
val urls = jarsList.asScala.map(_.toUri.toURL) :+ classDir.toUri.toURL
174+
val urls = getSparkConnectAddedJars :+ classDir.toUri.toURL
176175
new URLClassLoader(urls.toArray, Utils.getContextOrSparkClassLoader)
177176
}
178177

@@ -183,6 +182,12 @@ class SparkConnectArtifactManager(sessionHolder: SessionHolder) extends Logging
183182
logDebug(
184183
s"Cleaning up resources for session with userId: ${sessionHolder.userId} and " +
185184
s"sessionId: ${sessionHolder.sessionId}")
185+
186+
// Clean up added files
187+
sessionHolder.session.sparkContext.addedFiles.remove(state.uuid)
188+
sessionHolder.session.sparkContext.addedArchives.remove(state.uuid)
189+
sessionHolder.session.sparkContext.addedJars.remove(state.uuid)
190+
186191
// Clean up cached relations
187192
val blockManager = sessionHolder.session.sparkContext.env.blockManager
188193
blockManager.removeCache(sessionHolder.userId, sessionHolder.sessionId)

connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -88,11 +88,6 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
8888
artifactManager.addArtifact(remoteRelativePath, serverLocalStagingPath, fragment)
8989
}
9090

91-
/**
92-
* A [[JobArtifactSet]] for this SparkConnect session.
93-
*/
94-
def connectJobArtifactSet: JobArtifactSet = artifactManager.jobArtifactSet
95-
9691
/**
9792
* A [[ClassLoader]] for jar/class file resources specific to this SparkConnect session.
9893
*/
@@ -114,8 +109,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
114109
def withContextClassLoader[T](f: => T): T = {
115110
// Needed for deserializing and evaluating the UDF on the driver
116111
Utils.withContextClassLoader(classloader) {
117-
// Needed for propagating the dependencies to the executors.
118-
JobArtifactSet.withActive(connectJobArtifactSet) {
112+
JobArtifactSet.withActiveJobArtifactState(artifactManager.state) {
119113
f
120114
}
121115
}

connector/connect/server/src/test/scala/org/apache/spark/sql/connect/artifact/ArtifactManagerSuite.scala

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -51,20 +51,6 @@ class ArtifactManagerSuite extends SharedSparkSession with ResourceHelper {
5151
super.afterEach()
5252
}
5353

54-
test("Jar artifacts are added to spark session") {
55-
val copyDir = Utils.createTempDir().toPath
56-
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)
57-
val stagingPath = copyDir.resolve("smallJar.jar")
58-
val remotePath = Paths.get("jars/smallJar.jar")
59-
artifactManager.addArtifact(remotePath, stagingPath, None)
60-
61-
val expectedPath = SparkConnectArtifactManager.artifactRootPath
62-
.resolve(s"$sessionUUID/jars/smallJar.jar")
63-
assert(expectedPath.toFile.exists())
64-
val jars = artifactManager.jobArtifactSet.jars
65-
assert(jars.exists(_._1.contains(remotePath.toString)))
66-
}
67-
6854
test("Class artifacts are added to the correct directory.") {
6955
val copyDir = Utils.createTempDir().toPath
7056
FileUtils.copyDirectory(artifactPath.toFile, copyDir.toFile)

core/src/main/scala/org/apache/spark/JobArtifactSet.scala

Lines changed: 59 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -20,105 +20,100 @@ package org.apache.spark
2020
import java.io.Serializable
2121
import java.util.Objects
2222

23+
/**
24+
* Job artifact state. For example, Spark Connect client sets the state specifically
25+
* for the current client.
26+
*
27+
* @param uuid UUID to use in the current context of jab artifact set. Usually this is from
28+
* a Spark Connect client.
29+
* @param replClassDirUri The URI for the directory that stores REPL classes.
30+
*/
31+
private[spark] case class JobArtifactState(uuid: String, replClassDirUri: Option[String])
32+
2333
/**
2434
* Artifact set for a job.
2535
* This class is used to store session (i.e `SparkSession`) specific resources/artifacts.
2636
*
2737
* When Spark Connect is used, this job-set points towards session-specific jars and class files.
2838
* Note that Spark Connect is not a requirement for using this class.
2939
*
30-
* @param uuid An optional UUID for this session. If unset, a default session will be used.
31-
* @param replClassDirUri An optional custom URI to point towards class files.
40+
* @param state Job artifact state.
3241
* @param jars Jars belonging to this session.
3342
* @param files Files belonging to this session.
3443
* @param archives Archives belonging to this session.
3544
*/
36-
class JobArtifactSet(
37-
val uuid: Option[String],
38-
val replClassDirUri: Option[String],
45+
private[spark] class JobArtifactSet(
46+
val state: Option[JobArtifactState],
3947
val jars: Map[String, Long],
4048
val files: Map[String, Long],
4149
val archives: Map[String, Long]) extends Serializable {
42-
def withActive[T](f: => T): T = JobArtifactSet.withActive(this)(f)
43-
4450
override def hashCode(): Int = {
45-
Objects.hash(uuid, replClassDirUri, jars.toSeq, files.toSeq, archives.toSeq)
51+
Objects.hash(state, jars.toSeq, files.toSeq, archives.toSeq)
4652
}
4753

4854
override def equals(obj: Any): Boolean = {
4955
obj match {
5056
case that: JobArtifactSet =>
51-
this.getClass == that.getClass && this.uuid == that.uuid &&
52-
this.replClassDirUri == that.replClassDirUri && this.jars.toSeq == that.jars.toSeq &&
57+
this.getClass == that.getClass && this.state == that.state &&
58+
this.jars.toSeq == that.jars.toSeq &&
5359
this.files.toSeq == that.files.toSeq && this.archives.toSeq == that.archives.toSeq
5460
}
5561
}
56-
5762
}
5863

59-
object JobArtifactSet {
60-
61-
private[this] val current = new ThreadLocal[Option[JobArtifactSet]] {
62-
override def initialValue(): Option[JobArtifactSet] = None
63-
}
64-
65-
/**
66-
* When Spark Connect isn't used, we default back to the shared resources.
67-
* @param sc The active [[SparkContext]]
68-
* @return A [[JobArtifactSet]] containing a copy of the jars/files/archives from the underlying
69-
* [[SparkContext]] `sc`.
70-
*/
71-
def apply(sc: SparkContext): JobArtifactSet = {
72-
new JobArtifactSet(
73-
uuid = None,
74-
replClassDirUri = sc.conf.getOption("spark.repl.class.uri"),
75-
jars = sc.addedJars.toMap,
76-
files = sc.addedFiles.toMap,
77-
archives = sc.addedArchives.toMap)
78-
}
7964

80-
private lazy val emptyJobArtifactSet = new JobArtifactSet(
81-
None,
82-
None,
83-
Map.empty,
84-
Map.empty,
85-
Map.empty)
65+
private[spark] object JobArtifactSet {
66+
// For testing.
67+
val emptyJobArtifactSet: JobArtifactSet = new JobArtifactSet(
68+
None, Map.empty, Map.empty, Map.empty)
69+
// For testing.
70+
def defaultJobArtifactSet: JobArtifactSet = SparkContext.getActive.map(
71+
getActiveOrDefault).getOrElse(emptyJobArtifactSet)
8672

87-
/**
88-
* Empty artifact set for use in tests.
89-
*/
90-
private[spark] def apply(): JobArtifactSet = emptyJobArtifactSet
73+
private[this] val currentClientSessionState: ThreadLocal[Option[JobArtifactState]] =
74+
new ThreadLocal[Option[JobArtifactState]] {
75+
override def initialValue(): Option[JobArtifactState] = None
76+
}
9177

92-
/**
93-
* Used for testing. Returns artifacts from [[SparkContext]] if one exists or otherwise, an
94-
* empty set.
95-
*/
96-
private[spark] def defaultArtifactSet(): JobArtifactSet = {
97-
SparkContext.getActive.map(sc => JobArtifactSet(sc)).getOrElse(JobArtifactSet())
98-
}
78+
def getCurrentClientSessionState: Option[JobArtifactState] = currentClientSessionState.get()
9979

10080
/**
101-
* Execute a block of code with the currently active [[JobArtifactSet]].
102-
* @param active
103-
* @param block
104-
* @tparam T
81+
* Set the Spark Connect specific information in the active client to the underlying
82+
* [[JobArtifactSet]].
83+
*
84+
* @param state Job artifact state.
85+
* @return the result from the function applied with [[JobArtifactSet]] specific to
86+
* the active client.
10587
*/
106-
def withActive[T](active: JobArtifactSet)(block: => T): T = {
107-
val old = current.get()
108-
current.set(Option(active))
88+
def withActiveJobArtifactState[T](state: JobArtifactState)(block: => T): T = {
89+
val oldState = currentClientSessionState.get()
90+
currentClientSessionState.set(Option(state))
10991
try block finally {
110-
current.set(old)
92+
currentClientSessionState.set(oldState)
11193
}
11294
}
11395

11496
/**
115-
* Optionally returns the active [[JobArtifactSet]].
116-
*/
117-
def active: Option[JobArtifactSet] = current.get()
118-
119-
/**
120-
* Return the active [[JobArtifactSet]] or creates the default set using the [[SparkContext]].
121-
* @param sc
97+
* When Spark Connect isn't used, we default back to the shared resources.
98+
*
99+
* @param sc The active [[SparkContext]]
100+
* @return A [[JobArtifactSet]] containing a copy of the jars/files/archives.
101+
* If there is an active client, it sets the information from them.
102+
* Otherwise, it falls back to the default in the [[SparkContext]].
122103
*/
123-
def getActiveOrDefault(sc: SparkContext): JobArtifactSet = active.getOrElse(JobArtifactSet(sc))
104+
def getActiveOrDefault(sc: SparkContext): JobArtifactSet = {
105+
val maybeState = currentClientSessionState.get().map(s => s.copy(
106+
replClassDirUri = s.replClassDirUri.orElse(sc.conf.getOption("spark.repl.class.uri"))))
107+
new JobArtifactSet(
108+
state = maybeState,
109+
jars = maybeState
110+
.map(s => sc.addedJars.getOrElse(s.uuid, sc.allAddedJars))
111+
.getOrElse(sc.allAddedJars).toMap,
112+
files = maybeState
113+
.map(s => sc.addedFiles.getOrElse(s.uuid, sc.allAddedFiles))
114+
.getOrElse(sc.allAddedFiles).toMap,
115+
archives = maybeState
116+
.map(s => sc.addedArchives.getOrElse(s.uuid, sc.allAddedArchives))
117+
.getOrElse(sc.allAddedArchives).toMap)
118+
}
124119
}

0 commit comments

Comments
 (0)