Skip to content

IO Scheduler breaks long running Subscriber onNext #3152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
thasmin opened this issue Aug 13, 2015 · 32 comments
Closed

IO Scheduler breaks long running Subscriber onNext #3152

thasmin opened this issue Aug 13, 2015 · 32 comments
Labels

Comments

@thasmin
Copy link

thasmin commented Aug 13, 2015

I'm new to Rx and I'm having trouble finding a solution with a scenario. I'm trying to make a network call, parse the result into objects, and send it to the UI. Parsing the response is quick and updating the UI is slower. The problem is that there's backpressure. Using onBackpressureBuffer doesn't fix the problem, and it doesn't even get rid of the MissingBackpressureException. When I tried to reduce the problem in a unit test, I found that I was able to get it to work by removing the subscribeOn(Schedulers.io()) line. I would expect this unit test to either fail or write "12345" to the consooe. Instead, it writes "1" and stops without any exceptions.

I'm also not sure why using a backpressure buffer isn't working, so there may be two problems. At this point, all I know is that I'm in over my head and I can't find out any more information on the web. Am I doing this wrong or are there bugs here?

Observable.from(new Integer[]{1, 2, 3, 4, 5})
    .subscribeOn(Schedulers.io())
    .subscribe(new Subscriber<Integer>() {
        @Override public void onCompleted() { }
        @Override public void onError(Throwable e) {
            Assert.fail("error: " + e.getMessage());
        }

        @Override
        public void onNext(Integer integer) {
            try {
                System.out.print(integer);
                Thread.sleep(100);
            } catch (InterruptedException e) {
                System.out.print("interrupted");
                Assert.fail(e.getMessage());
            }
        }
    });
@headinthebox
Copy link
Contributor

Did you try to add a System.in.read(); after the subscribe?
(Also it is better to use observeOn)

@abersnaze
Copy link
Contributor

What @headinthebox is getting at is that the main(String[] args) is exiting before the io threads get a chance to do anything because they are daemon threads and don't block the JVM from stopping.

@thasmin
Copy link
Author

thasmin commented Aug 13, 2015

That makes sense. I'll try to duplicate my issue tomorrow then. It's on Android and I was trying to avoid requiring a device for the test, so it wasn't exactly my scenario.

@thasmin
Copy link
Author

thasmin commented Aug 13, 2015

Here's another test that confuses me, although I'm not sure it's related to the problem in my Android app. This test counts to 16 instead of 20. If I remove Observable.create() in the flatMap() call and change it to Observable.just(), then it works as expected.

Integer[] source = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20};
Observable.from(source)
    .subscribeOn(Schedulers.io())
    .flatMap(new Func1<Integer, Observable<Integer>>() {
        @Override
        public Observable<Integer> call(final Integer integer) {
            return Observable.create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(integer);
                }
            });
        }
    })
    .subscribe(new Subscriber<Integer>() {
        @Override public void onCompleted() { }

        @Override
        public void onError(Throwable e) {
            Assert.fail("error: " + e.getMessage());
        }

        @Override
        public void onNext(Integer integer) {
            try {
                System.out.print(integer.toString() + " ");
                Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.print("interrupted");
                Assert.fail(e.getMessage());
            }
        }
    });

Thread.sleep(10000);

@akarnokd
Copy link
Member

flatMap defaults to a concurrency level equal to the platform buffer size. Since you don't onComplete the created observable, flatMap won't continue with newer sources.

@headinthebox
Copy link
Contributor

Nice little sample that I can use in https://www.youtube.com/watch?v=pOl4E8x3fmw to show that with back pressure RxJava is essentially pull rather than push.

@headinthebox
Copy link
Contributor

@thasmin you should try retrolambda, 2 minutes to install will save you 50% or more on typing effort (in both meanings of the word typing)

@thasmin
Copy link
Author

thasmin commented Aug 13, 2015

I'll check out retrolambda. That's not the problem with my app if it's a buffer issue, although I'm not using onComplete, I'm only getting one item out of the chain before hitting the backpressure exception. I'll watch the video and try again tonight.

@stealthcode
Copy link

@thasmin Observable.just(1) calls subscriber.onNext(1) and then subscriber.onCompleted(). Your inner observables are not calling subscriber.onCompleted(). The reason you are only seeing 16 emissions from your flatMap is because merge has an internal buffer of inner observables (which has a capacity of 16 on android platforms). When an inner observable calls onCompleted then it's a space is freed in the buffer and another observable is subscribed to.

@headinthebox
Copy link
Contributor

Say I am writing a game, and I have a stream of game entities that each generate events, and I want to flatMap that stream of streams into a single stream (say to update the screen). At any point in time there are n active characters (i.e. that have not completed their inner streams) but that n is varying. Of course I want to receive the events from all active characters. How do I solve that using flatMap?

@akarnokd
Copy link
Member

@headinthebox if n is always less than the platform default buffer size, the flatMap(mapper) will happily merge the values. Otherwise, the excess sources will be dealt with according to how the main source reacts to backpressure (drop, buffer, latest, etc).

@headinthebox
Copy link
Contributor

Since n varies, it can be below or above the default buffer size. I can not drop or buffer them because then I would only see a small portion of my game being updated.

@artem-zinnatullin
Copy link
Contributor

@headinthebox increase buffer size to max possible value?

@davidmoten
Copy link
Collaborator

@headinthebox System property rx.ring-buffer.size controls the default buffer size. Would be nice of course if we could control that buffer size for individual flatMap calls.

@headinthebox
Copy link
Contributor

@artem-zinnatullin in which case why have back pressure at all?
@davidmoten in the scenario you cannot statically predict the required buffer size. RxJava is now essentially pull-based; Nothing wrong with that, just different from pure push, and makes it harder to do pure event stream processing.

@artem-zinnatullin
Copy link
Contributor

@artem-zinnatullin in which case why have back pressure at all?

It's a tricky question, personally I'd prefer if back pressure detection will be turned of by default and if we would have ability to use it via separate operator applied to the Observable.

But probably I'm just not in the target audience of the back pressure feature in RxJava.

As alternative — what about operator OnBackpressureDoNothing which will allow pure event processing when you just can't skip an event even if sometimes your code handles these events slower than they are coming to you.

@thasmin
Copy link
Author

thasmin commented Aug 14, 2015

I know the issue title is wrong and the conversation has strayed, but I think I found my problem. The ordering of my operators was messing things up. When I called .flatMap().observeOn().onBackpressureBuffer() in that order, I wasn't actually getting a backpressure buffer. Changing the order to .flatMap().onBackpressureBuffer().observeOn() made it work. I didn't expect that to make a difference. I only switched it randomly while testing to see whether another of the flatMap calls in the chain was causing the backpressure.

@thasmin thasmin closed this as completed Aug 14, 2015
@benjchristensen
Copy link
Member

what about operator OnBackpressureDoNothing

That is onBackpressureBuffer if you want to opt out of backpressure.

@benjchristensen
Copy link
Member

@akarnokd I don't understand the statement you made earlier:

flatMap defaults to a concurrency level equal to the platform buffer size. Since you don't onComplete the created observable, flatMap won't continue with newer sources.

It doesn't default to the buffer size, it has unbounded concurrency (well, up to Integer.MAX_VALUE): https://github.com/ReactiveX/RxJava/blob/1.x/src/main/java/rx/internal/operators/OperatorMerge.java#L57

If it didn't, we could deadlock the system. Only when the overloaded flatMap/merge operators that inject maxConcurrent are used does it restrict concurrency.

@akarnokd
Copy link
Member

@benjchristensen
Copy link
Member

@akarnokd Then that's a problem and a bug that was missed when reviewing the new merge code released in 1.0.13. We can't do that. We do bounded vertical backpressure, but must do unbounded horizontal backpressure. This applies to merge and groupBy. Always has.

I have opened an issue: #3156

@benjchristensen
Copy link
Member

@thasmin I'd like to understand the mental model that led to thinking the ordering of onBackpressureBuffer didn't matter. That suggests we are not communicating things well and the mental model of how Rx works is incorrect.

The order of operators definitely matters, as events flow down the pipe, each operator affects them. Since observeOn is where the async scheduling and buffering occurs, onBackpressureBuffer must be placed before observeOn to allow unbounded buffering before the scheduling and bounded buffering inside observeOn.

Can you help me understand how we need to better educate, document and communicate things to make this clear?

@JakeWharton
Copy link
Contributor

We run into this mental model all the time. In my experience it's almost as if people think they are in a builder of sorts. From randomly plugging in observeOn/subscribeOn until it "works" to making statements like "I thought you could only call observeOn once" it becomes clear they aren't thinking like a stream.

I'm giving a (recorded) talk soon dedicated to subscriptions, how they link together to create the chain, and how operators alter the behavior of each link.

@thasmin
Copy link
Author

thasmin commented Aug 14, 2015

Here's some advice, but of course it's my personal opinion and I barely know how Rx works. This is a few ways of saying the same thing, but I'm trying to be explicit in my thinking to paint the best picture I could for you guys.

  1. No tutorials on the net seem to pay any attention to the ordering of observeOn and subscribeOn.
  2. The terms observeOn and subscribeOn are misleading. In my first example, the code calls Observable.from, subscribeOn, and subscribe. I don't understand why I was told to use observeOn instead of subscribeOn when the only remaining call is subscribe. Why wouldn't I use the subscribeOn to specify the thread that subscribe uses?
  3. The first example on RxAndroid's github page uses just, subscribeOn, observeOn, and subscribe. Now that I think about it, that doesn't make sense to me. If it's a stream, does that mean that subscribeOn and observeOn affects the subscribe? Does subscribeOn only affect the observeOn?
  4. observeOn and subscribeOn don't touch the data, so it makes sense if they don't work the same way that the rest of the calls do.
  5. Here's what I want to specify and how it's usually explained: do the work an IO thread and then the final step (the only subscribe) on the main thread so I can update the UI. In my chain, I call just, observeOn, map, flatMap, flatMap, subscribeOn, onBackpressureBuffer, and subscribe. (This was the broken chain.) I expected the subscribeOn to affect only the subscribe call. I thought that putting it directly after the observeOn would have had the same affect.

Let me know if I can clarify anything.

@JakeWharton
Copy link
Contributor

Subscriptions go "upward". That is, they start where you call subscribe and follow the chain upwards to the source observable(s). So when the operation inside of the observable does something blocking, you need the thread on which it is subscribed to be a background thread. Thus, subscribeOn controls the thread on which the observable does its work (note: there are exceptions to this).

Conversely, data goes "downward". When an observable emits data, the subscriber observes it in the form of a callback. By default this happens synchronously (like a normal callback) on the same thread as the work is being done inside the observer. observeOn changes the thread on which an observer receives the callback.

If you understand these two facts, it should start to make sense why order matters and why you can do things like call these operators multiple times in a single stream.

For example,

Observable.create(s -> {
       // This runs on whatever thread calls 'subscribe' because there is no subscribeOn
      String name = readNameFromDb();
      s.onNext(name);
      s.onComplete();
    })
    // Move data from the above observable to "io" scheduler.
    .observeOn(io())
     // This map (and thus DB call) happens on "io" scheduler.
    .map(v -> readAddressForNameFromDb(v))
     // Move data from the above map to "mainThread" scheduler.
    .observeOn(mainThread())
     // This happens on "mainThread" scheduler.
    .subscribe(d -> System.out.println(d));

This stream is created synchronously and it reads and emits the first name synchronously when subscribe is called. Once that happens it moves data across threads twice resulting in the second DB read to happen on a background thread and then the print happening later in time on the main thread.

If you wanted the first DB read on a background thread (as you would in Android), you need to tell the stream to do the work which happens when you subscribe to the top observable on a background scheduler. This is done by putting subscribeOn in the chain, and in this example you can actually do it anywhere. I find it's most intuitive to put it as close to the source observable as possible.

Observable.create(s -> {
       // This now runs on the "io" scheduler
      String name = readNameFromDb();
      s.onNext(name);
      s.onComplete();
    })
    // When hooking up everything below to the observable above, do it on "io" scheduler.
    .subscribeOn(io())
    // Move data from the above observable to "io" scheduler.
    .observeOn(io())
     // This map (and thus DB call) happens on "io" scheduler.
    .map(v -> readAddressForNameFromDb(v))
     // Move data from the above map to "mainThread" scheduler.
    .observeOn(mainThread())
     // This happens on "mainThread" scheduler.
    .subscribe(d -> System.out.println(d));

Now the first DB read inside the observable happens on a background thread. Now hopefully you notice from the comments that we are doing the first DB read on the "io" scheduler and then we are telling it to observe that data on the "io" scheduler. This is redundant and can be removed:

Observable.create(s -> {
       // This now runs on the "io" scheduler
      String name = readNameFromDb();
      s.onNext(name);
      s.onComplete();
    })
    // When hooking up everything below to the observable above, do it on "io" scheduler.
    .subscribeOn(io())
     // This map still happens on "io" scheduler because that's the thread the data is emitted on.
    .map(v -> readAddressForNameFromDb(v))
     // Move data from the above map to "mainThread" scheduler.
    .observeOn(mainThread())
     // This happens on "mainThread" scheduler.
    .subscribe(d -> System.out.println(d));

Hope this helps. Like I said, I'm preparing a 45m talk on these concepts with some (hopefully) nice visualizations of the concepts.

DavidMGross added a commit to ReactiveX/reactivex.github.io that referenced this issue Aug 14, 2015
@thasmin
Copy link
Author

thasmin commented Aug 14, 2015

That explains it very well. So subscribeOn only applies to the first operation and observeOn affects all the following operations until observeOn is called again. That makes sense.

Is this because there's no way to set the scheduler before calling create? Perhaps the subscribeOn call can be removed by adding an Observable.createOn(Func1, Scheduler) call. This would add a lot of methods. Maybe Observable.observeOn(io).create would also be an acceptable API. It may be a good idea if you can remove the subscribeOn call completely and have one linear way to handle schedulers. Just some ideas for you guys.

@JakeWharton
Copy link
Contributor

subscribeOn applies to everything "upwards" in the chain, not just the first. In this case there was only the first observable above it but there easily could have been more. Most of the transformational operators (like map, filter, etc.) don't do anything when they are subscribed to beyond continuing to subscribe "upwards" so it doesn't matter on which thread they run.

The reason that subscribeOn exists is because it should not be up to the operator to determine on what thread it runs. In this example everything is in one place, but often times the creation of the observable and the person who actually subscribes are far, far away from each other. Because of this, calling subscribeOn allows the subscriber to dictate on what thread they want the work to happen without the person who created the observable having to care.

@thasmin
Copy link
Author

thasmin commented Aug 14, 2015

Here's something to answer in your talk: does subscribeOn go all the way to the top of the chain, does it affect only the preceding operation, does it go up until it hits a previous subscribeOn, or does it go up until it hits a previous observeOn?

@abersnaze
Copy link
Contributor

I like to explain subscribeOn as being the iterable dual of putting the execute() outside the for loop.

executorService.execute(() -> {
  items = source.getItems();
  for (Object item : items) {
    do(item);
  }
});

that is why subscribeOn() affects what thread the whole Observable chain runs on from creation to processing of the data.

Conversely observeOn() is inside the for loop and why it only affects what thread the work after is done on.

items = source.getItems();
for (Object item : items) {
  executorService.execute(() -> {
    do(item);
  });
}

It is important to keep in mind that a Worker from a Scheduler will not let scheduled items execute concurrently where an Executor doesn't make that guarantee.

@JakeWharton
Copy link
Contributor

Oooo that's clever. Very nice!

On Sat, Aug 15, 2015, 2:25 AM George Campbell [email protected]
wrote:

I like to explain subscribeOn as being the iterable dual of putting the
execute() outside the for loop.

executorService.execute(() -> {
items = source.getItems();
for (Object item : items) {
do(item);
}
});

that is why subscribeOn() affects what thread the whole Observable chain
runs on from creation to processing of the data.

Conversely observeOn() is inside the for loop and why it only affects
what thread the work after is done on.

items = source.getItems();
for (Object item : items) {
executorService.execute(() -> {
do(item);
});
}

It is important to keep in mind that a Worker from a Scheduler will not
let scheduled items execute concurrently where an Executor doesn't make
that guarantee.


Reply to this email directly or view it on GitHub
#3152 (comment).

@thasmin
Copy link
Author

thasmin commented Aug 15, 2015

It's more like startOn and continueOn. I'd say subscribeOn and observeOn are not newbie friendly. It still doesn't explain the part where you lose the backpressure buffer and probably other things.

@headinthebox
Copy link
Contributor

You start when you subscribe, and you continue when you observe.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

9 participants