Skip to content

1.x: observeOn now replenishes with constant rate #3795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Apr 8, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 21 additions & 24 deletions src/main/java/rx/internal/operators/OperatorObserveOn.java
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
final NotificationLite<T> on;
final boolean delayError;
final Queue<Object> queue;
final int bufferSize;
/** The emission threshold that should trigger a replenishing request. */
final int limit;

// the status of the current stream
volatile boolean finished;
Expand All @@ -97,6 +98,9 @@ private static final class ObserveOnSubscriber<T> extends Subscriber<T> implemen
* reading finished (acquire).
*/
Throwable error;

/** Remembers how many elements have been emitted before the requests run out. */
long emitted;

// do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should
// not prevent anything downstream from consuming, which will happen if the Subscription is chained
Expand All @@ -105,12 +109,16 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber<? super T> child, boo
this.recursiveScheduler = scheduler.createWorker();
this.delayError = delayError;
this.on = NotificationLite.instance();
this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
int calculatedSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE;
// this formula calculates the 75% of the bufferSize, rounded up to the next integer
this.limit = calculatedSize - (calculatedSize >> 2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about a less efficient (but more readable): (int) (calculatedSize * 0.75) ?

if (UnsafeAccess.isUnsafeAvailable()) {
queue = new SpscArrayQueue<Object>(this.bufferSize);
queue = new SpscArrayQueue<Object>(calculatedSize);
} else {
queue = new SpscAtomicArrayQueue<Object>(this.bufferSize);
queue = new SpscAtomicArrayQueue<Object>(calculatedSize);
}
// signal that this is an async operator capable of receiving this many
request(calculatedSize);
}

void init() {
Expand All @@ -133,12 +141,6 @@ public void request(long n) {
localChild.add(this);
}

@Override
public void onStart() {
// signal that this is an async operator capable of receiving this many
request(this.bufferSize);
}

@Override
public void onNext(final T t) {
if (isUnsubscribed() || finished) {
Expand Down Expand Up @@ -180,9 +182,8 @@ protected void schedule() {
// only execute this from schedule()
@Override
public void call() {
long emitted = 0L;

long missed = 1L;
long currentEmission = emitted;

// these are accessed in a tight loop around atomics so
// loading them into local variables avoids the mandatory re-reading
Expand All @@ -197,7 +198,6 @@ public void call() {

for (;;) {
long requestAmount = requested.get();
long currentEmission = 0L;

while (requestAmount != currentEmission) {
boolean done = finished;
Expand All @@ -215,28 +215,25 @@ public void call() {
localChild.onNext(localOn.getValue(v));

currentEmission++;
emitted++;
if (currentEmission == limit) {
requestAmount = BackpressureUtils.produced(requested, currentEmission);
request(currentEmission);
currentEmission = 0L;
}
}

if (requestAmount == currentEmission) {
if (checkTerminated(finished, q.isEmpty(), localChild, q)) {
return;
}
}

if (currentEmission != 0L) {
BackpressureUtils.produced(requested, currentEmission);
}


emitted = currentEmission;
missed = counter.addAndGet(-missed);
if (missed == 0L) {
break;
}
}

if (emitted != 0L) {
request(emitted);
}
}

boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a, Queue<Object> q) {
Expand Down Expand Up @@ -285,4 +282,4 @@ boolean checkTerminated(boolean done, boolean isEmpty, Subscriber<? super T> a,
return false;
}
}
}
}
51 changes: 50 additions & 1 deletion src/test/java/rx/internal/operators/OperatorObserveOnTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -864,7 +864,7 @@ public void testErrorDelayedAsync() {

@Test
public void requestExactCompletesImmediately() {
TestSubscriber<Integer> ts = TestSubscriber.create(0);
TestSubscriber<Integer> ts = TestSubscriber.create(0);

TestScheduler test = Schedulers.test();

Expand All @@ -884,4 +884,53 @@ public void requestExactCompletesImmediately() {
ts.assertNoErrors();
ts.assertCompleted();
}

@Test
public void fixedReplenishPattern() {
TestSubscriber<Integer> ts = TestSubscriber.create(0);

TestScheduler test = Schedulers.test();

final List<Long> requests = new ArrayList<Long>();

Observable.range(1, 100)
.doOnRequest(new Action1<Long>() {
@Override
public void call(Long v) {
requests.add(v);
}
})
.observeOn(test, 16).subscribe(ts);

test.advanceTimeBy(1, TimeUnit.SECONDS);
ts.requestMore(20);
test.advanceTimeBy(1, TimeUnit.SECONDS);
ts.requestMore(10);
test.advanceTimeBy(1, TimeUnit.SECONDS);
ts.requestMore(50);
test.advanceTimeBy(1, TimeUnit.SECONDS);
ts.requestMore(35);
test.advanceTimeBy(1, TimeUnit.SECONDS);

ts.assertValueCount(100);
ts.assertCompleted();
ts.assertNoErrors();

assertEquals(Arrays.asList(16L, 12L, 12L, 12L, 12L, 12L, 12L, 12L, 12L), requests);
}

@Test
public void bufferSizesWork() {
for (int i = 1; i <= 1024; i = i * 2) {
TestSubscriber<Integer> ts = TestSubscriber.create();

Observable.range(1, 1000 * 1000).observeOn(Schedulers.computation(), i)
.subscribe(ts);

ts.awaitTerminalEvent();
ts.assertValueCount(1000 * 1000);
ts.assertCompleted();
ts.assertNoErrors();
}
}
}
7 changes: 4 additions & 3 deletions src/test/java/rx/observables/AsyncOnSubscribeTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,8 @@ public void call() {
}}));
break;
case 2:
observer.onNext(Observable.<Integer>never()
observer.onNext(Observable.just(1)
.concatWith(Observable.<Integer>never())
.subscribeOn(scheduler)
.doOnUnsubscribe(new Action0(){
@Override
Expand All @@ -397,7 +398,7 @@ public void call() {
return state + 1;
}});
Subscription subscription = Observable.create(os)
.observeOn(scheduler)
.observeOn(scheduler, 1)
.subscribe(subscriber);
sub.set(subscription);
subscriber.assertNoValues();
Expand Down Expand Up @@ -465,4 +466,4 @@ public Integer call(Integer state, Long requested, Observer<Observable<? extends

subscriber.assertNotCompleted();
}
}
}