diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 13a78ca14c..e15c2f93cf 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -75,15 +75,19 @@ private static final class ObserveOnSubscriber extends Subscriber { final NotificationLite on = NotificationLite.instance(); final Queue queue; - volatile boolean completed = false; - volatile boolean failure = false; + + // the status of the current stream + volatile boolean finished = false; + @SuppressWarnings("unused") volatile long requested = 0; + @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested"); @SuppressWarnings("unused") volatile long counter; + @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter"); @@ -127,7 +131,7 @@ public void onStart() { @Override public void onNext(final T t) { - if (isUnsubscribed() || completed) { + if (isUnsubscribed()) { return; } if (!queue.offer(on.next(t))) { @@ -139,30 +143,23 @@ public void onNext(final T t) { @Override public void onCompleted() { - if (isUnsubscribed() || completed) { + if (isUnsubscribed() || finished) { return; } - if (error != null) { - return; - } - completed = true; + finished = true; schedule(); } @Override public void onError(final Throwable e) { - if (isUnsubscribed() || completed) { - return; - } - if (error != null) { + if (isUnsubscribed() || finished) { return; } error = e; // unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event unsubscribe(); - // mark failure so the polling thread will skip onNext still in the queue - completed = true; - failure = true; + finished = true; + // polling thread should skip any onNext still in the queue schedule(); } @@ -185,52 +182,42 @@ protected void schedule() { void pollQueue() { int emitted = 0; do { - /* - * Set to 1 otherwise it could have grown very large while in the last poll loop - * and then we can end up looping all those times again here before exiting even once we've drained - */ counter = 1; - -// middle: - while (!scheduledUnsubscribe.isUnsubscribed()) { - if (failure) { - child.onError(error); - return; - } else { - if (requested == 0 && completed && queue.isEmpty()) { + long produced = 0; + long r = requested; + while (!child.isUnsubscribed()) { + Throwable error; + if (finished) { + if ((error = this.error) != null) { + // errors shortcut the queue so + // release the elements in the queue for gc + queue.clear(); + child.onError(error); + return; + } else + if (queue.isEmpty()) { child.onCompleted(); return; } - if (REQUESTED.getAndDecrement(this) != 0) { - Object o = queue.poll(); - if (o == null) { - if (completed) { - if (failure) { - child.onError(error); - } else { - child.onCompleted(); - } - return; - } - // nothing in queue - REQUESTED.incrementAndGet(this); - break; - } else { - if (!on.accept(child, o)) { - // non-terminal event so let's increment count - emitted++; - } - } + } + if (r > 0) { + Object o = queue.poll(); + if (o != null) { + child.onNext(on.getValue(o)); + r--; + emitted++; + produced++; } else { - // we hit the end ... so increment back to 0 again - REQUESTED.incrementAndGet(this); break; } + } else { + break; } } + if (produced > 0 && requested != Long.MAX_VALUE) { + REQUESTED.addAndGet(this, -produced); + } } while (COUNTER_UPDATER.decrementAndGet(this) > 0); - - // request the number of items that we emitted in this poll loop if (emitted > 0) { request(emitted); } diff --git a/src/perf/java/rx/operators/OperatorObserveOnPerf.java b/src/perf/java/rx/operators/OperatorObserveOnPerf.java index 80feb8ecb6..a105f09548 100644 --- a/src/perf/java/rx/operators/OperatorObserveOnPerf.java +++ b/src/perf/java/rx/operators/OperatorObserveOnPerf.java @@ -66,5 +66,26 @@ public void observeOnImmediate(Input input) throws InterruptedException { input.observable.observeOn(Schedulers.immediate()).subscribe(o); o.latch.await(); } + + @Benchmark + public void observeOnComputationSubscribedOnComputation(Input input) throws InterruptedException { + LatchedObserver o = input.newLatchedObserver(); + input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).subscribe(o); + o.latch.await(); + } + + @Benchmark + public void observeOnNewThreadSubscribedOnComputation(Input input) throws InterruptedException { + LatchedObserver o = input.newLatchedObserver(); + input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).subscribe(o); + o.latch.await(); + } + + @Benchmark + public void observeOnImmediateSubscribedOnComputation(Input input) throws InterruptedException { + LatchedObserver o = input.newLatchedObserver(); + input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.immediate()).subscribe(o); + o.latch.await(); + } }