Skip to content

replay/refCount subscribe/unsubscribe exhibit O(N) behavior #3469

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
ybayk opened this issue Oct 26, 2015 · 5 comments
Closed

replay/refCount subscribe/unsubscribe exhibit O(N) behavior #3469

ybayk opened this issue Oct 26, 2015 · 5 comments

Comments

@ybayk
Copy link

ybayk commented Oct 26, 2015

Subscribing to replay/refCount chain slows down as O(N) with number of existing subscribers.

This is reproducible on a simple chain that never emits an item, thus data traffic should not be a factor in the performance degradation behavior:

Observable<?> shared = Observable.never().replay(1).refCount();

After certain number of subscribers sharing observable becomes unusable and wasting a lot of CPU cycles. In turn this draws useless some caching scenarios specifically with big number of subscribers.

Here is the simple test:

    final Observable<?> shared = Observable.never().replay(1).refCount();

    final int count = 300000;
    final int page = 10000;
    List<Subscription> ss = new ArrayList<>();
    System.out.println("subscribing...");
    long prev = System.currentTimeMillis();
    for (int i = 0; i < count; i++) {
      ss.add(shared.subscribe());
      if ((i+1) % page == 0) {
        long now = System.currentTimeMillis();
        System.out.println("subscribed so far " + (i+1) + "; took " + (now-prev) + "ms to subscribe last " + page + 
            " at rate: " + page*1000/(now-prev) + " per sec...");
        prev = now;
      }
    }
    System.out.println("unsubscribing...");
    prev = System.currentTimeMillis();
    for (int i = 0; i < count; i++) {
      ss.get(i).unsubscribe();
      if ((i+1) % page == 0) {
        long now = System.currentTimeMillis();
        System.out.println("unsubscribed so far " + (i+1) + "; took " + (now-prev) + "ms to unsubscribe last " + page + 
            " at rate: " + page*1000/(now-prev) + " per sec...");
        prev = now;
      }
    }

Here is the output on a 2015 MacBook Pro (PhysMem: 16G, jvm launched with -Xmx8000m):


subscribing...
subscribed so far 10000; took 374ms to subscribe last 10000 at rate: 26737 per sec...
subscribed so far 20000; took 905ms to subscribe last 10000 at rate: 11049 per sec...
subscribed so far 30000; took 1646ms to subscribe last 10000 at rate: 6075 per sec...
subscribed so far 40000; took 2557ms to subscribe last 10000 at rate: 3910 per sec...
...
subscribed so far 290000; took 29635ms to subscribe last 10000 at rate: 337 per sec...
subscribed so far 300000; took 30998ms to subscribe last 10000 at rate: 322 per sec...
unsubscribing...
unsubscribed so far 10000; took 32352ms to unsubscribe last 10000 at rate: 309 per sec...
unsubscribed so far 20000; took 28413ms to unsubscribe last 10000 at rate: 351 per sec...
unsubscribed so far 30000; took 27722ms to unsubscribe last 10000 at rate: 360 per sec...
...
unsubscribed so far 260000; took 2979ms to unsubscribe last 10000 at rate: 3356 per sec...
unsubscribed so far 270000; took 2012ms to unsubscribe last 10000 at rate: 4970 per sec...
unsubscribed so far 280000; took 1335ms to unsubscribe last 10000 at rate: 7490 per sec...
unsubscribed so far 290000; took 649ms to unsubscribe last 10000 at rate: 15408 per sec...
unsubscribed so far 300000; took 198ms to unsubscribe last 10000 at rate: 50505 per sec...

The main suspect is manageRequest() in OperatorReplay:

https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorReplay.java#L481-L546

That's where stack traces are pointing to during both subscribe() and unsubscribe() when the rate slows down.

@ybayk ybayk changed the title replay/refCount subscribe/unsubscribe exhibit Log(N) behavior replay/refCount subscribe/unsubscribe exhibit O(N) behavior Oct 26, 2015
@akarnokd
Copy link
Member

Hi and thanks for the feedback. I've posted the PR #3470 improving on the performance (up to ~20x improvement).

@ybayk
Copy link
Author

ybayk commented Oct 26, 2015

20x improvement is great but making it constant or near would be also nice. :) At this time we do not have any resources to look at it, but if no one else is able to pick this up, we might try to investigate and possibly contribute.

For now, we will have to put a concept of caching observables via replay() aside. That was pretty elegant way of doing async cache though.

@akarnokd
Copy link
Member

I've updated #3470 to have O(1) subscription cost and don't lose too much on the dispatching side by using the internals of an OpenHashSet. Let me know if it's fast enough for you.

@ybayk
Copy link
Author

ybayk commented Oct 27, 2015

Sweet!!! Hope this is merged into 1.x

@akarnokd
Copy link
Member

Closing via #3470, should be available in 1.1.6.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

2 participants