diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 421a5ea89d..776026fdb4 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -19,6 +19,7 @@ import rx.exceptions.*; import rx.functions.*; import rx.internal.operators.*; +import rx.internal.producers.SingleProducer; import rx.internal.util.*; import rx.observables.*; import rx.observers.SafeSubscriber; @@ -3857,8 +3858,14 @@ public final Observable debounce(long timeout, TimeUnit unit, Scheduler sched * items, or the items emitted by the source Observable * @see ReactiveX operators documentation: DefaultIfEmpty */ - public final Observable defaultIfEmpty(T defaultValue) { - return lift(new OperatorDefaultIfEmpty(defaultValue)); + public final Observable defaultIfEmpty(final T defaultValue) { + //if empty switch to an observable that emits defaultValue and supports backpressure + return switchIfEmpty(Observable.create(new OnSubscribe() { + + @Override + public void call(Subscriber subscriber) { + subscriber.setProducer(new SingleProducer(subscriber, defaultValue)); + }})); } /** diff --git a/src/main/java/rx/internal/operators/OperatorDefaultIfEmpty.java b/src/main/java/rx/internal/operators/OperatorDefaultIfEmpty.java deleted file mode 100644 index 1265f81907..0000000000 --- a/src/main/java/rx/internal/operators/OperatorDefaultIfEmpty.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.operators; - -import rx.Observable.Operator; -import rx.Subscriber; - -/** - * Returns the elements of the specified sequence or the specified default value - * in a singleton sequence if the sequence is empty. - * @param the value type - */ -public class OperatorDefaultIfEmpty implements Operator { - final T defaultValue; - - public OperatorDefaultIfEmpty(T defaultValue) { - this.defaultValue = defaultValue; - } - - @Override - public Subscriber call(final Subscriber child) { - return new Subscriber(child) { - boolean hasValue; - @Override - public void onNext(T t) { - hasValue = true; - child.onNext(t); - } - - @Override - public void onError(Throwable e) { - child.onError(e); - } - - @Override - public void onCompleted() { - if (!hasValue) { - try { - child.onNext(defaultValue); - } catch (Throwable e) { - child.onError(e); - return; - } - } - child.onCompleted(); - } - - }; - } - -} diff --git a/src/test/java/rx/internal/operators/OperatorDefaultIfEmptyTest.java b/src/test/java/rx/internal/operators/OperatorDefaultIfEmptyTest.java index a180f933e2..ec6bb0486f 100644 --- a/src/test/java/rx/internal/operators/OperatorDefaultIfEmptyTest.java +++ b/src/test/java/rx/internal/operators/OperatorDefaultIfEmptyTest.java @@ -26,6 +26,7 @@ import rx.Observer; import rx.Subscriber; import rx.exceptions.TestException; +import rx.observers.TestSubscriber; public class OperatorDefaultIfEmptyTest { @@ -85,4 +86,28 @@ public void onCompleted() { verify(o, never()).onNext(any(Integer.class)); verify(o, never()).onCompleted(); } + + @Test + public void testBackpressureEmpty() { + TestSubscriber ts = TestSubscriber.create(0); + Observable.empty().defaultIfEmpty(1).subscribe(ts); + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + ts.requestMore(1); + ts.assertValue(1); + ts.assertCompleted(); + } + + @Test + public void testBackpressureNonEmpty() { + TestSubscriber ts = TestSubscriber.create(0); + Observable.just(1,2,3).defaultIfEmpty(1).subscribe(ts); + ts.assertNoValues(); + ts.assertNoTerminalEvent(); + ts.requestMore(2); + ts.assertValues(1, 2); + ts.requestMore(1); + ts.assertValues(1, 2, 3); + ts.assertCompleted(); + } }