diff --git a/src/main/java/rx/internal/operators/OnSubscribeToObservableFuture.java b/src/main/java/rx/internal/operators/OnSubscribeToObservableFuture.java index 0f6e2e3f76..573c4065cd 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeToObservableFuture.java +++ b/src/main/java/rx/internal/operators/OnSubscribeToObservableFuture.java @@ -22,6 +22,7 @@ import rx.exceptions.Exceptions; import rx.Subscriber; import rx.functions.Action0; +import rx.internal.producers.SingleProducer; import rx.subscriptions.Subscriptions; /** @@ -72,8 +73,7 @@ public void call() { return; } T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit); - subscriber.onNext(value); - subscriber.onCompleted(); + subscriber.setProducer(new SingleProducer(subscriber, value)); } catch (Throwable e) { // If this Observable is unsubscribed, we will receive an CancellationException. // However, CancellationException will not be passed to the final Subscriber diff --git a/src/test/java/rx/internal/operators/OnSubscribeToObservableFutureTest.java b/src/test/java/rx/internal/operators/OnSubscribeToObservableFutureTest.java index ff37d8fab3..4740b97aa0 100644 --- a/src/test/java/rx/internal/operators/OnSubscribeToObservableFutureTest.java +++ b/src/test/java/rx/internal/operators/OnSubscribeToObservableFutureTest.java @@ -16,27 +16,16 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; -import static org.mockito.Mockito.any; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; - -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Test; -import rx.Observable; -import rx.Observer; -import rx.Subscription; -import rx.observers.TestObserver; -import rx.observers.TestSubscriber; +import rx.*; +import rx.observers.*; import rx.schedulers.Schedulers; public class OnSubscribeToObservableFutureTest { @@ -139,4 +128,28 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec assertEquals(0, testSubscriber.getOnCompletedEvents().size()); assertEquals(0, testSubscriber.getOnNextEvents().size()); } + + @Test + public void backpressure() { + TestSubscriber ts = new TestSubscriber(0); + + FutureTask f = new FutureTask(new Runnable() { + @Override + public void run() { + + } + }, 1); + + f.run(); + + Observable.from(f).subscribe(ts); + + ts.assertNoValues(); + + ts.requestMore(1); + + ts.assertValue(1); + ts.assertNoErrors(); + ts.assertCompleted(); + } }