Skip to content

Commit e2331e3

Browse files
committed
1.x: observeOn now replenishes with constant rate
1 parent add272d commit e2331e3

File tree

3 files changed

+75
-28
lines changed

3 files changed

+75
-28
lines changed

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,8 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
8383
final NotificationLite<T> on;
8484
final boolean delayError;
8585
final Queue<Object> queue;
86-
final int bufferSize;
86+
/** The emission threshold that should trigger a replenishing request. */
87+
final int limit;
8788

8889
// the status of the current stream
8990
volatile boolean finished;
@@ -97,6 +98,9 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
9798
* reading finished (acquire).
9899
*/
99100
Throwable error;
101+
102+
/** Remembers how many elements have been emitted before the requests run out. */
103+
long emitted;
100104

101105
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
102106
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
@@ -105,12 +109,16 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boo
105109
this.recursiveScheduler = scheduler.createWorker();
106110
this.delayError = delayError;
107111
this.on = NotificationLite.instance();
108-
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
112+
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
113+
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
114+
this.limit = calculatedSize - (calculatedSize >> 2);
109115
if (UnsafeAccess.isUnsafeAvailable()) {
110-
queue = new SpscArrayQueue<Object>(this.bufferSize);
116+
queue = new SpscArrayQueue<Object>(calculatedSize);
111117
} else {
112-
queue = new SpscAtomicArrayQueue<Object>(this.bufferSize);
118+
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
113119
}
120+
// signal that this is an async operator capable of receiving this many
121+
request(calculatedSize);
114122
}
115123

116124
void init() {
@@ -133,12 +141,6 @@ public void request(long n) {
133141
localChild.add(this);
134142
}
135143

136-
@Override
137-
public void onStart() {
138-
// signal that this is an async operator capable of receiving this many
139-
request(this.bufferSize);
140-
}
141-
142144
@Override
143145
public void onNext(final T t) {
144146
if (isUnsubscribed() || finished) {
@@ -180,9 +182,8 @@ protected void schedule() {
180182
// only execute this from schedule()
181183
@Override
182184
public void call() {
183-
long emitted = 0L;
184-
185185
long missed = 1L;
186+
long currentEmission = emitted;
186187

187188
// these are accessed in a tight loop around atomics so
188189
// loading them into local variables avoids the mandatory re-reading
@@ -197,7 +198,6 @@ public void call() {
197198

198199
for (;;) {
199200
long requestAmount = requested.get();
200-
long currentEmission = 0L;
201201

202202
while (requestAmount != currentEmission) {
203203
boolean done = finished;
@@ -215,28 +215,25 @@ public void call() {
215215
localChild.onNext(localOn.getValue(v));
216216

217217
currentEmission++;
218-
emitted++;
218+
if (currentEmission == limit) {
219+
requestAmount = BackpressureUtils.produced(requested, currentEmission);
220+
request(currentEmission);
221+
currentEmission = 0L;
222+
}
219223
}
220224

221225
if (requestAmount == currentEmission) {
222226
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
223227
return;
224228
}
225229
}
226-
227-
if (currentEmission != 0L) {
228-
BackpressureUtils.produced(requested, currentEmission);
229-
}
230-
230+
231+
emitted = currentEmission;
231232
missed = counter.addAndGet(-missed);
232233
if (missed == 0L) {
233234
break;
234235
}
235236
}
236-
237-
if (emitted != 0L) {
238-
request(emitted);
239-
}
240237
}
241238

242239
boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
@@ -285,4 +282,4 @@ boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a,
285282
return false;
286283
}
287284
}
288-
}
285+
}

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 50 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -864,7 +864,7 @@ public void testErrorDelayedAsync() {
864864

865865
@Test
866866
public void requestExactCompletesImmediately() {
867-
TestSubscriber<Integer> ts = TestSubscriber.create(0);
867+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
868868

869869
TestScheduler test = Schedulers.test();
870870

@@ -884,4 +884,53 @@ public void requestExactCompletesImmediately() {
884884
ts.assertNoErrors();
885885
ts.assertCompleted();
886886
}
887+
888+
@Test
889+
public void fixedReplenishPattern() {
890+
TestSubscriber<Integer> ts = TestSubscriber.create(0);
891+
892+
TestScheduler test = Schedulers.test();
893+
894+
final List<Long> requests = new ArrayList<Long>();
895+
896+
Observable.range(1, 100)
897+
.doOnRequest(new Action1<Long>() {
898+
@Override
899+
public void call(Long v) {
900+
requests.add(v);
901+
}
902+
})
903+
.observeOn(test, 16).subscribe(ts);
904+
905+
test.advanceTimeBy(1, TimeUnit.SECONDS);
906+
ts.requestMore(20);
907+
test.advanceTimeBy(1, TimeUnit.SECONDS);
908+
ts.requestMore(10);
909+
test.advanceTimeBy(1, TimeUnit.SECONDS);
910+
ts.requestMore(50);
911+
test.advanceTimeBy(1, TimeUnit.SECONDS);
912+
ts.requestMore(35);
913+
test.advanceTimeBy(1, TimeUnit.SECONDS);
914+
915+
ts.assertValueCount(100);
916+
ts.assertCompleted();
917+
ts.assertNoErrors();
918+
919+
assertEquals(Arrays.asList(16L, 12L, 12L, 12L, 12L, 12L, 12L, 12L, 12L), requests);
920+
}
921+
922+
@Test
923+
public void bufferSizesWork() {
924+
for (int i = 1; i <= 1024; i = i * 2) {
925+
TestSubscriber<Integer> ts = TestSubscriber.create();
926+
927+
Observable.range(1, 1000 * 1000).observeOn(Schedulers.computation(), i)
928+
.subscribe(ts);
929+
930+
ts.awaitTerminalEvent();
931+
ts.assertValueCount(1000 * 1000);
932+
ts.assertCompleted();
933+
ts.assertNoErrors();
934+
}
935+
}
887936
}

src/test/java/rx/observables/AsyncOnSubscribeTest.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -383,7 +383,8 @@ public void call() {
383383
}}));
384384
break;
385385
case 2:
386-
observer.onNext(Observable.<Integer>never()
386+
observer.onNext(Observable.just(1)
387+
.concatWith(Observable.<Integer>never())
387388
.subscribeOn(scheduler)
388389
.doOnUnsubscribe(new Action0(){
389390
@Override
@@ -397,7 +398,7 @@ public void call() {
397398
return state + 1;
398399
}});
399400
Subscription subscription = Observable.create(os)
400-
.observeOn(scheduler)
401+
.observeOn(scheduler, 1)
401402
.subscribe(subscriber);
402403
sub.set(subscription);
403404
subscriber.assertNoValues();
@@ -465,4 +466,4 @@ public Integer call(Integer state, Long requested, Observer<Observable<? extends
465466

466467
subscriber.assertNotCompleted();
467468
}
468-
}
469+
}

0 commit comments

Comments
 (0)