Skip to content

Conversation

@vinooganesh
Copy link
Contributor

@vinooganesh vinooganesh commented Jun 5, 2019

What changes were proposed in this pull request?

This change changes the current behavior where stopping a SparkSession stops the underlying SparkContext to only stopping the SparkContext if there are no remaining SparkSessions.

How was this patch tested?

  • Unit test included as a part of the PR

Please review https://spark.apache.org/contributing.html before opening a pull request.

@vinooganesh vinooganesh changed the title [WIP] [SPARK-27958] Stopping a SparkSession should not always stop Spark Context [SPARK-27958] Stopping a SparkSession should not always stop Spark Context Jun 5, 2019
*/
def stop(): Unit = {
sparkContext.stop()
if (SparkSession.numActiveSessions.get() == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this remove itself from the active sessions before checking that there are 0 remaining?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, good catch

@mccheah
Copy link
Contributor

mccheah commented Jun 6, 2019

Ok to test

*
* @since 2.0.0
*/
def stop(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey, I think this was a design decision that stopping sessions stops spark context too. Why don't you just don't call stop() on the session since what it does it just stops the session? Seems like the behaviour is documented properly as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is that if one creates a multi-tenant Spark process, and you give each user a Spark session, you want to be able to close down the resources for one session (e.g. connections to JDBC, perhaps), but not stop the entire Spark Context, thus keeping the Spark Context alive for the other users.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@dongjoon-hyun dongjoon-hyun changed the title [SPARK-27958] Stopping a SparkSession should not always stop Spark Context [SPARK-27958][SQL] Stopping a SparkSession should not always stop Spark Context Jun 9, 2019
@dongjoon-hyun
Copy link
Member

ok to test

@dongjoon-hyun
Copy link
Member

Welcome and thank you for your first contribution, @vinooganesh . BTW, according to the PR description, it seems that we are waiting for more tests, right?

[WIP] PR right now for community feedback, tests incoming.


/**
* Stop the underlying `SparkContext`.
* Stop the underlying `SparkContext` if there are are no active sessions remaining.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are are -> are.

} else {
logWarning("Calling clearDefaultSession() on a SparkSession " +
"without an default session is a noop.")
}
Copy link
Member

@dongjoon-hyun dongjoon-hyun Jun 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, the existing behavior of this function is a quiet no-op and looks better.

* Test cases for the lifecycle of a [[SparkSession]].
*/
class SparkSessionLifecycleSuite extends SparkFunSuite {
test("test SparkContext stopped when last SparkSession is stopped ") {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit. test("test -> test(".

.config("some-config", "b")
.getOrCreate()

session1.stop()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shall we add assert below this line?

assert(!session1.sparkContext.isStopped)

.getOrCreate()

session1.stop()
assert(!session1.sparkContext.isStopped)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This test looks almost duplicated with the first one.
Also, this test case seems to be dangerous because it doesn't clean up the SparkContext.
Let's remove this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I was torn about this one, but agreed, will remove

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use SparkSessionBuilderSuite instead of creating a new one, SparkSessionLifecycleSuite.

@SparkQA
Copy link

SparkQA commented Jun 9, 2019

Test build #106312 has finished for PR 24807 at commit 8fc95e9.

  • This patch fails due to an unknown error code, -9.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2019

Test build #106352 has finished for PR 24807 at commit 0c9c426.

  • This patch fails Scala style tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jun 10, 2019

Test build #106356 has finished for PR 24807 at commit 843491f.

  • This patch fails from timeout after a configured wait of 400m.
  • This patch merges cleanly.
  • This patch adds no public classes.

@gatorsmile
Copy link
Member

cc @jiangxb1987

Copy link
Member

@srowen srowen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree with the attempt here, but referencing counting seems error prone. There are a number of race conditions in these checks that could leave us never closing the context, or closing it prematurely, if the count is off.

The underlying issue is that there's no way to clean up the resources of one SparkSession, because many can exist. We can make SparkSession never close the context, and make users close it independently, but that would be a behavior change.

Is there an actual problem here though... does an app create a bunch of sessions and run out of resources in any reasonable usage? I was never clear on that from the dev@ mail.

def setActiveSession(session: SparkSession): Unit = {
activeThreadSession.set(session)
if (getActiveSession.isEmpty
|| (session != getActiveSession.get && getActiveSession.isDefined)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

.isDefined is checked in the wrong order, but you've already checked it actually with .isEmpty

*
* @since 2.0.0
*/
def stop(): Unit = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vinooganesh
Copy link
Contributor Author

Hey @srowen - I'm not super thrilled with using reference counting here either, but I can't think of an easier way to fix the inconsistency here. Definitely open to ideas.

There is a memory leak here (where even after the destruction of a Spark Session, the listener added onto the context by the session is never cleaned up, but I believe this be remedied by https://github.com/apache/spark/pull/24807/files#diff-d91c284798f1c98bf03a31855e26d71cR967).

The main thing I'm trying to address is what seems like a confusing relationship between a SparkSession and a SparkContext. Intuitively and per the initial blog post about SparkSession, I would that a session is like a networking session - a short lived engagement that is used, and then destroyed. In the current model, the destruction of the session destroys the context as well, which seems like incorrect behavior.

A more concrete example of this occurs when using temp tables. Let's say a user in session A creates a temp table, and a user in session B is working on other things. There are 2 guarantees that I believe should be made - 1) The user in session B cannot read the temp table in session A, and 2) the destruction of session A should not harm the user in session B. It seems that in this case, the latter invariant is being broken.

Does that help clear things up? Apologies if they were unclear from the thread.

@srowen
Copy link
Member

srowen commented Jun 11, 2019

I think the SparkSession.close() behavior is on purpose, and that's a coherent behavior (i.e. just don't shut anything down until you're done, and then everything shuts down). What's not consistent with that is maintaining some state in the session that can't be cleared.

I think the ways forward are probably:

  • A new lifecycle method like clear()? more user burden but at least provides some means of doing cleanup without changing close()
  • Figure out how to automatically dispose of those resources or not hold them
  • Just change the behavior of session's close() to not shut down the context. Behavior change, yes, but perhaps less surprising than anything.

Eh, do people like @cloud-fan or @gatorsmile or @HyukjinKwon or @dongjoon-hyun have thoughts on this? I feel like reference counting is going to end in tears here eventually, but, it's not crazy

@vinooganesh
Copy link
Contributor Author

If we can revisit certain fundamentals, I'd love to explore something like open() and close() to maintain something similar to session terminology. My rationale here is while Spark is in the rare 3.x api-breaks-permitted world, fixing it make make things easier in the log run

@SparkQA
Copy link

SparkQA commented Jun 14, 2019

Test build #106527 has finished for PR 24807 at commit 92c7b22.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@jiangxb1987 jiangxb1987 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The proposed new behavior make more sense to me, though I would go with another approach to implement the new behavior (as suggested in the comment below).
Also, it worth to be noted that we must make sure Spark will automatically recycle the SparkContext, so users don't need to remember to call SparkContext.stop() by themselves.

*/
def stop(): Unit = {
sparkContext.stop()
SparkSession.clearActiveSession()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also clear the defaultSession() ? Otherwise, the getOrCreate() method still returns the previously created SparkSession

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about this as well and here were my thoughts:

  1. The default session should simply be a "default" set of config that is applied when creating a new SparkSession, unless overrides to that config are specified. The only notion of SparkSessions that we care about is the set of active spark sessions. For that reason, I think clearing the default session may not be necessary here.
  2. We could completely get rid of the notion of default sessions (which I wouldn't be opposed to), and instead require that upon session creation time, each user is required to specify the full list of settings that they want in order to create their Session. How would people feel about this Alternatively, we could say that the default session is a session that is completely hidden from the user and that we grab configs from the first created active SparkSession (using getOrCreate()) to populate the config. This seems seems like confusing behavior to me though.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem is now we update both activeSession and defaultSession when we create a new SparkSession. If we don't clear the defaultSession here then we will still have defaultSession returned every time we call getOrCreate(). Whether to keep the notion of default session worth another mail thread or PR, and the quickest way to make this PR goes in would be to clear the defaultSession here, IIUC.

@Stable
object SparkSession extends Logging {

private[spark] val numActiveSessions: AtomicInteger = new AtomicInteger(0)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is error prone and also not easy to debug, we may need to keep track of each active SparkSession in SparkContext. We don't need to pass in the SparkSession instance, maybe only compute the System.identityHashCode() of current SparkSession object and pass that to the SparkContext. On SparkSession.stop() we only need to drop the corresponding identity from SparkContext, and when it's empty it means we can safely stop the SparkContext, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think the identity hash code could be an interesting solution here. I'll experiment with that

override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
defaultSession.set(null)
// Should remove listener after this event fires
sparkContext.removeSparkListener(this)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be called in the SparkSession.stop() method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, that would be ideal - the problem is that we lost the handle to this listener outside of this method. I could create a global var in the SparkSession to hold a reference to the listener, but that also seems kind of strange. Thoughts?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be fine to have a global var of SparkListener inside SparkSession.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, will add this in!

@vinooganesh
Copy link
Contributor Author

Quick update - apologies for the delay here, will have an updated PR by early next week

@AmplabJenkins
Copy link

Can one of the admins verify this patch?

@github-actions
Copy link

We're closing this PR because it hasn't been updated in a while.
This isn't a judgement on the merit of the PR in any way. It's just
a way of keeping the PR queue manageable.

If you'd like to revive this PR, please reopen it!

@github-actions github-actions bot added the Stale label Dec 28, 2019
@github-actions github-actions bot closed this Dec 29, 2019
@britishbadger
Copy link

Did this get merged in? We are using newSession() to ensure that temporary views are isolated. We are calling a spark application from a rest service and newSession for our workloads works really nicely but when profiling the java process I can see the SparkSession objects are never released or garbaged collected. I then found this PR but I'd agree not having a method to kill the session when you are done seems to be a leak. stop() closes the context not the session.

@vinooganesh
Copy link
Contributor Author

Hi @britishbadger - is this something you're still interested in? I actually just made the change in our fork (a slightly separate way). I can re-propose the change and reinvestigate a fix if so.

@rdblue
Copy link
Contributor

rdblue commented Mar 2, 2020

@vinooganesh, I'd like to see this fixed if you don't mind submitting what you're using.

def setActiveSession(session: SparkSession): Unit = {
activeThreadSession.set(session)
if (session != getActiveSession.get && getActiveSession.isDefined) {
numActiveSessions.getAndIncrement
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem I see here is: if I have 2 sessions, and I set one as active then set another. If I keep doing this then the count here will be wrong.

I don't have a good idea to track the last alive session. Even if we can, users may want to create more sessions later and not stop the SparkContext.

The SparkSession should be light-weighted, what's the memory leak you observed before?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem is that the current behaviour is clearly documented. It doesn't look particularly wrong either:

Stop the underlying SparkContext

We're trying to make a behaviour change just based on the new design choice.
Shell we at least keep the compatibility via a switch?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yea, the right way to "free" a session is to leave it and wait for it to be GCed. If there is something that can't be GCed, it's a memory leak and we should fix it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan @HyukjinKwon - thanks for the thoughts.

@cloud-fan - The memory leak is detailed here #24807 (comment).

@HyukjinKwon - I actually think that despite the fact that the current behavior is clearly documented, it actually doesn't make sense and is error prone. I detailed an example here #24807 (comment) of how this could bit us.

Is there a world where a user should be able to call .stop() on a session and forcibly invalidate every other potential session (by killing the context)?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the leak is problem, we should fix it rather than changing the behaviour. It is documented and users are relying on this behaviour
We can understand .stop() is like .stopContext(), no?

I don't think we should just change without guarding. All other projects related to Spark such as Zeppelin would need to revisit their behaviour about how to stop, and it would make it difficult them to support multiple Spark versions for instance.

@vinooganesh
Copy link
Contributor Author

@rdblue - sure, I think it'll require a bit of work to actually make the fix in an extensive way, but I'll continue pushing forward on this PR.

@britishbadger
Copy link

Hi @britishbadger - is this something you're still interested in? I actually just made the change in our fork (a slightly separate way). I can re-propose the change and reinvestigate a fix if so.

Very much so and thanks for progressing the PR

@vinooganesh
Copy link
Contributor Author

Hi All - Apologies for the delay here (I actually switched companies since I initially worked on this and realized I didn't have access to the work I previously did anymore). I did some brushing up on this PR (it's been a while since I've touched it), and re-found a few things.

  1. @HyukjinKwon - to your point about fixing the leak without changing the contract between a SparkContext and SparkSession, it's actually not a trivial fix. The leak comes from the fact that the SparkSession attaches a listener to singleton SparkContext (which the context will forever hold onto). Logically, at the end of the lifecycle of the SparkSession instance, I should be able to drop the listener from the context. We can do this a per-instance basis for a spark session, but that's where the weirdness comes in (see point 2).

  2. The weirdness comes from the fact that lifecycle operations are permitted on both the singleton SparkSession object as well as the session instance created by SparkSession.getOrCreate(...). Specifically, I can call stop() and kill the SparkContext on any instance of a spark session. That seems wrong, especially given an expected operating model where the active session and default session can be different.

  3. The concrete problem here is that there isn't a way to clean up an instance of a spark session without killing the context as a whole.

Here's my proposal:

  1. I'll need to introduce a way to "end" an instance of a SparkSession (ie. mark it ready to be GCed) on a per-instance basis (the class, not the singleton) to fix the memory leak. I propose adding a new lifecycle method (maybe end()?) to mark an instance of a spark session as ready to be removed. In this method, I'll also clean up the listener leak.
  2. The singleton (the SparkSession object) methods clearActiveSession() and clearDefaultSession() operate in a kind of strange way. The latter drops the singleton's reference to the default session, but doesn't actually clean up the listener state associated with the context. We've been able to get by the issues here thus far simply because there isn't truly a way to stop a spark session - stopping the session, just stops the context. If these sessions are meant to be lightweight, then we need a way to spin these up and tear them down easily, without affecting the underlying context. Meaning in a regular operating mode I could mark the instance of my spark session for GC (ie. spark.end()), and have the garbage collector clean it up (unless the SparkSession object still has a reference to it - which is expected behavior).
  3. The point is valid that folks rely on the - albeit strange - behavior of stopping a session stoping the global context. We can largely leave the current spark.stop() method unaffected (though I think we should rename / proxy this to a new spark.stopContext() method).

The plan above fixes the leak and changes the operating model without affecting the underlying functionality that people have been aware of. It still leaves it to the user to stop the context manually at the end of the operation of their last SparkSession, but I think it's an improvement to what we have now.

Thoughts?

cc @cloud-fan @jiangxb1987 @srowen @rdblue

@vinooganesh
Copy link
Contributor Author

Also, I realized I can't push to the palantir remote anymore. I have a that implements a variant of the changes described above, but it's off of my personal github https://github.com/vinooganesh/spark/tree/vinooganesh/SPARK-27958. I also reached out to github support to see if they can do this, but does anyone here have permissions to change the base branch of this PR?

@vinooganesh
Copy link
Contributor Author

friendly ping @cloud-fan @jiangxb1987 @srowen @rdblue

@srowen
Copy link
Member

srowen commented Apr 3, 2020

I don't see that the PR has been changed?

@vinooganesh
Copy link
Contributor Author

@srowen yeah, was wondering if you guys had the power to change the base branch of PRs: #24807 (comment). Is that something you can do? otherwise, I'll just make a new PR

@srowen
Copy link
Member

srowen commented Apr 3, 2020

I don't think that's possible on github

@vinooganesh
Copy link
Contributor Author

Got it - haven't heard back from Github yet, so did the following:

  1. Filed https://issues.apache.org/jira/browse/SPARK-31354 as the actual ask in this ticket is now different than "auto-stopping" the SparkContext assuming there are no more SparkSessions available.
  2. Created a follow up PR here: [SPARK-31354] SparkContext only register one SparkSession ApplicationEnd listener #28128 to address the issue I filed above and with the proposed solution that I detailed in the comment above.

Since I can't push more changes to this branch, let's continue discussion on the new linked PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.