diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 0a2ebab8ce..0da58ca7b0 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5595,6 +5595,24 @@ public final Observable groupJoin(Observable right, Func1 public final Observable ignoreElements() { return lift(OperatorIgnoreElements. instance()); } + + /** + * Returns the source with all {@code onNext} emissions ignored concatenated with the Observable {@code following}. + *

+ * + *

+ *
Scheduler:
+ *
{@code ignoreElementsThen} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the concatenation of {@code source.ignoreElements()} with {@code following} + * @see ReactiveX operators documentation: IgnoreElementsThen + */ + @SuppressWarnings("unchecked") + @Experimental + public final Observable ignoreElementsThen(Observable following) { + return ((Observable) (Observable) ignoreElements()).concatWith(following); + } /** * Returns an Observable that emits {@code true} if the source Observable is empty, otherwise {@code false}. diff --git a/src/test/java/rx/internal/operators/OperatorIgnoreElementsTest.java b/src/test/java/rx/internal/operators/OperatorIgnoreElementsTest.java index 818f228ba8..b7d82522d9 100644 --- a/src/test/java/rx/internal/operators/OperatorIgnoreElementsTest.java +++ b/src/test/java/rx/internal/operators/OperatorIgnoreElementsTest.java @@ -1,9 +1,11 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import java.util.Arrays; +import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -127,4 +129,39 @@ public void onNext(Integer t) { assertEquals(0, count.get()); } + @Test + public void testIgnoreElementsThen() { + final AtomicBoolean firstCompleted = new AtomicBoolean(false); + List list = Observable + .just("a","b","c") + .doOnCompleted(new Action0() { + @Override + public void call() { + firstCompleted.set(true); + }}) + .ignoreElementsThen(Observable.just(1, 2, 3)) + .toList().toBlocking().single(); + assertEquals(Arrays.asList(1, 2, 3), list ); + assertTrue(firstCompleted.get()); + } + + @Test + public void testIgnoreElementsThenWhenFirstObservableErrorsThatSecondObservableDoesNotGetSubscribedTo() { + final AtomicBoolean secondSubscribed = new AtomicBoolean(false); + TestSubscriber ts = TestSubscriber.create(); + Observable + .error(new RuntimeException()) + .ignoreElementsThen( + Observable.just(1, 2, 3) + .doOnSubscribe(new Action0() { + @Override + public void call() { + secondSubscribed.set(true); + }})) + .subscribe(ts); + ts.assertError(RuntimeException.class); + ts.assertNoValues(); + assertFalse(secondSubscribed.get()); + } + }