Skip to content

Merge can now operate in horizontally unbounded mode. #3169

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 25 additions & 23 deletions src/main/java/rx/internal/operators/OperatorMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@

import java.util.*;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.*;
import java.util.concurrent.atomic.AtomicLong;

import rx.*;
import rx.Observable.Operator;
import rx.Observable;
import rx.Observable.Operator;
import rx.exceptions.*;
import rx.internal.util.*;
import rx.internal.util.atomic.*;
import rx.internal.util.unsafe.*;
import rx.subscriptions.CompositeSubscription;

/**
Expand Down Expand Up @@ -144,7 +146,7 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>

MergeProducer<T> producer;

volatile RxRingBuffer queue;
volatile Queue<Object> queue;

/** Tracks the active subscriptions to sources. */
volatile CompositeSubscription subscriptions;
Expand Down Expand Up @@ -182,8 +184,7 @@ public MergeSubscriber(Subscriber<? super T> child, boolean delayErrors, int max
this.nl = NotificationLite.instance();
this.innerGuard = new Object();
this.innerSubscribers = EMPTY;
long r = Math.min(maxConcurrent, RxRingBuffer.SIZE);
request(r);
request(maxConcurrent == Integer.MAX_VALUE ? Long.MAX_VALUE : maxConcurrent);
}

Queue<Throwable> getOrCreateErrorQueue() {
Expand Down Expand Up @@ -443,23 +444,27 @@ protected void queueScalar(T value) {
* due to lack of requests or an ongoing emission,
* enqueue the value and try the slow emission path.
*/
RxRingBuffer q = this.queue;
Queue<Object> q = this.queue;
if (q == null) {
q = RxRingBuffer.getSpscInstance();
this.add(q);
int mc = maxConcurrent;
if (mc == Integer.MAX_VALUE) {
q = new SpscUnboundedAtomicArrayQueue<Object>(RxRingBuffer.SIZE);
} else {
if (Pow2.isPowerOfTwo(mc)) {
if (UnsafeAccess.isUnsafeAvailable()) {
q = new SpscArrayQueue<Object>(mc);
} else {
q = new SpscAtomicArrayQueue<Object>(mc);
}
} else {
q = new SpscExactAtomicArrayQueue<Object>(mc);
}
}
this.queue = q;
}
try {
q.onNext(nl.next(value));
} catch (MissingBackpressureException ex) {
this.unsubscribe();
this.onError(ex);
return;
} catch (IllegalStateException ex) {
if (!this.isUnsubscribed()) {
this.unsubscribe();
this.onError(ex);
}
if (!q.offer(value)) {
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(new MissingBackpressureException(), value));
return;
}
emit();
Expand Down Expand Up @@ -533,7 +538,7 @@ void emitLoop() {
skipFinal = true;
return;
}
RxRingBuffer svq = queue;
Queue<Object> svq = queue;

long r = producer.get();
boolean unbounded = r == Long.MAX_VALUE;
Expand Down Expand Up @@ -610,9 +615,6 @@ void emitLoop() {
} else {
reportError();
}
if (svq != null) {
svq.release();
}
skipFinal = true;
return;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/AtomicReferenceArrayQueue.java
*/
package rx.internal.util.atomic;

import java.util.*;
import java.util.concurrent.atomic.AtomicReferenceArray;

import rx.internal.util.unsafe.Pow2;

abstract class AtomicReferenceArrayQueue<E> extends AbstractQueue<E> {
protected final AtomicReferenceArray<E> buffer;
protected final int mask;
public AtomicReferenceArrayQueue(int capacity) {
int actualCapacity = Pow2.roundToPowerOfTwo(capacity);
this.mask = actualCapacity - 1;
this.buffer = new AtomicReferenceArray<E>(actualCapacity);
}
@Override
public Iterator<E> iterator() {
throw new UnsupportedOperationException();
}
@Override
public void clear() {
// we have to test isEmpty because of the weaker poll() guarantee
while (poll() != null || !isEmpty())
;
}
protected final int calcElementOffset(long index, int mask) {
return (int)index & mask;
}
protected final int calcElementOffset(long index) {
return (int)index & mask;
}
protected final E lvElement(AtomicReferenceArray<E> buffer, int offset) {
return buffer.get(offset);
}
protected final E lpElement(AtomicReferenceArray<E> buffer, int offset) {
return buffer.get(offset); // no weaker form available
}
protected final E lpElement(int offset) {
return buffer.get(offset); // no weaker form available
}
protected final void spElement(AtomicReferenceArray<E> buffer, int offset, E value) {
buffer.lazySet(offset, value); // no weaker form available
}
protected final void spElement(int offset, E value) {
buffer.lazySet(offset, value); // no weaker form available
}
protected final void soElement(AtomicReferenceArray<E> buffer, int offset, E value) {
buffer.lazySet(offset, value);
}
protected final void soElement(int offset, E value) {
buffer.lazySet(offset, value);
}
protected final void svElement(AtomicReferenceArray<E> buffer, int offset, E value) {
buffer.set(offset, value);
}
protected final E lvElement(int offset) {
return lvElement(buffer, offset);
}
}
124 changes: 124 additions & 0 deletions src/main/java/rx/internal/util/atomic/SpscAtomicArrayQueue.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Original License: https://github.com/JCTools/JCTools/blob/master/LICENSE
* Original location: https://github.com/JCTools/JCTools/blob/master/jctools-core/src/main/java/org/jctools/queues/atomic/SpscAtomicArrayQueue.java
*/
package rx.internal.util.atomic;

import java.util.concurrent.atomic.*;

/**
* A Single-Producer-Single-Consumer queue backed by a pre-allocated buffer.
* <p>
* This implementation is a mashup of the <a href="http://sourceforge.net/projects/mc-fastflow/">Fast Flow</a>
* algorithm with an optimization of the offer method taken from the <a
* href="http://staff.ustc.edu.cn/~bhua/publications/IJPP_draft.pdf">BQueue</a> algorithm (a variation on Fast
* Flow), and adjusted to comply with Queue.offer semantics with regards to capacity.<br>
* For convenience the relevant papers are available in the resources folder:<br>
* <i>2010 - Pisa - SPSC Queues on Shared Cache Multi-Core Systems.pdf<br>
* 2012 - Junchang- BQueue- Efficient and Practical Queuing.pdf <br>
* </i> This implementation is wait free.
*
* @param <E>
*/
public final class SpscAtomicArrayQueue<E> extends AtomicReferenceArrayQueue<E> {
private static final Integer MAX_LOOK_AHEAD_STEP = Integer.getInteger("jctools.spsc.max.lookahead.step", 4096);
final AtomicLong producerIndex;
protected long producerLookAhead;
final AtomicLong consumerIndex;
final int lookAheadStep;
public SpscAtomicArrayQueue(int capacity) {
super(capacity);
this.producerIndex = new AtomicLong();
this.consumerIndex = new AtomicLong();
lookAheadStep = Math.min(capacity / 4, MAX_LOOK_AHEAD_STEP);
}

@Override
public boolean offer(E e) {
if (null == e) {
throw new NullPointerException("Null is not a valid element");
}
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<E> buffer = this.buffer;
final int mask = this.mask;
final long index = producerIndex.get();
final int offset = calcElementOffset(index, mask);
if (index >= producerLookAhead) {
int step = lookAheadStep;
if (null == lvElement(buffer, calcElementOffset(index + step, mask))) {// LoadLoad
producerLookAhead = index + step;
}
else if (null != lvElement(buffer, offset)){
return false;
}
}
soProducerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(buffer, offset, e); // StoreStore
return true;
}

@Override
public E poll() {
final long index = consumerIndex.get();
final int offset = calcElementOffset(index);
// local load of field to avoid repeated loads after volatile reads
final AtomicReferenceArray<E> lElementBuffer = buffer;
final E e = lvElement(lElementBuffer, offset);// LoadLoad
if (null == e) {
return null;
}
soConsumerIndex(index + 1); // ordered store -> atomic and ordered for size()
soElement(lElementBuffer, offset, null);// StoreStore
return e;
}

@Override
public E peek() {
return lvElement(calcElementOffset(consumerIndex.get()));
}

@Override
public int size() {
/*
* It is possible for a thread to be interrupted or reschedule between the read of the producer and consumer
* indices, therefore protection is required to ensure size is within valid range. In the event of concurrent
* polls/offers to this method the size is OVER estimated as we read consumer index BEFORE the producer index.
*/
long after = lvConsumerIndex();
while (true) {
final long before = after;
final long currentProducerIndex = lvProducerIndex();
after = lvConsumerIndex();
if (before == after) {
return (int) (currentProducerIndex - after);
}
}
}

private void soProducerIndex(long newIndex) {
producerIndex.lazySet(newIndex);
}

private void soConsumerIndex(long newIndex) {
consumerIndex.lazySet(newIndex);
}

private long lvConsumerIndex() {
return consumerIndex.get();
}
private long lvProducerIndex() {
return producerIndex.get();
}
}
Loading