Skip to content

Subscription and Synchronization #1383

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Jun 24, 2014 · 14 comments
Closed

Subscription and Synchronization #1383

benjchristensen opened this issue Jun 24, 2014 · 14 comments

Comments

@benjchristensen
Copy link
Member

I want to explore whether it is possible to eliminate synchronization inside a Subscription, particularly the SubscriptionList used inside Subscriber.

We have discussed this before and determined that it is difficult to guarantee it doesn't get called from other threads when subscribeOn and observeOn are used, since a user can get a reference to the Subscription and call unsubscribe from anywhere.

The cost however of protecting against this slim chance is pretty high. The rx.PerfBaseline.observableConsumption perf test shows that by removing synchronization (including use of volatile and/or cas) we can increase from ~32m to ~46m ops/second.

Since this is a non-trivial increase, I'd like to determine if we can adopt a single-threaded assumption for Subscription that same as we do for Observer.

The observeOn operator does schedule the unsubscribe for the worker: https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java#L190 But it passes the same Subscriber through to the parent (https://github.com/Netflix/RxJava/blob/master/rxjava-core/src/main/java/rx/internal/operators/OperatorObserveOn.java#L75) which means it is crossing over thread boundaries. This is an example of an area where Subscription requires synchronization as currently used.

Similarly, the subscribeOn use case can mean a user gets the returned Subscription on one thread while the work happens on a second. The user can therefore call unsubscribe from the first thread and it needs to synchronize to the second.

Most of the time though we have synchronous execution where synchronization is not needed but we always pay the cost.

Anyone else have insights on this or how we can improve so that we only pay the cost of synchronization when needed? For example, I wonder if we can fix the observeOn implementation, and decorate the Subscription resulting from subscribeOn to include synchronization, and leave it unsynchronized the rest of the time. Is that sufficient to be safe?

@benjchristensen
Copy link
Member Author

/cc @akarnokd @headinthebox

@benjchristensen
Copy link
Member Author

Please take a look at an exploration of this in #1384

@headinthebox
Copy link
Contributor

It makes a lot of sense to make the same assumptions for subscribers and observers, and only synchronize in operators like observeOn and subscribeOn. In fact, I kind of hit myself in the head for not observing this (pun intended).

One question, is there something like .serialize that users can apply to synchronize access.

Would you be able to release this as a canary in production?

@benjchristensen
Copy link
Member Author

I have created a 'SynchronizedSubscription' for internal use right now. We also have the 'unsubscribeOn' operator, but I have not proven to myself yet that it is all 100% correct.

@benjchristensen
Copy link
Member Author

Yes, I could canary this, but the types of issues that this could cause probably would not be easily seen in our environment since it is still mostly request/response and an unsubscribe being missed wouldn't severely affect anything (it would just get filtered out by take for example). Where we do use streams they are infinite and thus never exercise the unsubscribe.

@headinthebox
Copy link
Contributor

Makes sense.

Schedulers are another place where both subscriptions and concurrency exist.

@benjchristensen
Copy link
Member Author

Yes I'll need to review those.

@benjchristensen
Copy link
Member Author

I've been spending more time on this and it definitely appears to consistently help performance. The one area where I think this change makes us vulnerable is in this signature:

public Subscription subscribe(Subscriber s)
public Subscription subscribe(Observer s)

I propose changing these to:

public void subscribe(Subscriber s)
public void subscribe(Observer s)

Returning the Subscription is useless on synchronous streams, and on asynchronous streams it requires synchronization on unsubscribe for the rare occasion someone uses it. I can't figure out a way to do that protection only when it's used without wrapping the Subscription with a SynchronizedSubscription. This adds another object allocation per subscribe that significantly impacts some benchmarks (tight loop of observable.create.subscribe).

Is there any reason (other than being a bad breaking change) for keeping the Subscription return? We almost removed them when we did 0.17, but kept them just to not do the breaking change.

Can we keep them without impeding performance?

@headinthebox
Copy link
Contributor

Mmmm, need to think about this. I do unsubscribe when I subscribe quite often, and it will be a massive breaking change.

Since most operators are defined using lift and thus unsafeSubscribe it is not an issue there. Can't we introduce an unsafeCreate operator that does not protect and use that internally, user code will use the safe variant.

@DylanSale
Copy link

I tend to save all current subscriptions in my Android apps on the Activity, then when the Activity is destroyed, it unsubscribes from them all in order to free up resources. It gets used extensively there.

I think it is required because if the Subscription holds a reference to the Activity (via onNext, etc), then the Activity may never be deallocated. I have an app that has a long lasting TCP connection pumping events into a hot Observable that survives multiple Activities and if I could not unsubscribe from it, then those Activities would be leaked. It would require using WeakReferences to the activity inside of subscriptions to overcome I think. That was used in the Android library previously, but was removed because it caused other problems (I cannot recall what they were).

@benjchristensen
Copy link
Member Author

@DylanSale Thanks for helping on this. I'm pretty certain we can't do this change as it is a very significant breaking change for this late in the game. Putting that aside, on your use case, would it work if you use takeWhile or takeUntil instead of capturing all of the subscriptions? In short, something like this:

someObservable.takeUntil(activityDestroyed).subscribe(s)
... or ...
someObservable.takeWhile(activityIsActive).subscribe(s)

@DylanSale
Copy link

Yes, it looks like that would work. Cheers for that!

@benjchristensen
Copy link
Member Author

I did some further testing on different approaches and here are the results (manually formatted to make it more readable while comparing):

                                                             non-volatile           volatile        synchronized     synchronized   AtomicIntegerFieldUpdater
                                                          SubscriptionList  SubscriptionList    SubscriptionList   SafeSubscriber            Subscriptionlist
---------------------------------------------------------------------------------------------------------------------------------------------------------------

Baseline without RxJava:

r.PerfBaseline.iterableViaForLoopConsumption            1      248,286,785 loops over iterable of 1 item consumed into Observer.onNext
r.PerfBaseline.iterableViaForLoopConsumption         1000          182,798 loops over iterable of 1000 items consumed into Observer.onNext
r.PerfBaseline.iterableViaForLoopConsumption      1000000              172 loops over iterable of 1,000,000 items consumed into Observer.onNext  

r.PerfBaseline.iterableViaHasNextConsumption            1      325,036,745
r.PerfBaseline.iterableViaHasNextConsumption         1000          298,493
r.PerfBaseline.iterableViaHasNextConsumption      1000000              268

---------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                             non-volatile           volatile        synchronized     synchronized   AtomicIntegerFieldUpdater
                                                          SubscriptionList  SubscriptionList    SubscriptionList   SafeSubscriber            Subscriptionlist
---------------------------------------------------------------------------------------------------------------------------------------------------------------                                                       
Fastest we should expect from RxJava. Single Observable subscribed to repeatedly via `unsafeSubscribe` without any wrapping (new Subscriber each time):

r.PerfBaseline.observableConsumptionUnsafe              1      121,447,376        66,666,838         121,434,744      122,254,109                  68,761,481
r.PerfBaseline.observableConsumptionUnsafe           1000          318,131           319,493             318,508          320,140                     318,099
r.PerfBaseline.observableConsumptionUnsafe        1000000              315               308                 314              315                         315

r.PerfBaseline.observableViaRangeUnsafe                 1      230,223,319        96,690,586         223,722,344      230,588,005                  99,951,920
r.PerfBaseline.observableViaRangeUnsafe              1000          310,078           297,840             231,158          311,091                     309,594
r.PerfBaseline.observableViaRangeUnsafe           1000000              280               268                 200              267                         246

---------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                             non-volatile           volatile        synchronized     synchronized   AtomicIntegerFieldUpdater
                                                          SubscriptionList  SubscriptionList    SubscriptionList   SafeSubscriber            Subscriptionlist
---------------------------------------------------------------------------------------------------------------------------------------------------------------

r.PerfBaseline.observableConsumption                    1       54,346,582        29,219,633          37,542,983       32,687,078                  26,254,636
r.PerfBaseline.observableConsumption                 1000          246,747           231,651             266,484          245,754                     246,173
r.PerfBaseline.observableConsumption              1000000              252               237                 259              249                         251

r.PerfBaseline.observableViaRange                       1       55,475,749        29,832,040          34,142,305       32,975,147                  29,737,054
r.PerfBaseline.observableViaRange                    1000          255,971           234,519             168,015          214,055                     255,824
r.PerfBaseline.observableViaRange                 1000000              222               214                 170              174                         226


---------------------------------------------------------------------------------------------------------------------------------------------------------------
                                                             non-volatile           volatile        synchronized     synchronized   AtomicIntegerFieldUpdater
                                                          SubscriptionList  SubscriptionList    SubscriptionList   SafeSubscriber            Subscriptionlist
---------------------------------------------------------------------------------------------------------------------------------------------------------------

r.o.OperatorMergePerf.merge1SyncStreamOfN               1        8,598,036         8,221,514           8,324,789        8,587,566                   7,908,497 
r.o.OperatorMergePerf.merge1SyncStreamOfN            1000           81,244           11,1945              98,456           81,036                      81,396 
r.o.OperatorMergePerf.merge1SyncStreamOfN         1000000               72                78                  64               75                          73 

r.o.OperatorMergePerf.mergeNAsyncStreamsOfN             1          102,689           105,055             103,722          103,979                     101,734 
r.o.OperatorMergePerf.mergeNAsyncStreamsOfN          1000                8                 8                   5                8                           8 

r.o.OperatorMergePerf.mergeNSyncStreamsOf1              1       10,586,891         9,149,101           9,365,997        9,961,394                   9,045,580 
r.o.OperatorMergePerf.mergeNSyncStreamsOf1            100        1,139,139         1,060,454             962,148        1,134,602                   1,051,191 
r.o.OperatorMergePerf.mergeNSyncStreamsOf1           1000           95,961            99,052              79,349          101,080                      98,040 

r.o.OperatorMergePerf.mergeNSyncStreamsOfN              1        9,794,588         8,852,408           9,040,839        8,803,122                   9,054,888 
r.o.OperatorMergePerf.mergeNSyncStreamsOfN           1000               83                82                  74               72                          83 

r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN           1           81,126            81,421              81,029           81,481                      80,370 
r.o.OperatorMergePerf.mergeTwoAsyncStreamsOfN        1000            5,837             5,670               3,808            5,721                       4,873 

r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1         1        9,195,390         7,852,711           8,360,268        8,650,547                   8,238,710 
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1      1000           77,743            75,857              70,337           79,576                      76,601 
r.o.OperatorMergePerf.oneStreamOfNthatMergesIn1   1000000               79                79                  73               71                          79 

Considering it as unlikely that we can make the necessary changes to allow a Subscription to not be thread-safe, it seems that using synchronized instead of volatile is better in most situations, though all of the alternatives I pursued obviously have a performance penalty.

Unless we make a design change that removes direct use of Subscription.unsubscribed and pushes those use cases to takeWhile/takeUntil and other such operators, I think Subscription needs to ensure thread-safety.

It can not be hidden inside something like a createUnsafe, because the create has nothing to do with the Subscription which is passed into it at subscription time. The issue is that the Subscription is given back to the user where by definition if they call unsubscribe, it is happening from a different thread, and since isUnsubscribed is checked inside tight loops, it must be at least volatile to ensure visibility of changes across threads and comes with the cost.

Does anyone disagree with my assessment and testing above? Is there a better alternative I'm not considering?

(I can give the raw JMH results if interested. And sorry about the wrong post that I posted then deleted ... and the closing/reopening of this issue ... fat fingered it).

@benjchristensen
Copy link
Member Author

By the way, the tests I included above show two different perspectives of RxJava. The PerfBaseline ones are showing the fastest possible use cases of RxJava with just the overhead of Observable and Subscriber. I then include merge since it commonly is used via flatMap.

The results show merge is impacted far less by this decision, primarily because there is already quite a bit of overhead in doing merge, and all the bookkeeping and concurrency it naturally has. Due to this, the impact of volatile is hardly noticeable.

The point of this issue is to present and discuss the impact of requiring a thread-safe Subscription for pipelines that are trying to be as fast as possible, pure pub-sub, or simple map transformations, without going through anything "heavy" like merge, flatMap or observeOn.

Ideally, if I could go back in time, I think I would change the signature of Subscription subscribe(Subscriber s) to void subscribe(Subscriber s) and say that takeWhile and takeUntil are the general approach to handling unsubscribe, or calling unsubscribe directly from within Subscriber.onNext, but rarely or never "from the outside" like we allow by returning a Subscription.

benjchristensen added a commit to benjchristensen/RxJava that referenced this issue Jul 1, 2014
As per discussion in ReactiveX#1383 ... despite the performance hit.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants