diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d5d8fb8297..23166ef074 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -77,6 +77,7 @@ import rx.operators.OperationZip; import rx.operators.SafeObservableSubscription; import rx.operators.SafeObserver; +import rx.operators.OperationAny; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; @@ -4187,4 +4188,35 @@ private boolean isInternalImplementation(Object o) { return p != null && p.getName().startsWith("rx.operators"); } + /** + * Returns an {@link Observable} that emits true if the source + * {@link Observable} is not empty, otherwise false. + * + * @return A subscription function for creating the target Observable. + * @see MSDN: Observable.Any + */ + public Observable any() { + return create(OperationAny.any(this)); + } + + /** + * Returns an {@link Observable} that emits true if any element + * of the source {@link Observable} satisfies the given condition, otherwise + * false. Note: always emit false if the source + * {@link Observable} is empty. + * + * @param predicate + * The condition to test every element. + * @return A subscription function for creating the target Observable. + * @see MSDN: Observable.Any Note: the description in this page is + * wrong. + */ + public Observable any(Func1 predicate) { + return create(OperationAny.any(this, predicate)); + } + } diff --git a/rxjava-core/src/main/java/rx/operators/OperationAny.java b/rxjava-core/src/main/java/rx/operators/OperationAny.java new file mode 100644 index 0000000000..5423af2f8d --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationAny.java @@ -0,0 +1,230 @@ +package rx.operators; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static rx.util.functions.Functions.alwaysTrue; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.junit.Test; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.util.functions.Func1; + +/** + * Returns an {@link Observable} that emits true if any element of + * an observable sequence satisfies a condition, otherwise false. + */ +public final class OperationAny { + + /** + * Returns an {@link Observable} that emits true if the source + * {@link Observable} is not empty, otherwise false. + * + * @param source + * The source {@link Observable} to check if not empty. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc any( + Observable source) { + return new Any(source, alwaysTrue()); + } + + /** + * Returns an {@link Observable} that emits true if any element + * of the source {@link Observable} satisfies the given condition, otherwise + * false. Note: always emit false if the source + * {@link Observable} is empty. + * + * @param source + * The source {@link Observable} to check if any element + * satisfies the given condition. + * @param predicate + * The condition to test every element. + * @return A subscription function for creating the target Observable. + */ + public static OnSubscribeFunc any( + Observable source, Func1 predicate) { + return new Any(source, predicate); + } + + private static class Any implements OnSubscribeFunc { + + private final Observable source; + private final Func1 predicate; + + private Any(Observable source, + Func1 predicate) { + this.source = source; + this.predicate = predicate; + } + + @Override + public Subscription onSubscribe(final Observer observer) { + final SafeObservableSubscription subscription = new SafeObservableSubscription(); + return subscription.wrap(source.subscribe(new Observer() { + + private final AtomicBoolean hasEmitted = new AtomicBoolean( + false); + + @Override + public void onNext(T value) { + try { + if (hasEmitted.get() == false) { + if (predicate.call(value) == true + && hasEmitted.getAndSet(true) == false) { + observer.onNext(true); + observer.onCompleted(); + // this will work if the sequence is + // asynchronous, it + // will have no effect on a synchronous + // observable + subscription.unsubscribe(); + } + } + } catch (Throwable ex) { + observer.onError(ex); + // this will work if the sequence is asynchronous, it + // will have no effect on a synchronous observable + subscription.unsubscribe(); + } + + } + + @Override + public void onError(Throwable ex) { + observer.onError(ex); + } + + @Override + public void onCompleted() { + if (!hasEmitted.get()) { + observer.onNext(false); + observer.onCompleted(); + } + } + })); + } + + } + + public static class UnitTest { + + @Test + public void testAnyWithTwoItems() { + Observable w = Observable.from(1, 2); + Observable observable = Observable.create(any(w)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(false); + verify(aObserver, times(1)).onNext(true); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testAnyWithOneItem() { + Observable w = Observable.from(1); + Observable observable = Observable.create(any(w)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(false); + verify(aObserver, times(1)).onNext(true); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testAnyWithEmpty() { + Observable w = Observable.empty(); + Observable observable = Observable.create(any(w)); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext(false); + verify(aObserver, never()).onNext(true); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testAnyWithPredicate1() { + Observable w = Observable.from(1, 2, 3); + Observable observable = Observable.create(any(w, + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 < 2; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, never()).onNext(false); + verify(aObserver, times(1)).onNext(true); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testAnyWithPredicate2() { + Observable w = Observable.from(1, 2, 3); + Observable observable = Observable.create(any(w, + new Func1() { + + @Override + public Boolean call(Integer t1) { + return t1 < 1; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext(false); + verify(aObserver, never()).onNext(true); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + + @Test + public void testAnyWithEmptyAndPredicate() { + // If the source is empty, always output false. + Observable w = Observable.empty(); + Observable observable = Observable.create(any(w, + new Func1() { + + @Override + public Boolean call(Integer t1) { + return true; + } + })); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + verify(aObserver, times(1)).onNext(false); + verify(aObserver, never()).onNext(true); + verify(aObserver, never()).onError( + org.mockito.Matchers.any(Throwable.class)); + verify(aObserver, times(1)).onCompleted(); + } + } +} diff --git a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java index 4bd6e11106..ee14a6e166 100644 --- a/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/AsyncSubject.java @@ -168,7 +168,7 @@ public void testCompleted() { private void assertCompletedObserver(Observer aObserver) { verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -221,7 +221,7 @@ public void testUnsubscribeBeforeCompleted() { private void assertNoOnNextEventsReceived(Observer aObserver) { verify(aObserver, Mockito.never()).onNext(anyString()); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(aObserver, Mockito.never()).onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java index 4a3729c24e..6fa45cd6c7 100644 --- a/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java @@ -15,7 +15,6 @@ */ package rx.subjects; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.concurrent.ConcurrentHashMap; @@ -199,7 +198,7 @@ private void assertCompletedObserver(Observer aObserver) { verify(aObserver, times(1)).onNext("default"); verify(aObserver, times(1)).onNext("one"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } diff --git a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java index c196d83cdd..dab76b5ec5 100644 --- a/rxjava-core/src/main/java/rx/subjects/PublishSubject.java +++ b/rxjava-core/src/main/java/rx/subjects/PublishSubject.java @@ -16,7 +16,6 @@ package rx.subjects; import static org.junit.Assert.*; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.ArrayList; @@ -273,7 +272,7 @@ private void assertCompletedObserver(Observer aObserver) verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -340,7 +339,7 @@ private void assertCompletedStartingWithThreeObserver(Observer aObserver verify(aObserver, Mockito.never()).onNext("one"); verify(aObserver, Mockito.never()).onNext("two"); verify(aObserver, times(1)).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } @@ -374,7 +373,7 @@ private void assertObservedUntilTwo(Observer aObserver) verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, Mockito.never()).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(aObserver, Mockito.never()).onCompleted(); } @@ -404,7 +403,7 @@ public void testUnsubscribeAfterOnCompleted() { inOrder.verify(anObserver, times(1)).onNext("one"); inOrder.verify(anObserver, times(1)).onNext("two"); inOrder.verify(anObserver, times(1)).onCompleted(); - inOrder.verify(anObserver, Mockito.never()).onError(any(Throwable.class)); + inOrder.verify(anObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); @SuppressWarnings("unchecked") Observer anotherObserver = mock(Observer.class); @@ -414,7 +413,7 @@ public void testUnsubscribeAfterOnCompleted() { inOrder.verify(anotherObserver, Mockito.never()).onNext("one"); inOrder.verify(anotherObserver, Mockito.never()).onNext("two"); inOrder.verify(anotherObserver, times(1)).onCompleted(); - inOrder.verify(anotherObserver, Mockito.never()).onError(any(Throwable.class)); + inOrder.verify(anotherObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); } @Test diff --git a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java index 51ac519357..28fbceccdf 100644 --- a/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java +++ b/rxjava-core/src/main/java/rx/subjects/ReplaySubject.java @@ -15,7 +15,6 @@ */ package rx.subjects; -import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; import java.util.ArrayList; @@ -215,7 +214,7 @@ private void assertCompletedObserver(Observer aObserver) inOrder.verify(aObserver, times(1)).onNext("one"); inOrder.verify(aObserver, times(1)).onNext("two"); inOrder.verify(aObserver, times(1)).onNext("three"); - inOrder.verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + inOrder.verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); inOrder.verify(aObserver, times(1)).onCompleted(); inOrder.verifyNoMoreInteractions(); } @@ -307,7 +306,7 @@ private void assertObservedUntilTwo(Observer aObserver) verify(aObserver, times(1)).onNext("one"); verify(aObserver, times(1)).onNext("two"); verify(aObserver, Mockito.never()).onNext("three"); - verify(aObserver, Mockito.never()).onError(any(Throwable.class)); + verify(aObserver, Mockito.never()).onError(org.mockito.Matchers.any(Throwable.class)); verify(aObserver, Mockito.never()).onCompleted(); }