diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java index 35a8f3db29..808f4605cc 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableCombineLatest.java @@ -124,9 +124,9 @@ public void subscribe(ObservableSource[] sources) { public void dispose() { if (!cancelled) { cancelled = true; - + cancelSources(); if (getAndIncrement() == 0) { - cancel(queue); + clear(queue); } } } @@ -138,6 +138,10 @@ public boolean isDisposed() { void cancel(SpscLinkedArrayQueue q) { clear(q); + cancelSources(); + } + + void cancelSources() { for (CombinerObserver s : observers) { s.dispose(); } diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java index 1fd39ccbab..64d6cb39ea 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableCombineLatestTest.java @@ -1517,4 +1517,38 @@ public void run() throws Exception { RxJavaPlugins.reset(); } } + + @Test + public void eagerDispose() { + final PublishProcessor pp1 = PublishProcessor.create(); + final PublishProcessor pp2 = PublishProcessor.create(); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + if (pp1.hasSubscribers()) { + onError(new IllegalStateException("pp1 not disposed")); + } else + if (pp2.hasSubscribers()) { + onError(new IllegalStateException("pp2 not disposed")); + } else { + onComplete(); + } + } + }; + + Flowable.combineLatest(pp1, pp2, new BiFunction() { + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + } + }) + .subscribe(ts); + + pp1.onNext(1); + pp2.onNext(2); + ts.assertResult(3); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java index 0938934939..5a6c7263bb 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableCombineLatestTest.java @@ -1165,4 +1165,37 @@ public void run() throws Exception { } } + @Test + public void eagerDispose() { + final PublishSubject ps1 = PublishSubject.create(); + final PublishSubject ps2 = PublishSubject.create(); + + TestObserver ts = new TestObserver() { + @Override + public void onNext(Integer t) { + super.onNext(t); + cancel(); + if (ps1.hasObservers()) { + onError(new IllegalStateException("ps1 not disposed")); + } else + if (ps2.hasObservers()) { + onError(new IllegalStateException("ps2 not disposed")); + } else { + onComplete(); + } + } + }; + + Observable.combineLatest(ps1, ps2, new BiFunction() { + @Override + public Integer apply(Integer t1, Integer t2) throws Exception { + return t1 + t2; + } + }) + .subscribe(ts); + + ps1.onNext(1); + ps2.onNext(2); + ts.assertResult(3); + } }