From bb78b96ee1ba091b2efb199bc85a802561b34441 Mon Sep 17 00:00:00 2001 From: Nitesh Kant Date: Fri, 9 Oct 2015 14:17:52 -0700 Subject: [PATCH] New operators: `concatEmptyWith` and `mergeEmptyWith` As discussed in issue #3037, the primary use of these operators is to be applied to `Observable` so that they can be merged and concatenated with an Observable of a different type. Both these operators raise an error if the source Observable emits any item. --- src/main/java/rx/Observable.java | 69 ++++++ .../operators/OperatorConcatEmptyWith.java | 203 ++++++++++++++++++ .../operators/OperatorMergeEmptyWith.java | 132 ++++++++++++ .../OperatorConcatEmptyWithTest.java | 167 ++++++++++++++ .../operators/OperatorMergeEmptyWithTest.java | 145 +++++++++++++ 5 files changed, 716 insertions(+) create mode 100644 src/main/java/rx/internal/operators/OperatorConcatEmptyWith.java create mode 100644 src/main/java/rx/internal/operators/OperatorMergeEmptyWith.java create mode 100644 src/test/java/rx/internal/operators/OperatorConcatEmptyWithTest.java create mode 100644 src/test/java/rx/internal/operators/OperatorMergeEmptyWithTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 02142ec6ce..d8b863b499 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -9986,6 +9986,75 @@ public final Observable zipWith(Observable other, Func2 return zip(this, other, zipFunction); } + /** + * Returns an Observable that upon completion of the source Observable subscribes to the passed {@code other} + * Observable and then emits all items emitted by that Observable. This function does not expect the source + * Observable to emit any item, in case, the source Observable, emits any item, an {@link IllegalStateException} + * is raised. + *

+ * + * This is different than {@link #concatWith(Observable)} as it does not expect the source Observable to ever emit + * an item. So, this usually is useful for {@code Observable} and results in cleaner code as opposed to using + * a {@link #cast(Class)}, something like: + * + * {@code Observable.empty().cast(String.class).concatWith(Observable.just("Hello"))} + * + *

+ *
Scheduler:
+ *
{@code concatEmptyWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + *
Backpressure:
+ *
{@code concatEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable} + * as it never expects the source to ever emit an item. All demands are sent to the {@code other} + * {@code Observable}.
+ * + * @return an Observable that upon completion of the source, starts emitting items from the {@code other} + * Observable. + * @throws IllegalStateException If the source emits any item. + * + * @see #mergeEmptyWith(Observable) + */ + @Experimental + public final Observable concatEmptyWith(Observable other) { + return lift(new OperatorConcatEmptyWith(other)); + } + + /** + * Returns an Observable that only listens for error from the source Observable and emit items only from the passed + * {@code other} Observable. This function does not expect the source Observable to emit any item, in case, the + * source Observable, emits any item, an {@link IllegalStateException} is raised. + *

+ * + * This is different than {@link #mergeWith(Observable)} as it does not expect the source Observable to ever emit + * an item. So, this usually is useful for using on {@code Observable} and results in cleaner code as opposed + * to using a {@link #cast(Class)}, something like: + * {@code Observable.empty().cast(String.class).mergeWith(Observable.just("Hello"))} + * + *

+ *
Scheduler:
+ *
{@code mergeEmptyWith} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + *
+ *
Backpressure:
+ *
{@code mergeEmptyWith} does not propagate any demands from the subscriber to the source {@code Observable} + * as it never expects the source to ever emit an item. All demands are sent to the {@code other} + * {@code Observable}.
+ *
+ * + * @return an Observable that only listens for errors from the source and starts emitting items from the + * {@code other} Observable on subscription. + * Observable. + * @throws IllegalStateException If the source emits any item. + * + * @see #concatEmptyWith(Observable) + */ + @Experimental + public final Observable mergeEmptyWith(Observable other) { + return lift(new OperatorMergeEmptyWith(other)); + } + /** * An Observable that never sends any information to an {@link Observer}. * This Observable is useful primarily for testing purposes. diff --git a/src/main/java/rx/internal/operators/OperatorConcatEmptyWith.java b/src/main/java/rx/internal/operators/OperatorConcatEmptyWith.java new file mode 100644 index 0000000000..8f6a5d18e1 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorConcatEmptyWith.java @@ -0,0 +1,203 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; +import rx.internal.producers.ProducerArbiter; +import rx.subscriptions.SerialSubscription; + +/** + * Returns an Observable that emits an error if any item is emitted by the source and emits items from the supplied + * alternate {@code Observable} after the source completes. + * + * @param the source value type + * @param the result value type + */ +public final class OperatorConcatEmptyWith implements Operator { + + private final Observable alternate; + + public OperatorConcatEmptyWith(Observable alternate) { + this.alternate = alternate; + } + + @Override + public Subscriber call(Subscriber child) { + final SerialSubscription ssub = new SerialSubscription(); + final ParentSubscriber parent = new ParentSubscriber(child, ssub, alternate); + ssub.set(parent); + child.add(ssub); + child.setProducer(parent.emptyProducer); + return parent; + } + + private final class ParentSubscriber extends Subscriber { + + private final Subscriber child; + private final SerialSubscription ssub; + private final EmptyProducer emptyProducer; + private final Observable alternate; + + ParentSubscriber(Subscriber child, final SerialSubscription ssub, Observable alternate) { + this.child = child; + this.ssub = ssub; + this.emptyProducer = new EmptyProducer(); + this.alternate = alternate; + } + + @Override + public void setProducer(final Producer producer) { + /* + * Always request Max from the parent as we never really expect the parent to emit an item, so the + * actual value does not matter. However, if the parent producer is waiting for a request to emit + * a terminal event, not requesting the same will cause a deadlock of the parent never completing and + * the child never subscribed. + */ + producer.request(Long.MAX_VALUE); + } + + @Override + public void onCompleted() { + if (!child.isUnsubscribed()) { + AlternateSubscriber as = new AlternateSubscriber(child, emptyProducer); + ssub.set(as); + alternate.unsafeSubscribe(as); + } + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + onError(new IllegalStateException("Concat empty with source emitted an item: " + t)); + } + } + + private final class AlternateSubscriber extends Subscriber { + + private final EmptyProducer emptyProducer; + private final Subscriber child; + + AlternateSubscriber(Subscriber child, EmptyProducer emptyProducer) { + this.child = child; + this.emptyProducer = emptyProducer; + } + + @Override + public void setProducer(final Producer producer) { + emptyProducer.setAltProducer(producer); + } + + @Override + public void onCompleted() { + child.onCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(R r) { + child.onNext(r); + } + } + + /** + * This is a producer implementation that does the following: + * + *
    + *
  • If the alternate producer has not yet arrived, store the total requested count from downstream.
  • + *
  • If the alternate producer has arrived, then relay the request demand to it.
  • + *
  • Request {@link Long#MAX_VALUE} from the parent producer, the first time the child requests anything.
  • + *
+ * + * Since, this is only applicable to this operator, it does not check for emissions from the source, as the source + * is never expected to emit any item. Thus it is "lighter" weight than {@link ProducerArbiter} + */ + private static final class EmptyProducer implements Producer { + + /*Total requested items till the time the alternate producer arrives.*/ + private long missedRequested; /*Guarded by this*/ + /*Producer from the alternate Observable for this operator*/ + private Producer altProducer; /*Guarded by this*/ + + @Override + public void request(final long requested) { + if (requested < 0) { + throw new IllegalArgumentException("Requested items can not be negative."); + } + + if (requested == 0) { + return; + } + + boolean requestToAlternate = false; + Producer _altProducer; + synchronized (this) { + if (null == altProducer) { + /*Accumulate requested till the time an alternate producer arrives.*/ + long r = this.missedRequested; + long u = r + requested; + if (u < 0) { + u = Long.MAX_VALUE; + } + this.missedRequested = u; + } else { + /*If the alternate producer exists, then relay a valid request. The missed requested will be + requested from the alt producer on setProducer()*/ + requestToAlternate = true; + } + + _altProducer = altProducer; + } + + if (requestToAlternate) { + _altProducer.request(requested); + } + } + + private void setAltProducer(final Producer altProducer) { + if (null == altProducer) { + throw new IllegalArgumentException("Producer can not be null."); + } + + boolean requestToAlternate = false; + long _missedRequested; + + synchronized (this) { + if (0 != missedRequested) { + /*Something was requested from the source Observable, relay that to the new producer*/ + requestToAlternate = true; + } + + this.altProducer = altProducer; + _missedRequested = missedRequested; + } + + if (requestToAlternate) { + altProducer.request(_missedRequested); + } + } + } +} \ No newline at end of file diff --git a/src/main/java/rx/internal/operators/OperatorMergeEmptyWith.java b/src/main/java/rx/internal/operators/OperatorMergeEmptyWith.java new file mode 100644 index 0000000000..db9faf24c4 --- /dev/null +++ b/src/main/java/rx/internal/operators/OperatorMergeEmptyWith.java @@ -0,0 +1,132 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package rx.internal.operators; + +import rx.Observable; +import rx.Observable.Operator; +import rx.Producer; +import rx.Subscriber; +import rx.observers.SerializedSubscriber; + +/** + * Returns an Observable that emits an error if any item is emitted by the source and emits items from the supplied + * alternate {@code Observable}. The errors from source are propagated as-is. + * + * @param the source value type + * @param the result value type + */ +public final class OperatorMergeEmptyWith implements Operator { + + private final Observable alternate; + + public OperatorMergeEmptyWith(Observable alternate) { + this.alternate = alternate; + } + + @Override + public Subscriber call(final Subscriber child) { + final ChildSubscriber wrappedChild = new ChildSubscriber(child); + final ParentSubscriber parent = new ParentSubscriber(wrappedChild); + wrappedChild.add(parent); + alternate.unsafeSubscribe(wrappedChild); + return parent; + } + + private final class ParentSubscriber extends Subscriber { + + private final ChildSubscriber child; + + ParentSubscriber(ChildSubscriber child) { + this.child = child; + } + + @Override + public void setProducer(final Producer producer) { + /* + * Always request Max from the parent as we never really expect the parent to emit an item, so the + * actual value does not matter. However, if the parent producer is waiting for a request to emit + * a terminal event, not requesting the same will cause the merged Observable to never complete. + */ + producer.request(Long.MAX_VALUE); + } + + @Override + public void onCompleted() { + child.parentCompleted(); + } + + @Override + public void onError(Throwable e) { + child.onError(e); + } + + @Override + public void onNext(T t) { + onError(new IllegalStateException("Merge empty with source emitted an item: " + t)); + } + } + + private final class ChildSubscriber extends Subscriber { + + private final SerializedSubscriber delegate; + private boolean parentCompleted; /*Guarded by this*/ + private boolean childCompleted; /*Guarded by this*/ + + ChildSubscriber(Subscriber delegate) { + super(delegate); + this.delegate = new SerializedSubscriber(delegate); + } + + @Override + public void onCompleted() { + boolean bothCompleted = false; + synchronized (this) { + if (parentCompleted) { + bothCompleted = true; + } + childCompleted = true; + } + + if (bothCompleted) { + delegate.onCompleted(); + } + } + + @Override + public void onError(Throwable e) { + delegate.onError(e); + } + + @Override + public void onNext(R r) { + delegate.onNext(r); + } + + public void parentCompleted() { + boolean bothCompleted = false; + synchronized (this) { + if (childCompleted) { + bothCompleted = true; + } + parentCompleted = true; + } + + if (bothCompleted) { + delegate.onCompleted(); + } + } + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorConcatEmptyWithTest.java b/src/test/java/rx/internal/operators/OperatorConcatEmptyWithTest.java new file mode 100644 index 0000000000..bc38a497a0 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorConcatEmptyWithTest.java @@ -0,0 +1,167 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Producer; +import rx.Subscriber; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +public class OperatorConcatEmptyWithTest { + + @Rule + public final ExpectedException expectedException = ExpectedException.none(); + + @Test(timeout = 60000) + public void testWithVoid() { + final String soleValue = "Hello"; + Observable source = Observable.empty() + .concatEmptyWith(Observable.just(soleValue)); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(soleValue); + } + + @Test(timeout = 60000) + public void testErrorOnSourceEmitItem() { + Observable source = Observable.just(1) + .concatEmptyWith(Observable.just("Hello")); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoValues(); + testSubscriber.assertError(IllegalStateException.class); + } + + @Test(timeout = 60000) + public void testSourceError() throws Exception { + Observable source = Observable.error(new IllegalStateException()) + .concatEmptyWith(Observable.just("Hello")); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoValues(); + testSubscriber.assertError(IllegalStateException.class); + } + + @Test(timeout = 60000) + public void testNoSubscribeBeforeSourceCompletion() { + final String soleValue = "Hello"; + final TestScheduler testScheduler = Schedulers.test(); + + /*Delaying on complete event so to check that the subscription does not happen before completion*/ + Observable source = Observable.empty() + .observeOn(testScheduler) + .concatEmptyWith(Observable.just(soleValue)); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testSubscriber.assertNoTerminalEvent(); + testSubscriber.assertNoValues(); + + testScheduler.triggerActions(); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(soleValue); + } + + @Test(timeout = 60000) + public void testRequestNSingle() throws Exception { + final String[] values = {"Hello1", "Hello2"}; + Observable source = Observable.empty() + .concatEmptyWith(Observable.from(values)); + + TestSubscriber testSubscriber = new TestSubscriber(0); + source.subscribe(testSubscriber); + + testSubscriber.assertNoTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertNoValues(); + + testSubscriber.requestMore(2); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValues(values); + } + + @Test(timeout = 60000) + public void testRequestNMulti() throws Exception { + final String[] values = {"Hello1", "Hello2"}; + Observable source = Observable.empty() + .concatEmptyWith(Observable.from(values)); + + TestSubscriber testSubscriber = new TestSubscriber(0); + source.subscribe(testSubscriber); + + testSubscriber.assertNoTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertNoValues(); + + testSubscriber.requestMore(1); + + testSubscriber.assertNoTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValues(values[0]); + + testSubscriber.requestMore(1); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValues(values); + } + + @Test(timeout = 60000) + public void testSourceDontCompleteWithoutRequest() throws Exception { + + TestSubscriber testSubscriber = new TestSubscriber(0); + + String soleValue = "Hello"; + Observable.create(new OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + subscriber.setProducer(new Producer() { + @Override + public void request(long n) { + subscriber.onCompleted(); + } + }); + } + }).concatEmptyWith(Observable.just(soleValue)).subscribe(testSubscriber); + + testSubscriber.requestMore(1); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(soleValue); + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OperatorMergeEmptyWithTest.java b/src/test/java/rx/internal/operators/OperatorMergeEmptyWithTest.java new file mode 100644 index 0000000000..6cc6707e63 --- /dev/null +++ b/src/test/java/rx/internal/operators/OperatorMergeEmptyWithTest.java @@ -0,0 +1,145 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import org.junit.Test; +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Producer; +import rx.Subscriber; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.schedulers.TestScheduler; + +import java.util.concurrent.TimeUnit; + +public class OperatorMergeEmptyWithTest { + + @Test(timeout = 60000) + public void testWithVoid() { + final String soleValue = "Hello"; + Observable source = Observable.empty() + .mergeEmptyWith(Observable.just(soleValue)); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(soleValue); + } + + @Test(timeout = 60000) + public void testErrorOnSourceEmitItem() { + TestScheduler testScheduler = Schedulers.test(); + Observable source = Observable.just(1) + .mergeEmptyWith(Observable.just("Hello").observeOn(testScheduler)); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testScheduler.triggerActions(); + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoValues(); + testSubscriber.assertError(IllegalStateException.class); + } + + @Test(timeout = 60000) + public void testSourceError() throws Exception { + TestScheduler testScheduler = Schedulers.test(); + Observable source = Observable.error(new IllegalStateException()) + .mergeEmptyWith(Observable.just("Hello").observeOn(testScheduler)); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testScheduler.triggerActions(); + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoValues(); + testSubscriber.assertError(IllegalStateException.class); + } + + @Test(timeout = 60000) + public void testSourceComplete() throws Exception { + final String soleValue = "Hello"; + Observable source = Observable.empty() + .mergeEmptyWith(Observable.just(soleValue)); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(soleValue); + } + + @Test(timeout = 60000) + public void testErrorFromSourcePostEmission() { + final String soleValue = "Hello"; + final TestScheduler testScheduler = Schedulers.test(); + + /*Delaying error event*/ + Observable source = Observable.error(new IllegalArgumentException()) + .observeOn(testScheduler) + .mergeEmptyWith(Observable.just(soleValue)); + + TestSubscriber testSubscriber = new TestSubscriber(); + source.subscribe(testSubscriber); + + testSubscriber.assertNotCompleted(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(soleValue); + + testScheduler.advanceTimeBy(1, TimeUnit.HOURS); + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertError(IllegalArgumentException.class); + } + + @Test(timeout = 60000) + public void testSourceNeverCompletes() throws Exception { + TestSubscriber subscriber = new TestSubscriber(); + Observable.never() + .mergeEmptyWith(Observable.just("Hello")) + .subscribe(subscriber); + + subscriber.assertValue("Hello"); + subscriber.assertNoTerminalEvent(); + } + + @Test(timeout = 60000) + public void testSourceDoesntCompleteWithoutRequest() throws Exception { + TestSubscriber testSubscriber = new TestSubscriber(0); + + String soleValue = "Hello"; + Observable.create(new OnSubscribe() { + @Override + public void call(final Subscriber subscriber) { + subscriber.setProducer(new Producer() { + @Override + public void request(long n) { + subscriber.onCompleted(); + } + }); + } + }).mergeEmptyWith(Observable.just(soleValue)).subscribe(testSubscriber); + + testSubscriber.requestMore(1); + + testSubscriber.awaitTerminalEvent(); + testSubscriber.assertNoErrors(); + testSubscriber.assertValue(soleValue); + } +} \ No newline at end of file