Skip to content

Commit fb80b82

Browse files
committed
Merge now operates in horizontally unbounded mode.
1 parent adfabec commit fb80b82

File tree

8 files changed

+925
-24
lines changed

8 files changed

+925
-24
lines changed

src/main/java/rx/internal/operators/OperatorMerge.java

Lines changed: 25 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,15 @@
1717

1818
import java.util.*;
1919
import java.util.concurrent.ConcurrentLinkedQueue;
20-
import java.util.concurrent.atomic.*;
20+
import java.util.concurrent.atomic.AtomicLong;
2121

2222
import rx.*;
23-
import rx.Observable.Operator;
2423
import rx.Observable;
24+
import rx.Observable.Operator;
2525
import rx.exceptions.*;
2626
import rx.internal.util.*;
27+
import rx.internal.util.atomic.*;
28+
import rx.internal.util.unsafe.*;
2729
import rx.subscriptions.CompositeSubscription;
2830

2931
/**
@@ -144,7 +146,7 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
144146

145147
MergeProducer<T> producer;
146148

147-
volatile RxRingBuffer queue;
149+
volatile Queue<Object> queue;
148150

149151
/** Tracks the active subscriptions to sources. */
150152
volatile CompositeSubscription subscriptions;
@@ -182,8 +184,7 @@ public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int max
182184
this.nl = NotificationLite.instance();
183185
this.innerGuard = new Object();
184186
this.innerSubscribers = EMPTY;
185-
long r = Math.min(maxConcurrent, RxRingBuffer.SIZE);
186-
request(r);
187+
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
187188
}
188189

189190
Queue<Throwable> getOrCreateErrorQueue() {
@@ -443,23 +444,27 @@ protected void queueScalar(T value) {
443444
* due to lack of requests or an ongoing emission,
444445
* enqueue the value and try the slow emission path.
445446
*/
446-
RxRingBuffer q = this.queue;
447+
Queue<Object> q = this.queue;
447448
if (q == null) {
448-
q = RxRingBuffer.getSpscInstance();
449-
this.add(q);
449+
int mc = maxConcurrent;
450+
if (mc == Integer.MAX_VALUE) {
451+
q = new SpscUnboundedAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
452+
} else {
453+
if (Pow2.isPowerOfTwo(mc)) {
454+
if (UnsafeAccess.isUnsafeAvailable()) {
455+
q = new SpscArrayQueue<Object>(mc);
456+
} else {
457+
q = new SpscAtomicArrayQueue<Object>(mc);
458+
}
459+
} else {
460+
q = new SpscExactAtomicArrayQueue<Object>(mc);
461+
}
462+
}
450463
this.queue = q;
451464
}
452-
try {
453-
q.onNext(nl.next(value));
454-
} catch (MissingBackpressureException ex) {
455-
this.unsubscribe();
456-
this.onError(ex);
457-
return;
458-
} catch (IllegalStateException ex) {
459-
if (!this.isUnsubscribed()) {
460-
this.unsubscribe();
461-
this.onError(ex);
462-
}
465+
if (!q.offer(value)) {
466+
unsubscribe();
467+
onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value));
463468
return;
464469
}
465470
emit();
@@ -533,7 +538,7 @@ void emitLoop() {
533538
skipFinal = true;
534539
return;
535540
}
536-
RxRingBuffer svq = queue;
541+
Queue<Object> svq = queue;
537542

538543
long r = producer.get();
539544
boolean unbounded = r == Long.MAX_VALUE;
@@ -610,9 +615,6 @@ void emitLoop() {
610615
} else {
611616
reportError();
612617
}
613-
if (svq != null) {
614-
svq.release();
615-
}
616618
skipFinal = true;
617619
return;
618620
}
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*
14+
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
15+
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/AtomicReferenceArrayQueue.java
16+
*/
17+
package rx.internal.util.atomic;
18+
19+
import java.util.*;
20+
import java.util.concurrent.atomic.AtomicReferenceArray;
21+
22+
import rx.internal.util.unsafe.Pow2;
23+
24+
abstract class AtomicReferenceArrayQueue<E> extends AbstractQueue<E> {
25+
protected final AtomicReferenceArray<E> buffer;
26+
protected final int mask;
27+
public AtomicReferenceArrayQueue(int capacity) {
28+
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
29+
this.mask = actualCapacity - 1;
30+
this.buffer = new AtomicReferenceArray<E>(actualCapacity);
31+
}
32+
@Override
33+
public Iterator<E> iterator() {
34+
throw new UnsupportedOperationException();
35+
}
36+
@Override
37+
public void clear() {
38+
// we have to test isEmpty because of the weaker poll() guarantee
39+
while (poll() != null || !isEmpty())
40+
;
41+
}
42+
protected final int calcElementOffset(long index, int mask) {
43+
return (int)index & mask;
44+
}
45+
protected final int calcElementOffset(long index) {
46+
return (int)index & mask;
47+
}
48+
protected final E lvElement(AtomicReferenceArray<E> buffer, int offset) {
49+
return buffer.get(offset);
50+
}
51+
protected final E lpElement(AtomicReferenceArray<E> buffer, int offset) {
52+
return buffer.get(offset); // no weaker form available
53+
}
54+
protected final E lpElement(int offset) {
55+
return buffer.get(offset); // no weaker form available
56+
}
57+
protected final void spElement(AtomicReferenceArray<E> buffer, int offset, E value) {
58+
buffer.lazySet(offset, value); // no weaker form available
59+
}
60+
protected final void spElement(int offset, E value) {
61+
buffer.lazySet(offset, value); // no weaker form available
62+
}
63+
protected final void soElement(AtomicReferenceArray<E> buffer, int offset, E value) {
64+
buffer.lazySet(offset, value);
65+
}
66+
protected final void soElement(int offset, E value) {
67+
buffer.lazySet(offset, value);
68+
}
69+
protected final void svElement(AtomicReferenceArray<E> buffer, int offset, E value) {
70+
buffer.set(offset, value);
71+
}
72+
protected final E lvElement(int offset) {
73+
return lvElement(buffer, offset);
74+
}
75+
}
76+
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Licensed under the Apache License, Version 2.0 (the "License");
3+
* you may not use this file except in compliance with the License.
4+
* You may obtain a copy of the License at
5+
*
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* Unless required by applicable law or agreed to in writing, software
9+
* distributed under the License is distributed on an "AS IS" BASIS,
10+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11+
* See the License for the specific language governing permissions and
12+
* limitations under the License.
13+
*
14+
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
15+
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java
16+
*/
17+
package rx.internal.util.atomic;
18+
19+
import java.util.concurrent.atomic.*;
20+
21+
/**
22+
* A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
23+
* <p>
24+
* This implementation is a mashup of the <a href="http://sourceforge.net/projects/mc-fastflow/">Fast Flow</a>
25+
* algorithm with an optimization of the offer method taken from the <a
26+
* href="http://staff.ustc.edu.cn/~bhua/publications/IJPP_draft.pdf">BQueue</a> algorithm (a variation on Fast
27+
* Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.<br>
28+
* For convenience the relevant papers are available in the resources folder:<br>
29+
* <i>2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf<br>
30+
* 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf <br>
31+
* </i> This implementation is wait free.
32+
*
33+
* @param <E>
34+
*/
35+
public final class SpscAtomicArrayQueue<E> extends AtomicReferenceArrayQueue<E> {
36+
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
37+
final AtomicLong producerIndex;
38+
protected long producerLookAhead;
39+
final AtomicLong consumerIndex;
40+
final int lookAheadStep;
41+
public SpscAtomicArrayQueue(int capacity) {
42+
super(capacity);
43+
this.producerIndex = new AtomicLong();
44+
this.consumerIndex = new AtomicLong();
45+
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
46+
}
47+
48+
@Override
49+
public boolean offer(E e) {
50+
if (null == e) {
51+
throw new NullPointerException("Null is not a valid element");
52+
}
53+
// local load of field to avoid repeated loads after volatile reads
54+
final AtomicReferenceArray<E> buffer = this.buffer;
55+
final int mask = this.mask;
56+
final long index = producerIndex.get();
57+
final int offset = calcElementOffset(index, mask);
58+
if (index >= producerLookAhead) {
59+
int step = lookAheadStep;
60+
if (null == lvElement(buffer, calcElementOffset(index + step, mask))) {// LoadLoad
61+
producerLookAhead = index + step;
62+
}
63+
else if (null != lvElement(buffer, offset)){
64+
return false;
65+
}
66+
}
67+
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
68+
soElement(buffer, offset, e); // StoreStore
69+
return true;
70+
}
71+
72+
@Override
73+
public E poll() {
74+
final long index = consumerIndex.get();
75+
final int offset = calcElementOffset(index);
76+
// local load of field to avoid repeated loads after volatile reads
77+
final AtomicReferenceArray<E> lElementBuffer = buffer;
78+
final E e = lvElement(lElementBuffer, offset);// LoadLoad
79+
if (null == e) {
80+
return null;
81+
}
82+
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
83+
soElement(lElementBuffer, offset, null);// StoreStore
84+
return e;
85+
}
86+
87+
@Override
88+
public E peek() {
89+
return lvElement(calcElementOffset(consumerIndex.get()));
90+
}
91+
92+
@Override
93+
public int size() {
94+
/*
95+
* It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
96+
* indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
97+
* polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
98+
*/
99+
long after = lvConsumerIndex();
100+
while (true) {
101+
final long before = after;
102+
final long currentProducerIndex = lvProducerIndex();
103+
after = lvConsumerIndex();
104+
if (before == after) {
105+
return (int) (currentProducerIndex - after);
106+
}
107+
}
108+
}
109+
110+
private void soProducerIndex(long newIndex) {
111+
producerIndex.lazySet(newIndex);
112+
}
113+
114+
private void soConsumerIndex(long newIndex) {
115+
consumerIndex.lazySet(newIndex);
116+
}
117+
118+
private long lvConsumerIndex() {
119+
return consumerIndex.get();
120+
}
121+
private long lvProducerIndex() {
122+
return producerIndex.get();
123+
}
124+
}
125+

0 commit comments

Comments
 (0)