From 4369e1c9e051ee2b42f5dbe2062458303219b382 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Tue, 2 Feb 2016 09:53:48 +0100 Subject: [PATCH] 1.x: fix sample(other) backpressure and unsubscription behavior --- .../OperatorSampleWithObservable.java | 26 ++- .../operators/OperatorSampleTest.java | 154 ++++++++++++++++++ 2 files changed, 172 insertions(+), 8 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java b/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java index 3b3e295dd3..45614dfc28 100644 --- a/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java +++ b/src/main/java/rx/internal/operators/OperatorSampleWithObservable.java @@ -16,9 +16,9 @@ package rx.internal.operators; import java.util.concurrent.atomic.AtomicReference; -import rx.Observable; + +import rx.*; import rx.Observable.Operator; -import rx.Subscriber; import rx.observers.SerializedSubscriber; /** @@ -44,7 +44,9 @@ public Subscriber call(Subscriber child) { final AtomicReference value = new AtomicReference(EMPTY_TOKEN); - Subscriber samplerSub = new Subscriber(child) { + final AtomicReference main = new AtomicReference(); + + final Subscriber samplerSub = new Subscriber() { @Override public void onNext(U t) { Object localValue = value.getAndSet(EMPTY_TOKEN); @@ -58,15 +60,17 @@ public void onNext(U t) { @Override public void onError(Throwable e) { s.onError(e); - unsubscribe(); + // no need to null check, main is assigned before any of the two gets subscribed + main.get().unsubscribe(); } @Override public void onCompleted() { + // onNext(null); // emit the very last value? s.onCompleted(); - unsubscribe(); + // no need to null check, main is assigned before any of the two gets subscribed + main.get().unsubscribe(); } - }; Subscriber result = new Subscriber() { @@ -78,17 +82,23 @@ public void onNext(T t) { @Override public void onError(Throwable e) { s.onError(e); - unsubscribe(); + + samplerSub.unsubscribe(); } @Override public void onCompleted() { + // samplerSub.onNext(null); // emit the very last value? s.onCompleted(); - unsubscribe(); + + samplerSub.unsubscribe(); } }; + main.lazySet(result); + child.add(result); + child.add(samplerSub); sampler.unsafeSubscribe(samplerSub); diff --git a/src/test/java/rx/internal/operators/OperatorSampleTest.java b/src/test/java/rx/internal/operators/OperatorSampleTest.java index 1db795cbfb..78d3633d6f 100644 --- a/src/test/java/rx/internal/operators/OperatorSampleTest.java +++ b/src/test/java/rx/internal/operators/OperatorSampleTest.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; import org.junit.*; import org.mockito.InOrder; @@ -26,8 +27,10 @@ import rx.*; import rx.Observable.OnSubscribe; import rx.functions.*; +import rx.observers.TestSubscriber; import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; +import rx.subscriptions.Subscriptions; public class OperatorSampleTest { private TestScheduler scheduler; @@ -312,4 +315,155 @@ public void call(Long t) { Assert.assertEquals(Long.MAX_VALUE, requested[0]); } + + @Test + public void dontUnsubscribeChild1() { + TestSubscriber ts = new TestSubscriber(); + + PublishSubject source = PublishSubject.create(); + + PublishSubject sampler = PublishSubject.create(); + + source.sample(sampler).unsafeSubscribe(ts); + + source.onCompleted(); + + Assert.assertFalse("Source has subscribers?", source.hasObservers()); + Assert.assertFalse("Sampler has subscribers?", sampler.hasObservers()); + + Assert.assertFalse("TS unsubscribed?", ts.isUnsubscribed()); + } + + @Test + public void dontUnsubscribeChild2() { + TestSubscriber ts = new TestSubscriber(); + + PublishSubject source = PublishSubject.create(); + + PublishSubject sampler = PublishSubject.create(); + + source.sample(sampler).unsafeSubscribe(ts); + + sampler.onCompleted(); + + Assert.assertFalse("Source has subscribers?", source.hasObservers()); + Assert.assertFalse("Sampler has subscribers?", sampler.hasObservers()); + + Assert.assertFalse("TS unsubscribed?", ts.isUnsubscribed()); + } + + @Test + public void neverSetProducer() { + Observable neverBackpressure = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t) { + t.setProducer(new Producer() { + @Override + public void request(long n) { + // irrelevant in this test + } + }); + } + }); + + final AtomicInteger count = new AtomicInteger(); + + neverBackpressure.sample(neverBackpressure).unsafeSubscribe(new Subscriber() { + @Override + public void onNext(Integer t) { + // irrelevant + } + + @Override + public void onError(Throwable e) { + // irrelevant + } + + @Override + public void onCompleted() { + // irrelevant + } + + @Override + public void setProducer(Producer p) { + count.incrementAndGet(); + } + }); + + Assert.assertEquals(0, count.get()); + } + + @Test + public void unsubscribeMainAfterCompleted() { + final AtomicBoolean unsubscribed = new AtomicBoolean(); + + Observable source = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t) { + t.add(Subscriptions.create(new Action0() { + @Override + public void call() { + unsubscribed.set(true); + } + })); + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onCompleted() { + if (unsubscribed.get()) { + onError(new IllegalStateException("Resource unsubscribed!")); + } else { + super.onCompleted(); + } + } + }; + + PublishSubject sampler = PublishSubject.create(); + + source.sample(sampler).unsafeSubscribe(ts); + + sampler.onCompleted(); + + ts.assertNoErrors(); + ts.assertCompleted(); + } + + @Test + public void unsubscribeSamplerAfterCompleted() { + final AtomicBoolean unsubscribed = new AtomicBoolean(); + + Observable source = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t) { + t.add(Subscriptions.create(new Action0() { + @Override + public void call() { + unsubscribed.set(true); + } + })); + } + }); + + TestSubscriber ts = new TestSubscriber() { + @Override + public void onCompleted() { + if (unsubscribed.get()) { + onError(new IllegalStateException("Resource unsubscribed!")); + } else { + super.onCompleted(); + } + } + }; + + PublishSubject sampled = PublishSubject.create(); + + sampled.sample(source).unsafeSubscribe(ts); + + sampled.onCompleted(); + + ts.assertNoErrors(); + ts.assertCompleted(); + } }