diff --git a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java index 001058763b..d6845303f2 100644 --- a/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java +++ b/src/main/java/rx/internal/operators/OnSubscribeConcatMap.java @@ -125,7 +125,7 @@ public ConcatMapSubscriber(Subscriber actual, this.wip = new AtomicInteger(); this.error = new AtomicReference(); Queue q; - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { q = new SpscArrayQueue(prefetch); } else { q = new SpscAtomicArrayQueue(prefetch); diff --git a/src/main/java/rx/internal/operators/OnSubscribePublishMulticast.java b/src/main/java/rx/internal/operators/OnSubscribePublishMulticast.java index c1d14b29fa..90a061eb13 100644 --- a/src/main/java/rx/internal/operators/OnSubscribePublishMulticast.java +++ b/src/main/java/rx/internal/operators/OnSubscribePublishMulticast.java @@ -104,7 +104,7 @@ public OnSubscribePublishMulticast(int prefetch, boolean delayError) { } this.prefetch = prefetch; this.delayError = delayError; - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { this.queue = new SpscArrayQueue(prefetch); } else { this.queue = new SpscAtomicArrayQueue(prefetch); diff --git a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java index bbf2bcc48b..c9a8e0b242 100644 --- a/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java +++ b/src/main/java/rx/internal/operators/OperatorEagerConcatMap.java @@ -282,7 +282,7 @@ public EagerInnerSubscriber(EagerOuterSubscriber parent, int bufferSize) { super(); this.parent = parent; Queue q; - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { q = new SpscArrayQueue(bufferSize); } else { q = new SpscAtomicArrayQueue(bufferSize); diff --git a/src/main/java/rx/internal/operators/OperatorMerge.java b/src/main/java/rx/internal/operators/OperatorMerge.java index d0bbf38a24..3ba3e015a8 100644 --- a/src/main/java/rx/internal/operators/OperatorMerge.java +++ b/src/main/java/rx/internal/operators/OperatorMerge.java @@ -464,7 +464,7 @@ protected void queueScalar(T value) { q = new SpscUnboundedAtomicArrayQueue(RxRingBuffer.SIZE); } else { if (Pow2.isPowerOfTwo(mc)) { - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { q = new SpscArrayQueue(mc); } else { q = new SpscAtomicArrayQueue(mc); diff --git a/src/main/java/rx/internal/operators/OperatorObserveOn.java b/src/main/java/rx/internal/operators/OperatorObserveOn.java index 2a7c7684dd..cf26c99042 100644 --- a/src/main/java/rx/internal/operators/OperatorObserveOn.java +++ b/src/main/java/rx/internal/operators/OperatorObserveOn.java @@ -106,7 +106,7 @@ public ObserveOnSubscriber(Scheduler scheduler, Subscriber child, boo this.delayError = delayError; this.on = NotificationLite.instance(); this.bufferSize = (bufferSize > 0) ? bufferSize : RxRingBuffer.SIZE; - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { queue = new SpscArrayQueue(this.bufferSize); } else { queue = new SpscAtomicArrayQueue(this.bufferSize); diff --git a/src/main/java/rx/internal/operators/OperatorPublish.java b/src/main/java/rx/internal/operators/OperatorPublish.java index 347905e06e..98987d2588 100644 --- a/src/main/java/rx/internal/operators/OperatorPublish.java +++ b/src/main/java/rx/internal/operators/OperatorPublish.java @@ -244,7 +244,7 @@ static final class PublishSubscriber extends Subscriber implements Subscri boolean missed; public PublishSubscriber(AtomicReference> current) { - this.queue = UnsafeAccess.isUnsafeAvailable() + this.queue = UnsafeAccess.IS_UNSAFE_AVAILABLE ? new SpscArrayQueue(RxRingBuffer.SIZE) : new SynchronizedQueue(RxRingBuffer.SIZE); diff --git a/src/main/java/rx/internal/operators/OperatorScan.java b/src/main/java/rx/internal/operators/OperatorScan.java index 547edf5c1b..f176eccb65 100644 --- a/src/main/java/rx/internal/operators/OperatorScan.java +++ b/src/main/java/rx/internal/operators/OperatorScan.java @@ -186,7 +186,7 @@ public InitialProducer(R initialValue, Subscriber child) { this.child = child; Queue q; // TODO switch to the linked-array based queue once available - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { q = new SpscLinkedQueue(); // new SpscUnboundedArrayQueue(8); } else { q = new SpscLinkedAtomicQueue(); // new SpscUnboundedAtomicArrayQueue(8); diff --git a/src/main/java/rx/internal/operators/UnicastSubject.java b/src/main/java/rx/internal/operators/UnicastSubject.java index 569745358e..e82dfdc89b 100644 --- a/src/main/java/rx/internal/operators/UnicastSubject.java +++ b/src/main/java/rx/internal/operators/UnicastSubject.java @@ -146,11 +146,11 @@ public State(int capacityHint, Action0 onTerminated) { Queue q; if (capacityHint > 1) { - q = UnsafeAccess.isUnsafeAvailable() + q = UnsafeAccess.IS_UNSAFE_AVAILABLE ? new SpscUnboundedArrayQueue(capacityHint) : new SpscUnboundedAtomicArrayQueue(capacityHint); } else { - q = UnsafeAccess.isUnsafeAvailable() + q = UnsafeAccess.IS_UNSAFE_AVAILABLE ? new SpscLinkedQueue() : new SpscLinkedAtomicQueue(); } diff --git a/src/main/java/rx/internal/producers/QueuedProducer.java b/src/main/java/rx/internal/producers/QueuedProducer.java index ed892bab30..34530dc3cc 100644 --- a/src/main/java/rx/internal/producers/QueuedProducer.java +++ b/src/main/java/rx/internal/producers/QueuedProducer.java @@ -50,7 +50,7 @@ public final class QueuedProducer extends AtomicLong implements Producer, Obs * @param child the target child subscriber */ public QueuedProducer(Subscriber child) { - this(child, UnsafeAccess.isUnsafeAvailable() + this(child, UnsafeAccess.IS_UNSAFE_AVAILABLE ? new SpscLinkedQueue() : new SpscLinkedAtomicQueue()); } /** diff --git a/src/main/java/rx/internal/producers/QueuedValueProducer.java b/src/main/java/rx/internal/producers/QueuedValueProducer.java index 53853f2ccc..667d9764e4 100644 --- a/src/main/java/rx/internal/producers/QueuedValueProducer.java +++ b/src/main/java/rx/internal/producers/QueuedValueProducer.java @@ -47,7 +47,7 @@ public final class QueuedValueProducer extends AtomicLong implements Producer * @param child the target child subscriber */ public QueuedValueProducer(Subscriber child) { - this(child, UnsafeAccess.isUnsafeAvailable() + this(child, UnsafeAccess.IS_UNSAFE_AVAILABLE ? new SpscLinkedQueue() : new SpscLinkedAtomicQueue()); } /** diff --git a/src/main/java/rx/internal/util/ObjectPool.java b/src/main/java/rx/internal/util/ObjectPool.java index 0aa005208e..c882c9a7ae 100644 --- a/src/main/java/rx/internal/util/ObjectPool.java +++ b/src/main/java/rx/internal/util/ObjectPool.java @@ -139,7 +139,7 @@ public void call() { protected abstract T createObject(); private void initialize(final int min) { - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { pool = new MpmcArrayQueue(Math.max(maxSize, 1024)); } else { pool = new ConcurrentLinkedQueue(); diff --git a/src/main/java/rx/internal/util/RxRingBuffer.java b/src/main/java/rx/internal/util/RxRingBuffer.java index 5f35c7f6e5..183d102e24 100644 --- a/src/main/java/rx/internal/util/RxRingBuffer.java +++ b/src/main/java/rx/internal/util/RxRingBuffer.java @@ -33,7 +33,7 @@ public class RxRingBuffer implements Subscription { public static RxRingBuffer getSpscInstance() { - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { return new RxRingBuffer(SPSC_POOL, SIZE); } else { return new RxRingBuffer(); @@ -41,7 +41,7 @@ public static RxRingBuffer getSpscInstance() { } public static RxRingBuffer getSpmcInstance() { - if (UnsafeAccess.isUnsafeAvailable()) { + if (UnsafeAccess.IS_UNSAFE_AVAILABLE) { return new RxRingBuffer(SPMC_POOL, SIZE); } else { return new RxRingBuffer(); diff --git a/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java b/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java index a13989f4f1..0fc654c7b3 100644 --- a/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java +++ b/src/main/java/rx/internal/util/unsafe/UnsafeAccess.java @@ -20,7 +20,7 @@ import sun.misc.Unsafe; /** - * All use of this class MUST first check that UnsafeAccess.isUnsafeAvailable() == true + * All use of this class MUST first check that {@code UnsafeAccess.IS_UNSAFE_AVAILABLE == true} * otherwise NPEs will happen in environments without "suc.misc.Unsafe" such as Android. */ public final class UnsafeAccess { @@ -28,7 +28,9 @@ private UnsafeAccess() { throw new IllegalStateException("No instances!"); } - public static final Unsafe UNSAFE; + static final Unsafe UNSAFE; + public static final boolean IS_UNSAFE_AVAILABLE; + static { Unsafe u = null; try { @@ -45,10 +47,7 @@ private UnsafeAccess() { // do nothing, hasUnsafe() will return false } UNSAFE = u; - } - - public static boolean isUnsafeAvailable() { - return UNSAFE != null; + IS_UNSAFE_AVAILABLE = UNSAFE != null; } /* diff --git a/src/test/java/rx/internal/util/JCToolsQueueTests.java b/src/test/java/rx/internal/util/JCToolsQueueTests.java index fdf844bf81..a6db51068b 100644 --- a/src/test/java/rx/internal/util/JCToolsQueueTests.java +++ b/src/test/java/rx/internal/util/JCToolsQueueTests.java @@ -41,7 +41,7 @@ static void await(CyclicBarrier cb) { } @Test public void casBasedUnsafe() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } long offset = UnsafeAccess.addressOf(IntField.class, "value"); @@ -74,7 +74,7 @@ public void powerOfTwo() { @Test(expected = NullPointerException.class) public void testMpmcArrayQueueNull() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } MpmcArrayQueue q = new MpmcArrayQueue(16); @@ -83,7 +83,7 @@ public void testMpmcArrayQueueNull() { @Test(expected = UnsupportedOperationException.class) public void testMpmcArrayQueueIterator() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } MpmcArrayQueue q = new MpmcArrayQueue(16); @@ -92,7 +92,7 @@ public void testMpmcArrayQueueIterator() { @Test public void testMpmcArrayQueueOfferPoll() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } Queue q = new MpmcArrayQueue(128); @@ -102,7 +102,7 @@ public void testMpmcArrayQueueOfferPoll() { @Test public void testMpmcOfferUpToCapacity() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } int n = 128; @@ -176,7 +176,7 @@ public void run() { @Test(expected = UnsupportedOperationException.class) public void testMpscLinkedQueueIterator() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } MpscLinkedQueue q = new MpscLinkedQueue(); @@ -185,7 +185,7 @@ public void testMpscLinkedQueueIterator() { @Test(expected = NullPointerException.class) public void testMpscLinkedQueueNull() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } MpscLinkedQueue q = new MpscLinkedQueue(); @@ -194,7 +194,7 @@ public void testMpscLinkedQueueNull() { @Test public void testMpscLinkedQueueOfferPoll() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } MpscLinkedQueue q = new MpscLinkedQueue(); @@ -203,7 +203,7 @@ public void testMpscLinkedQueueOfferPoll() { } @Test(timeout = 2000) public void testMpscLinkedQueuePipelined() throws InterruptedException { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } final MpscLinkedQueue q = new MpscLinkedQueue(); @@ -273,7 +273,7 @@ protected void testOfferPoll(Queue q) { @Test(expected = NullPointerException.class) public void testSpmcArrayQueueNull() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } SpmcArrayQueue q = new SpmcArrayQueue(16); @@ -282,7 +282,7 @@ public void testSpmcArrayQueueNull() { @Test public void testSpmcArrayQueueOfferPoll() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } Queue q = new SpmcArrayQueue(128); @@ -291,7 +291,7 @@ public void testSpmcArrayQueueOfferPoll() { } @Test(expected = UnsupportedOperationException.class) public void testSpmcArrayQueueIterator() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } SpmcArrayQueue q = new SpmcArrayQueue(16); @@ -300,7 +300,7 @@ public void testSpmcArrayQueueIterator() { @Test public void testSpmcOfferUpToCapacity() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } int n = 128; @@ -313,7 +313,7 @@ public void testSpmcOfferUpToCapacity() { @Test(expected = NullPointerException.class) public void testSpscArrayQueueNull() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } SpscArrayQueue q = new SpscArrayQueue(16); @@ -322,7 +322,7 @@ public void testSpscArrayQueueNull() { @Test public void testSpscArrayQueueOfferPoll() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } Queue q = new SpscArrayQueue(128); @@ -331,7 +331,7 @@ public void testSpscArrayQueueOfferPoll() { } @Test(expected = UnsupportedOperationException.class) public void testSpscArrayQueueIterator() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } SpscArrayQueue q = new SpscArrayQueue(16); @@ -384,7 +384,7 @@ public void run() { @Test(expected = UnsupportedOperationException.class) public void testSpscLinkedQueueIterator() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } SpscLinkedQueue q = new SpscLinkedQueue(); @@ -393,7 +393,7 @@ public void testSpscLinkedQueueIterator() { @Test(expected = NullPointerException.class) public void testSpscLinkedQueueNull() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } SpscLinkedQueue q = new SpscLinkedQueue(); @@ -402,7 +402,7 @@ public void testSpscLinkedQueueNull() { @Test public void testSpscLinkedQueueOfferPoll() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } SpscLinkedQueue q = new SpscLinkedQueue(); @@ -412,7 +412,7 @@ public void testSpscLinkedQueueOfferPoll() { @Test(timeout = 2000) public void testSpscLinkedQueuePipelined() throws InterruptedException { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } final SpscLinkedQueue q = new SpscLinkedQueue(); @@ -442,7 +442,7 @@ public void run() { @Test public void testSpscOfferUpToCapacity() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } int n = 128; @@ -455,7 +455,7 @@ public void testSpscOfferUpToCapacity() { @Test(expected = InternalError.class) public void testUnsafeAccessAddressOf() { - if (!UnsafeAccess.isUnsafeAvailable()) { + if (!UnsafeAccess.IS_UNSAFE_AVAILABLE) { return; } UnsafeAccess.addressOf(Object.class, "field");