diff --git a/src/main/java/rx/internal/operators/BackpressureUtils.java b/src/main/java/rx/internal/operators/BackpressureUtils.java index 4a2fa90f48..6a13709649 100644 --- a/src/main/java/rx/internal/operators/BackpressureUtils.java +++ b/src/main/java/rx/internal/operators/BackpressureUtils.java @@ -45,7 +45,9 @@ private BackpressureUtils() { * @param n * the number of requests to add to the requested count * @return requested value just prior to successful addition + * @deprecated Android has issues with reflection-based atomics */ + @Deprecated public static long getAndAddRequest(AtomicLongFieldUpdater requested, T object, long n) { // add n to field but check for overflow while (true) { diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java index 85d2509264..dd7ff32b5b 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeConcat.java @@ -52,9 +52,7 @@ static final class CompletableConcatSubscriber volatile boolean done; - volatile int once; - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(CompletableConcatSubscriber.class, "once"); + final AtomicBoolean once; final ConcatInnerSubscriber inner; @@ -67,6 +65,7 @@ public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) { this.sr = new SerialSubscription(); this.inner = new ConcatInnerSubscriber(); this.wip = new AtomicInteger(); + this.once = new AtomicBoolean(); add(sr); request(prefetch); } @@ -84,7 +83,7 @@ public void onNext(Completable t) { @Override public void onError(Throwable t) { - if (ONCE.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(false, true)) { actual.onError(t); return; } @@ -121,7 +120,7 @@ void next() { Completable c = queue.poll(); if (c == null) { if (d) { - if (ONCE.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(false, true)) { actual.onCompleted(); } return; diff --git a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java index 651ae49bc1..c59578b2e5 100644 --- a/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java +++ b/src/main/java/rx/internal/operators/CompletableOnSubscribeMerge.java @@ -55,14 +55,9 @@ static final class CompletableMergeSubscriber volatile boolean done; - volatile Queue errors; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater ERRORS = - AtomicReferenceFieldUpdater.newUpdater(CompletableMergeSubscriber.class, Queue.class, "errors"); + final AtomicReference> errors; - volatile int once; - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(CompletableMergeSubscriber.class, "once"); + final AtomicBoolean once; final AtomicInteger wip; @@ -72,6 +67,8 @@ public CompletableMergeSubscriber(CompletableSubscriber actual, int maxConcurren this.delayErrors = delayErrors; this.set = new CompositeSubscription(); this.wip = new AtomicInteger(1); + this.once = new AtomicBoolean(); + this.errors = new AtomicReference>(); if (maxConcurrency == Integer.MAX_VALUE) { request(Long.MAX_VALUE); } else { @@ -80,17 +77,17 @@ public CompletableMergeSubscriber(CompletableSubscriber actual, int maxConcurren } Queue getOrCreateErrors() { - Queue q = errors; + Queue q = errors.get(); if (q != null) { return q; } q = new ConcurrentLinkedQueue(); - if (ERRORS.compareAndSet(this, null, q)) { + if (errors.compareAndSet(null, q)) { return q; } - return errors; + return errors.get(); } @Override @@ -167,12 +164,12 @@ public void onCompleted() { void terminate() { if (wip.decrementAndGet() == 0) { - Queue q = errors; + Queue q = errors.get(); if (q == null || q.isEmpty()) { actual.onCompleted(); } else { Throwable e = collectErrors(q); - if (ONCE.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(false, true)) { actual.onError(e); } else { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); @@ -180,10 +177,10 @@ void terminate() { } } else if (!delayErrors) { - Queue q = errors; + Queue q = errors.get(); if (q != null && !q.isEmpty()) { Throwable e = collectErrors(q); - if (ONCE.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(false, true)) { actual.onError(e); } else { RxJavaPlugins.getInstance().getErrorHandler().handleError(e); diff --git a/src/main/java/rx/internal/operators/OperatorGroupBy.java b/src/main/java/rx/internal/operators/OperatorGroupBy.java index 05fbd6445f..bf474619bb 100644 --- a/src/main/java/rx/internal/operators/OperatorGroupBy.java +++ b/src/main/java/rx/internal/operators/OperatorGroupBy.java @@ -106,28 +106,16 @@ public static final class GroupBySubscriber final ProducerArbiter s; - volatile int cancelled; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater CANCELLED = - AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "cancelled"); + final AtomicBoolean cancelled; - volatile long requested; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(GroupBySubscriber.class, "requested"); - - volatile int groupCount; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater GROUP_COUNT = - AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "groupCount"); + final AtomicLong requested; + + final AtomicInteger groupCount; Throwable error; volatile boolean done; - volatile int wip; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater WIP = - AtomicIntegerFieldUpdater.newUpdater(GroupBySubscriber.class, "wip"); + final AtomicInteger wip; public GroupBySubscriber(Subscriber> actual, Func1 keySelector, Func1 valueSelector, int bufferSize, boolean delayError) { this.actual = actual; @@ -137,10 +125,13 @@ public GroupBySubscriber(Subscriber> actual, Fun this.delayError = delayError; this.groups = new ConcurrentHashMap>(); this.queue = new ConcurrentLinkedQueue>(); - GROUP_COUNT.lazySet(this, 1); this.s = new ProducerArbiter(); this.s.request(bufferSize); this.producer = new GroupByProducer(this); + this.cancelled = new AtomicBoolean(); + this.requested = new AtomicLong(); + this.groupCount = new AtomicInteger(1); + this.wip = new AtomicInteger(); } @Override @@ -172,11 +163,11 @@ public void onNext(T t) { if (group == null) { // if the main has been cancelled, stop creating groups // and skip this value - if (cancelled == 0) { + if (!cancelled.get()) { group = GroupedUnicast.createWith(key, bufferSize, this, delayError); groups.put(mapKey, group); - GROUP_COUNT.getAndIncrement(this); + groupCount.getAndIncrement(); notNew = false; q.offer(group); @@ -210,7 +201,7 @@ public void onError(Throwable t) { } error = t; done = true; - GROUP_COUNT.decrementAndGet(this); + groupCount.decrementAndGet(); drain(); } @@ -226,7 +217,7 @@ public void onCompleted() { groups.clear(); done = true; - GROUP_COUNT.decrementAndGet(this); + groupCount.decrementAndGet(); drain(); } @@ -235,15 +226,15 @@ public void requestMore(long n) { throw new IllegalArgumentException("n >= 0 required but it was " + n); } - BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + BackpressureUtils.getAndAddRequest(requested, n); drain(); } public void cancel() { // cancelling the main source means we don't want any more groups // but running groups still require new values - if (CANCELLED.compareAndSet(this, 0, 1)) { - if (GROUP_COUNT.decrementAndGet(this) == 0) { + if (cancelled.compareAndSet(false, true)) { + if (groupCount.decrementAndGet() == 0) { unsubscribe(); } } @@ -252,14 +243,14 @@ public void cancel() { public void cancel(K key) { Object mapKey = key != null ? key : NULL_KEY; if (groups.remove(mapKey) != null) { - if (GROUP_COUNT.decrementAndGet(this) == 0) { + if (groupCount.decrementAndGet() == 0) { unsubscribe(); } } } void drain() { - if (WIP.getAndIncrement(this) != 0) { + if (wip.getAndIncrement() != 0) { return; } @@ -274,7 +265,7 @@ void drain() { return; } - long r = requested; + long r = requested.get(); boolean unbounded = r == Long.MAX_VALUE; long e = 0L; @@ -301,12 +292,12 @@ void drain() { if (e != 0L) { if (!unbounded) { - REQUESTED.addAndGet(this, e); + requested.addAndGet(e); } s.request(-e); } - missed = WIP.addAndGet(this, -missed); + missed = wip.addAndGet(-missed); if (missed == 0) { break; } @@ -378,28 +369,16 @@ static final class State extends AtomicInteger implements Producer, Subscr final GroupBySubscriber parent; final boolean delayError; - volatile long requested; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater REQUESTED = - AtomicLongFieldUpdater.newUpdater(State.class, "requested"); + final AtomicLong requested; volatile boolean done; Throwable error; - volatile int cancelled; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater CANCELLED = - AtomicIntegerFieldUpdater.newUpdater(State.class, "cancelled"); - - volatile Subscriber actual; - @SuppressWarnings("rawtypes") - static final AtomicReferenceFieldUpdater ACTUAL = - AtomicReferenceFieldUpdater.newUpdater(State.class, Subscriber.class, "actual"); + final AtomicBoolean cancelled; + + final AtomicReference> actual; - volatile int once; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater ONCE = - AtomicIntegerFieldUpdater.newUpdater(State.class, "once"); + final AtomicBoolean once; public State(int bufferSize, GroupBySubscriber parent, K key, boolean delayError) { @@ -407,6 +386,10 @@ public State(int bufferSize, GroupBySubscriber parent, K key, boolean d this.parent = parent; this.key = key; this.delayError = delayError; + this.cancelled = new AtomicBoolean(); + this.actual = new AtomicReference>(); + this.once = new AtomicBoolean(); + this.requested = new AtomicLong(); } @Override @@ -415,19 +398,19 @@ public void request(long n) { throw new IllegalArgumentException("n >= required but it was " + n); } if (n != 0L) { - BackpressureUtils.getAndAddRequest(REQUESTED, this, n); + BackpressureUtils.getAndAddRequest(requested, n); drain(); } } @Override public boolean isUnsubscribed() { - return cancelled != 0; + return cancelled.get(); } @Override public void unsubscribe() { - if (CANCELLED.compareAndSet(this, 0, 1)) { + if (cancelled.compareAndSet(false, true)) { if (getAndIncrement() == 0) { parent.cancel(key); } @@ -436,10 +419,10 @@ public void unsubscribe() { @Override public void call(Subscriber s) { - if (ONCE.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(false, true)) { s.add(this); s.setProducer(this); - ACTUAL.lazySet(this, s); + actual.lazySet(s); drain(); } else { s.onError(new IllegalStateException("Only one Subscriber allowed!")); @@ -475,7 +458,7 @@ void drain() { final Queue q = queue; final boolean delayError = this.delayError; - Subscriber a = actual; + Subscriber a = actual.get(); NotificationLite nl = NotificationLite.instance(); for (;;) { if (a != null) { @@ -483,7 +466,7 @@ void drain() { return; } - long r = requested; + long r = requested.get(); boolean unbounded = r == Long.MAX_VALUE; long e = 0; @@ -508,7 +491,7 @@ void drain() { if (e != 0L) { if (!unbounded) { - REQUESTED.addAndGet(this, e); + requested.addAndGet(e); } parent.s.request(-e); } @@ -519,13 +502,13 @@ void drain() { break; } if (a == null) { - a = actual; + a = actual.get(); } } } boolean checkTerminated(boolean d, boolean empty, Subscriber a, boolean delayError) { - if (cancelled != 0) { + if (cancelled.get()) { queue.clear(); parent.cancel(key); return true; diff --git a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java index 1ebfa56fe4..c1655f0aa3 100644 --- a/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java +++ b/src/main/java/rx/internal/schedulers/CachedThreadScheduler.java @@ -177,19 +177,17 @@ private static final class EventLoopWorker extends Scheduler.Worker { private final CompositeSubscription innerSubscription = new CompositeSubscription(); private final CachedWorkerPool pool; private final ThreadWorker threadWorker; - @SuppressWarnings("unused") - volatile int once; - static final AtomicIntegerFieldUpdater ONCE_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(EventLoopWorker.class, "once"); + final AtomicBoolean once; EventLoopWorker(CachedWorkerPool pool) { this.pool = pool; + this.once = new AtomicBoolean(); this.threadWorker = pool.get(); } @Override public void unsubscribe() { - if (ONCE_UPDATER.compareAndSet(this, 0, 1)) { + if (once.compareAndSet(false, true)) { // unsubscribe should be idempotent, so only do this once pool.release(threadWorker); } diff --git a/src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java index 1151a7a3a9..53f9b719ce 100644 --- a/src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java +++ b/src/main/java/rx/internal/util/atomic/SpscExactAtomicArrayQueue.java @@ -33,21 +33,16 @@ public final class SpscExactAtomicArrayQueue extends AtomicReferenceArray private static final long serialVersionUID = 6210984603741293445L; final int mask; final int capacitySkip; - volatile long producerIndex; - volatile long consumerIndex; - - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater PRODUCER_INDEX = - AtomicLongFieldUpdater.newUpdater(SpscExactAtomicArrayQueue.class, "producerIndex"); - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater CONSUMER_INDEX = - AtomicLongFieldUpdater.newUpdater(SpscExactAtomicArrayQueue.class, "consumerIndex"); + final AtomicLong producerIndex; + final AtomicLong consumerIndex; public SpscExactAtomicArrayQueue(int capacity) { super(Pow2.roundToPowerOfTwo(capacity)); int len = length(); this.mask = len - 1; - this.capacitySkip = len - capacity; + this.capacitySkip = len - capacity; + this.producerIndex = new AtomicLong(); + this.consumerIndex = new AtomicLong(); } @@ -57,7 +52,7 @@ public boolean offer(T value) { throw new NullPointerException(); } - long pi = producerIndex; + long pi = producerIndex.get(); int m = mask; int fullCheck = (int)(pi + capacitySkip) & m; @@ -65,25 +60,25 @@ public boolean offer(T value) { return false; } int offset = (int)pi & m; - PRODUCER_INDEX.lazySet(this, pi + 1); + producerIndex.lazySet(pi + 1); lazySet(offset, value); return true; } @Override public T poll() { - long ci = consumerIndex; + long ci = consumerIndex.get(); int offset = (int)ci & mask; T value = get(offset); if (value == null) { return null; } - CONSUMER_INDEX.lazySet(this, ci + 1); + consumerIndex.lazySet(ci + 1); lazySet(offset, null); return value; } @Override public T peek() { - return get((int)consumerIndex & mask); + return get((int)consumerIndex.get() & mask); } @Override public void clear() { @@ -96,10 +91,10 @@ public boolean isEmpty() { @Override public int size() { - long ci = consumerIndex; + long ci = consumerIndex.get(); for (;;) { - long pi = producerIndex; - long ci2 = consumerIndex; + long pi = producerIndex.get(); + long ci2 = consumerIndex.get(); if (ci == ci2) { return (int)(pi - ci2); } diff --git a/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java index 36bba49862..728591794e 100644 --- a/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java +++ b/src/main/java/rx/internal/util/atomic/SpscUnboundedAtomicArrayQueue.java @@ -32,25 +32,21 @@ */ public final class SpscUnboundedAtomicArrayQueue implements Queue { static final int MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096); - protected volatile long producerIndex; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater PRODUCER_INDEX = - AtomicLongFieldUpdater.newUpdater(SpscUnboundedAtomicArrayQueue.class, "producerIndex"); + final AtomicLong producerIndex; protected int producerLookAheadStep; protected long producerLookAhead; protected int producerMask; protected AtomicReferenceArray producerBuffer; protected int consumerMask; protected AtomicReferenceArray consumerBuffer; - protected volatile long consumerIndex; - @SuppressWarnings("rawtypes") - static final AtomicLongFieldUpdater CONSUMER_INDEX = - AtomicLongFieldUpdater.newUpdater(SpscUnboundedAtomicArrayQueue.class, "consumerIndex"); + final AtomicLong consumerIndex; private static final Object HAS_NEXT = new Object(); public SpscUnboundedAtomicArrayQueue(final int bufferSize) { int p2capacity = Pow2.roundToPowerOfTwo(Math.max(8, bufferSize)); // lookahead doesn't work with capacity < 8 int mask = p2capacity - 1; + this.producerIndex = new AtomicLong(); + this.consumerIndex = new AtomicLong(); AtomicReferenceArray buffer = new AtomicReferenceArray(p2capacity + 1); producerBuffer = buffer; producerMask = mask; @@ -221,27 +217,27 @@ private void adjustLookAheadStep(int capacity) { } private long lvProducerIndex() { - return producerIndex; + return producerIndex.get(); } private long lvConsumerIndex() { - return consumerIndex; + return consumerIndex.get(); } private long lpProducerIndex() { - return producerIndex; + return producerIndex.get(); } private long lpConsumerIndex() { - return consumerIndex; + return consumerIndex.get(); } private void soProducerIndex(long v) { - PRODUCER_INDEX.lazySet(this, v); + producerIndex.lazySet(v); } private void soConsumerIndex(long v) { - CONSUMER_INDEX.lazySet(this, v); + consumerIndex.lazySet(v); } private static int calcWrappedOffset(long index, int mask) { diff --git a/src/main/java/rx/observables/AsyncOnSubscribe.java b/src/main/java/rx/observables/AsyncOnSubscribe.java index 56b963429e..9b8926622f 100644 --- a/src/main/java/rx/observables/AsyncOnSubscribe.java +++ b/src/main/java/rx/observables/AsyncOnSubscribe.java @@ -17,7 +17,7 @@ package rx.observables; import java.util.*; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; +import java.util.concurrent.atomic.AtomicBoolean; import rx.*; import rx.Observable; @@ -25,8 +25,8 @@ import rx.Observer; import rx.annotations.Experimental; import rx.functions.*; -import rx.internal.operators.*; -import rx.observers.*; +import rx.internal.operators.BufferUntilSubscriber; +import rx.observers.SerializedObserver; import rx.plugins.RxJavaPlugins; import rx.subscriptions.CompositeSubscription; @@ -359,9 +359,7 @@ public Observable call(Observable v) { static final class AsyncOuterManager implements Producer, Subscription, Observer> { - private volatile int isUnsubscribed; - @SuppressWarnings("rawtypes") - private static final AtomicIntegerFieldUpdater IS_UNSUBSCRIBED = AtomicIntegerFieldUpdater.newUpdater(AsyncOuterManager.class, "isUnsubscribed"); + final AtomicBoolean isUnsubscribed; private final AsyncOnSubscribe parent; private final SerializedObserver> serializedSubscriber; @@ -385,11 +383,12 @@ public AsyncOuterManager(AsyncOnSubscribe parent, S initialState, UnicastS this.serializedSubscriber = new SerializedObserver>(this); this.state = initialState; this.merger = merger; + this.isUnsubscribed = new AtomicBoolean(); } @Override public void unsubscribe() { - if (IS_UNSUBSCRIBED.compareAndSet(this, 0, 1)) { + if (isUnsubscribed.compareAndSet(false, true)) { synchronized (this) { if (emitting) { requests = new ArrayList(); @@ -411,7 +410,7 @@ void setConcatProducer(Producer p) { @Override public boolean isUnsubscribed() { - return isUnsubscribed != 0; + return isUnsubscribed.get(); } public void nextIteration(long requestCount) {