diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 4ca4bda3e9..48fb2ef030 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -21,9 +21,9 @@ import rx.*; import rx.Observable; -import rx.exceptions.Exceptions; -import rx.exceptions.OnErrorThrowable; +import rx.exceptions.*; import rx.functions.*; +import rx.internal.util.OpenHashSet; import rx.observables.ConnectableObservable; import rx.schedulers.Timestamped; import rx.subscriptions.Subscriptions; @@ -303,8 +303,16 @@ static final class ReplaySubscriber extends Subscriber implements Subscrip /** Indicates a terminated ReplaySubscriber. */ static final InnerProducer[] TERMINATED = new InnerProducer[0]; - /** Tracks the subscribed producers. */ - final AtomicReference producers; + /** Indicates no further InnerProducers are accepted. */ + volatile boolean terminated; + /** Tracks the subscribed producers. Guarded by itself. */ + final OpenHashSet> producers; + /** Contains a copy of the producers. Modified only from the source side. */ + InnerProducer[] producersCache; + /** Contains number of modifications to the producers set.*/ + volatile long producersVersion; + /** Contains the number of modifications that the producersCache holds. */ + long producersCacheVersion; /** * Atomically changed from false to true by connect to make sure the * connection is only performed by one thread. @@ -324,12 +332,19 @@ static final class ReplaySubscriber extends Subscriber implements Subscrip /** The upstream producer. */ volatile Producer producer; + /** The queue that holds producers with request changes that need to be coordinated. */ + List> coordinationQueue; + /** Indicate that all request amounts should be considered. */ + boolean coordinateAll; + + @SuppressWarnings("unchecked") public ReplaySubscriber(AtomicReference> current, ReplayBuffer buffer) { this.buffer = buffer; this.nl = NotificationLite.instance(); - this.producers = new AtomicReference(EMPTY); + this.producers = new OpenHashSet>(); + this.producersCache = EMPTY; this.shouldConnect = new AtomicBoolean(); // make sure the source doesn't produce values until the child subscribers // expressed their request amounts @@ -340,7 +355,15 @@ void init() { add(Subscriptions.create(new Action0() { @Override public void call() { - ReplaySubscriber.this.producers.getAndSet(TERMINATED); + if (!terminated) { + synchronized (producers) { + if (!terminated) { + producers.terminate(); + producersVersion++; + terminated = true; + } + } + } // unlike OperatorPublish, we can't null out the terminated so // late subscribers can still get replay // current.compareAndSet(ReplaySubscriber.this, null); @@ -359,27 +382,18 @@ boolean add(InnerProducer producer) { if (producer == null) { throw new NullPointerException(); } - // the state can change so we do a CAS loop to achieve atomicity - for (;;) { - // get the current producer array - InnerProducer[] c = producers.get(); - // if this subscriber-to-source reached a terminal state by receiving - // an onError or onCompleted, just refuse to add the new producer - if (c == TERMINATED) { + if (terminated) { + return false; + } + synchronized (producers) { + if (terminated) { return false; } - // we perform a copy-on-write logic - int len = c.length; - InnerProducer[] u = new InnerProducer[len + 1]; - System.arraycopy(c, 0, u, 0, len); - u[len] = producer; - // try setting the producers array - if (producers.compareAndSet(c, u)) { - return true; - } - // if failed, some other operation succeeded (another add, remove or termination) - // so retry + + producers.add(producer); + producersVersion++; } + return true; } /** @@ -387,48 +401,15 @@ boolean add(InnerProducer producer) { * @param producer the producer to remove */ void remove(InnerProducer producer) { - // the state can change so we do a CAS loop to achieve atomicity - for (;;) { - // let's read the current producers array - InnerProducer[] c = producers.get(); - // if it is either empty or terminated, there is nothing to remove so we quit - if (c == EMPTY || c == TERMINATED) { - return; - } - // let's find the supplied producer in the array - // although this is O(n), we don't expect too many child subscribers in general - int j = -1; - int len = c.length; - for (int i = 0; i < len; i++) { - if (c[i].equals(producer)) { - j = i; - break; - } - } - // we didn't find it so just quit - if (j < 0) { - return; - } - // we do copy-on-write logic here - InnerProducer[] u; - // we don't create a new empty array if producer was the single inhabitant - // but rather reuse an empty array - if (len == 1) { - u = EMPTY; - } else { - // otherwise, create a new array one less in size - u = new InnerProducer[len - 1]; - // copy elements being before the given producer - System.arraycopy(c, 0, u, 0, j); - // copy elements being after the given producer - System.arraycopy(c, j + 1, u, j, len - j - 1); - } - // try setting this new array as - if (producers.compareAndSet(c, u)) { + if (terminated) { + return; + } + synchronized (producers) { + if (terminated) { return; } - // if we failed, it means something else happened - // (a concurrent add/remove or termination), we need to retry + producers.remove(producer); + producersVersion++; } } @@ -439,7 +420,7 @@ public void setProducer(Producer p) { throw new IllegalStateException("Only a single producer can be set on a Subscriber."); } producer = p; - manageRequests(); + manageRequests(null); replay(); } @@ -482,81 +463,157 @@ public void onCompleted() { /** * Coordinates the request amounts of various child Subscribers. */ - void manageRequests() { + void manageRequests(InnerProducer inner) { // if the upstream has completed, no more requesting is possible if (isUnsubscribed()) { return; } synchronized (this) { if (emitting) { + if (inner != null) { + List> q = coordinationQueue; + if (q == null) { + q = new ArrayList>(); + coordinationQueue = q; + } + q.add(inner); + } else { + coordinateAll = true; + } missed = true; return; } emitting = true; } + + long ri = maxChildRequested; + long maxTotalRequested; + + if (inner != null) { + maxTotalRequested = Math.max(ri, inner.totalRequested.get()); + } else { + maxTotalRequested = ri; + + InnerProducer[] a = copyProducers(); + for (InnerProducer rp : a) { + if (rp != null) { + maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get()); + } + } + + } + makeRequest(maxTotalRequested, ri); + for (;;) { // if the upstream has completed, no more requesting is possible if (isUnsubscribed()) { return; } - @SuppressWarnings("unchecked") - InnerProducer[] a = producers.get(); - - long ri = maxChildRequested; - long maxTotalRequests = ri; - - for (InnerProducer rp : a) { - maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get()); + List> q; + boolean all; + synchronized (this) { + if (!missed) { + emitting = false; + return; + } + missed = false; + q = coordinationQueue; + coordinationQueue = null; + all = coordinateAll; + coordinateAll = false; } - long ur = maxUpstreamRequested; - Producer p = producer; + ri = maxChildRequested; + maxTotalRequested = ri; - long diff = maxTotalRequests - ri; - if (diff != 0) { - maxChildRequested = maxTotalRequests; - if (p != null) { - if (ur != 0L) { - maxUpstreamRequested = 0L; - p.request(ur + diff); - } else { - p.request(diff); - } - } else { - // collect upstream request amounts until there is a producer for them - long u = ur + diff; - if (u < 0) { - u = Long.MAX_VALUE; + if (q != null) { + for (InnerProducer rp : q) { + maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get()); + } + } + + if (all) { + InnerProducer[] a = copyProducers(); + for (InnerProducer rp : a) { + if (rp != null) { + maxTotalRequested = Math.max(maxTotalRequested, rp.totalRequested.get()); } - maxUpstreamRequested = u; } - } else - // if there were outstanding upstream requests and we have a producer - if (ur != 0L && p != null) { - maxUpstreamRequested = 0L; - // fire the accumulated requests - p.request(ur); } - synchronized (this) { - if (!missed) { - emitting = false; - return; + makeRequest(maxTotalRequested, ri); + } + } + + InnerProducer[] copyProducers() { + synchronized (producers) { + Object[] a = producers.values(); + int n = a.length; + @SuppressWarnings("unchecked") + InnerProducer[] result = new InnerProducer[n]; + System.arraycopy(a, 0, result, 0, n); + return result; + } + } + + void makeRequest(long maxTotalRequests, long previousTotalRequests) { + long ur = maxUpstreamRequested; + Producer p = producer; + + long diff = maxTotalRequests - previousTotalRequests; + if (diff != 0) { + maxChildRequested = maxTotalRequests; + if (p != null) { + if (ur != 0L) { + maxUpstreamRequested = 0L; + p.request(ur + diff); + } else { + p.request(diff); } - missed = false; + } else { + // collect upstream request amounts until there is a producer for them + long u = ur + diff; + if (u < 0) { + u = Long.MAX_VALUE; + } + maxUpstreamRequested = u; } + } else + // if there were outstanding upstream requests and we have a producer + if (ur != 0L && p != null) { + maxUpstreamRequested = 0L; + // fire the accumulated requests + p.request(ur); } } /** * Tries to replay the buffer contents to all known subscribers. */ + @SuppressWarnings("unchecked") void replay() { - @SuppressWarnings("unchecked") - InnerProducer[] a = producers.get(); - for (InnerProducer rp : a) { - buffer.replay(rp); + InnerProducer[] pc = producersCache; + if (producersCacheVersion != producersVersion) { + synchronized (producers) { + pc = producersCache; + // if the producers hasn't changed do nothing + // otherwise make a copy of the current set of producers + Object[] a = producers.values(); + int n = a.length; + if (pc.length != n) { + pc = new InnerProducer[n]; + producersCache = pc; + } + System.arraycopy(a, 0, pc, 0, n); + producersCacheVersion = producersVersion; + } + } + ReplayBuffer b = buffer; + for (InnerProducer rp : pc) { + if (rp != null) { + b.replay(rp); + } } } } @@ -635,7 +692,7 @@ public void request(long n) { addTotalRequested(n); // if successful, notify the parent dispatcher this child can receive more // elements - parent.manageRequests(); + parent.manageRequests(this); parent.buffer.replay(this); return; @@ -716,7 +773,7 @@ public void unsubscribe() { // let's assume this child had 0 requested before the unsubscription while // the others had non-zero. By removing this 'blocking' child, the others // are now free to receive events - parent.manageRequests(); + parent.manageRequests(this); } } } @@ -856,8 +913,6 @@ public void replay(InnerProducer output) { /** * Represents a node in a bounded replay buffer's linked list. - * - * @param the contained value type */ static final class Node extends AtomicReference { /** */ diff --git a/src/main/java/rx/internal/util/OpenHashSet.java b/src/main/java/rx/internal/util/OpenHashSet.java new file mode 100644 index 0000000000..1e79fdd4c1 --- /dev/null +++ b/src/main/java/rx/internal/util/OpenHashSet.java @@ -0,0 +1,211 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * Modified from http://www.javacodegeeks.com/2013/08/simple-and-lightweight-pool-implementation.html + */ + +package rx.internal.util; + +import java.util.Arrays; + +import rx.functions.Action1; +import rx.internal.util.unsafe.Pow2; + +/* + * Inspired by fastutils' OpenHashSet implementation at + * https://github.com/vigna/fastutil/blob/master/drv/OpenHashSet.drv + */ + +/** + * A simple open hash set with add, remove and clear capabilities only. + *

Doesn't support nor checks for {@code null}s. + * + * @param the element type + */ +public final class OpenHashSet { + final float loadFactor; + int mask; + int size; + int maxSize; + T[] keys; + + public OpenHashSet() { + this(16, 0.75f); + } + + /** + * Creates an OpenHashSet with the initial capacity and load factor of 0.75f. + * @param capacity the initial capacity + */ + public OpenHashSet(int capacity) { + this(capacity, 0.75f); + } + + @SuppressWarnings("unchecked") + public OpenHashSet(int capacity, float loadFactor) { + this.loadFactor = loadFactor; + int c = Pow2.roundToPowerOfTwo(capacity); + this.mask = c - 1; + this.maxSize = (int)(loadFactor * c); + this.keys = (T[])new Object[c]; + } + + public boolean add(T value) { + final T[] a = keys; + final int m = mask; + + int pos = mix(value.hashCode()) & m; + T curr = a[pos]; + if (curr != null) { + if (curr.equals(value)) { + return false; + } + for (;;) { + pos = (pos + 1) & m; + curr = a[pos]; + if (curr == null) { + break; + } + if (curr.equals(value)) { + return false; + } + } + } + a[pos] = value; + if (++size >= maxSize) { + rehash(); + } + return true; + } + public boolean remove(T value) { + T[] a = keys; + int m = mask; + int pos = mix(value.hashCode()) & m; + T curr = a[pos]; + if (curr == null) { + return false; + } + if (curr.equals(value)) { + return removeEntry(pos, a, m); + } + for (;;) { + pos = (pos + 1) & m; + curr = a[pos]; + if (curr == null) { + return false; + } + if (curr.equals(value)) { + return removeEntry(pos, a, m); + } + } + } + + boolean removeEntry(int pos, T[] a, int m) { + size--; + + int last; + int slot; + T curr; + for (;;) { + last = pos; + pos = (pos + 1) & m; + for (;;) { + curr = a[pos]; + if (curr == null) { + a[last] = null; + return true; + } + slot = mix(curr.hashCode()) & m; + + if (last <= pos ? last >= slot || slot > pos : last >= slot && slot > pos) { + break; + } + + pos = (pos + 1) & m; + } + a[last] = curr; + } + } + + public void clear(Action1 clearAction) { + if (size == 0) { + return; + } + T[] a = keys; + int len = a.length; + for (int i = 0; i < len; i++) { + T e = a[i]; + if (e != null) { + clearAction.call(e); + } + } + Arrays.fill(a, null); + size = 0; + } + + @SuppressWarnings("unchecked") + public void terminate() { + size = 0; + keys = (T[])new Object[0]; + } + + @SuppressWarnings("unchecked") + void rehash() { + T[] a = keys; + int i = a.length; + int newCap = i << 1; + int m = newCap - 1; + + T[] b = (T[])new Object[newCap]; + + + for (int j = size; j-- != 0; ) { + while (a[--i] == null); + int pos = mix(a[i].hashCode()) & m; + if (b[pos] != null) { + for (;;) { + pos = (pos + 1) & m; + if (b[pos] == null) { + break; + } + } + } + b[pos] = a[i]; + } + + this.mask = m; + this.maxSize = (int)(newCap * loadFactor); + this.keys = b; + } + + private static final int INT_PHI = 0x9E3779B9; + + static int mix(int x) { + final int h = x * INT_PHI; + return h ^ (h >>> 16); + } + + public boolean isEmpty() { + return size == 0; + } + + /** + * Returns the raw array of values of this set, watch out for null entires. + * @return the raw array of values of this set + */ + public T[] values() { + return keys; + } +} diff --git a/src/test/java/rx/internal/util/OpenHashSetTest.java b/src/test/java/rx/internal/util/OpenHashSetTest.java new file mode 100644 index 0000000000..b777bdbb50 --- /dev/null +++ b/src/test/java/rx/internal/util/OpenHashSetTest.java @@ -0,0 +1,58 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package rx.internal.util; + +import org.junit.Test; +import static org.junit.Assert.*; + +public class OpenHashSetTest { + @Test + public void addRemove() { + OpenHashSet set = new OpenHashSet(); + + for (int i = 0; i < 1000; i++) { + assertTrue(set.add(i)); + assertFalse(set.add(i)); + assertTrue(set.remove(i)); + assertFalse(set.remove(i)); + } + + Object[] values = set.values(); + for (Object i : values) { + assertNull(i); + } + } + + @Test + public void addAllRemoveAll() { + for (int i = 16; i < 128 * 1024; i *= 2) { + OpenHashSet set = new OpenHashSet(i); + for (int j = 0; j < i * 2; j++) { + assertTrue(set.add(j)); + assertFalse(set.add(j)); + } + for (int j = i * 2 - 1; j >= 0; j--) { + assertTrue(set.remove(j)); + assertFalse(set.remove(j)); + } + Object[] values = set.values(); + for (Object j : values) { + assertNull(j); + } + } + } +}