Skip to content

delaySubscription(Observable) breaks upstream unsubscription #3844

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
hannesstruss opened this issue Apr 8, 2016 · 6 comments
Closed

delaySubscription(Observable) breaks upstream unsubscription #3844

hannesstruss opened this issue Apr 8, 2016 · 6 comments
Labels

Comments

@hannesstruss
Copy link

This test case is failing (tested with 1.1.2):

  @Test
  public void testWithSubjects() {
    PublishSubject<Integer> delayUntil = PublishSubject.create();
    PublishSubject<Integer> interrupt = PublishSubject.create();
    final AtomicBoolean subscribed = new AtomicBoolean(false);

    Observable.just(1)
        .doOnSubscribe(() -> subscribed.set(true))
        .delaySubscription(delayUntil)
        .takeUntil(interrupt)
        .subscribe();

    interrupt.onNext(9000);
    delayUntil.onNext(1);

    assertFalse(subscribed.get());
  }

I stumbled upon this using Completable.andThen (which delegates to delaySubscription).

@akarnokd
Copy link
Member

akarnokd commented Apr 8, 2016

TakeUntil subscribes to its source before the subscription to upstream could happen. This is partly due to how lift works and partly to allow interrupting a synchronous source as it would otherwise possibly rush through without letting the operator subscribe to the until source.

@hannesstruss
Copy link
Author

In other words, takeUntil only guarantees to suppress emission of items given a condition, but does not guarantee what happens upstream?

(I was confused, as the analogous example with time based delaySubscription works:)

  @Test
  public void testWithSubjects() {
    PublishSubject<Integer> interrupt = PublishSubject.create();
    final AtomicBoolean subscribed = new AtomicBoolean(false);
    TestScheduler testScheduler = new TestScheduler();

    Observable.just(1)
        .doOnSubscribe(() -> subscribed.set(true))
        .delaySubscription(1, TimeUnit.SECONDS, testScheduler)
        .takeUntil(interrupt)
        .subscribe();

    interrupt.onNext(9000);
    testScheduler.advanceTimeBy(1, TimeUnit.SECONDS);

    assertFalse(subscribed.get());
  }

@akarnokd
Copy link
Member

akarnokd commented Apr 8, 2016

Interesting. The second case shouldn't pass either. I'll investigate.

@akarnokd
Copy link
Member

akarnokd commented Apr 8, 2016

Okay, I was wrong. Both tests should pass because they verify that the subscription doesn't happen - as expected. There is a bug in the non-timed delaySubscription that doesn't propagate the unsubscription properly. I'll post a PR to fix that.

@akarnokd
Copy link
Member

akarnokd commented Apr 8, 2016

Fix posted: #3845

@hannesstruss
Copy link
Author

This works in 1.1.3, thanks so much! ✨

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants