Skip to content

Commit d82f1be

Browse files
committed
1.x: enable backpressure with from(Future). (#3893)
1 parent 1d7e889 commit d82f1be

File tree

2 files changed

+32
-19
lines changed

2 files changed

+32
-19
lines changed

src/main/java/rx/internal/operators/OnSubscribeToObservableFuture.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import rx.exceptions.Exceptions;
2323
import rx.Subscriber;
2424
import rx.functions.Action0;
25+
import rx.internal.producers.SingleProducer;
2526
import rx.subscriptions.Subscriptions;
2627

2728
/**
@@ -72,8 +73,7 @@ public void call() {
7273
return;
7374
}
7475
T value = (unit == null) ? (T) that.get() : (T) that.get(time, unit);
75-
subscriber.onNext(value);
76-
subscriber.onCompleted();
76+
subscriber.setProducer(new SingleProducer<T>(subscriber, value));
7777
} catch (Throwable e) {
7878
// If this Observable is unsubscribed, we will receive an CancellationException.
7979
// However, CancellationException will not be passed to the final Subscriber

src/test/java/rx/internal/operators/OnSubscribeToObservableFutureTest.java

Lines changed: 30 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -16,27 +16,16 @@
1616
package rx.internal.operators;
1717

1818
import static org.junit.Assert.assertEquals;
19-
import static org.mockito.Mockito.any;
20-
import static org.mockito.Mockito.mock;
21-
import static org.mockito.Mockito.never;
22-
import static org.mockito.Mockito.times;
23-
import static org.mockito.Mockito.verify;
24-
import static org.mockito.Mockito.when;
25-
26-
import java.util.concurrent.CancellationException;
27-
import java.util.concurrent.ExecutionException;
28-
import java.util.concurrent.Future;
29-
import java.util.concurrent.TimeUnit;
30-
import java.util.concurrent.TimeoutException;
19+
import static org.mockito.Matchers.any;
20+
import static org.mockito.Mockito.*;
21+
22+
import java.util.concurrent.*;
3123
import java.util.concurrent.atomic.AtomicBoolean;
3224

3325
import org.junit.Test;
3426

35-
import rx.Observable;
36-
import rx.Observer;
37-
import rx.Subscription;
38-
import rx.observers.TestObserver;
39-
import rx.observers.TestSubscriber;
27+
import rx.*;
28+
import rx.observers.*;
4029
import rx.schedulers.Schedulers;
4130

4231
public class OnSubscribeToObservableFutureTest {
@@ -139,4 +128,28 @@ public Object get(long timeout, TimeUnit unit) throws InterruptedException, Exec
139128
assertEquals(0, testSubscriber.getOnCompletedEvents().size());
140129
assertEquals(0, testSubscriber.getOnNextEvents().size());
141130
}
131+
132+
@Test
133+
public void backpressure() {
134+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
135+
136+
FutureTask<Integer> f = new FutureTask<Integer>(new Runnable() {
137+
@Override
138+
public void run() {
139+
140+
}
141+
}, 1);
142+
143+
f.run();
144+
145+
Observable.from(f).subscribe(ts);
146+
147+
ts.assertNoValues();
148+
149+
ts.requestMore(1);
150+
151+
ts.assertValue(1);
152+
ts.assertNoErrors();
153+
ts.assertCompleted();
154+
}
142155
}

0 commit comments

Comments
 (0)