diff --git a/src/main/java/io/reactivex/internal/observers/ToNotificationObserver.java b/src/main/java/io/reactivex/internal/observers/ToNotificationObserver.java index 722fa860a0..00f27480b5 100644 --- a/src/main/java/io/reactivex/internal/observers/ToNotificationObserver.java +++ b/src/main/java/io/reactivex/internal/observers/ToNotificationObserver.java @@ -13,6 +13,8 @@ package io.reactivex.internal.observers; +import java.util.concurrent.atomic.AtomicReference; + import io.reactivex.*; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; @@ -20,10 +22,12 @@ import io.reactivex.internal.disposables.DisposableHelper; import io.reactivex.plugins.RxJavaPlugins; -public final class ToNotificationObserver implements Observer { - final Consumer> consumer; +public final class ToNotificationObserver +extends AtomicReference +implements Observer, Disposable { + private static final long serialVersionUID = -7420197867343208289L; - Disposable s; + final Consumer> consumer; public ToNotificationObserver(Consumer> consumer) { this.consumer = consumer; @@ -31,22 +35,20 @@ public ToNotificationObserver(Consumer> consumer) { @Override public void onSubscribe(Disposable s) { - if (DisposableHelper.validate(this.s, s)) { - this.s = s; - } + DisposableHelper.setOnce(this, s); } @Override public void onNext(T t) { if (t == null) { - s.dispose(); + get().dispose(); onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); } else { try { consumer.accept(Notification.createOnNext(t)); } catch (Throwable ex) { Exceptions.throwIfFatal(ex); - s.dispose(); + get().dispose(); onError(ex); } } @@ -71,4 +73,14 @@ public void onComplete() { RxJavaPlugins.onError(ex); } } + + @Override + public void dispose() { + DisposableHelper.dispose(this); + } + + @Override + public boolean isDisposed() { + return DisposableHelper.isDisposed(get()); + } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java index 83691b50b8..59c8ebf05f 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableRedo.java @@ -19,7 +19,7 @@ import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.Exceptions; import io.reactivex.functions.*; -import io.reactivex.internal.disposables.SequentialDisposable; +import io.reactivex.internal.disposables.*; import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.observers.ToNotificationObserver; import io.reactivex.subjects.*; @@ -40,7 +40,14 @@ public void subscribeActual(Observer s) { final RedoObserver parent = new RedoObserver(s, subject, source); - s.onSubscribe(parent.arbiter); + ToNotificationObserver actionObserver = new ToNotificationObserver(new Consumer>() { + @Override + public void accept(Notification o) { + parent.handle(o); + } + }); + ListCompositeDisposable cd = new ListCompositeDisposable(parent.arbiter, actionObserver); + s.onSubscribe(cd); ObservableSource action; @@ -52,12 +59,7 @@ public void subscribeActual(Observer s) { return; } - action.subscribe(new ToNotificationObserver(new Consumer>() { - @Override - public void accept(Notification o) { - parent.handle(o); - } - })); + action.subscribe(actionObserver); // trigger first subscription parent.handle(Notification.createOnNext(0)); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java index fede189c54..96d577eb0a 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRepeatTest.java @@ -25,9 +25,11 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.internal.subscriptions.BooleanSubscription; +import io.reactivex.processors.PublishProcessor; import io.reactivex.schedulers.Schedulers; import io.reactivex.subscribers.TestSubscriber; @@ -306,4 +308,25 @@ public boolean getAsBoolean() throws Exception { .assertFailure(TestException.class, 1); } + @Test + public void shouldDisposeInnerObservable() { + final PublishProcessor subject = PublishProcessor.create(); + final Disposable disposable = Flowable.just("Leak") + .repeatWhen(new Function, Flowable>() { + @Override + public Flowable apply(Flowable completions) throws Exception { + return completions.switchMap(new Function>() { + @Override + public Flowable apply(Object ignore) throws Exception { + return subject; + } + }); + } + }) + .subscribe(); + + assertTrue(subject.hasSubscribers()); + disposable.dispose(); + assertFalse(subject.hasSubscribers()); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java index ef88b0e8f0..262627c41b 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRetryTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -997,4 +998,27 @@ public boolean getAsBoolean() throws Exception { .test() .assertResult(1, 1, 1, 1, 1); } + + + @Test + public void shouldDisposeInnerObservable() { + final PublishProcessor subject = PublishProcessor.create(); + final Disposable disposable = Flowable.error(new RuntimeException("Leak")) + .retryWhen(new Function, Flowable>() { + @Override + public Flowable apply(Flowable errors) throws Exception { + return errors.switchMap(new Function>() { + @Override + public Flowable apply(Throwable ignore) throws Exception { + return subject; + } + }); + } + }) + .subscribe(); + + assertTrue(subject.hasSubscribers()); + disposable.dispose(); + assertFalse(subject.hasSubscribers()); + } } \ No newline at end of file diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java index 81243f011c..acf87c8992 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRepeatTest.java @@ -14,6 +14,7 @@ package io.reactivex.internal.operators.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.*; import java.util.*; @@ -25,11 +26,12 @@ import io.reactivex.*; import io.reactivex.Observable; import io.reactivex.Observer; -import io.reactivex.disposables.Disposables; +import io.reactivex.disposables.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.*; import io.reactivex.observers.TestObserver; import io.reactivex.schedulers.Schedulers; +import io.reactivex.subjects.PublishSubject; public class ObservableRepeatTest { @@ -257,4 +259,25 @@ public boolean getAsBoolean() throws Exception { .assertFailure(TestException.class, 1); } + @Test + public void shouldDisposeInnerObservable() { + final PublishSubject subject = PublishSubject.create(); + final Disposable disposable = Observable.just("Leak") + .repeatWhen(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable completions) throws Exception { + return completions.switchMap(new Function>() { + @Override + public ObservableSource apply(Object ignore) throws Exception { + return subject; + } + }); + } + }) + .subscribe(); + + assertTrue(subject.hasObservers()); + disposable.dispose(); + assertFalse(subject.hasObservers()); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java index bcd1788d68..a2d00f1096 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRetryTest.java @@ -908,4 +908,26 @@ public boolean test(Throwable e) throws Exception { } } + @Test + public void shouldDisposeInnerObservable() { + final PublishSubject subject = PublishSubject.create(); + final Disposable disposable = Observable.error(new RuntimeException("Leak")) + .retryWhen(new Function, ObservableSource>() { + @Override + public ObservableSource apply(Observable errors) throws Exception { + return errors.switchMap(new Function>() { + @Override + public ObservableSource apply(Throwable ignore) throws Exception { + return subject; + } + }); + } + }) + .subscribe(); + + assertTrue(subject.hasObservers()); + disposable.dispose(); + assertFalse(subject.hasObservers()); + } + }