Skip to content

Commit d72a41e

Browse files
committed
Merge pull request #3777 from srvaroa/1.x
observeOn: allow configurable buffer size
2 parents 97331fb + ef1d418 commit d72a41e

File tree

3 files changed

+150
-12
lines changed

3 files changed

+150
-12
lines changed

src/main/java/rx/Observable.java

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6291,7 +6291,8 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
62916291

62926292
/**
62936293
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
6294-
* asynchronously with a bounded buffer.
6294+
* asynchronously with a bounded buffer of {@link RxRingBuffer.SIZE} slots.
6295+
*
62956296
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
62966297
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
62976298
* <p>
@@ -6308,13 +6309,41 @@ public final Observable<T> mergeWith(Observable<? extends T> t1) {
63086309
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
63096310
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
63106311
* @see #subscribeOn
6312+
* @see #observeOn(Scheduler, int)
63116313
* @see #observeOn(Scheduler, boolean)
6314+
* @see #observeOn(Scheduler, boolean, int)
63126315
*/
63136316
public final Observable<T> observeOn(Scheduler scheduler) {
6314-
if (this instanceof ScalarSynchronousObservable) {
6315-
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
6316-
}
6317-
return lift(new OperatorObserveOn<T>(scheduler, false));
6317+
return observeOn(scheduler, RxRingBuffer.SIZE);
6318+
}
6319+
6320+
/**
6321+
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
6322+
* asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE}
6323+
* default.
6324+
*
6325+
* <p>Note that onError notifications will cut ahead of onNext notifications on the emission thread if Scheduler is truly
6326+
* asynchronous. If strict event ordering is required, consider using the {@link #observeOn(Scheduler, boolean)} overload.
6327+
* <p>
6328+
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
6329+
* <dl>
6330+
* <dt><b>Scheduler:</b></dt>
6331+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
6332+
* </dl>
6333+
*
6334+
* @param scheduler the {@link Scheduler} to notify {@link Observer}s on
6335+
* @param bufferSize the size of the buffer.
6336+
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
6337+
* {@link Scheduler}
6338+
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
6339+
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
6340+
* @see #subscribeOn
6341+
* @see #observeOn(Scheduler)
6342+
* @see #observeOn(Scheduler, boolean)
6343+
* @see #observeOn(Scheduler, boolean, int)
6344+
*/
6345+
public final Observable<T> observeOn(Scheduler scheduler, int bufferSize) {
6346+
return observeOn(scheduler, false, bufferSize);
63186347
}
63196348

63206349
/**
@@ -6339,12 +6368,45 @@ public final Observable<T> observeOn(Scheduler scheduler) {
63396368
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
63406369
* @see #subscribeOn
63416370
* @see #observeOn(Scheduler)
6371+
* @see #observeOn(Scheduler, int)
6372+
* @see #observeOn(Scheduler, boolean, int)
63426373
*/
63436374
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError) {
6375+
return observeOn(scheduler, delayError, RxRingBuffer.SIZE);
6376+
}
6377+
6378+
/**
6379+
* Modifies an Observable to perform its emissions and notifications on a specified {@link Scheduler},
6380+
* asynchronously with a bounded buffer of configurable size other than the {@link RxRingBuffer.SIZE}
6381+
* default, and optionally delays onError notifications.
6382+
* <p>
6383+
* <img width="640" height="308" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/observeOn.png" alt="">
6384+
* <dl>
6385+
* <dt><b>Scheduler:</b></dt>
6386+
* <dd>you specify which {@link Scheduler} this operator will use</dd>
6387+
* </dl>
6388+
*
6389+
* @param scheduler
6390+
* the {@link Scheduler} to notify {@link Observer}s on
6391+
* @param delayError
6392+
* indicates if the onError notification may not cut ahead of onNext notification on the other side of the
6393+
* scheduling boundary. If true a sequence ending in onError will be replayed in the same order as was received
6394+
* from upstream
6395+
* @param bufferSize the size of the buffer.
6396+
* @return the source Observable modified so that its {@link Observer}s are notified on the specified
6397+
* {@link Scheduler}
6398+
* @see <a href="http://reactivex.io/documentation/operators/observeon.html">ReactiveX operators documentation: ObserveOn</a>
6399+
* @see <a href="http://www.grahamlea.com/2014/07/rxjava-threading-examples/">RxJava Threading Examples</a>
6400+
* @see #subscribeOn
6401+
* @see #observeOn(Scheduler)
6402+
* @see #observeOn(Scheduler, int)
6403+
* @see #observeOn(Scheduler, boolean)
6404+
*/
6405+
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
63446406
if (this instanceof ScalarSynchronousObservable) {
63456407
return ((ScalarSynchronousObservable<T>)this).scalarScheduleOn(scheduler);
63466408
}
6347-
return lift(new OperatorObserveOn<T>(scheduler, delayError));
6409+
return lift(new OperatorObserveOn<T>(scheduler, delayError, bufferSize));
63486410
}
63496411

63506412
/**

src/main/java/rx/internal/operators/OperatorObserveOn.java

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,25 @@ public final class OperatorObserveOn<T> implements Operator<T, T> {
4040

4141
private final Scheduler scheduler;
4242
private final boolean delayError;
43+
private final int bufferSize;
4344

4445
/**
4546
* @param scheduler the scheduler to use
4647
* @param delayError delay errors until all normal events are emitted in the other thread?
4748
*/
4849
public OperatorObserveOn(Scheduler scheduler, boolean delayError) {
50+
this(scheduler, delayError, RxRingBuffer.SIZE);
51+
}
52+
53+
/**
54+
* @param scheduler the scheduler to use
55+
* @param delayError delay errors until all normal events are emitted in the other thread?
56+
* @param bufferSize for the buffer feeding the Scheduler workers, defaults to {@code RxRingBuffer.MAX} if <= 0
57+
*/
58+
public OperatorObserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
4959
this.scheduler = scheduler;
5060
this.delayError = delayError;
61+
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
5162
}
5263

5364
@Override
@@ -59,7 +70,7 @@ public Subscriber<? super T> call(Subscriber<? super T> child) {
5970
// avoid overhead, execute directly
6071
return child;
6172
} else {
62-
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError);
73+
ObserveOnSubscriber<T> parent = new ObserveOnSubscriber<T>(scheduler, child, delayError, bufferSize);
6374
parent.init();
6475
return parent;
6576
}
@@ -72,6 +83,7 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
7283
final NotificationLite<T> on;
7384
final boolean delayError;
7485
final Queue<Object> queue;
86+
final int bufferSize;
7587

7688
// the status of the current stream
7789
volatile boolean finished;
@@ -88,15 +100,16 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
88100

89101
// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
90102
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
91-
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError) {
103+
public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boolean delayError, int bufferSize) {
92104
this.child = child;
93105
this.recursiveScheduler = scheduler.createWorker();
94106
this.delayError = delayError;
95107
this.on = NotificationLite.instance();
108+
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
96109
if (UnsafeAccess.isUnsafeAvailable()) {
97-
queue = new SpscArrayQueue<Object>(RxRingBuffer.SIZE);
110+
queue = new SpscArrayQueue<Object>(this.bufferSize);
98111
} else {
99-
queue = new SpscAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
112+
queue = new SpscAtomicArrayQueue<Object>(this.bufferSize);
100113
}
101114
}
102115

@@ -123,7 +136,7 @@ public void request(long n) {
123136
@Override
124137
public void onStart() {
125138
// signal that this is an async operator capable of receiving this many
126-
request(RxRingBuffer.SIZE);
139+
request(this.bufferSize);
127140
}
128141

129142
@Override
@@ -180,7 +193,7 @@ public void call() {
180193

181194
// requested and counter are not included to avoid JIT issues with register spilling
182195
// and their access is is amortized because they are part of the outer loop which runs
183-
// less frequently (usually after each RxRingBuffer.SIZE elements)
196+
// less frequently (usually after each bufferSize elements)
184197

185198
for (;;) {
186199
long requestAmount = requested.get();

src/test/java/rx/internal/operators/OperatorObserveOnTest.java

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -580,6 +580,69 @@ public void onNext(Integer t) {
580580
}
581581
}
582582

583+
@Test
584+
public void testQueueFullEmitsErrorWithVaryingBufferSize() {
585+
final CountDownLatch latch = new CountDownLatch(1);
586+
// randomize buffer size, note that underlying implementations may be tuning the real size to a power of 2
587+
// which can lead to unexpected results when adding excess capacity (e.g.: see ConcurrentCircularArrayQueue)
588+
for (int i = 1; i <= 1024; i = i * 2) {
589+
final int capacity = i;
590+
Observable<Integer> observable = Observable.create(new OnSubscribe<Integer>() {
591+
592+
@Override
593+
public void call(Subscriber<? super Integer> o) {
594+
for (int i = 0; i < capacity + 10; i++) {
595+
o.onNext(i);
596+
}
597+
latch.countDown();
598+
o.onCompleted();
599+
}
600+
601+
});
602+
603+
TestSubscriber<Integer> testSubscriber = new TestSubscriber<Integer>(new Observer<Integer>() {
604+
605+
@Override
606+
public void onCompleted() {
607+
608+
}
609+
610+
@Override
611+
public void onError(Throwable e) {
612+
613+
}
614+
615+
@Override
616+
public void onNext(Integer t) {
617+
try {
618+
// force it to be slow wait until we have queued everything
619+
latch.await(500, TimeUnit.MILLISECONDS);
620+
} catch (InterruptedException e) {
621+
e.printStackTrace();
622+
}
623+
}
624+
625+
});
626+
System.out.println("Using capacity " + capacity); // for post-failure debugging
627+
observable.observeOn(Schedulers.newThread(), capacity).subscribe(testSubscriber);
628+
629+
testSubscriber.awaitTerminalEvent();
630+
List<Throwable> errors = testSubscriber.getOnErrorEvents();
631+
assertEquals(1, errors.size());
632+
System.out.println("Errors: " + errors);
633+
Throwable t = errors.get(0);
634+
if (t instanceof MissingBackpressureException) {
635+
// success, we expect this
636+
} else {
637+
if (t.getCause() instanceof MissingBackpressureException) {
638+
// this is also okay
639+
} else {
640+
fail("Expecting MissingBackpressureException");
641+
}
642+
}
643+
}
644+
}
645+
583646
@Test
584647
public void testAsyncChild() {
585648
TestSubscriber<Integer> ts = new TestSubscriber<Integer>();

0 commit comments

Comments
 (0)