From f768f4b69fc533c26ab20d4d615b0166e33795c3 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Mon, 13 Jun 2016 12:51:04 +0200 Subject: [PATCH] 1.x: fix Spsc queues reporting not empty but then poll() returns null --- .../util/atomic/SpscAtomicArrayQueue.java | 4 +-- .../util/atomic/SpscLinkedArrayQueue.java | 14 ++++----- .../internal/util/unsafe/SpscArrayQueue.java | 4 +-- .../util/unsafe/SpscUnboundedArrayQueue.java | 8 ++--- .../operators/OperatorSwitchTest.java | 29 ++++++++++++++++++- 5 files changed, 43 insertions(+), 16 deletions(-) diff --git a/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java index cadf772d49..6240a5f156 100644 --- a/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java +++ b/src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java @@ -64,8 +64,8 @@ else if (null != lvElement(buffer, offset)){ return false; } } - soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() soElement(buffer, offset, e); // StoreStore + soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() return true; } @@ -79,8 +79,8 @@ public E poll() { if (null == e) { return null; } - soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() soElement(lElementBuffer, offset, null);// StoreStore + soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() return e; } diff --git a/src/main/java/rx/internal/util/atomic/SpscLinkedArrayQueue.java b/src/main/java/rx/internal/util/atomic/SpscLinkedArrayQueue.java index 23d8ad7c9e..4e4943dfcb 100644 --- a/src/main/java/rx/internal/util/atomic/SpscLinkedArrayQueue.java +++ b/src/main/java/rx/internal/util/atomic/SpscLinkedArrayQueue.java @@ -90,8 +90,8 @@ public final boolean offer(final T e) { } private boolean writeToQueue(final AtomicReferenceArray buffer, final T e, final long index, final int offset) { - soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms soElement(buffer, offset, e);// StoreStore + soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms return true; } @@ -101,11 +101,11 @@ private void resize(final AtomicReferenceArray oldBuffer, final long cur final AtomicReferenceArray newBuffer = new AtomicReferenceArray(capacity); producerBuffer = newBuffer; producerLookAhead = currIndex + mask - 1; - soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms soElement(newBuffer, offset, e);// StoreStore soNext(oldBuffer, newBuffer); soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is // inserted + soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms } private void soNext(AtomicReferenceArray curr, AtomicReferenceArray next) { @@ -131,8 +131,8 @@ public final T poll() { final Object e = lvElement(buffer, offset);// LoadLoad boolean isNextBuffer = e == HAS_NEXT; if (null != e && !isNextBuffer) { - soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms soElement(buffer, offset, null);// StoreStore + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms return (T) e; } else if (isNextBuffer) { return newBufferPoll(lvNext(buffer), index, mask); @@ -149,8 +149,8 @@ private T newBufferPoll(AtomicReferenceArray nextBuffer, final long inde if (null == n) { return null; } else { - soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms soElement(nextBuffer, offsetInNew, null);// StoreStore + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms return n; } } @@ -330,8 +330,8 @@ public boolean offer(T first, T second) { if (null == lvElement(buffer, pi)) { pi = calcWrappedOffset(p, m); soElement(buffer, pi + 1, second); - soProducerIndex(p + 2); soElement(buffer, pi, first); + soProducerIndex(p + 2); } else { final int capacity = buffer.length(); final AtomicReferenceArray newBuffer = new AtomicReferenceArray(capacity); @@ -342,9 +342,9 @@ public boolean offer(T first, T second) { soElement(newBuffer, pi, first); soNext(buffer, newBuffer); - soProducerIndex(p + 2);// this ensures correctness on 32bit platforms - soElement(buffer, pi, HAS_NEXT); // new buffer is visible after element is + + soProducerIndex(p + 2);// this ensures correctness on 32bit platforms } return true; diff --git a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java index 17fee1c804..ae9a5b771a 100644 --- a/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/SpscArrayQueue.java @@ -110,8 +110,8 @@ public boolean offer(final E e) { if (null != lvElement(lElementBuffer, offset)){ return false; } - soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() soElement(lElementBuffer, offset, e); // StoreStore + soProducerIndex(index + 1); // ordered store -> atomic and ordered for size() return true; } @@ -130,8 +130,8 @@ public E poll() { if (null == e) { return null; } - soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() soElement(lElementBuffer, offset, null);// StoreStore + soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size() return e; } diff --git a/src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java b/src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java index 680f62860a..6175bab455 100644 --- a/src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java +++ b/src/main/java/rx/internal/util/unsafe/SpscUnboundedArrayQueue.java @@ -132,8 +132,8 @@ public final boolean offer(final E e) { } private boolean writeToQueue(final E[] buffer, final E e, final long index, final long offset) { - soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms soElement(buffer, offset, e);// StoreStore + soProducerIndex(index + 1);// this ensures atomic write of long on 32bit platforms return true; } @@ -144,11 +144,11 @@ private void resize(final E[] oldBuffer, final long currIndex, final long offset final E[] newBuffer = (E[]) new Object[capacity]; producerBuffer = newBuffer; producerLookAhead = currIndex + mask - 1; - soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms soElement(newBuffer, offset, e);// StoreStore soNext(oldBuffer, newBuffer); soElement(oldBuffer, offset, HAS_NEXT); // new buffer is visible after element is // inserted + soProducerIndex(currIndex + 1);// this ensures correctness on 32bit platforms } private void soNext(E[] curr, E[] next) { @@ -174,8 +174,8 @@ public final E poll() { final Object e = lvElement(buffer, offset);// LoadLoad boolean isNextBuffer = e == HAS_NEXT; if (null != e && !isNextBuffer) { - soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms soElement(buffer, offset, null);// StoreStore + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms return (E) e; } else if (isNextBuffer) { return newBufferPoll(lvNext(buffer), index, mask); @@ -192,8 +192,8 @@ private E newBufferPoll(E[] nextBuffer, final long index, final long mask) { if (null == n) { return null; } else { - soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms soElement(nextBuffer, offsetInNew, null);// StoreStore + soConsumerIndex(index + 1);// this ensures correctness on 32bit platforms return n; } } diff --git a/src/test/java/rx/internal/operators/OperatorSwitchTest.java b/src/test/java/rx/internal/operators/OperatorSwitchTest.java index 836192d3e6..ae85d48864 100644 --- a/src/test/java/rx/internal/operators/OperatorSwitchTest.java +++ b/src/test/java/rx/internal/operators/OperatorSwitchTest.java @@ -22,7 +22,7 @@ import java.lang.ref.WeakReference; import java.util.*; import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.*; import org.junit.*; import org.mockito.InOrder; @@ -32,6 +32,7 @@ import rx.Observer; import rx.exceptions.*; import rx.functions.*; +import rx.internal.util.UtilityFunctions; import rx.observers.TestSubscriber; import rx.schedulers.*; import rx.subjects.PublishSubject; @@ -880,4 +881,30 @@ public void call(Throwable e) { } } } + + @Test + public void asyncInner() throws Throwable { + for (int i = 0; i < 100; i++) { + + final AtomicReference error = new AtomicReference(); + + Observable.just(Observable.range(1, 1000 * 1000).subscribeOn(Schedulers.computation())) + .switchMap(UtilityFunctions.>identity()) + .observeOn(Schedulers.computation()) + .ignoreElements() + .timeout(5, TimeUnit.SECONDS) + .toBlocking() + .subscribe(Actions.empty(), new Action1() { + @Override + public void call(Throwable e) { + error.set(e); + } + }); + + Throwable ex = error.get(); + if (ex != null) { + throw ex; + } + } + } }