From 8eb6482ca7e343a4ab76f5daafb1866878337182 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Fri, 1 May 2015 10:04:00 +1000 Subject: [PATCH 1/3] fix OperatorObserveOn race condition where onComplete could be emitted despite onError being called --- .../internal/operators/OperatorObserveOn.java | 89 ++++++++++--------- .../rx/operators/OperatorObserveOnPerf.java | 21 +++++ 2 files changed, 69 insertions(+), 41 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 13a78ca14c..ae013f9d3a 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -75,15 +75,19 @@ private static final class ObserveOnSubscriber extends Subscriber { final NotificationLite on = NotificationLite.instance(); final Queue queue; - volatile boolean completed = false; - volatile boolean failure = false; + + // the status of the current stream + volatile boolean finished = false; + @SuppressWarnings("unused") volatile long requested = 0; + @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested"); @SuppressWarnings("unused") volatile long counter; + @SuppressWarnings("rawtypes") static final AtomicLongFieldUpdater COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter"); @@ -127,7 +131,7 @@ public void onStart() { @Override public void onNext(final T t) { - if (isUnsubscribed() || completed) { + if (isUnsubscribed()) { return; } if (!queue.offer(on.next(t))) { @@ -139,30 +143,23 @@ public void onNext(final T t) { @Override public void onCompleted() { - if (isUnsubscribed() || completed) { + if (isUnsubscribed() || finished) { return; } - if (error != null) { - return; - } - completed = true; + finished = true; schedule(); } @Override public void onError(final Throwable e) { - if (isUnsubscribed() || completed) { - return; - } - if (error != null) { + if (isUnsubscribed() || finished) { return; } error = e; // unsubscribe eagerly since time will pass before the scheduled onError results in an unsubscribe event unsubscribe(); - // mark failure so the polling thread will skip onNext still in the queue - completed = true; - failure = true; + finished = true; + // polling thread should skip any onNext still in the queue schedule(); } @@ -191,41 +188,51 @@ void pollQueue() { */ counter = 1; -// middle: while (!scheduledUnsubscribe.isUnsubscribed()) { - if (failure) { - child.onError(error); - return; - } else { - if (requested == 0 && completed && queue.isEmpty()) { + if (finished) { + // only read volatile error once + Throwable err = error; + if (err != null) { + // clear the queue to enable gc + queue.clear(); + // even if there are onNext in the queue we eagerly notify of error + child.onError(err); + return; + } else if (queue.isEmpty()) { child.onCompleted(); return; } - if (REQUESTED.getAndDecrement(this) != 0) { - Object o = queue.poll(); - if (o == null) { - if (completed) { - if (failure) { - child.onError(error); - } else { - child.onCompleted(); - } + } + if (REQUESTED.getAndDecrement(this) != 0) { + Object o = queue.poll(); + if (o == null) { + // nothing in queue (but be careful, something could be added concurrently right now) + if (finished) { + // only read volatile error once + Throwable err = error; + if (err != null) { + // clear the queue to enable gc + queue.clear(); + // even if there are onNext in the queue we eagerly notify of error + child.onError(err); + return; + } else if (queue.isEmpty()) { + child.onCompleted(); return; - } - // nothing in queue - REQUESTED.incrementAndGet(this); - break; - } else { - if (!on.accept(child, o)) { - // non-terminal event so let's increment count - emitted++; } } - } else { - // we hit the end ... so increment back to 0 again - REQUESTED.incrementAndGet(this); + BackpressureUtils.getAndAddRequest(REQUESTED, this, 1); break; + } else { + if (!on.accept(child, o)) { + // non-terminal event so let's increment count + emitted++; + } } + } else { + // we hit the end ... so increment back to 0 again + BackpressureUtils.getAndAddRequest(REQUESTED, this, 1); + break; } } } while (COUNTER_UPDATER.decrementAndGet(this) > 0); diff --git a/src/perf/java/rx/operators/OperatorObserveOnPerf.java b/src/perf/java/rx/operators/OperatorObserveOnPerf.java index 80feb8ecb6..a105f09548 100644 --- a/src/perf/java/rx/operators/OperatorObserveOnPerf.java +++ b/src/perf/java/rx/operators/OperatorObserveOnPerf.java @@ -66,5 +66,26 @@ public void observeOnImmediate(Input input) throws InterruptedException { input.observable.observeOn(Schedulers.immediate()).subscribe(o); o.latch.await(); } + + @Benchmark + public void observeOnComputationSubscribedOnComputation(Input input) throws InterruptedException { + LatchedObserver o = input.newLatchedObserver(); + input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.computation()).subscribe(o); + o.latch.await(); + } + + @Benchmark + public void observeOnNewThreadSubscribedOnComputation(Input input) throws InterruptedException { + LatchedObserver o = input.newLatchedObserver(); + input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.newThread()).subscribe(o); + o.latch.await(); + } + + @Benchmark + public void observeOnImmediateSubscribedOnComputation(Input input) throws InterruptedException { + LatchedObserver o = input.newLatchedObserver(); + input.observable.subscribeOn(Schedulers.computation()).observeOn(Schedulers.immediate()).subscribe(o); + o.latch.await(); + } } From 18caf0e77ae1f7323095c7bd2fa93464a8146368 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Sun, 10 May 2015 07:49:37 +1000 Subject: [PATCH 2/3] use new pollQueue from @akarnokd --- .../internal/operators/OperatorObserveOn.java | 60 +++++++------------ 1 file changed, 20 insertions(+), 40 deletions(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index ae013f9d3a..08f4088ceb 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -182,62 +182,42 @@ protected void schedule() { void pollQueue() { int emitted = 0; do { - /* - * Set to 1 otherwise it could have grown very large while in the last poll loop - * and then we can end up looping all those times again here before exiting even once we've drained - */ counter = 1; - - while (!scheduledUnsubscribe.isUnsubscribed()) { + long produced = 0; + long r = requested; + while (!child.isUnsubscribed()) { + Throwable error; if (finished) { - // only read volatile error once - Throwable err = error; - if (err != null) { - // clear the queue to enable gc + if ((error = this.error) != null) { + // errors shortcut the queue so + // release the elements in the queue for gc queue.clear(); - // even if there are onNext in the queue we eagerly notify of error - child.onError(err); + child.onError(error); return; - } else if (queue.isEmpty()) { + } else + if (queue.isEmpty()) { child.onCompleted(); return; } } - if (REQUESTED.getAndDecrement(this) != 0) { + if (r > 0) { Object o = queue.poll(); - if (o == null) { - // nothing in queue (but be careful, something could be added concurrently right now) - if (finished) { - // only read volatile error once - Throwable err = error; - if (err != null) { - // clear the queue to enable gc - queue.clear(); - // even if there are onNext in the queue we eagerly notify of error - child.onError(err); - return; - } else if (queue.isEmpty()) { - child.onCompleted(); - return; - } - } - BackpressureUtils.getAndAddRequest(REQUESTED, this, 1); - break; + if (o != null) { + child.onNext(on.getValue(o)); + r--; + emitted++; + produced++; } else { - if (!on.accept(child, o)) { - // non-terminal event so let's increment count - emitted++; - } + break; } } else { - // we hit the end ... so increment back to 0 again - BackpressureUtils.getAndAddRequest(REQUESTED, this, 1); break; } } + if (produced > 0) { + REQUESTED.addAndGet(this, -produced); + } } while (COUNTER_UPDATER.decrementAndGet(this) > 0); - - // request the number of items that we emitted in this poll loop if (emitted > 0) { request(emitted); } From c9d932420cb68e4c9703224cdfefc51cdcb161b8 Mon Sep 17 00:00:00 2001 From: Dave Moten Date: Wed, 13 May 2015 16:28:14 +1000 Subject: [PATCH 3/3] don't reduce requested by produced if requested is Long.MAX_VALUE --- src/main/java/rx/internal/operators/OperatorObserveOn.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 08f4088ceb..e15c2f93cf 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -214,7 +214,7 @@ void pollQueue() { break; } } - if (produced > 0) { + if (produced > 0 && requested != Long.MAX_VALUE) { REQUESTED.addAndGet(this, -produced); } } while (COUNTER_UPDATER.decrementAndGet(this) > 0);