Skip to content

Commit 9a8daca

Browse files
committed
1.x: observeOn now replenishes with constant rate
1 parent d72a41e commit 9a8daca

File tree

3 files changed

+92
-82
lines changed

3 files changed

+92
-82
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 19 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
8383
final NotificationLite<T> on;
8484
final boolean delayError;
8585
final Queue<Object> queue;
86-
final int bufferSize;
86+
/** The emission threshold that should trigger a replenishing request. */
87+
final int limit;
8788

8889
// the status of the current stream
8990
volatile boolean finished;
@@ -97,6 +98,9 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
9798
* reading finished (acquire).
9899
*/
99100
Throwable error;
101+
102+
/** Remembers how many elements have been emitted before the requests run out. */
103+
long emitted;
100104

101105
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
102106
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
@@ -105,12 +109,15 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boo
105109
this.recursiveScheduler = scheduler.createWorker();
106110
this.delayError = delayError;
107111
this.on = NotificationLite.instance();
108-
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
112+
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
113+
this.limit = calculatedSize - (calculatedSize >> 2);
109114
if (UnsafeAccess.isUnsafeAvailable()) {
110-
queue = new SpscArrayQueue<Object>(this.bufferSize);
115+
queue = new SpscArrayQueue<Object>(calculatedSize);
111116
} else {
112-
queue = new SpscAtomicArrayQueue<Object>(this.bufferSize);
117+
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
113118
}
119+
// signal that this is an async operator capable of receiving this many
120+
request(calculatedSize);
114121
}
115122

116123
void init() {
@@ -133,12 +140,6 @@ public void request(long n) {
133140
localChild.add(this);
134141
}
135142

136-
@Override
137-
public void onStart() {
138-
// signal that this is an async operator capable of receiving this many
139-
request(this.bufferSize);
140-
}
141-
142143
@Override
143144
public void onNext(final T t) {
144145
if (isUnsubscribed() || finished) {
@@ -180,9 +181,8 @@ protected void schedule() {
180181
// only execute this from schedule()
181182
@Override
182183
public void call() {
183-
long emitted = 0L;
184-
185184
long missed = 1L;
185+
long currentEmission = emitted;
186186

187187
// these are accessed in a tight loop around atomics so
188188
// loading them into local variables avoids the mandatory re-reading
@@ -197,7 +197,6 @@ public void call() {
197197

198198
for (;;) {
199199
long requestAmount = requested.get();
200-
long currentEmission = 0L;
201200

202201
while (requestAmount != currentEmission) {
203202
boolean done = finished;
@@ -215,28 +214,25 @@ public void call() {
215214
localChild.onNext(localOn.getValue(v));
216215

217216
currentEmission++;
218-
emitted++;
217+
if (currentEmission == limit) {
218+
requestAmount = BackpressureUtils.produced(requested, currentEmission);
219+
request(currentEmission);
220+
currentEmission = 0L;
221+
}
219222
}
220223

221224
if (requestAmount == currentEmission) {
222225
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
223226
return;
224227
}
225228
}
226-
227-
if (currentEmission != 0L) {
228-
BackpressureUtils.produced(requested, currentEmission);
229-
}
230-
229+
230+
emitted = currentEmission;
231231
missed = counter.addAndGet(-missed);
232232
if (missed == 0L) {
233233
break;
234234
}
235235
}
236-
237-
if (emitted != 0L) {
238-
request(emitted);
239-
}
240236
}
241237

242238
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 70 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -581,65 +581,29 @@ public void onNext(Integer t) {
581581
}
582582

583583
@Test
584-
public void testQueueFullEmitsErrorWithVaryingBufferSize() {
585-
final CountDownLatch latch = new CountDownLatch(1);
586-
// randomize buffer size, note that underlying implementations may be tuning the real size to a power of 2
587-
// which can lead to unexpected results when adding excess capacity (e.g.: see ConcurrentCircularArrayQueue)
584+
public void overflownQueueSignalsMBE() {
588585
for (int i = 1; i <= 1024; i = i * 2) {
589586
final int capacity = i;
590-
Observable<Integer> observable = Observable.create(new OnSubscribe<Integer>() {
591-
592-
@Override
593-
public void call(Subscriber<? super Integer> o) {
594-
for (int i = 0; i < capacity + 10; i++) {
595-
o.onNext(i);
596-
}
597-
latch.countDown();
598-
o.onCompleted();
599-
}
600-
601-
});
602-
603-
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>(new Observer<Integer>() {
604-
605-
@Override
606-
public void onCompleted() {
607-
608-
}
609-
610-
@Override
611-
public void onError(Throwable e) {
612-
613-
}
614-
615-
@Override
616-
public void onNext(Integer t) {
617-
try {
618-
// force it to be slow wait until we have queued everything
619-
latch.await(500, TimeUnit.MILLISECONDS);
620-
} catch (InterruptedException e) {
621-
e.printStackTrace();
622-
}
623-
}
624-
625-
});
626-
System.out.println("Using capacity " + capacity); // for post-failure debugging
627-
observable.observeOn(Schedulers.newThread(), capacity).subscribe(testSubscriber);
628-
629-
testSubscriber.awaitTerminalEvent();
630-
List<Throwable> errors = testSubscriber.getOnErrorEvents();
631-
assertEquals(1, errors.size());
632-
System.out.println("Errors: " + errors);
633-
Throwable t = errors.get(0);
634-
if (t instanceof MissingBackpressureException) {
635-
// success, we expect this
636-
} else {
637-
if (t.getCause() instanceof MissingBackpressureException) {
638-
// this is also okay
639-
} else {
640-
fail("Expecting MissingBackpressureException");
641-
}
587+
System.out.println(">> overflownQueueSignalsMBE @ " + i);
588+
589+
PublishSubject<Integer> ps = PublishSubject.create();
590+
591+
TestSubscriber<Integer> ts = new TestSubscriber<Integer>(0);
592+
593+
TestScheduler test = Schedulers.test();
594+
595+
ps.observeOn(test, capacity).subscribe(ts);
596+
597+
for (int j = 0; j < capacity + 10; j++) {
598+
ps.onNext(j);
642599
}
600+
ps.onCompleted();
601+
602+
test.advanceTimeBy(1, TimeUnit.SECONDS);
603+
604+
ts.assertNoValues();
605+
ts.assertError(MissingBackpressureException.class);
606+
ts.assertNotCompleted();
643607
}
644608
}
645609

@@ -900,7 +864,7 @@ public void testErrorDelayedAsync() {
900864

901865
@Test
902866
public void requestExactCompletesImmediately() {
903-
TestSubscriber<Integer> ts = TestSubscriber.create(0);
867+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
904868

905869
TestScheduler test = Schedulers.test();
906870

@@ -920,4 +884,53 @@ public void requestExactCompletesImmediately() {
920884
ts.assertNoErrors();
921885
ts.assertCompleted();
922886
}
887+
888+
@Test
889+
public void fixedReplenishPattern() {
890+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
891+
892+
TestScheduler test = Schedulers.test();
893+
894+
final List<Long> requests = new ArrayList<Long>();
895+
896+
Observable.range(1, 100)
897+
.doOnRequest(new Action1<Long>() {
898+
@Override
899+
public void call(Long v) {
900+
requests.add(v);
901+
}
902+
})
903+
.observeOn(test, 16).subscribe(ts);
904+
905+
test.advanceTimeBy(1, TimeUnit.SECONDS);
906+
ts.requestMore(20);
907+
test.advanceTimeBy(1, TimeUnit.SECONDS);
908+
ts.requestMore(10);
909+
test.advanceTimeBy(1, TimeUnit.SECONDS);
910+
ts.requestMore(50);
911+
test.advanceTimeBy(1, TimeUnit.SECONDS);
912+
ts.requestMore(35);
913+
test.advanceTimeBy(1, TimeUnit.SECONDS);
914+
915+
ts.assertValueCount(100);
916+
ts.assertCompleted();
917+
ts.assertNoErrors();
918+
919+
assertEquals(Arrays.asList(16L, 12L, 12L, 12L, 12L, 12L, 12L, 12L, 12L), requests);
920+
}
921+
922+
@Test
923+
public void bufferSizesWork() {
924+
for (int i = 1; i <= 1024; i = i * 2) {
925+
TestSubscriber<Integer> ts = TestSubscriber.create();
926+
927+
Observable.range(1, 1000 * 1000).observeOn(Schedulers.computation(), i)
928+
.subscribe(ts);
929+
930+
ts.awaitTerminalEvent();
931+
ts.assertValueCount(1000 * 1000);
932+
ts.assertCompleted();
933+
ts.assertNoErrors();
934+
}
935+
}
923936
}

src/test/java/rx/observables/AsyncOnSubscribeTest.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,8 @@ public void call() {
383383
}}));
384384
break;
385385
case 2:
386-
observer.onNext(Observable.<Integer>never()
386+
observer.onNext(Observable.just(1)
387+
.concatWith(Observable.<Integer>never())
387388
.subscribeOn(scheduler)
388389
.doOnUnsubscribe(new Action0(){
389390
@Override
@@ -397,7 +398,7 @@ public void call() {
397398
return state + 1;
398399
}});
399400
Subscription subscription = Observable.create(os)
400-
.observeOn(scheduler)
401+
.observeOn(scheduler, 1)
401402
.subscribe(subscriber);
402403
sub.set(subscription);
403404
subscriber.assertNoValues();

0 commit comments

Comments
 (0)