Skip to content

Commit c89ea32

Browse files
committed
1.x: eager concatMap to choose safe or unsafe queue based on platform.
I forgot to add the choice because 2.x SpscArrayQueue doesn't use Unsafe. I copied the SpscAtomicArrayQueue from ReactiveX#3169 and I hope it won't conflict.
1 parent 3e2b3b1 commit c89ea32

File tree

3 files changed

+208
-2
lines changed

3 files changed

+208
-2
lines changed

src/main/java/rx/internal/operators/OperatorEagerConcatMap.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@
2424
import rx.Observable.Operator;
2525
import rx.exceptions.Exceptions;
2626
import rx.functions.*;
27-
import rx.internal.util.unsafe.SpscArrayQueue;
27+
import rx.internal.util.atomic.SpscAtomicArrayQueue;
28+
import rx.internal.util.unsafe.*;
2829
import rx.subscriptions.Subscriptions;
2930

3031
public final class OperatorEagerConcatMap<T, R> implements Operator<R, T> {
@@ -278,7 +279,13 @@ static final class EagerInnerSubscriber<T> extends Subscriber<T> {
278279
public EagerInnerSubscriber(EagerOuterSubscriber<?, T> parent, int bufferSize) {
279280
super();
280281
this.parent = parent;
281-
this.queue = new SpscArrayQueue<T>(bufferSize);
282+
Queue<T> q;
283+
if (UnsafeAccess.isUnsafeAvailable()) {
284+
q = new SpscArrayQueue<T>(bufferSize);
285+
} else {
286+
q = new SpscAtomicArrayQueue<T>(bufferSize);
287+
}
288+
this.queue = q;
282289
request(bufferSize);
283290
}
284291

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*
14+
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
15+
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/AtomicReferenceArrayQueue.java
16+
*/
17+
package rx.internal.util.atomic;
18+
19+
import java.util.*;
20+
import java.util.concurrent.atomic.AtomicReferenceArray;
21+
22+
import rx.internal.util.unsafe.Pow2;
23+
24+
abstract class AtomicReferenceArrayQueue<E> extends AbstractQueue<E> {
25+
protected final AtomicReferenceArray<E> buffer;
26+
protected final int mask;
27+
public AtomicReferenceArrayQueue(int capacity) {
28+
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
29+
this.mask = actualCapacity - 1;
30+
this.buffer = new AtomicReferenceArray<E>(actualCapacity);
31+
}
32+
@Override
33+
public Iterator<E> iterator() {
34+
throw new UnsupportedOperationException();
35+
}
36+
@Override
37+
public void clear() {
38+
// we have to test isEmpty because of the weaker poll() guarantee
39+
while (poll() != null || !isEmpty())
40+
;
41+
}
42+
protected final int calcElementOffset(long index, int mask) {
43+
return (int)index & mask;
44+
}
45+
protected final int calcElementOffset(long index) {
46+
return (int)index & mask;
47+
}
48+
protected final E lvElement(AtomicReferenceArray<E> buffer, int offset) {
49+
return buffer.get(offset);
50+
}
51+
protected final E lpElement(AtomicReferenceArray<E> buffer, int offset) {
52+
return buffer.get(offset); // no weaker form available
53+
}
54+
protected final E lpElement(int offset) {
55+
return buffer.get(offset); // no weaker form available
56+
}
57+
protected final void spElement(AtomicReferenceArray<E> buffer, int offset, E value) {
58+
buffer.lazySet(offset, value); // no weaker form available
59+
}
60+
protected final void spElement(int offset, E value) {
61+
buffer.lazySet(offset, value); // no weaker form available
62+
}
63+
protected final void soElement(AtomicReferenceArray<E> buffer, int offset, E value) {
64+
buffer.lazySet(offset, value);
65+
}
66+
protected final void soElement(int offset, E value) {
67+
buffer.lazySet(offset, value);
68+
}
69+
protected final void svElement(AtomicReferenceArray<E> buffer, int offset, E value) {
70+
buffer.set(offset, value);
71+
}
72+
protected final E lvElement(int offset) {
73+
return lvElement(buffer, offset);
74+
}
75+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*
14+
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
15+
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java
16+
*/
17+
package rx.internal.util.atomic;
18+
19+
import java.util.concurrent.atomic.*;
20+
21+
/**
22+
* A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
23+
* <p>
24+
* This implementation is a mashup of the <a href="http://sourceforge.net/projects/mc-fastflow/">Fast Flow</a>
25+
* algorithm with an optimization of the offer method taken from the <a
26+
* href="http://staff.ustc.edu.cn/~bhua/publications/IJPP_draft.pdf">BQueue</a> algorithm (a variation on Fast
27+
* Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.<br>
28+
* For convenience the relevant papers are available in the resources folder:<br>
29+
* <i>2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf<br>
30+
* 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf <br>
31+
* </i> This implementation is wait free.
32+
*
33+
* @param <E>
34+
*/
35+
public final class SpscAtomicArrayQueue<E> extends AtomicReferenceArrayQueue<E> {
36+
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
37+
final AtomicLong producerIndex;
38+
protected long producerLookAhead;
39+
final AtomicLong consumerIndex;
40+
final int lookAheadStep;
41+
public SpscAtomicArrayQueue(int capacity) {
42+
super(capacity);
43+
this.producerIndex = new AtomicLong();
44+
this.consumerIndex = new AtomicLong();
45+
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
46+
}
47+
48+
@Override
49+
public boolean offer(E e) {
50+
if (null == e) {
51+
throw new NullPointerException("Null is not a valid element");
52+
}
53+
// local load of field to avoid repeated loads after volatile reads
54+
final AtomicReferenceArray<E> buffer = this.buffer;
55+
final int mask = this.mask;
56+
final long index = producerIndex.get();
57+
final int offset = calcElementOffset(index, mask);
58+
if (index >= producerLookAhead) {
59+
int step = lookAheadStep;
60+
if (null == lvElement(buffer, calcElementOffset(index + step, mask))) {// LoadLoad
61+
producerLookAhead = index + step;
62+
}
63+
else if (null != lvElement(buffer, offset)){
64+
return false;
65+
}
66+
}
67+
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
68+
soElement(buffer, offset, e); // StoreStore
69+
return true;
70+
}
71+
72+
@Override
73+
public E poll() {
74+
final long index = consumerIndex.get();
75+
final int offset = calcElementOffset(index);
76+
// local load of field to avoid repeated loads after volatile reads
77+
final AtomicReferenceArray<E> lElementBuffer = buffer;
78+
final E e = lvElement(lElementBuffer, offset);// LoadLoad
79+
if (null == e) {
80+
return null;
81+
}
82+
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
83+
soElement(lElementBuffer, offset, null);// StoreStore
84+
return e;
85+
}
86+
87+
@Override
88+
public E peek() {
89+
return lvElement(calcElementOffset(consumerIndex.get()));
90+
}
91+
92+
@Override
93+
public int size() {
94+
/*
95+
* It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
96+
* indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
97+
* polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
98+
*/
99+
long after = lvConsumerIndex();
100+
while (true) {
101+
final long before = after;
102+
final long currentProducerIndex = lvProducerIndex();
103+
after = lvConsumerIndex();
104+
if (before == after) {
105+
return (int) (currentProducerIndex - after);
106+
}
107+
}
108+
}
109+
110+
private void soProducerIndex(long newIndex) {
111+
producerIndex.lazySet(newIndex);
112+
}
113+
114+
private void soConsumerIndex(long newIndex) {
115+
consumerIndex.lazySet(newIndex);
116+
}
117+
118+
private long lvConsumerIndex() {
119+
return consumerIndex.get();
120+
}
121+
private long lvProducerIndex() {
122+
return producerIndex.get();
123+
}
124+
}

0 commit comments

Comments
 (0)