diff --git a/src/main/java/rx/Single.java b/src/main/java/rx/Single.java index 8f0db3e56b..20b983c063 100644 --- a/src/main/java/rx/Single.java +++ b/src/main/java/rx/Single.java @@ -12,6 +12,9 @@ */ package rx; +import java.util.Collection; +import java.util.concurrent.*; + import rx.Observable.Operator; import rx.annotations.Beta; import rx.annotations.Experimental; @@ -23,15 +26,13 @@ import rx.internal.util.ScalarSynchronousSingle; import rx.internal.util.UtilityFunctions; import rx.observers.SafeSubscriber; +import rx.observers.SerializedSubscriber; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.schedulers.Schedulers; import rx.singles.BlockingSingle; import rx.subscriptions.Subscriptions; -import java.util.Collection; -import java.util.concurrent.*; - /** * The Single class implements the Reactive Pattern for a single value response. See {@link Observable} for the * implementation of the Reactive Pattern for a stream or vector of values. @@ -1800,6 +1801,229 @@ public void onError(Throwable error) { } }); } + + /** + * Returns a Single that emits the item emitted by the source Single until a Completable terminates. Upon + * termination of {@code other}, this will emit a {@link CancellationException} rather than go to + * {@link SingleSubscriber#onSuccess(Object)}. + *

+ * + *

+ *
Scheduler:
+ *
{@code takeUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param other + * the Completable whose termination will cause {@code takeUntil} to emit the item from the source + * Single + * @return a Single that emits the item emitted by the source Single until such time as {@code other} terminates. + * @see ReactiveX operators documentation: TakeUntil + */ + public final Single takeUntil(final Completable other) { + return lift(new Operator() { + @Override + public Subscriber call(Subscriber child) { + final Subscriber serial = new SerializedSubscriber(child, false); + + final Subscriber main = new Subscriber(serial, false) { + @Override + public void onNext(T t) { + serial.onNext(t); + } + @Override + public void onError(Throwable e) { + try { + serial.onError(e); + } finally { + serial.unsubscribe(); + } + } + @Override + public void onCompleted() { + try { + serial.onCompleted(); + } finally { + serial.unsubscribe(); + } + } + }; + + final Completable.CompletableSubscriber so = new Completable.CompletableSubscriber() { + @Override + public void onCompleted() { + onError(new CancellationException("Stream was canceled before emitting a terminal event.")); + } + + @Override + public void onError(Throwable e) { + main.onError(e); + } + + @Override + public void onSubscribe(Subscription d) { + serial.add(d); + } + }; + + serial.add(main); + child.add(serial); + + other.subscribe(so); + + return main; + } + }); + } + + /** + * Returns a Single that emits the item emitted by the source Single until an Observable emits an item. Upon + * emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to + * {@link SingleSubscriber#onSuccess(Object)}. + *

+ * + *

+ *
Scheduler:
+ *
{@code takeUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param other + * the Observable whose first emitted item will cause {@code takeUntil} to emit the item from the source + * Single + * @param + * the type of items emitted by {@code other} + * @return a Single that emits the item emitted by the source Single until such time as {@code other} emits + * its first item + * @see ReactiveX operators documentation: TakeUntil + */ + public final Single takeUntil(final Observable other) { + return lift(new Operator() { + @Override + public Subscriber call(Subscriber child) { + final Subscriber serial = new SerializedSubscriber(child, false); + + final Subscriber main = new Subscriber(serial, false) { + @Override + public void onNext(T t) { + serial.onNext(t); + } + @Override + public void onError(Throwable e) { + try { + serial.onError(e); + } finally { + serial.unsubscribe(); + } + } + @Override + public void onCompleted() { + try { + serial.onCompleted(); + } finally { + serial.unsubscribe(); + } + } + }; + + final Subscriber so = new Subscriber() { + + @Override + public void onCompleted() { + onError(new CancellationException("Stream was canceled before emitting a terminal event.")); + } + + @Override + public void onError(Throwable e) { + main.onError(e); + } + + @Override + public void onNext(E e) { + onError(new CancellationException("Stream was canceled before emitting a terminal event.")); + } + }; + + serial.add(main); + serial.add(so); + + child.add(serial); + + other.unsafeSubscribe(so); + + return main; + } + }); + } + + /** + * Returns a Single that emits the item emitted by the source Single until a second Single emits an item. Upon + * emission of an item from {@code other}, this will emit a {@link CancellationException} rather than go to + * {@link SingleSubscriber#onSuccess(Object)}. + *

+ * + *

+ *
Scheduler:
+ *
{@code takeUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param other + * the Single whose emitted item will cause {@code takeUntil} to emit the item from the source Single + * @param + * the type of item emitted by {@code other} + * @return a Single that emits the item emitted by the source Single until such time as {@code other} emits its item + * @see ReactiveX operators documentation: TakeUntil + */ + public final Single takeUntil(final Single other) { + return lift(new Operator() { + @Override + public Subscriber call(Subscriber child) { + final Subscriber serial = new SerializedSubscriber(child, false); + + final Subscriber main = new Subscriber(serial, false) { + @Override + public void onNext(T t) { + serial.onNext(t); + } + @Override + public void onError(Throwable e) { + try { + serial.onError(e); + } finally { + serial.unsubscribe(); + } + } + @Override + public void onCompleted() { + try { + serial.onCompleted(); + } finally { + serial.unsubscribe(); + } + } + }; + + final SingleSubscriber so = new SingleSubscriber() { + @Override + public void onSuccess(E value) { + onError(new CancellationException("Stream was canceled before emitting a terminal event.")); + } + + @Override + public void onError(Throwable e) { + main.onError(e); + } + }; + + serial.add(main); + serial.add(so); + + child.add(serial); + + other.subscribe(so); + + return main; + } + }); + } /** * Converts this Single into an {@link Observable}. diff --git a/src/test/java/rx/SingleTest.java b/src/test/java/rx/SingleTest.java index 393088562c..3ce86e9772 100644 --- a/src/test/java/rx/SingleTest.java +++ b/src/test/java/rx/SingleTest.java @@ -15,6 +15,12 @@ import org.junit.Test; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.*; + import rx.Single.OnSubscribe; import rx.exceptions.CompositeException; import rx.functions.*; @@ -22,13 +28,9 @@ import rx.schedulers.Schedulers; import rx.schedulers.TestScheduler; import rx.singles.BlockingSingle; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; -import java.io.IOException; -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.*; - import static org.junit.Assert.*; import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; @@ -1353,4 +1355,293 @@ public Observable call(Throwable throwable) { int numberOfErrors = retryCounter.getOnErrorEvents().size(); assertEquals(retryCount, numberOfErrors); } + + @Test + public void takeUntilCompletableFires() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.toCompletable()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + until.onCompleted(); + + ts.assertError(CancellationException.class); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilObservableFires() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.take(1)).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + until.onNext(1); + + ts.assertError(CancellationException.class); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilSingleFires() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.take(1).toSingle()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + until.onNext(1); + + ts.assertError(CancellationException.class); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilObservableCompletes() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.take(1)).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + until.onCompleted(); + + ts.assertError(CancellationException.class); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilSourceUnsubscribes_withCompletable() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.toCompletable()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + source.onNext(1); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilSourceUnsubscribes_withObservable() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + source.onNext(1); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilSourceUnsubscribes_withSingle() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.take(1).toSingle()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + source.onNext(1); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilSourceErrorUnsubscribes_withCompletable() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.toCompletable()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + Exception e = new Exception(); + source.onError(e); + + ts.assertNoValues(); + ts.assertError(e); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilSourceErrorUnsubscribes_withObservable() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + source.onError(new Throwable()); + + ts.assertNoValues(); + ts.assertError(Throwable.class); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilSourceErrorUnsubscribes_withSingle() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.take(1).toSingle()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + source.onError(new Throwable()); + + ts.assertNoValues(); + ts.assertError(Throwable.class); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilError_withCompletable_shouldMatch() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.toCompletable()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + Exception e = new Exception(); + until.onError(e); + + ts.assertNoValues(); + ts.assertError(e); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilError_withObservable_shouldMatch() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.asObservable()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + Exception e = new Exception(); + until.onError(e); + + ts.assertNoValues(); + ts.assertError(e); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } + + @Test + public void takeUntilError_withSingle_shouldMatch() { + PublishSubject source = PublishSubject.create(); + PublishSubject until = PublishSubject.create(); + + TestSubscriber ts = new TestSubscriber(); + + source.take(1).toSingle().takeUntil(until.take(1).toSingle()).unsafeSubscribe(ts); + + assertTrue(source.hasObservers()); + assertTrue(until.hasObservers()); + + Exception e = new Exception(); + until.onError(e); + + ts.assertNoValues(); + ts.assertError(e); + + assertFalse(source.hasObservers()); + assertFalse(until.hasObservers()); + assertFalse(ts.isUnsubscribed()); + } }