From d01cd061e378494d342e6dd03d469ff31a42184a Mon Sep 17 00:00:00 2001 From: Mark Rietveld Date: Mon, 2 Nov 2015 17:57:07 -0800 Subject: [PATCH] 1.x Remove all instances of Atomic*FieldUpdater Replace them all with their respective Atomic* counterparts For example AtomicLongFieldUpdater -> AtomicLong Addresses https://github.com/ReactiveX/RxJava/issues/3459 --- .../operators/BlockingOperatorLatest.java | 12 +- .../operators/BlockingOperatorNext.java | 12 +- .../operators/BufferUntilSubscriber.java | 28 ++-- .../operators/OnSubscribeCombineLatest.java | 11 +- .../rx/internal/operators/OperatorConcat.java | 35 ++--- .../operators/OperatorMaterialize.java | 17 +- .../internal/operators/OperatorObserveOn.java | 38 ++--- .../operators/OperatorRetryWithPredicate.java | 11 +- .../operators/OperatorSampleWithTime.java | 12 +- .../operators/OperatorTimeoutBase.java | 27 ++-- .../rx/internal/operators/OperatorZip.java | 15 +- .../operators/TakeLastQueueProducer.java | 26 ++-- .../util/BackpressureDrainManager.java | 19 +-- .../rx/internal/util/PaddedAtomicInteger.java | 30 ---- .../util/PaddedAtomicIntegerBase.java | 84 ---------- .../rx/internal/util/RxThreadFactory.java | 9 +- .../util/SubscriptionIndexedRingBuffer.java | 145 ------------------ .../rx/schedulers/TrampolineScheduler.java | 7 +- src/main/java/rx/subjects/AsyncSubject.java | 14 +- .../java/rx/subjects/BehaviorSubject.java | 22 +-- src/main/java/rx/subjects/PublishSubject.java | 8 +- src/main/java/rx/subjects/ReplaySubject.java | 45 +++--- .../subjects/SubjectSubscriptionManager.java | 37 ++--- src/main/java/rx/subjects/TestSubject.java | 2 +- .../rx/subscriptions/BooleanSubscription.java | 26 ++-- .../MultipleAssignmentSubscription.java | 20 +-- .../subscriptions/RefCountSubscription.java | 34 ++-- .../rx/subscriptions/SerialSubscription.java | 20 +-- 28 files changed, 223 insertions(+), 543 deletions(-) delete mode 100644 src/main/java/rx/internal/util/PaddedAtomicInteger.java delete mode 100644 src/main/java/rx/internal/util/PaddedAtomicIntegerBase.java delete mode 100644 src/main/java/rx/internal/util/SubscriptionIndexedRingBuffer.java diff --git a/src/main/java/rx/internal/operators/BlockingOperatorLatest.java b/src/main/java/rx/internal/operators/BlockingOperatorLatest.java index c5f90f3828..5b2b798995 100644 --- a/src/main/java/rx/internal/operators/BlockingOperatorLatest.java +++ b/src/main/java/rx/internal/operators/BlockingOperatorLatest.java @@ -18,7 +18,7 @@ import java.util.Iterator; import java.util.NoSuchElementException; import java.util.concurrent.Semaphore; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import rx.Notification; import rx.Observable; @@ -59,15 +59,11 @@ public Iterator iterator() { static final class LatestObserverIterator extends Subscriber> implements Iterator { final Semaphore notify = new Semaphore(0); // observer's notification - volatile Notification value; - /** Updater for the value field. */ - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater REFERENCE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value"); + final AtomicReference> value = new AtomicReference>(); @Override public void onNext(Notification args) { - boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null; + boolean wasntAvailable = value.getAndSet(args) == null; if (wasntAvailable) { notify.release(); } @@ -103,7 +99,7 @@ public boolean hasNext() { } @SuppressWarnings("unchecked") - Notification n = REFERENCE_UPDATER.getAndSet(this, null); + Notification n = value.getAndSet(null); iNotif = n; if (iNotif.isOnError()) { throw Exceptions.propagate(iNotif.getThrowable()); diff --git a/src/main/java/rx/internal/operators/BlockingOperatorNext.java b/src/main/java/rx/internal/operators/BlockingOperatorNext.java index 05b5b8f1d8..abfab09f2c 100644 --- a/src/main/java/rx/internal/operators/BlockingOperatorNext.java +++ b/src/main/java/rx/internal/operators/BlockingOperatorNext.java @@ -19,7 +19,7 @@ import java.util.NoSuchElementException; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; import rx.Notification; import rx.Observable; @@ -147,11 +147,7 @@ public void remove() { private static class NextObserver extends Subscriber> { private final BlockingQueue> buf = new ArrayBlockingQueue>(1); - @SuppressWarnings("unused") - volatile int waiting; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WAITING_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting"); + final AtomicInteger waiting = new AtomicInteger(); @Override public void onCompleted() { @@ -166,7 +162,7 @@ public void onError(Throwable e) { @Override public void onNext(Notification args) { - if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) { + if (waiting.getAndSet(0) == 1 || !args.isOnNext()) { Notification toOffer = args; while (!buf.offer(toOffer)) { Notification concurrentItem = buf.poll(); @@ -185,7 +181,7 @@ public Notification takeNext() throws InterruptedException { return buf.take(); } void setWaiting(int value) { - waiting = value; + waiting.set(value); } } } diff --git a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java index e4722c9a60..f486c397f7 100644 --- a/src/main/java/rx/internal/operators/BufferUntilSubscriber.java +++ b/src/main/java/rx/internal/operators/BufferUntilSubscriber.java @@ -16,7 +16,7 @@ package rx.internal.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import rx.Observer; import rx.Subscriber; @@ -59,15 +59,9 @@ public static BufferUntilSubscriber create() { } /** The common state. */ - static final class State { - volatile Observer observerRef = null; - /** Field updater for observerRef. */ - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater OBSERVER_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef"); - + static final class State extends AtomicReference> { boolean casObserverRef(Observer expected, Observer next) { - return OBSERVER_UPDATER.compareAndSet(this, expected, next); + return compareAndSet(expected, next); } final Object guard = new Object(); @@ -92,7 +86,7 @@ public void call(final Subscriber s) { @SuppressWarnings("unchecked") @Override public void call() { - state.observerRef = EMPTY_OBSERVER; + state.set(EMPTY_OBSERVER); } })); boolean win = false; @@ -107,7 +101,7 @@ public void call() { while(true) { Object o; while ((o = state.buffer.poll()) != null) { - nl.accept(state.observerRef, o); + nl.accept(state.get(), o); } synchronized (state.guard) { if (state.buffer.isEmpty()) { @@ -138,7 +132,7 @@ private BufferUntilSubscriber(State state) { private void emit(Object v) { synchronized (state.guard) { state.buffer.add(v); - if (state.observerRef != null && !state.emitting) { + if (state.get() != null && !state.emitting) { // Have an observer and nobody is emitting, // should drain the `buffer` forward = true; @@ -148,7 +142,7 @@ private void emit(Object v) { if (forward) { Object o; while ((o = state.buffer.poll()) != null) { - state.nl.accept(state.observerRef, o); + state.nl.accept(state.get(), o); } // Because `emit(Object v)` will be called in sequence, // no event will be put into `buffer` after we drain it. @@ -158,7 +152,7 @@ private void emit(Object v) { @Override public void onCompleted() { if (forward) { - state.observerRef.onCompleted(); + state.get().onCompleted(); } else { emit(state.nl.completed()); @@ -168,7 +162,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { if (forward) { - state.observerRef.onError(e); + state.get().onError(e); } else { emit(state.nl.error(e)); @@ -178,7 +172,7 @@ public void onError(Throwable e) { @Override public void onNext(T t) { if (forward) { - state.observerRef.onNext(t); + state.get().onNext(t); } else { emit(state.nl.next(t)); @@ -188,7 +182,7 @@ public void onNext(T t) { @Override public boolean hasObservers() { synchronized (state.guard) { - return state.observerRef != null; + return state.get() != null; } } diff --git a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java index 54e1335205..5df99b2585 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java +++ b/src/main/java/rx/internal/operators/OnSubscribeCombineLatest.java @@ -19,7 +19,6 @@ import java.util.List; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable; import rx.Observable.OnSubscribe; @@ -90,10 +89,7 @@ final static class MultiSourceProducer implements Producer { private final BitSet completion; private volatile int completionCount; // does this need to be volatile or is WIP sufficient? - @SuppressWarnings("unused") - private volatile long counter; - @SuppressWarnings("rawtypes") - private static final AtomicLongFieldUpdater WIP = AtomicLongFieldUpdater.newUpdater(MultiSourceProducer.class, "counter"); + private final AtomicLong counter = new AtomicLong(); @SuppressWarnings("unchecked") public MultiSourceProducer(final Subscriber child, final List> sources, FuncN combinator) { @@ -139,7 +135,8 @@ public void request(long n) { * that there is always once who acts on each `tick`. Same concept as used in OperationObserveOn. */ void tick() { - if (WIP.getAndIncrement(this) == 0) { + AtomicLong localCounter = this.counter; + if (localCounter.getAndIncrement() == 0) { int emitted = 0; do { // we only emit if requested > 0 @@ -155,7 +152,7 @@ void tick() { } } } - } while (WIP.decrementAndGet(this) > 0); + } while (localCounter.decrementAndGet() > 0); if (emitted > 0) { for (MultiSourceRequestableSubscriber s : subscribers) { s.requestUpTo(emitted); diff --git a/src/main/java/rx/internal/operators/OperatorConcat.java b/src/main/java/rx/internal/operators/OperatorConcat.java index e91e669bba..398cbacf4d 100644 --- a/src/main/java/rx/internal/operators/OperatorConcat.java +++ b/src/main/java/rx/internal/operators/OperatorConcat.java @@ -16,8 +16,8 @@ package rx.internal.operators; import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Observable; import rx.Observable.Operator; @@ -84,14 +84,10 @@ static final class ConcatSubscriber extends Subscriber currentSubscriber; - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP = AtomicIntegerFieldUpdater.newUpdater(ConcatSubscriber.class, "wip"); + final AtomicInteger wip = new AtomicInteger(); // accessed by REQUESTED - private volatile long requested; - @SuppressWarnings("rawtypes") - private static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(ConcatSubscriber.class, "requested"); + private final AtomicLong requested = new AtomicLong(); private final ProducerArbiter arbiter; public ConcatSubscriber(Subscriber s, SerialSubscription current) { @@ -118,10 +114,10 @@ public void onStart() { private void requestFromChild(long n) { if (n <=0) return; // we track 'requested' so we know whether we should subscribe the next or not - long previous = BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + long previous = BackpressureUtils.getAndAddRequest(requested, n); arbiter.request(n); if (previous == 0) { - if (currentSubscriber == null && wip > 0) { + if (currentSubscriber == null && wip.get() > 0) { // this means we may be moving from one subscriber to another after having stopped processing // so need to kick off the subscribe via this request notification subscribeNext(); @@ -130,13 +126,13 @@ private void requestFromChild(long n) { } private void decrementRequested() { - REQUESTED.decrementAndGet(this); + requested.decrementAndGet(); } @Override public void onNext(Observable t) { queue.add(nl.next(t)); - if (WIP.getAndIncrement(this) == 0) { + if (wip.getAndIncrement() == 0) { subscribeNext(); } } @@ -150,7 +146,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { queue.add(nl.completed()); - if (WIP.getAndIncrement(this) == 0) { + if (wip.getAndIncrement() == 0) { subscribeNext(); } } @@ -158,14 +154,14 @@ public void onCompleted() { void completeInner() { currentSubscriber = null; - if (WIP.decrementAndGet(this) > 0) { + if (wip.decrementAndGet() > 0) { subscribeNext(); } request(1); } void subscribeNext() { - if (requested > 0) { + if (requested.get() > 0) { Object o = queue.poll(); if (nl.isCompleted(o)) { child.onCompleted(); @@ -189,10 +185,7 @@ static class ConcatInnerSubscriber extends Subscriber { private final Subscriber child; private final ConcatSubscriber parent; - @SuppressWarnings("unused") - private volatile int once = 0; - @SuppressWarnings("rawtypes") - private final static AtomicIntegerFieldUpdater ONCE = AtomicIntegerFieldUpdater.newUpdater(ConcatInnerSubscriber.class, "once"); + private final AtomicInteger once = new AtomicInteger(); private final ProducerArbiter arbiter; public ConcatInnerSubscriber(ConcatSubscriber parent, Subscriber child, ProducerArbiter arbiter) { @@ -210,7 +203,7 @@ public void onNext(T t) { @Override public void onError(Throwable e) { - if (ONCE.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(0, 1)) { // terminal error through parent so everything gets cleaned up, including this inner parent.onError(e); } @@ -218,7 +211,7 @@ public void onError(Throwable e) { @Override public void onCompleted() { - if (ONCE.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(0, 1)) { // terminal completion to parent so it continues to the next parent.completeInner(); } diff --git a/src/main/java/rx/internal/operators/OperatorMaterialize.java b/src/main/java/rx/internal/operators/OperatorMaterialize.java index e074cd5816..32b49c6c77 100644 --- a/src/main/java/rx/internal/operators/OperatorMaterialize.java +++ b/src/main/java/rx/internal/operators/OperatorMaterialize.java @@ -15,7 +15,7 @@ */ package rx.internal.operators; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import rx.Notification; import rx.Observable.Operator; @@ -76,10 +76,7 @@ private static class ParentSubscriber extends Subscriber { // guarded by this private boolean missed = false; - private volatile long requested; - @SuppressWarnings("rawtypes") - private static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater - .newUpdater(ParentSubscriber.class, "requested"); + private final AtomicLong requested = new AtomicLong(); ParentSubscriber(Subscriber> child) { this.child = child; @@ -91,7 +88,7 @@ public void onStart() { } void requestMore(long n) { - BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + BackpressureUtils.getAndAddRequest(requested, n); request(n); drain(); } @@ -117,12 +114,13 @@ public void onNext(T t) { private void decrementRequested() { // atomically decrement requested + AtomicLong localRequested = this.requested; while (true) { - long r = requested; + long r = localRequested.get(); if (r == Long.MAX_VALUE) { // don't decrement if unlimited requested return; - } else if (REQUESTED.compareAndSet(this, r, r - 1)) { + } else if (localRequested.compareAndSet(r, r - 1)) { return; } } @@ -137,11 +135,12 @@ private void drain() { } } // drain loop + final AtomicLong localRequested = this.requested; while (!child.isUnsubscribed()) { Notification tn; tn = terminalNotification; if (tn != null) { - if (requested > 0) { + if (localRequested.get() > 0) { // allow tn to be GC'd after the onNext call terminalNotification = null; // emit the terminal notification diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 1f1f380ff0..8aff74e67f 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -16,8 +16,8 @@ package rx.internal.operators; import java.util.Queue; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Observable.Operator; import rx.Producer; @@ -79,18 +79,10 @@ private static final class ObserveOnSubscriber extends Subscriber { // the status of the current stream volatile boolean finished = false; - @SuppressWarnings("unused") - volatile long requested = 0; + final AtomicLong requested = new AtomicLong(); - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "requested"); - - @SuppressWarnings("unused") - volatile long counter; + final AtomicLong counter = new AtomicLong(); - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(ObserveOnSubscriber.class, "counter"); - volatile Throwable error; // do NOT pass the Subscriber through to couple the subscription chain ... unsubscribing on the parent should @@ -114,7 +106,7 @@ void init() { @Override public void request(long n) { - BackpressureUtils.getAndAddRequest(REQUESTED, ObserveOnSubscriber.this, n); + BackpressureUtils.getAndAddRequest(requested, n); schedule(); } @@ -173,7 +165,7 @@ public void call() { }; protected void schedule() { - if (COUNTER_UPDATER.getAndIncrement(this) == 0) { + if (counter.getAndIncrement() == 0) { recursiveScheduler.schedule(action); } } @@ -181,10 +173,12 @@ protected void schedule() { // only execute this from schedule() void pollQueue() { int emitted = 0; + final AtomicLong localRequested = this.requested; + final AtomicLong localCounter = this.counter; do { - counter = 1; + localCounter.set(1); long produced = 0; - long r = requested; + long r = localRequested.get(); for (;;) { if (child.isUnsubscribed()) return; @@ -216,20 +210,18 @@ void pollQueue() { break; } } - if (produced > 0 && requested != Long.MAX_VALUE) { - REQUESTED.addAndGet(this, -produced); + if (produced > 0 && localRequested.get() != Long.MAX_VALUE) { + localRequested.addAndGet(-produced); } - } while (COUNTER_UPDATER.decrementAndGet(this) > 0); + } while (localCounter.decrementAndGet() > 0); if (emitted > 0) { request(emitted); } } } - static final class ScheduledUnsubscribe implements Subscription { + static final class ScheduledUnsubscribe extends AtomicInteger implements Subscription { final Scheduler.Worker worker; - volatile int once; - static final AtomicIntegerFieldUpdater ONCE_UPDATER = AtomicIntegerFieldUpdater.newUpdater(ScheduledUnsubscribe.class, "once"); volatile boolean unsubscribed = false; public ScheduledUnsubscribe(Scheduler.Worker worker) { @@ -243,7 +235,7 @@ public boolean isUnsubscribed() { @Override public void unsubscribe() { - if (ONCE_UPDATER.getAndSet(this, 1) == 0) { + if (getAndSet(1) == 0) { worker.schedule(new Action0() { @Override public void call() { diff --git a/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java b/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java index bdfcd3dbeb..0e5111b6c4 100644 --- a/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java +++ b/src/main/java/rx/internal/operators/OperatorRetryWithPredicate.java @@ -15,7 +15,7 @@ */ package rx.internal.operators; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; import rx.Observable; import rx.Producer; @@ -53,10 +53,7 @@ static final class SourceSubscriber extends Subscriber> { final SerialSubscription serialSubscription; final ProducerArbiter pa; - volatile int attempts; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ATTEMPTS_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts"); + final AtomicInteger attempts = new AtomicInteger(); public SourceSubscriber(Subscriber child, final Func2 predicate, @@ -88,7 +85,7 @@ public void onNext(final Observable o) { @Override public void call() { final Action0 _self = this; - ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this); + attempts.incrementAndGet(); // new subscription each time so if it unsubscribes itself it does not prevent retries // by unsubscribing the child subscription @@ -106,7 +103,7 @@ public void onCompleted() { public void onError(Throwable e) { if (!done) { done = true; - if (predicate.call(attempts, e) && !inner.isUnsubscribed()) { + if (predicate.call(attempts.get(), e) && !inner.isUnsubscribed()) { // retry again inner.schedule(_self); } else { diff --git a/src/main/java/rx/internal/operators/OperatorSampleWithTime.java b/src/main/java/rx/internal/operators/OperatorSampleWithTime.java index f3130cbb97..0fdcbd2c68 100644 --- a/src/main/java/rx/internal/operators/OperatorSampleWithTime.java +++ b/src/main/java/rx/internal/operators/OperatorSampleWithTime.java @@ -16,7 +16,8 @@ package rx.internal.operators; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; + import rx.Observable.Operator; import rx.Scheduler; import rx.Scheduler.Worker; @@ -64,11 +65,8 @@ static final class SamplerSubscriber extends Subscriber implements Action0 /** Indicates that no value is available. */ private static final Object EMPTY_TOKEN = new Object(); /** The shared value between the observer and the timed action. */ - volatile Object value = EMPTY_TOKEN; + final AtomicReference value = new AtomicReference(EMPTY_TOKEN); /** Updater for the value field. */ - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater VALUE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(SamplerSubscriber.class, Object.class, "value"); public SamplerSubscriber(Subscriber subscriber) { this.subscriber = subscriber; } @@ -80,7 +78,7 @@ public void onStart() { @Override public void onNext(T t) { - value = t; + value.set(t); } @Override @@ -97,7 +95,7 @@ public void onCompleted() { @Override public void call() { - Object localValue = VALUE_UPDATER.getAndSet(this, EMPTY_TOKEN); + Object localValue = value.getAndSet(EMPTY_TOKEN); if (localValue != EMPTY_TOKEN) { try { @SuppressWarnings("unchecked") diff --git a/src/main/java/rx/internal/operators/OperatorTimeoutBase.java b/src/main/java/rx/internal/operators/OperatorTimeoutBase.java index 038bf88a0c..65b940640c 100644 --- a/src/main/java/rx/internal/operators/OperatorTimeoutBase.java +++ b/src/main/java/rx/internal/operators/OperatorTimeoutBase.java @@ -16,8 +16,8 @@ package rx.internal.operators; import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Observable; import rx.Observable.Operator; @@ -90,16 +90,9 @@ public Subscriber call(Subscriber subscriber) { private final Observable other; private final Scheduler.Worker inner; - volatile int terminated; - volatile long actual; + final AtomicInteger terminated = new AtomicInteger(); + final AtomicLong actual = new AtomicLong(); - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater TERMINATED_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(TimeoutSubscriber.class, "terminated"); - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater ACTUAL_UPDATER - = AtomicLongFieldUpdater.newUpdater(TimeoutSubscriber.class, "actual"); - private TimeoutSubscriber( SerializedSubscriber serializedSubscriber, TimeoutStub timeoutStub, SerialSubscription serial, @@ -117,14 +110,14 @@ private TimeoutSubscriber( public void onNext(T value) { boolean onNextWins = false; synchronized (gate) { - if (terminated == 0) { - ACTUAL_UPDATER.incrementAndGet(this); + if (terminated.get() == 0) { + actual.incrementAndGet(); onNextWins = true; } } if (onNextWins) { serializedSubscriber.onNext(value); - serial.set(timeoutStub.call(this, actual, value, inner)); + serial.set(timeoutStub.call(this, actual.get(), value, inner)); } } @@ -132,7 +125,7 @@ public void onNext(T value) { public void onError(Throwable error) { boolean onErrorWins = false; synchronized (gate) { - if (TERMINATED_UPDATER.getAndSet(this, 1) == 0) { + if (terminated.getAndSet(1) == 0) { onErrorWins = true; } } @@ -146,7 +139,7 @@ public void onError(Throwable error) { public void onCompleted() { boolean onCompletedWins = false; synchronized (gate) { - if (TERMINATED_UPDATER.getAndSet(this, 1) == 0) { + if (terminated.getAndSet(1) == 0) { onCompletedWins = true; } } @@ -160,7 +153,7 @@ public void onTimeout(long seqId) { long expected = seqId; boolean timeoutWins = false; synchronized (gate) { - if (expected == actual && TERMINATED_UPDATER.getAndSet(this, 1) == 0) { + if (expected == actual.get() && terminated.getAndSet(1) == 0) { timeoutWins = true; } } diff --git a/src/main/java/rx/internal/operators/OperatorZip.java b/src/main/java/rx/internal/operators/OperatorZip.java index d4f0560718..df9dc4a00d 100644 --- a/src/main/java/rx/internal/operators/OperatorZip.java +++ b/src/main/java/rx/internal/operators/OperatorZip.java @@ -16,14 +16,14 @@ package rx.internal.operators; import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; import rx.Observable; import rx.Observable.Operator; -import rx.exceptions.*; import rx.Observer; import rx.Producer; import rx.Subscriber; +import rx.exceptions.Exceptions; +import rx.exceptions.MissingBackpressureException; import rx.functions.Func2; import rx.functions.Func3; import rx.functions.Func4; @@ -175,16 +175,11 @@ public void request(long n) { } - private static final class Zip { + private static final class Zip extends AtomicLong { private final Observer child; private final FuncN zipFunction; private final CompositeSubscription childSubscription = new CompositeSubscription(); - @SuppressWarnings("unused") - volatile long counter; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater COUNTER_UPDATER = AtomicLongFieldUpdater.newUpdater(Zip.class, "counter"); - static final int THRESHOLD = (int) (RxRingBuffer.SIZE * 0.7); int emitted = 0; // not volatile/synchronized as accessed inside COUNTER_UPDATER block @@ -227,7 +222,7 @@ void tick() { // nothing yet to do (initial request from Producer) return; } - if (COUNTER_UPDATER.getAndIncrement(this) == 0) { + if (getAndIncrement() == 0) { final int length = observers.length; final Observer child = this.child; final AtomicLong requested = this.requested; @@ -290,7 +285,7 @@ void tick() { break; } } - } while (COUNTER_UPDATER.decrementAndGet(this) > 0); + } while (decrementAndGet() > 0); } } diff --git a/src/main/java/rx/internal/operators/TakeLastQueueProducer.java b/src/main/java/rx/internal/operators/TakeLastQueueProducer.java index 7fc5ce9235..664dfd0e3a 100644 --- a/src/main/java/rx/internal/operators/TakeLastQueueProducer.java +++ b/src/main/java/rx/internal/operators/TakeLastQueueProducer.java @@ -16,14 +16,14 @@ package rx.internal.operators; +import java.util.Deque; +import java.util.concurrent.atomic.AtomicLong; + import rx.Producer; import rx.Subscriber; import rx.exceptions.Exceptions; -import java.util.Deque; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; - -final class TakeLastQueueProducer implements Producer { +final class TakeLastQueueProducer extends AtomicLong implements Producer { private final NotificationLite notification; private final Deque deque; @@ -36,10 +36,6 @@ public TakeLastQueueProducer(NotificationLite n, Deque q, Subscriber< this.subscriber = subscriber; } - private volatile long requested = 0; - @SuppressWarnings("rawtypes") - private static final AtomicLongFieldUpdater REQUESTED_UPDATER = AtomicLongFieldUpdater.newUpdater(TakeLastQueueProducer.class, "requested"); - void startEmitting() { if (!emittingStarted) { emittingStarted = true; @@ -49,14 +45,14 @@ void startEmitting() { @Override public void request(long n) { - if (requested == Long.MAX_VALUE) { + if (get() == Long.MAX_VALUE) { return; } long _c; if (n == Long.MAX_VALUE) { - _c = REQUESTED_UPDATER.getAndSet(this, Long.MAX_VALUE); + _c = getAndSet(Long.MAX_VALUE); } else { - _c = BackpressureUtils.getAndAddRequest(REQUESTED_UPDATER, this, n); + _c = BackpressureUtils.getAndAddRequest(this, n); } if (!emittingStarted) { // we haven't started yet, so record what was requested and return @@ -66,7 +62,7 @@ public void request(long n) { } void emit(long previousRequested) { - if (requested == Long.MAX_VALUE) { + if (get() == Long.MAX_VALUE) { // fast-path without backpressure if (previousRequested == 0) { try { @@ -91,7 +87,7 @@ void emit(long previousRequested) { * This complicated logic is done to avoid touching the volatile `requested` value * during the loop itself. If it is touched during the loop the performance is impacted significantly. */ - long numToEmit = requested; + long numToEmit = get(); int emitted = 0; Object o; while (--numToEmit >= 0 && (o = deque.poll()) != null) { @@ -106,14 +102,14 @@ void emit(long previousRequested) { } } for (; ; ) { - long oldRequested = requested; + long oldRequested = get(); long newRequested = oldRequested - emitted; if (oldRequested == Long.MAX_VALUE) { // became unbounded during the loop // continue the outer loop to emit the rest events. break; } - if (REQUESTED_UPDATER.compareAndSet(this, oldRequested, newRequested)) { + if (compareAndSet(oldRequested, newRequested)) { if (newRequested == 0) { // we're done emitting the number requested so return return; diff --git a/src/main/java/rx/internal/util/BackpressureDrainManager.java b/src/main/java/rx/internal/util/BackpressureDrainManager.java index f4a95573e7..38f714b67f 100644 --- a/src/main/java/rx/internal/util/BackpressureDrainManager.java +++ b/src/main/java/rx/internal/util/BackpressureDrainManager.java @@ -15,7 +15,7 @@ */ package rx.internal.util; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; import rx.Producer; import rx.annotations.Experimental; @@ -26,7 +26,7 @@ * terminal events. */ @Experimental -public final class BackpressureDrainManager implements Producer { +public final class BackpressureDrainManager extends AtomicLong implements Producer { /** * Interface representing the minimal callbacks required * to operate the drain part of a backpressure system. @@ -61,11 +61,6 @@ public interface BackpressureQueueCallback { void complete(Throwable exception); } - /** The request counter, updated via REQUESTED_COUNTER. */ - protected volatile long requestedCount; - /** Atomically updates the the requestedCount field. */ - protected static final AtomicLongFieldUpdater REQUESTED_COUNT - = AtomicLongFieldUpdater.newUpdater(BackpressureDrainManager.class, "requestedCount"); /** Indicates if one is in emitting phase, guarded by this. */ protected boolean emitting; /** Indicates a terminal state. */ @@ -138,7 +133,7 @@ public final void request(long n) { long r; long u; do { - r = requestedCount; + r = get(); mayDrain = r == 0; if (r == Long.MAX_VALUE) { break; @@ -153,7 +148,7 @@ public final void request(long n) { u = r + n; } } - } while (!REQUESTED_COUNT.compareAndSet(this, r, u)); + } while (!compareAndSet(r, u)); // since we implement producer, we have to call drain // on a 0-n request transition if (mayDrain) { @@ -174,7 +169,7 @@ public final void drain() { emitting = true; term = terminated; } - n = requestedCount; + n = get(); boolean skipFinal = false; try { BackpressureQueueCallback a = actual; @@ -210,7 +205,7 @@ public final void drain() { term = terminated; boolean more = a.peek() != null; // if no backpressure below - if (requestedCount == Long.MAX_VALUE) { + if (get() == Long.MAX_VALUE) { // no new data arrived since the last poll if (!more && !term) { skipFinal = true; @@ -219,7 +214,7 @@ public final void drain() { } n = Long.MAX_VALUE; } else { - n = REQUESTED_COUNT.addAndGet(this, -emitted); + n = addAndGet(-emitted); if ((n == 0 || !more) && (!term || more)) { skipFinal = true; emitting = false; diff --git a/src/main/java/rx/internal/util/PaddedAtomicInteger.java b/src/main/java/rx/internal/util/PaddedAtomicInteger.java deleted file mode 100644 index e0ebdd3a21..0000000000 --- a/src/main/java/rx/internal/util/PaddedAtomicInteger.java +++ /dev/null @@ -1,30 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package rx.internal.util; - -/** - * A padded atomic integer to fill in 4 cache lines to avoid any false sharing or - * adjacent prefetch. - * Based on Netty's implementation. - */ -public final class PaddedAtomicInteger extends PaddedAtomicIntegerBase { - /** */ - private static final long serialVersionUID = 8781891581317286855L; - /** Padding. */ - public transient long p16, p17, p18, p19, p20, p21, p22; // 56 bytes (the remaining 8 is in the base) - /** Padding. */ - public transient long p24, p25, p26, p27, p28, p29, p30, p31; // 64 bytes -} diff --git a/src/main/java/rx/internal/util/PaddedAtomicIntegerBase.java b/src/main/java/rx/internal/util/PaddedAtomicIntegerBase.java deleted file mode 100644 index afa67e4b81..0000000000 --- a/src/main/java/rx/internal/util/PaddedAtomicIntegerBase.java +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package rx.internal.util; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -/** - * The atomic integer base padded at the front. - * Based on Netty's implementation. - */ -abstract class PaddedAtomicIntegerBase extends FrontPadding { - - private static final long serialVersionUID = 6513142711280243198L; - - private static final AtomicIntegerFieldUpdater updater; - - static { - updater = AtomicIntegerFieldUpdater.newUpdater(PaddedAtomicIntegerBase.class, "value"); - } - - private volatile int value; // 8-byte object field (or 4-byte + padding) - - public final int get() { - return value; - } - - public final void set(int newValue) { - this.value = newValue; - } - - public final void lazySet(int newValue) { - updater.lazySet(this, newValue); - } - - public final boolean compareAndSet(int expect, int update) { - return updater.compareAndSet(this, expect, update); - } - - public final boolean weakCompareAndSet(int expect, int update) { - return updater.weakCompareAndSet(this, expect, update); - } - - public final int getAndSet(int newValue) { - return updater.getAndSet(this, value); - } - - public final int getAndAdd(int delta) { - return updater.getAndAdd(this, delta); - } - public final int incrementAndGet() { - return updater.incrementAndGet(this); - } - public final int decrementAndGet() { - return updater.decrementAndGet(this); - } - public final int getAndIncrement() { - return updater.getAndIncrement(this); - } - public final int getAndDecrement() { - return updater.getAndDecrement(this); - } - public final int addAndGet(int delta) { - return updater.addAndGet(this, delta); - } - - @Override - public String toString() { - return String.valueOf(get()); - } -} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/RxThreadFactory.java b/src/main/java/rx/internal/util/RxThreadFactory.java index 16f7551bcb..cc6d45d486 100644 --- a/src/main/java/rx/internal/util/RxThreadFactory.java +++ b/src/main/java/rx/internal/util/RxThreadFactory.java @@ -16,13 +16,10 @@ package rx.internal.util; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicLong; -public final class RxThreadFactory implements ThreadFactory { +public final class RxThreadFactory extends AtomicLong implements ThreadFactory { final String prefix; - volatile long counter; - static final AtomicLongFieldUpdater COUNTER_UPDATER - = AtomicLongFieldUpdater.newUpdater(RxThreadFactory.class, "counter"); public RxThreadFactory(String prefix) { this.prefix = prefix; @@ -30,7 +27,7 @@ public RxThreadFactory(String prefix) { @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, prefix + COUNTER_UPDATER.incrementAndGet(this)); + Thread t = new Thread(r, prefix + incrementAndGet()); t.setDaemon(true); return t; } diff --git a/src/main/java/rx/internal/util/SubscriptionIndexedRingBuffer.java b/src/main/java/rx/internal/util/SubscriptionIndexedRingBuffer.java deleted file mode 100644 index 6dcb2d566d..0000000000 --- a/src/main/java/rx/internal/util/SubscriptionIndexedRingBuffer.java +++ /dev/null @@ -1,145 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.util; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import rx.Subscription; -import rx.functions.Func1; - -/** - * Similar to CompositeSubscription but giving extra access to internals so we can reuse a datastructure. - *

- * NOTE: This purposefully is leaking the internal data structure through the API for efficiency reasons to avoid extra object allocations. - */ -public final class SubscriptionIndexedRingBuffer implements Subscription { - - private volatile IndexedRingBuffer subscriptions = IndexedRingBuffer.getInstance(); - private volatile int unsubscribed = 0; - @SuppressWarnings("rawtypes") - private final static AtomicIntegerFieldUpdater UNSUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(SubscriptionIndexedRingBuffer.class, "unsubscribed"); - - public SubscriptionIndexedRingBuffer() { - } - - @Override - public boolean isUnsubscribed() { - return unsubscribed == 1; - } - - /** - * Adds a new {@link Subscription} to this {@code CompositeSubscription} if the {@code CompositeSubscription} is not yet unsubscribed. If the {@code CompositeSubscription} is - * unsubscribed, {@code add} will indicate this by explicitly unsubscribing the new {@code Subscription} as - * well. - * - * @param s - * the {@link Subscription} to add - * - * @return int index that can be used to remove a Subscription - */ - public synchronized int add(final T s) { - // TODO figure out how to remove synchronized here. See https://github.com/ReactiveX/RxJava/issues/1420 - if (unsubscribed == 1 || subscriptions == null) { - s.unsubscribe(); - return -1; - } else { - int n = subscriptions.add(s); - // double check for race condition - if (unsubscribed == 1) { - s.unsubscribe(); - } - return n; - } - } - - /** - * Uses the Node received from `add` to remove this Subscription. - *

- * Unsubscribes the Subscription after removal - */ - public void remove(final int n) { - if (unsubscribed == 1 || subscriptions == null || n < 0) { - return; - } - Subscription t = subscriptions.remove(n); - if (t != null) { - // if we removed successfully we then need to call unsubscribe on it - if (t != null) { - t.unsubscribe(); - } - } - } - - /** - * Uses the Node received from `add` to remove this Subscription. - *

- * Does not unsubscribe the Subscription after removal. - */ - public void removeSilently(final int n) { - if (unsubscribed == 1 || subscriptions == null || n < 0) { - return; - } - subscriptions.remove(n); - } - - @Override - public void unsubscribe() { - if (UNSUBSCRIBED.compareAndSet(this, 0, 1) && subscriptions != null) { - // we will only get here once - unsubscribeFromAll(subscriptions); - - IndexedRingBuffer s = subscriptions; - subscriptions = null; - s.unsubscribe(); - } - } - - public int forEach(Func1 action) { - return forEach(action, 0); - } - - /** - * - * @param action - * @return int of last index seen if forEach exited early - */ - public synchronized int forEach(Func1 action, int startIndex) { - // TODO figure out how to remove synchronized here. See https://github.com/ReactiveX/RxJava/issues/1420 - if (unsubscribed == 1 || subscriptions == null) { - return 0; - } - return subscriptions.forEach(action, startIndex); - } - - private static void unsubscribeFromAll(IndexedRingBuffer subscriptions) { - if (subscriptions == null) { - return; - } - - // TODO migrate to drain (remove while we're doing this) so we don't have to immediately clear it in IndexedRingBuffer.releaseToPool? - subscriptions.forEach(UNSUBSCRIBE); - } - - private final static Func1 UNSUBSCRIBE = new Func1() { - - @Override - public Boolean call(Subscription s) { - s.unsubscribe(); - return Boolean.TRUE; - } - }; - -} diff --git a/src/main/java/rx/schedulers/TrampolineScheduler.java b/src/main/java/rx/schedulers/TrampolineScheduler.java index 1482d34756..9f7b14eb43 100644 --- a/src/main/java/rx/schedulers/TrampolineScheduler.java +++ b/src/main/java/rx/schedulers/TrampolineScheduler.java @@ -18,7 +18,6 @@ import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import rx.Scheduler; import rx.Subscription; @@ -47,9 +46,7 @@ public Worker createWorker() { private static class InnerCurrentThreadScheduler extends Scheduler.Worker implements Subscription { - private static final AtomicIntegerFieldUpdater COUNTER_UPDATER = AtomicIntegerFieldUpdater.newUpdater(InnerCurrentThreadScheduler.class, "counter"); - @SuppressWarnings("unused") - volatile int counter; + final AtomicInteger counter = new AtomicInteger(); private final PriorityBlockingQueue queue = new PriorityBlockingQueue(); private final BooleanSubscription innerSubscription = new BooleanSubscription(); private final AtomicInteger wip = new AtomicInteger(); @@ -70,7 +67,7 @@ private Subscription enqueue(Action0 action, long execTime) { if (innerSubscription.isUnsubscribed()) { return Subscriptions.unsubscribed(); } - final TimedAction timedAction = new TimedAction(action, execTime, COUNTER_UPDATER.incrementAndGet(this)); + final TimedAction timedAction = new TimedAction(action, execTime, counter.incrementAndGet()); queue.add(timedAction); if (wip.getAndIncrement() == 0) { diff --git a/src/main/java/rx/subjects/AsyncSubject.java b/src/main/java/rx/subjects/AsyncSubject.java index e3e508164f..b124b8966c 100644 --- a/src/main/java/rx/subjects/AsyncSubject.java +++ b/src/main/java/rx/subjects/AsyncSubject.java @@ -67,7 +67,7 @@ public static AsyncSubject create() { state.onTerminated = new Action1>() { @Override public void call(SubjectObserver o) { - Object v = state.get(); + Object v = state.getLatest(); NotificationLite nl = state.nl; o.accept(v, nl); if (v == null || (!nl.isCompleted(v) && !nl.isError(v))) { @@ -145,7 +145,7 @@ public boolean hasObservers() { @Override public boolean hasValue() { Object v = lastValue; - Object o = state.get(); + Object o = state.getLatest(); return !nl.isError(o) && nl.isNext(v); } /** @@ -155,7 +155,7 @@ public boolean hasValue() { @Experimental @Override public boolean hasThrowable() { - Object o = state.get(); + Object o = state.getLatest(); return nl.isError(o); } /** @@ -165,7 +165,7 @@ public boolean hasThrowable() { @Experimental @Override public boolean hasCompleted() { - Object o = state.get(); + Object o = state.getLatest(); return o != null && !nl.isError(o); } /** @@ -181,7 +181,7 @@ public boolean hasCompleted() { @Override public T getValue() { Object v = lastValue; - Object o = state.get(); + Object o = state.getLatest(); if (!nl.isError(o) && nl.isNext(v)) { return nl.getValue(v); } @@ -195,7 +195,7 @@ public T getValue() { @Experimental @Override public Throwable getThrowable() { - Object o = state.get(); + Object o = state.getLatest(); if (nl.isError(o)) { return nl.getError(o); } @@ -207,7 +207,7 @@ public Throwable getThrowable() { @SuppressWarnings("unchecked") public T[] getValues(T[] a) { Object v = lastValue; - Object o = state.get(); + Object o = state.getLatest(); if (!nl.isError(o) && nl.isNext(v)) { T val = nl.getValue(v); if (a.length == 0) { diff --git a/src/main/java/rx/subjects/BehaviorSubject.java b/src/main/java/rx/subjects/BehaviorSubject.java index 218eef5eba..d912e81411 100644 --- a/src/main/java/rx/subjects/BehaviorSubject.java +++ b/src/main/java/rx/subjects/BehaviorSubject.java @@ -97,13 +97,13 @@ public static BehaviorSubject create(T defaultValue) { private static BehaviorSubject create(T defaultValue, boolean hasDefault) { final SubjectSubscriptionManager state = new SubjectSubscriptionManager(); if (hasDefault) { - state.set(NotificationLite.instance().next(defaultValue)); + state.setLatest(NotificationLite.instance().next(defaultValue)); } state.onAdded = new Action1>() { @Override public void call(SubjectObserver o) { - o.emitFirst(state.get(), state.nl); + o.emitFirst(state.getLatest(), state.nl); } }; @@ -121,7 +121,7 @@ protected BehaviorSubject(OnSubscribe onSubscribe, SubjectSubscriptionManager @Override public void onCompleted() { - Object last = state.get(); + Object last = state.getLatest(); if (last == null || state.active) { Object n = nl.completed(); for (SubjectObserver bo : state.terminate(n)) { @@ -132,7 +132,7 @@ public void onCompleted() { @Override public void onError(Throwable e) { - Object last = state.get(); + Object last = state.getLatest(); if (last == null || state.active) { Object n = nl.error(e); List errors = null; @@ -153,7 +153,7 @@ public void onError(Throwable e) { @Override public void onNext(T v) { - Object last = state.get(); + Object last = state.getLatest(); if (last == null || state.active) { Object n = nl.next(v); for (SubjectObserver bo : state.next(n)) { @@ -180,7 +180,7 @@ public boolean hasObservers() { @Experimental @Override public boolean hasValue() { - Object o = state.get(); + Object o = state.getLatest(); return nl.isNext(o); } /** @@ -190,7 +190,7 @@ public boolean hasValue() { @Experimental @Override public boolean hasThrowable() { - Object o = state.get(); + Object o = state.getLatest(); return nl.isError(o); } /** @@ -200,7 +200,7 @@ public boolean hasThrowable() { @Experimental @Override public boolean hasCompleted() { - Object o = state.get(); + Object o = state.getLatest(); return nl.isCompleted(o); } /** @@ -215,7 +215,7 @@ public boolean hasCompleted() { @Experimental @Override public T getValue() { - Object o = state.get(); + Object o = state.getLatest(); if (nl.isNext(o)) { return nl.getValue(o); } @@ -229,7 +229,7 @@ public T getValue() { @Experimental @Override public Throwable getThrowable() { - Object o = state.get(); + Object o = state.getLatest(); if (nl.isError(o)) { return nl.getError(o); } @@ -239,7 +239,7 @@ public Throwable getThrowable() { @Experimental @SuppressWarnings("unchecked") public T[] getValues(T[] a) { - Object o = state.get(); + Object o = state.getLatest(); if (nl.isNext(o)) { if (a.length == 0) { a = (T[])Array.newInstance(a.getClass().getComponentType(), 1); diff --git a/src/main/java/rx/subjects/PublishSubject.java b/src/main/java/rx/subjects/PublishSubject.java index 6ec0af1608..f9dd1f0e4f 100644 --- a/src/main/java/rx/subjects/PublishSubject.java +++ b/src/main/java/rx/subjects/PublishSubject.java @@ -63,7 +63,7 @@ public static PublishSubject create() { @Override public void call(SubjectObserver o) { - o.emitFirst(state.get(), state.nl); + o.emitFirst(state.getLatest(), state.nl); } }; @@ -127,7 +127,7 @@ public boolean hasObservers() { @Experimental @Override public boolean hasThrowable() { - Object o = state.get(); + Object o = state.getLatest(); return nl.isError(o); } /** @@ -137,7 +137,7 @@ public boolean hasThrowable() { @Experimental @Override public boolean hasCompleted() { - Object o = state.get(); + Object o = state.getLatest(); return o != null && !nl.isError(o); } /** @@ -148,7 +148,7 @@ public boolean hasCompleted() { @Experimental @Override public Throwable getThrowable() { - Object o = state.get(); + Object o = state.getLatest(); if (nl.isError(o)) { return nl.getError(o); } diff --git a/src/main/java/rx/subjects/ReplaySubject.java b/src/main/java/rx/subjects/ReplaySubject.java index f2230f4bba..d683db0b12 100644 --- a/src/main/java/rx/subjects/ReplaySubject.java +++ b/src/main/java/rx/subjects/ReplaySubject.java @@ -16,15 +16,17 @@ package rx.subjects; import java.lang.reflect.Array; -import java.util.*; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; -import rx.*; import rx.Observer; +import rx.Scheduler; import rx.annotations.Experimental; import rx.exceptions.Exceptions; -import rx.functions.*; +import rx.functions.Action1; +import rx.functions.Func1; import rx.internal.operators.NotificationLite; import rx.internal.util.UtilityFunctions; import rx.schedulers.Timestamped; @@ -113,15 +115,17 @@ public void call(SubjectObserver o) { } boolean skipFinal = false; try { + //noinspection UnnecessaryLocalVariable - Avoid re-read from outside this scope + final UnboundedReplayState localState = state; for (;;) { int idx = o.index(); - int sidx = state.index; + int sidx = localState.get(); if (idx != sidx) { - Integer j = state.replayObserverFromIndex(idx, o); + Integer j = localState.replayObserverFromIndex(idx, o); o.index(j); } synchronized (o) { - if (sidx == state.index) { + if (sidx == localState.get()) { o.emitting = false; skipFinal = true; break; @@ -410,7 +414,7 @@ public void onCompleted() { * @return Returns the number of subscribers. */ /* Support test. */int subscriberCount() { - return ssm.state.observers.length; + return ssm.get().observers.length; } @Override @@ -439,17 +443,12 @@ private boolean caughtUp(SubjectObserver o) { * The unbounded replay state. * @param the input and output type */ - static final class UnboundedReplayState implements ReplayState { + static final class UnboundedReplayState extends AtomicInteger implements ReplayState { private final NotificationLite nl = NotificationLite.instance(); /** The buffer. */ private final ArrayList list; /** The termination flag. */ private volatile boolean terminated; - /** The size of the buffer. */ - volatile int index; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater INDEX_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(UnboundedReplayState.class, "index"); public UnboundedReplayState(int initialCapacity) { list = new ArrayList(initialCapacity); } @@ -458,7 +457,7 @@ public UnboundedReplayState(int initialCapacity) { public void next(T n) { if (!terminated) { list.add(nl.next(n)); - INDEX_UPDATER.getAndIncrement(this); // release index + getAndIncrement(); // release index } } @@ -471,7 +470,7 @@ public void complete() { if (!terminated) { terminated = true; list.add(nl.completed()); - INDEX_UPDATER.getAndIncrement(this); // release index + getAndIncrement(); // release index } } @Override @@ -479,7 +478,7 @@ public void error(Throwable e) { if (!terminated) { terminated = true; list.add(nl.error(e)); - INDEX_UPDATER.getAndIncrement(this); // release index + getAndIncrement(); // release index } } @@ -511,7 +510,7 @@ public boolean replayObserver(SubjectObserver observer) { @Override public Integer replayObserverFromIndex(Integer idx, SubjectObserver observer) { int i = idx; - while (i < index) { + while (i < get()) { accept(observer, i); i++; } @@ -526,7 +525,7 @@ public Integer replayObserverFromIndexTest(Integer idx, SubjectObserver 0) { Object o = list.get(idx - 1); if (nl.isCompleted(o) || nl.isError(o)) { @@ -561,7 +560,7 @@ public T[] toArray(T[] a) { } @Override public T latest() { - int idx = index; + int idx = get(); if (idx > 0) { Object o = list.get(idx - 1); if (nl.isCompleted(o) || nl.isError(o)) { @@ -1102,7 +1101,7 @@ public void evictFinal(NodeList list) { @Override public boolean hasThrowable() { NotificationLite nl = ssm.nl; - Object o = ssm.get(); + Object o = ssm.getLatest(); return nl.isError(o); } /** @@ -1113,7 +1112,7 @@ public boolean hasThrowable() { @Override public boolean hasCompleted() { NotificationLite nl = ssm.nl; - Object o = ssm.get(); + Object o = ssm.getLatest(); return o != null && !nl.isError(o); } /** @@ -1125,7 +1124,7 @@ public boolean hasCompleted() { @Override public Throwable getThrowable() { NotificationLite nl = ssm.nl; - Object o = ssm.get(); + Object o = ssm.getLatest(); if (nl.isError(o)) { return nl.getError(o); } diff --git a/src/main/java/rx/subjects/SubjectSubscriptionManager.java b/src/main/java/rx/subjects/SubjectSubscriptionManager.java index 542d050c39..9a0c90ece7 100644 --- a/src/main/java/rx/subjects/SubjectSubscriptionManager.java +++ b/src/main/java/rx/subjects/SubjectSubscriptionManager.java @@ -17,7 +17,7 @@ import java.util.ArrayList; import java.util.List; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import rx.Observable.OnSubscribe; import rx.Observer; @@ -33,11 +33,7 @@ * @param the source and return value type */ @SuppressWarnings({"unchecked", "rawtypes"}) -/* package */final class SubjectSubscriptionManager implements OnSubscribe { - /** Contains the unsubscription flag and the array of active subscribers. */ - volatile State state = State.EMPTY; - static final AtomicReferenceFieldUpdater STATE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(SubjectSubscriptionManager.class, State.class, "state"); +/* package */final class SubjectSubscriptionManager extends AtomicReference> implements OnSubscribe { /** Stores the latest value or the terminal value for some Subjects. */ volatile Object latest; /** Indicates that the subject is active (cheaper than checking the state).*/ @@ -50,6 +46,11 @@ Action1> onTerminated = Actions.empty(); /** The notification lite. */ public final NotificationLite nl = NotificationLite.instance(); + + public SubjectSubscriptionManager() { + super(State.EMPTY); + } + @Override public void call(final Subscriber child) { SubjectObserver bo = new SubjectObserver(child); @@ -71,16 +72,16 @@ public void call() { })); } /** Set the latest NotificationLite value. */ - void set(Object value) { + void setLatest(Object value) { latest = value; } /** @return Retrieve the latest NotificationLite value */ - Object get() { + Object getLatest() { return latest; } /** @return the array of active subscribers, don't write into the array! */ SubjectObserver[] observers() { - return state.observers; + return get().observers; } /** * Try to atomically add a SubjectObserver to the active state. @@ -89,13 +90,13 @@ SubjectObserver[] observers() { */ boolean add(SubjectObserver o) { do { - State oldState = state; + State oldState = get(); if (oldState.terminated) { onTerminated.call(o); return false; } State newState = oldState.add(o); - if (STATE_UPDATER.compareAndSet(this, oldState, newState)) { + if (compareAndSet(oldState, newState)) { onAdded.call(o); return true; } @@ -107,12 +108,12 @@ boolean add(SubjectObserver o) { */ void remove(SubjectObserver o) { do { - State oldState = state; + State oldState = get(); if (oldState.terminated) { return; } State newState = oldState.remove(o); - if (newState == oldState || STATE_UPDATER.compareAndSet(this, oldState, newState)) { + if (newState == oldState || compareAndSet(oldState, newState)) { return; } } while (true); @@ -123,8 +124,8 @@ void remove(SubjectObserver o) { * @return the array of SubjectObservers, don't write into the array! */ SubjectObserver[] next(Object n) { - set(n); - return state.observers; + setLatest(n); + return get().observers; } /** * Atomically set the terminal NotificationLite value (which could be any of the 3), @@ -133,14 +134,14 @@ SubjectObserver[] next(Object n) { * @return the last active SubjectObservers */ SubjectObserver[] terminate(Object n) { - set(n); + setLatest(n); active = false; - State oldState = state; + State oldState = get(); if (oldState.terminated) { return State.NO_OBSERVERS; } - return STATE_UPDATER.getAndSet(this, State.TERMINATED).observers; + return getAndSet(State.TERMINATED).observers; } /** State-machine representing the termination state and active SubjectObservers. */ diff --git a/src/main/java/rx/subjects/TestSubject.java b/src/main/java/rx/subjects/TestSubject.java index 2de860c602..2cc32b007c 100644 --- a/src/main/java/rx/subjects/TestSubject.java +++ b/src/main/java/rx/subjects/TestSubject.java @@ -49,7 +49,7 @@ public static TestSubject create(TestScheduler scheduler) { @Override public void call(SubjectObserver o) { - o.emitFirst(state.get(), state.nl); + o.emitFirst(state.getLatest(), state.nl); } }; diff --git a/src/main/java/rx/subscriptions/BooleanSubscription.java b/src/main/java/rx/subscriptions/BooleanSubscription.java index ef0b082f79..9ba4100a66 100644 --- a/src/main/java/rx/subscriptions/BooleanSubscription.java +++ b/src/main/java/rx/subscriptions/BooleanSubscription.java @@ -15,7 +15,7 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import rx.Observable; import rx.Subscription; @@ -27,17 +27,14 @@ */ public final class BooleanSubscription implements Subscription { - private final Action0 action; - volatile int unsubscribed; - static final AtomicIntegerFieldUpdater UNSUBSCRIBED_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(BooleanSubscription.class, "unsubscribed"); + final AtomicReference actionRef; public BooleanSubscription() { - action = null; + actionRef = new AtomicReference(); } private BooleanSubscription(Action0 action) { - this.action = action; + actionRef = new AtomicReference(action); } /** @@ -62,16 +59,25 @@ public static BooleanSubscription create(Action0 onUnsubscribe) { @Override public boolean isUnsubscribed() { - return unsubscribed != 0; + return actionRef.get() == EMPTY_ACTION; } @Override public final void unsubscribe() { - if (UNSUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) { - if (action != null) { + Action0 action = actionRef.get(); + if (action != EMPTY_ACTION) { + action = actionRef.getAndSet(EMPTY_ACTION); + if (action != null && action != EMPTY_ACTION) { action.call(); } } } + static final Action0 EMPTY_ACTION = new Action0() { + @Override + public void call() { + + } + }; + } diff --git a/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java b/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java index 8591b062d7..ec0ea7c6df 100644 --- a/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java +++ b/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java @@ -15,7 +15,7 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import rx.Observable; import rx.Subscription; @@ -26,9 +26,7 @@ */ public final class MultipleAssignmentSubscription implements Subscription { - volatile State state = new State(false, Subscriptions.empty()); - static final AtomicReferenceFieldUpdater STATE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(MultipleAssignmentSubscription.class, State.class, "state"); + final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); private static final class State { final boolean isUnsubscribed; @@ -50,21 +48,22 @@ State set(Subscription s) { } @Override public boolean isUnsubscribed() { - return state.isUnsubscribed; + return state.get().isUnsubscribed; } @Override public void unsubscribe() { State oldState; State newState; + final AtomicReference localState = this.state; do { - oldState = state; + oldState = localState.get(); if (oldState.isUnsubscribed) { return; } else { newState = oldState.unsubscribe(); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } while (!localState.compareAndSet(oldState, newState)); oldState.subscription.unsubscribe(); } @@ -81,15 +80,16 @@ public void set(Subscription s) { } State oldState; State newState; + final AtomicReference localState = this.state; do { - oldState = state; + oldState = localState.get(); if (oldState.isUnsubscribed) { s.unsubscribe(); return; } else { newState = oldState.set(s); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } while (!localState.compareAndSet(oldState, newState)); } /** @@ -98,7 +98,7 @@ public void set(Subscription s) { * @return the {@link Subscription} that underlies the {@code MultipleAssignmentSubscription} */ public Subscription get() { - return state.subscription; + return state.get().subscription; } } diff --git a/src/main/java/rx/subscriptions/RefCountSubscription.java b/src/main/java/rx/subscriptions/RefCountSubscription.java index af225fa1a7..a45c6d3b66 100644 --- a/src/main/java/rx/subscriptions/RefCountSubscription.java +++ b/src/main/java/rx/subscriptions/RefCountSubscription.java @@ -15,8 +15,8 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; @@ -27,9 +27,7 @@ public final class RefCountSubscription implements Subscription { private final Subscription actual; static final State EMPTY_STATE = new State(false, 0); - volatile State state = EMPTY_STATE; - static final AtomicReferenceFieldUpdater STATE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(RefCountSubscription.class, State.class, "state"); + final AtomicReference state = new AtomicReference(EMPTY_STATE); private static final class State { final boolean isUnsubscribed; @@ -77,34 +75,36 @@ public RefCountSubscription(Subscription s) { public Subscription get() { State oldState; State newState; + final AtomicReference localState = this.state; do { - oldState = state; + oldState = localState.get(); if (oldState.isUnsubscribed) { return Subscriptions.unsubscribed(); } else { newState = oldState.addChild(); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } while (!localState.compareAndSet(oldState, newState)); return new InnerSubscription(this); } @Override public boolean isUnsubscribed() { - return state.isUnsubscribed; + return state.get().isUnsubscribed; } @Override public void unsubscribe() { State oldState; State newState; + final AtomicReference localState = this.state; do { - oldState = state; + oldState = localState.get(); if (oldState.isUnsubscribed) { return; } newState = oldState.unsubscribe(); - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } while (!localState.compareAndSet(oldState, newState)); unsubscribeActualIfApplicable(newState); } @@ -116,32 +116,30 @@ private void unsubscribeActualIfApplicable(State state) { void unsubscribeAChild() { State oldState; State newState; + final AtomicReference localState = this.state; do { - oldState = state; + oldState = localState.get(); newState = oldState.removeChild(); - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } while (!localState.compareAndSet(oldState, newState)); unsubscribeActualIfApplicable(newState); } /** The individual sub-subscriptions. */ - private static final class InnerSubscription implements Subscription { + private static final class InnerSubscription extends AtomicInteger implements Subscription { final RefCountSubscription parent; - volatile int innerDone; - static final AtomicIntegerFieldUpdater INNER_DONE_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(InnerSubscription.class, "innerDone"); public InnerSubscription(RefCountSubscription parent) { this.parent = parent; } @Override public void unsubscribe() { - if (INNER_DONE_UPDATER.compareAndSet(this, 0, 1)) { + if (compareAndSet(0, 1)) { parent.unsubscribeAChild(); } } @Override public boolean isUnsubscribed() { - return innerDone != 0; + return get() != 0; } }; } diff --git a/src/main/java/rx/subscriptions/SerialSubscription.java b/src/main/java/rx/subscriptions/SerialSubscription.java index 6cc5019092..f8aff9b67e 100644 --- a/src/main/java/rx/subscriptions/SerialSubscription.java +++ b/src/main/java/rx/subscriptions/SerialSubscription.java @@ -15,7 +15,7 @@ */ package rx.subscriptions; -import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import rx.Subscription; @@ -24,9 +24,7 @@ * the previous underlying subscription to be unsubscribed. */ public final class SerialSubscription implements Subscription { - volatile State state = new State(false, Subscriptions.empty()); - static final AtomicReferenceFieldUpdater STATE_UPDATER - = AtomicReferenceFieldUpdater.newUpdater(SerialSubscription.class, State.class, "state"); + final AtomicReference state = new AtomicReference(new State(false, Subscriptions.empty())); private static final class State { final boolean isUnsubscribed; @@ -49,21 +47,22 @@ State set(Subscription s) { @Override public boolean isUnsubscribed() { - return state.isUnsubscribed; + return state.get().isUnsubscribed; } @Override public void unsubscribe() { State oldState; State newState; + final AtomicReference localState = this.state; do { - oldState = state; + oldState = localState.get(); if (oldState.isUnsubscribed) { return; } else { newState = oldState.unsubscribe(); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } while (!localState.compareAndSet(oldState, newState)); oldState.subscription.unsubscribe(); } @@ -81,15 +80,16 @@ public void set(Subscription s) { } State oldState; State newState; + final AtomicReference localState = this.state; do { - oldState = state; + oldState = localState.get(); if (oldState.isUnsubscribed) { s.unsubscribe(); return; } else { newState = oldState.set(s); } - } while (!STATE_UPDATER.compareAndSet(this, oldState, newState)); + } while (!localState.compareAndSet(oldState, newState)); oldState.subscription.unsubscribe(); } @@ -99,7 +99,7 @@ public void set(Subscription s) { * @return the current {@link Subscription} that is being represented by this {@code SerialSubscription} */ public Subscription get() { - return state.subscription; + return state.get().subscription; } }