diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java
index c125682bd0..5d11c7b9ed 100644
--- a/rxjava-core/src/main/java/rx/Observable.java
+++ b/rxjava-core/src/main/java/rx/Observable.java
@@ -40,6 +40,7 @@
import rx.operators.OperationDefer;
import rx.operators.OperationDematerialize;
import rx.operators.OperationFilter;
+import rx.operators.OperationWhere;
import rx.operators.OperationMap;
import rx.operators.OperationMaterialize;
import rx.operators.OperationMerge;
@@ -722,6 +723,21 @@ public Boolean call(T t1) {
});
}
+ /**
+ * Filters an Observable by discarding any of its emissions that do not meet some test.
+ *
+ *
+ *
+ * @param that
+ * the Observable to filter
+ * @param predicate
+ * a function that evaluates the items emitted by the source Observable, returning true
if they pass the filter
+ * @return an Observable that emits only those items in the original Observable that the filter evaluates as true
+ */
+ public static Observable where(Observable that, Func1 predicate) {
+ return _create(OperationWhere.where(that, predicate));
+ }
+
/**
* Converts an {@link Iterable} sequence to an Observable sequence.
*
@@ -2419,6 +2435,21 @@ public Boolean call(T t1) {
});
}
+ /**
+ * Filters an Observable by discarding any of its emissions that do not meet some test.
+ *
+ *
+ *
+ * @param predicate
+ * a function that evaluates the items emitted by the source Observable, returning
+ * true
if they pass the filter
+ * @return an Observable that emits only those items in the original Observable that the filter
+ * evaluates as true
+ */
+ public Observable where(Func1 predicate) {
+ return where(this, predicate);
+ }
+
/**
* Returns the last element of an observable sequence with a specified source.
*
diff --git a/rxjava-core/src/main/java/rx/operators/OperationWhere.java b/rxjava-core/src/main/java/rx/operators/OperationWhere.java
new file mode 100644
index 0000000000..8fd7d2b36b
--- /dev/null
+++ b/rxjava-core/src/main/java/rx/operators/OperationWhere.java
@@ -0,0 +1,61 @@
+/**
+ * Copyright 2013 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package rx.operators;
+
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import rx.Observable;
+import rx.Observer;
+import rx.Subscription;
+import rx.util.functions.Func1;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public final class OperationWhere {
+
+ public static Func1, Subscription> where(Observable that, Func1 predicate) {
+ return OperationFilter.filter(that, predicate);
+ }
+
+ public static class UnitTest {
+
+ @Test
+ public void testWhere() {
+ Observable w = Observable.toObservable("one", "two", "three");
+ Observable observable = Observable.create(where(w, new Func1() {
+
+ @Override
+ public Boolean call(String t1) {
+ return t1.equals("two");
+ }
+ }));
+
+ @SuppressWarnings("unchecked")
+ Observer aObserver = mock(Observer.class);
+ observable.subscribe(aObserver);
+ verify(aObserver, Mockito.never()).onNext("one");
+ verify(aObserver, times(1)).onNext("two");
+ verify(aObserver, Mockito.never()).onNext("three");
+ verify(aObserver, Mockito.never()).onError(any(Exception.class));
+ verify(aObserver, times(1)).onCompleted();
+ }
+ }
+
+}