diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0cef17b1d3..5c52376ac7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -76,6 +76,7 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; +import rx.operators.OperationSkipUntil; import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; @@ -6119,4 +6120,18 @@ public Observable>> toMultimap(Func1 Observable>> toMultimap(Func1 keySelector, Func1 valueSelector, Func0>> mapFactory, Func1> collectionFactory) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector, mapFactory, collectionFactory)); } + + /** + * Return an Observable that skips elements from the source Observable until the secondary + * observable emits an element. + * + * @param other the other Observable that has to emit an element before this + * Observable's elements are relayed + * @return an Observable that skips elements from the source Observable until the secondary + * observable emits an element. + * @see MSDN: Observable.SkipUntil + */ + public Observable skipUntil(Observable other) { + return create(new OperationSkipUntil(this, other)); + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java new file mode 100644 index 0000000000..e8f04fd383 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java @@ -0,0 +1,121 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicBoolean; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SerialSubscription; + +/** + * Skip elements from the source Observable until the secondary + * observable fires an element. + * + * @see MSDN: Observable.SkipUntil + */ +public class OperationSkipUntil implements OnSubscribeFunc { + protected final Observable source; + protected final Observable other; + public OperationSkipUntil(Observable source, Observable other) { + this.source = source; + this.other = other; + } + + @Override + public Subscription onSubscribe(Observer t1) { + return new ResultManager(t1).init(); + } + /** Manage the source and other observers. */ + private class ResultManager implements Subscription, Observer { + final Observer observer; + final CompositeSubscription cancel; + final Object guard = new Object(); + final AtomicBoolean running = new AtomicBoolean(); + public ResultManager(Observer observer) { + this.observer = observer; + this.cancel = new CompositeSubscription(); + } + public ResultManager init() { + + SerialSubscription toSource = new SerialSubscription(); + SerialSubscription toOther = new SerialSubscription(); + + cancel.add(toSource); + cancel.add(toOther); + + toSource.setSubscription(source.subscribe(this)); + toOther.setSubscription(other.subscribe(new OtherObserver(toOther))); + + return this; + } + + @Override + public void unsubscribe() { + cancel.unsubscribe(); + } + + @Override + public void onNext(T args) { + if (running.get()) { + observer.onNext(args); + } + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + observer.onError(e); + unsubscribe(); + } + } + + @Override + public void onCompleted() { + synchronized (guard) { + observer.onCompleted(); + unsubscribe(); + } + } + + /** Observe the other stream. */ + private class OtherObserver implements Observer { + final Subscription self; + public OtherObserver(Subscription self) { + this.self = self; + } + + @Override + public void onNext(U args) { + running.set(true); + self.unsubscribe(); + } + + @Override + public void onError(Throwable e) { + ResultManager.this.onError(e); + } + + @Override + public void onCompleted() { + self.unsubscribe(); + } + + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java b/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java new file mode 100644 index 0000000000..2acca312df --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java @@ -0,0 +1,157 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.subjects.PublishSubject; + +public class OperationSkipUntilTest { + @Mock + Observer observer; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void normal1() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onNext(100); + + source.onNext(2); + source.onNext(3); + source.onNext(4); + source.onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onCompleted(); + } + @Test + public void otherNeverFires() { + PublishSubject source = PublishSubject.create(); + + Observable m = source.skipUntil(Observable.never()); + + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onNext(4); + source.onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + } + @Test + public void otherEmpty() { + PublishSubject source = PublishSubject.create(); + + Observable m = source.skipUntil(Observable.empty()); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, never()).onCompleted(); + } + @Test + public void otherFiresAndCompletes() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onNext(100); + other.onCompleted(); + + source.onNext(2); + source.onNext(3); + source.onNext(4); + source.onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onCompleted(); + } + @Test + public void sourceThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onNext(100); + other.onCompleted(); + + source.onNext(2); + source.onError(new RuntimeException("Forced failure")); + + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } + @Test + public void otherThrowsImmediately() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onError(new RuntimeException("Forced failure")); + + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } +}