From cdf7ee760b7003a072f882c577c1460e27978699 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 29 Nov 2013 09:47:25 +0100 Subject: [PATCH 1/3] Operation SkipUntil --- rxjava-core/src/main/java/rx/Observable.java | 15 ++ .../java/rx/operators/OperationSkipUntil.java | 125 ++++++++++++++ .../rx/operators/OperationSkipUntilTest.java | 157 ++++++++++++++++++ 3 files changed, 297 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java create mode 100644 rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 0cef17b1d3..5c52376ac7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -76,6 +76,7 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSkipLast; +import rx.operators.OperationSkipUntil; import rx.operators.OperationSkipWhile; import rx.operators.OperationSubscribeOn; import rx.operators.OperationSum; @@ -6119,4 +6120,18 @@ public Observable>> toMultimap(Func1 Observable>> toMultimap(Func1 keySelector, Func1 valueSelector, Func0>> mapFactory, Func1> collectionFactory) { return create(OperationToMultimap.toMultimap(this, keySelector, valueSelector, mapFactory, collectionFactory)); } + + /** + * Return an Observable that skips elements from the source Observable until the secondary + * observable emits an element. + * + * @param other the other Observable that has to emit an element before this + * Observable's elements are relayed + * @return an Observable that skips elements from the source Observable until the secondary + * observable emits an element. + * @see MSDN: Observable.SkipUntil + */ + public Observable skipUntil(Observable other) { + return create(new OperationSkipUntil(this, other)); + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java new file mode 100644 index 0000000000..77c8e43c7f --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java @@ -0,0 +1,125 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import java.util.concurrent.atomic.AtomicBoolean; +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.SerialSubscription; + +/** + * Skip elements from the source Observable until the secondary + * observable fires an element. + * + * @see MSDN: Observable.SkipUntil + */ +public class OperationSkipUntil implements OnSubscribeFunc { + protected final Observable source; + protected final Observable other; + public OperationSkipUntil(Observable source, Observable other) { + this.source = source; + this.other = other; + } + + @Override + public Subscription onSubscribe(Observer t1) { + return new ResultManager(t1).init(); + } + /** Manage the source and other observers. */ + class ResultManager implements Subscription, Observer { + final Observer observer; + final CompositeSubscription cancel; + final Object guard = new Object(); + final AtomicBoolean running = new AtomicBoolean(); + public ResultManager(Observer observer) { + this.observer = observer; + this.cancel = new CompositeSubscription(); + } + public ResultManager init() { + + SerialSubscription toSource = new SerialSubscription(); + SerialSubscription toOther = new SerialSubscription(); + + cancel.add(toSource); + cancel.add(toOther); + + toSource.setSubscription(source.subscribe(this)); + toOther.setSubscription(other.subscribe(new OtherObserver(toOther))); + + return this; + } + + @Override + public void unsubscribe() { + cancel.unsubscribe(); + } + + @Override + public void onNext(T args) { + if (running.get()) { + observer.onNext(args); + } + } + + @Override + public void onError(Throwable e) { + synchronized (guard) { + observer.onError(e); + unsubscribe(); + } + } + + @Override + public void onCompleted() { + synchronized (guard) { + observer.onCompleted(); + unsubscribe(); + } + } + + /** Observe the other stream. */ + class OtherObserver implements Observer { + final Subscription self; + public OtherObserver(Subscription self) { + this.self = self; + } + + @Override + public void onNext(U args) { + running.set(true); + self.unsubscribe(); + } + + @Override + public void onError(Throwable e) { + ResultManager.this.onError(e); + } + + @Override + public void onCompleted() { + if (!running.get()) { + ResultManager.this.onCompleted(); + } else { + self.unsubscribe(); + } + } + + } + } +} diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java b/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java new file mode 100644 index 0000000000..b1b570e7bf --- /dev/null +++ b/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java @@ -0,0 +1,157 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import org.junit.Before; +import org.junit.Test; +import static org.mockito.Matchers.any; +import org.mockito.Mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import org.mockito.MockitoAnnotations; +import rx.Observable; +import rx.Observer; +import rx.subjects.PublishSubject; + +public class OperationSkipUntilTest { + @Mock + Observer observer; + + @Before + public void before() { + MockitoAnnotations.initMocks(this); + } + + @Test + public void normal1() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onNext(100); + + source.onNext(2); + source.onNext(3); + source.onNext(4); + source.onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onCompleted(); + } + @Test + public void otherNeverFires() { + PublishSubject source = PublishSubject.create(); + + Observable m = source.skipUntil(Observable.never()); + + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + source.onNext(2); + source.onNext(3); + source.onNext(4); + source.onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + } + @Test + public void otherEmpty() { + PublishSubject source = PublishSubject.create(); + + Observable m = source.skipUntil(Observable.empty()); + + m.subscribe(observer); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onCompleted(); + } + @Test + public void otherFiresAndCompletes() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onNext(100); + other.onCompleted(); + + source.onNext(2); + source.onNext(3); + source.onNext(4); + source.onCompleted(); + + verify(observer, never()).onError(any(Throwable.class)); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onNext(4); + verify(observer, times(1)).onCompleted(); + } + @Test + public void sourceThrows() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onNext(100); + other.onCompleted(); + + source.onNext(2); + source.onError(new RuntimeException("Forced failure")); + + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } + @Test + public void otherThrowsImmediately() { + PublishSubject source = PublishSubject.create(); + PublishSubject other = PublishSubject.create(); + + Observable m = source.skipUntil(other); + m.subscribe(observer); + + source.onNext(0); + source.onNext(1); + + other.onError(new RuntimeException("Forced failure")); + + verify(observer, never()).onNext(any()); + verify(observer, times(1)).onError(any(Throwable.class)); + verify(observer, never()).onCompleted(); + } +} From b11b30adce8292887c3b4488c03f18f49ab6e14e Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 29 Nov 2013 12:12:29 +0100 Subject: [PATCH 2/3] Inner classes set to private --- .../src/main/java/rx/operators/OperationSkipUntil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java index 77c8e43c7f..787877dfdb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java @@ -42,7 +42,7 @@ public Subscription onSubscribe(Observer t1) { return new ResultManager(t1).init(); } /** Manage the source and other observers. */ - class ResultManager implements Subscription, Observer { + private class ResultManager implements Subscription, Observer { final Observer observer; final CompositeSubscription cancel; final Object guard = new Object(); @@ -94,7 +94,7 @@ public void onCompleted() { } /** Observe the other stream. */ - class OtherObserver implements Observer { + private class OtherObserver implements Observer { final Subscription self; public OtherObserver(Subscription self) { this.self = self; From a17b592387be5b16b72892728a85748deb19e4d9 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Sun, 1 Dec 2013 10:34:01 +0100 Subject: [PATCH 3/3] Fixed case with skipUntil(empty()) == never() (matches Rx.NET) --- .../src/main/java/rx/operators/OperationSkipUntil.java | 6 +----- .../src/test/java/rx/operators/OperationSkipUntilTest.java | 2 +- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java index 787877dfdb..e8f04fd383 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSkipUntil.java @@ -113,11 +113,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { - if (!running.get()) { - ResultManager.this.onCompleted(); - } else { - self.unsubscribe(); - } + self.unsubscribe(); } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java b/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java index b1b570e7bf..2acca312df 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationSkipUntilTest.java @@ -89,7 +89,7 @@ public void otherEmpty() { verify(observer, never()).onError(any(Throwable.class)); verify(observer, never()).onNext(any()); - verify(observer, times(1)).onCompleted(); + verify(observer, never()).onCompleted(); } @Test public void otherFiresAndCompletes() {