Skip to content

zip: doOnTerminate is not called on some observables #3124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vleushin opened this issue Jul 30, 2015 · 14 comments
Closed

zip: doOnTerminate is not called on some observables #3124

vleushin opened this issue Jul 30, 2015 · 14 comments
Labels

Comments

@vleushin
Copy link

    @Test
    public void test() {
        Observable.zip(
                Observable.just("1")
                        .doOnTerminate(() -> System.out.println("TERMINATE 1")),
                Observable.just("2")
                        .delay(1, TimeUnit.SECONDS)
                        .doOnTerminate(() -> System.out.println("TERMINATE 2")),
                (result1, result2) -> null)
                .doOnTerminate(() -> System.out.println("TERMINATE"))
                .toBlocking()
                .single();
    }

Output:

TERMINATE 1
TERMINATE

Expected output:

TERMINATE 1
TERMINATE 2
TERMINATE

Or maybe my thinking is wrong?

Context of the problem: I use Hystrix observable commands in zip:

Observable.zip(
  new MyHystrixObservableCommand(arg1).toObservable(),
  new MyHystrixObservableCommand(arg2).toObservable(),
  (result1, result2) -> null)
.toBlocking()
.single()

and hystrix command semaphore release happens in doOnTerminate. One of them is not called and semaphore is not released.

@akarnokd
Copy link
Member

Once the second delayed value runs the zip, it detects that the first source has terminated, completes and unsubscribes the second source. At this time, there is an onCompleted scheduled by the second source which due to the unsubscription won't be executed and you don't see the doOnTerminate called.

@vleushin
Copy link
Author

I felt back to merge:

        Observable.merge(
                Observable.just("1")
                        .doOnTerminate(() -> System.out.println("TERMINATE 1")),
                Observable.just("2")
                        .delay(1, TimeUnit.SECONDS)
                        .doOnTerminate(() -> System.out.println("TERMINATE 2")))
                .doOnTerminate(() ->
                        System.out.println("TERMINATE"))
                .ignoreElements()
                .singleOrDefault(null)
                .toBlocking()
                .single();
TERMINATE 1
TERMINATE 2
TERMINATE

This code looks strange to me. Please, advice how to properly run some observables in parallel and wait for them to finish. I can go with merge for now because I do not need observable results.

@zsxwing
Copy link
Member

zsxwing commented Aug 12, 2015

You can use lastOrDefault to shorten your codes, e.g., o1.mergeWith(o2).toBlocking().lastOrDefault(null).

@amitcse
Copy link

amitcse commented Sep 20, 2015

@vleushin Is your issue resolved ? I am facing same issue . What is the drawback of using merge ?

@vleushin
Copy link
Author

I use merge. Drawback of merge is that it's hard to combine results, if you need them. If you don't need them (like in my case), you can be good with merge

@tommack
Copy link

tommack commented Oct 21, 2015

I just encountered this same thing. I want to zip together two Observables and do some simple logging on each when complete. You don't even need the delay to make this happen.

        Observable<Integer> nums = Observable.just(1, 2, 3, 4)
                .doOnCompleted(() -> System.out.println("done with nums"));
        Observable<String> letters = Observable.just("a", "b", "c", "d")
                .doOnCompleted(() -> System.out.println("done with letters")); // this won't appear

        nums.zipWith(letters, (n, l) -> "got " + n + " and " + l)
                .toBlocking()
                .forEach(System.out::println);

I found this behavior to be very surprising. Could zip be changed to allow the source Observables to fully complete in the case where they are the same length?

@akarnokd
Copy link
Member

akarnokd commented Feb 9, 2016

I'd add doOnUnsubscribe so you can execute the cleanup action or simply use using that will execute the cleanup if the source completes normally or is cancelled.

@ultimate-deej
Copy link

ultimate-deej commented May 21, 2016

Why is this closed? I think the current zip behavior is incorrect.
My point is, if inner observables of zip emit the same number of items, then onCompleted (doOnCompleted, doOnTerminate etc.) should be called on each of them.

@abersnaze
Copy link
Contributor

DoOnTerminate won't be called if the observable is unsubscribed before the onCompleted/onError. Use doOnUnsubscribe if you need to always get a call back.

@ultimate-deej
Copy link

ultimate-deej commented May 22, 2016

That is what I'm talking about.
Why unsubscribe just before completion? An event is lost without a reason.
Also there is an obvious inconsistency: you have two identical inner observables, the first completes, then unsubscribes, the second one unsubscribes before completion, even though it was about to do so.

@akarnokd
Copy link
Member

With collections over time, you can't know you are just before completion. The zip operator behaves correctly and you need a different operator, doOnTerminate+doOnUnsubscribe or using, to handle completion and unsubscription case together.

@vleushin
Copy link
Author

It is clear to me now. Thank you all for clarifications. I think we can close this issue.

@ultimate-deej
Copy link

@abersnaze doOnUnsubscribe can be an acceptable workaround, but what if I don't want it to trigger on error?

@akarnokd Formally, zip behaves correctly, since it is not documented whether onCompletes (and so on) should be called on inner observables.
The fact is the behavior is unspecified and unpredictable. The problem is, I want to know exactly what does my code do.

@akarnokd
Copy link
Member

akarnokd commented Jun 1, 2016

See the proposed documentation changes in #3981.

zsxwing pushed a commit that referenced this issue Jun 2, 2016
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

7 participants