From 18ff5afd380625f9157d9e9a3144baf845c09086 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 10 Jun 2015 19:22:58 +0200 Subject: [PATCH] cache now supports backpressure --- src/main/java/rx/Observable.java | 4 +- .../internal/operators/OnSubscribeCache.java | 76 --- .../rx/internal/util/CachedObservable.java | 432 ++++++++++++++++++ .../rx/internal/util/LinkedArrayList.java | 136 ++++++ .../operators/OnSubscribeCacheTest.java | 164 ------- .../internal/util/CachedObservableTest.java | 264 +++++++++++ .../rx/internal/util/LinkedArrayListTest.java | 37 ++ 7 files changed, 871 insertions(+), 242 deletions(-) delete mode 100644 src/main/java/rx/internal/operators/OnSubscribeCache.java create mode 100644 src/main/java/rx/internal/util/CachedObservable.java create mode 100644 src/main/java/rx/internal/util/LinkedArrayList.java delete mode 100644 src/test/java/rx/internal/operators/OnSubscribeCacheTest.java create mode 100644 src/test/java/rx/internal/util/CachedObservableTest.java create mode 100644 src/test/java/rx/internal/util/LinkedArrayListTest.java diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 5537bca126..2ca7ffddf0 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -3504,7 +3504,7 @@ public final Observable> buffer(Observable boundary, int initialC * @see ReactiveX operators documentation: Replay */ public final Observable cache() { - return create(new OnSubscribeCache(this)); + return CachedObservable.from(this); } /** @@ -3539,7 +3539,7 @@ public final Observable cache() { * @see ReactiveX operators documentation: Replay */ public final Observable cache(int capacity) { - return create(new OnSubscribeCache(this, capacity)); + return CachedObservable.from(this, capacity); } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeCache.java b/src/main/java/rx/internal/operators/OnSubscribeCache.java deleted file mode 100644 index a568fd0e0b..0000000000 --- a/src/main/java/rx/internal/operators/OnSubscribeCache.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.operators; - -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - -import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Subscriber; -import rx.subjects.ReplaySubject; -import rx.subjects.Subject; - -/** - * This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes - * to the source Observable rather than returning a connectable Observable. - *

- * - *

- * This is useful with an Observable that you want to cache responses when you can't control the - * subscribe/unsubscribe behavior of all the Observers. - *

- * Note: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be - * careful not to use this operator on Observables that emit infinite or very large numbers of - * items, as this will use up memory. - * - * @param - * the cached value type - */ -public final class OnSubscribeCache implements OnSubscribe { - protected final Observable source; - protected final Subject cache; - volatile int sourceSubscribed; - @SuppressWarnings("rawtypes") - static final AtomicIntegerFieldUpdater SRC_SUBSCRIBED_UPDATER - = AtomicIntegerFieldUpdater.newUpdater(OnSubscribeCache.class, "sourceSubscribed"); - - public OnSubscribeCache(Observable source) { - this(source, ReplaySubject. create()); - } - - public OnSubscribeCache(Observable source, int capacity) { - this(source, ReplaySubject. create(capacity)); - } - - /* accessible to tests */OnSubscribeCache(Observable source, Subject cache) { - this.source = source; - this.cache = cache; - } - - @Override - public void call(Subscriber s) { - if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) { - source.subscribe(cache); - /* - * Note that we will never unsubscribe from 'source' unless we receive `onCompleted` or `onError`, - * as we want to receive and cache all of its values. - * - * This means this should never be used on an infinite or very large sequence, similar to toList(). - */ - } - cache.unsafeSubscribe(s); - } -} diff --git a/src/main/java/rx/internal/util/CachedObservable.java b/src/main/java/rx/internal/util/CachedObservable.java new file mode 100644 index 0000000000..cda4b9d277 --- /dev/null +++ b/src/main/java/rx/internal/util/CachedObservable.java @@ -0,0 +1,432 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */package rx.internal.util; + +import java.util.concurrent.atomic.*; + +import rx.*; +import rx.internal.operators.NotificationLite; +import rx.subscriptions.SerialSubscription; + +/** + * An observable which auto-connects to another observable, caches the elements + * from that observable but allows terminating the connection and completing the cache. + * + * @param the source element type + */ +public final class CachedObservable extends Observable { + /** The cache and replay state. */ + private CacheState state; + + /** + * Creates a cached Observable with a default capacity hint of 16. + * @param source the source Observable to cache + * @return the CachedObservable instance + */ + public static CachedObservable from(Observable source) { + return from(source, 16); + } + + /** + * Creates a cached Observable with the given capacity hint. + * @param source the source Observable to cache + * @param capacityHint the hint for the internal buffer size + * @return the CachedObservable instance + */ + public static CachedObservable from(Observable source, int capacityHint) { + if (capacityHint < 1) { + throw new IllegalArgumentException("capacityHint > 0 required"); + } + CacheState state = new CacheState(source, capacityHint); + CachedSubscribe onSubscribe = new CachedSubscribe(state); + return new CachedObservable(onSubscribe, state); + } + + /** + * Private constructor because state needs to be shared between the Observable body and + * the onSubscribe function. + * @param onSubscribe + * @param state + */ + private CachedObservable(OnSubscribe onSubscribe, CacheState state) { + super(onSubscribe); + this.state = state; + } + + /** + * Check if this cached observable is connected to its source. + * @return true if already connected + */ + /* public */boolean isConnected() { + return state.isConnected; + } + + /** + * Returns true if there are observers subscribed to this observable. + * @return + */ + /* public */ boolean hasObservers() { + return state.producers.length != 0; + } + + /** + * Returns the number of events currently cached. + * @return + */ + /* public */ int cachedEventCount() { + return state.size(); + } + + /** + * Contains the active child producers and the values to replay. + * + * @param + */ + static final class CacheState extends LinkedArrayList implements Observer { + /** The source observable to connect to. */ + final Observable source; + /** Holds onto the subscriber connected to source. */ + final SerialSubscription connection; + /** Guarded by connection (not this). */ + volatile ReplayProducer[] producers; + /** The default empty array of producers. */ + static final ReplayProducer[] EMPTY = new ReplayProducer[0]; + + final NotificationLite nl; + + /** Set to true after connection. */ + volatile boolean isConnected; + /** + * Indicates that the source has completed emitting values or the + * Observable was forcefully terminated. + */ + boolean sourceDone; + + public CacheState(Observable source, int capacityHint) { + super(capacityHint); + this.source = source; + this.producers = EMPTY; + this.nl = NotificationLite.instance(); + this.connection = new SerialSubscription(); + } + /** + * Adds a ReplayProducer to the producers array atomically. + * @param p + */ + public void addProducer(ReplayProducer p) { + // guarding by connection to save on allocating another object + // thus there are two distinct locks guarding the value-addition and child come-and-go + synchronized (connection) { + ReplayProducer[] a = producers; + int n = a.length; + ReplayProducer[] b = new ReplayProducer[n + 1]; + System.arraycopy(a, 0, b, 0, n); + b[n] = p; + producers = b; + } + } + /** + * Removes the ReplayProducer (if present) from the producers array atomically. + * @param p + */ + public void removeProducer(ReplayProducer p) { + synchronized (connection) { + ReplayProducer[] a = producers; + int n = a.length; + int j = -1; + for (int i = 0; i < n; i++) { + if (a[i].equals(p)) { + j = i; + break; + } + } + if (j < 0) { + return; + } + if (n == 1) { + producers = EMPTY; + return; + } + ReplayProducer[] b = new ReplayProducer[n - 1]; + System.arraycopy(a, 0, b, 0, j); + System.arraycopy(a, j + 1, b, j, n - j - 1); + producers = b; + } + } + /** + * Connects the cache to the source. + * Make sure this is called only once. + */ + public void connect() { + connection.set(source.subscribe(this)); + isConnected = true; + } + @Override + public void onNext(T t) { + Object o = nl.next(t); + synchronized (this) { + if (!sourceDone) { + add(o); + } else { + return; + } + } + dispatch(); + } + @Override + public void onError(Throwable e) { + Object o = nl.error(e); + synchronized (this) { + if (!sourceDone) { + sourceDone = true; + add(o); + } else { + return; + } + } + connection.unsubscribe(); + dispatch(); + } + @Override + public void onCompleted() { + Object o = nl.completed(); + synchronized (this) { + if (!sourceDone) { + sourceDone = true; + add(o); + } else { + return; + } + } + connection.unsubscribe(); + dispatch(); + } + /** + * Signals all known children there is work to do. + */ + void dispatch() { + ReplayProducer[] a = producers; + for (ReplayProducer rp : a) { + rp.replay(); + } + } + } + + /** + * Manages the subscription of child subscribers by setting up a replay producer and + * performs auto-connection of the very first subscription. + * @param the value type emitted + */ + static final class CachedSubscribe extends AtomicBoolean implements OnSubscribe { + /** */ + private static final long serialVersionUID = -2817751667698696782L; + final CacheState state; + public CachedSubscribe(CacheState state) { + this.state = state; + } + @Override + public void call(Subscriber t) { + // we can connect first because we replay everything anyway + ReplayProducer rp = new ReplayProducer(t, state); + state.addProducer(rp); + + t.add(rp); + t.setProducer(rp); + + // we ensure a single connection here to save an instance field of AtomicBoolean in state. + if (!get() && compareAndSet(false, true)) { + state.connect(); + } + + // no need to call rp.replay() here because the very first request will trigger it anyway + } + } + + /** + * Keeps track of the current request amount and the replay position for a child Subscriber. + * + * @param + */ + static final class ReplayProducer extends AtomicLong implements Producer, Subscription { + /** */ + private static final long serialVersionUID = -2557562030197141021L; + /** The actual child subscriber. */ + final Subscriber child; + /** The cache state object. */ + final CacheState state; + + /** + * Contains the reference to the buffer segment in replay. + * Accessed after reading state.size() and when emitting == true. + */ + Object[] currentBuffer; + /** + * Contains the index into the currentBuffer where the next value is expected. + * Accessed after reading state.size() and when emitting == true. + */ + int currentIndexInBuffer; + /** + * Contains the absolute index up until the values have been replayed so far. + */ + int index; + + /** Indicates there is a replay going on; guarded by this. */ + boolean emitting; + /** Indicates there were some state changes/replay attempts; guarded by this. */ + boolean missed; + + public ReplayProducer(Subscriber child, CacheState state) { + this.child = child; + this.state = state; + } + @Override + public void request(long n) { + for (;;) { + long r = get(); + if (r < 0) { + return; + } + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + if (compareAndSet(r, u)) { + replay(); + return; + } + } + } + /** + * Updates the request count to reflect values have been produced. + * @param n + * @return + */ + public long produced(long n) { + return addAndGet(-n); + } + + @Override + public boolean isUnsubscribed() { + return get() < 0; + } + @Override + public void unsubscribe() { + long r = get(); + if (r >= 0) { + r = getAndSet(-1L); // unsubscribed state is negative + if (r >= 0) { + state.removeProducer(this); + } + } + } + + /** + * Continue replaying available values if there are requests for them. + */ + public void replay() { + // make sure there is only a single thread emitting + synchronized (this) { + if (emitting) { + missed = true; + return; + } + emitting = true; + } + boolean skipFinal = false; + try { + final NotificationLite nl = state.nl; + final Subscriber child = this.child; + + for (;;) { + + long r = get(); + // read the size, if it is non-zero, we can safely read the head and + // read values up to the given absolute index + int s = state.size(); + if (s != 0) { + Object[] b = currentBuffer; + + // latch onto the very first buffer now that it is available. + if (b == null) { + b = state.head(); + currentBuffer = b; + } + final int n = b.length - 1; + int j = index; + int k = currentIndexInBuffer; + // eagerly emit any terminal event + if (r == 0) { + Object o = b[k]; + if (nl.isCompleted(o)) { + child.onCompleted(); + skipFinal = true; + unsubscribe(); + return; + } else + if (nl.isError(o)) { + child.onError(nl.getError(o)); + skipFinal = true; + unsubscribe(); + return; + } + } else + if (r > 0) { + int valuesProduced = 0; + + while (j < s && r > 0 && !child.isUnsubscribed()) { + if (k == n) { + b = (Object[])b[n]; + k = 0; + } + Object o = b[k]; + + if (nl.accept(child, o)) { + skipFinal = true; + unsubscribe(); + return; + } + + k++; + j++; + r--; + valuesProduced++; + } + + index = j; + currentIndexInBuffer = k; + currentBuffer = b; + produced(valuesProduced); + } + } + + synchronized (this) { + if (!missed) { + emitting = false; + skipFinal = true; + return; + } + missed = false; + } + } + } finally { + if (!skipFinal) { + synchronized (this) { + emitting = false; + } + } + } + } + } +} diff --git a/src/main/java/rx/internal/util/LinkedArrayList.java b/src/main/java/rx/internal/util/LinkedArrayList.java new file mode 100644 index 0000000000..57a1289640 --- /dev/null +++ b/src/main/java/rx/internal/util/LinkedArrayList.java @@ -0,0 +1,136 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import java.util.*; + +/** + * A list implementation which combines an ArrayList with a LinkedList to + * avoid copying values when the capacity needs to be increased. + *

+ * The class is non final to allow embedding it directly and thus saving on object allocation. + */ +public class LinkedArrayList { + /** The capacity of each array segment. */ + final int capacityHint; + /** + * Contains the head of the linked array list if not null. The + * length is always capacityHint + 1 and the last element is an Object[] pointing + * to the next element of the linked array list. + */ + Object[] head; + /** The tail array where new elements will be added. */ + Object[] tail; + /** + * The total size of the list; written after elements have been added (release) and + * and when read, the value indicates how many elements can be safely read (acquire). + */ + volatile int size; + /** The next available slot in the current tail. */ + int indexInTail; + /** + * Constructor with the capacity hint of each array segment. + * @param capacityHint + */ + public LinkedArrayList(int capacityHint) { + this.capacityHint = capacityHint; + } + /** + * Adds a new element to this list. + * @param o the object to add, nulls are accepted + */ + public void add(Object o) { + // if no value yet, create the first array + if (size == 0) { + head = new Object[capacityHint + 1]; + tail = head; + head[0] = o; + indexInTail = 1; + size = 1; + } else + // if the tail is full, create a new tail and link + if (indexInTail == capacityHint) { + Object[] t = new Object[capacityHint + 1]; + t[0] = o; + tail[capacityHint] = t; + tail = t; + indexInTail = 1; + size++; + } else { + tail[indexInTail] = o; + indexInTail++; + size++; + } + } + /** + * Returns the head buffer segment or null if the list is empty. + * @return + */ + public Object[] head() { + return head; + } + /** + * Returns the tail buffer segment or null if the list is empty. + * @return + */ + public Object[] tail() { + return tail; + } + /** + * Returns the total size of the list. + * @return + */ + public int size() { + return size; + } + /** + * Returns the index of the next slot in the tail buffer segment. + * @return + */ + public int indexInTail() { + return indexInTail; + } + /** + * Returns the capacity hint that indicates the capacity of each buffer segment. + * @return + */ + public int capacityHint() { + return capacityHint; + } + /* Test support */List toList() { + final int cap = capacityHint; + final int s = size; + final List list = new ArrayList(s + 1); + + Object[] h = head(); + int j = 0; + int k = 0; + while (j < s) { + list.add(h[k]); + j++; + if (++k == cap) { + k = 0; + h = (Object[])h[cap]; + } + } + + return list; + } + @Override + public String toString() { + return toList().toString(); + } +} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java b/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java deleted file mode 100644 index 0d74cd878b..0000000000 --- a/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java +++ /dev/null @@ -1,164 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.operators; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.fail; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - -import java.util.Arrays; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.Test; - -import rx.Observable; -import rx.Subscriber; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; -import rx.functions.Func2; -import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; -import rx.subjects.AsyncSubject; -import rx.subjects.BehaviorSubject; -import rx.subjects.PublishSubject; -import rx.subjects.ReplaySubject; -import rx.subjects.Subject; - -public class OnSubscribeCacheTest { - - @Test - public void testCache() throws InterruptedException { - final AtomicInteger counter = new AtomicInteger(); - Observable o = Observable.create(new Observable.OnSubscribe() { - - @Override - public void call(final Subscriber observer) { - new Thread(new Runnable() { - - @Override - public void run() { - counter.incrementAndGet(); - System.out.println("published observable being executed"); - observer.onNext("one"); - observer.onCompleted(); - } - }).start(); - } - }).cache(); - - // we then expect the following 2 subscriptions to get that same value - final CountDownLatch latch = new CountDownLatch(2); - - // subscribe once - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - System.out.println("v: " + v); - latch.countDown(); - } - }); - - // subscribe again - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - System.out.println("v: " + v); - latch.countDown(); - } - }); - - if (!latch.await(1000, TimeUnit.MILLISECONDS)) { - fail("subscriptions did not receive values"); - } - assertEquals(1, counter.get()); - } - - private void testWithCustomSubjectAndRepeat(Subject subject, Integer... expected) { - Observable source0 = Observable.just(1, 2, 3) - .subscribeOn(Schedulers.io()) - .flatMap(new Func1>() { - @Override - public Observable call(final Integer i) { - return Observable.timer(i * 20, TimeUnit.MILLISECONDS).map(new Func1() { - @Override - public Integer call(Long t1) { - return i; - } - }); - } - }); - - Observable source1 = Observable.create(new OnSubscribeCache(source0, subject)); - - Observable source2 = source1 - .repeat(4) - .zipWith(Observable.interval(0, 10, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func2() { - @Override - public Integer call(Integer t1, Long t2) { - return t1; - } - - }); - TestSubscriber ts = new TestSubscriber(); - source2.subscribe(ts); - - ts.awaitTerminalEvent(); - ts.assertNoErrors(); - System.out.println(ts.getOnNextEvents()); - ts.assertReceivedOnNext(Arrays.asList(expected)); - } - - @Test(timeout = 10000) - public void testWithAsyncSubjectAndRepeat() { - testWithCustomSubjectAndRepeat(AsyncSubject. create(), 3, 3, 3, 3); - } - - @Test(timeout = 10000) - public void testWithBehaviorSubjectAndRepeat() { - // BehaviorSubject just completes when repeated - testWithCustomSubjectAndRepeat(BehaviorSubject.create(0), 0, 1, 2, 3); - } - - @Test(timeout = 10000) - public void testWithPublishSubjectAndRepeat() { - // PublishSubject just completes when repeated - testWithCustomSubjectAndRepeat(PublishSubject. create(), 1, 2, 3); - } - - @Test - public void testWithReplaySubjectAndRepeat() { - testWithCustomSubjectAndRepeat(ReplaySubject. create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); - } - - @Test - public void testUnsubscribeSource() { - Action0 unsubscribe = mock(Action0.class); - Observable o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache(); - o.subscribe(); - o.subscribe(); - o.subscribe(); - verify(unsubscribe, times(1)).call(); - } -} diff --git a/src/test/java/rx/internal/util/CachedObservableTest.java b/src/test/java/rx/internal/util/CachedObservableTest.java new file mode 100644 index 0000000000..c14018390f --- /dev/null +++ b/src/test/java/rx/internal/util/CachedObservableTest.java @@ -0,0 +1,264 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import static org.junit.Assert.*; +import static org.mockito.Mockito.*; + +import java.util.*; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.*; + +import rx.*; +import rx.Observable.OnSubscribe; +import rx.Observable; +import rx.exceptions.TestException; +import rx.functions.*; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; + +public class CachedObservableTest { + @Test + public void testColdReplayNoBackpressure() { + CachedObservable source = CachedObservable.from(Observable.range(0, 1000)); + + assertFalse("Source is connected!", source.isConnected()); + + TestSubscriber ts = new TestSubscriber(); + + source.subscribe(ts); + + assertTrue("Source is not connected!", source.isConnected()); + assertFalse("Subscribers retained!", source.hasObservers()); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + List onNextEvents = ts.getOnNextEvents(); + assertEquals(1000, onNextEvents.size()); + + for (int i = 0; i < 1000; i++) { + assertEquals((Integer)i, onNextEvents.get(i)); + } + } + @Test + public void testColdReplayBackpressure() { + CachedObservable source = CachedObservable.from(Observable.range(0, 1000)); + + assertFalse("Source is connected!", source.isConnected()); + + TestSubscriber ts = new TestSubscriber(); + ts.requestMore(10); + + source.subscribe(ts); + + assertTrue("Source is not connected!", source.isConnected()); + assertTrue("Subscribers not retained!", source.hasObservers()); + + ts.assertNoErrors(); + assertTrue(ts.getOnCompletedEvents().isEmpty()); + List onNextEvents = ts.getOnNextEvents(); + assertEquals(10, onNextEvents.size()); + + for (int i = 0; i < 10; i++) { + assertEquals((Integer)i, onNextEvents.get(i)); + } + + ts.unsubscribe(); + assertFalse("Subscribers retained!", source.hasObservers()); + } + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(final Subscriber observer) { + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + } + }).cache(); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + + @Test + public void testUnsubscribeSource() { + Action0 unsubscribe = mock(Action0.class); + Observable o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache(); + o.subscribe(); + o.subscribe(); + o.subscribe(); + verify(unsubscribe, times(1)).call(); + } + + @Test + public void testTake() { + TestSubscriber ts = new TestSubscriber(); + + CachedObservable cached = CachedObservable.from(Observable.range(1, 100)); + cached.take(10).subscribe(ts); + + ts.assertNoErrors(); + ts.assertTerminalEvent(); + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + ts.assertUnsubscribed(); + assertFalse(cached.hasObservers()); + } + + @Test + public void testAsync() { + Observable source = Observable.range(1, 10000); + for (int i = 0; i < 100; i++) { + TestSubscriber ts1 = new TestSubscriber(); + + CachedObservable cached = CachedObservable.from(source); + + cached.observeOn(Schedulers.computation()).subscribe(ts1); + + ts1.awaitTerminalEvent(2, TimeUnit.SECONDS); + ts1.assertNoErrors(); + ts1.assertTerminalEvent(); + assertEquals(10000, ts1.getOnNextEvents().size()); + + TestSubscriber ts2 = new TestSubscriber(); + cached.observeOn(Schedulers.computation()).subscribe(ts2); + + ts2.awaitTerminalEvent(2, TimeUnit.SECONDS); + ts2.assertNoErrors(); + ts2.assertTerminalEvent(); + assertEquals(10000, ts2.getOnNextEvents().size()); + } + } + @Test + public void testAsyncComeAndGo() { + Observable source = Observable.timer(1, 1, TimeUnit.MILLISECONDS) + .take(1000) + .subscribeOn(Schedulers.io()); + CachedObservable cached = CachedObservable.from(source); + + Observable output = cached.observeOn(Schedulers.computation()); + + List> list = new ArrayList>(100); + for (int i = 0; i < 100; i++) { + TestSubscriber ts = new TestSubscriber(); + list.add(ts); + output.skip(i * 10).take(10).subscribe(ts); + } + + List expected = new ArrayList(); + for (int i = 0; i < 10; i++) { + expected.add((long)(i - 10)); + } + int j = 0; + for (TestSubscriber ts : list) { + ts.awaitTerminalEvent(3, TimeUnit.SECONDS); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + for (int i = j * 10; i < j * 10 + 10; i++) { + expected.set(i - j * 10, (long)i); + } + + ts.assertReceivedOnNext(expected); + + j++; + } + } + + @Test + public void testNoMissingBackpressureException() { + final int m = 4 * 1000 * 1000; + Observable firehose = Observable.create(new OnSubscribe() { + @Override + public void call(Subscriber t) { + for (int i = 0; i < m; i++) { + t.onNext(i); + } + t.onCompleted(); + } + }); + + TestSubscriber ts = new TestSubscriber(); + firehose.cache().observeOn(Schedulers.computation()).takeLast(100).subscribe(ts); + + ts.awaitTerminalEvent(3, TimeUnit.SECONDS); + ts.assertNoErrors(); + ts.assertTerminalEvent(); + + assertEquals(100, ts.getOnNextEvents().size()); + } + + @Test + public void testValuesAndThenError() { + Observable source = Observable.range(1, 10) + .concatWith(Observable.error(new TestException())) + .cache(); + + + TestSubscriber ts = new TestSubscriber(); + source.subscribe(ts); + + ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + Assert.assertTrue(ts.getOnCompletedEvents().isEmpty()); + Assert.assertEquals(1, ts.getOnErrorEvents().size()); + + TestSubscriber ts2 = new TestSubscriber(); + source.subscribe(ts2); + + ts2.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); + Assert.assertTrue(ts2.getOnCompletedEvents().isEmpty()); + Assert.assertEquals(1, ts2.getOnErrorEvents().size()); + } +} diff --git a/src/test/java/rx/internal/util/LinkedArrayListTest.java b/src/test/java/rx/internal/util/LinkedArrayListTest.java new file mode 100644 index 0000000000..af7e167c19 --- /dev/null +++ b/src/test/java/rx/internal/util/LinkedArrayListTest.java @@ -0,0 +1,37 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.util; + +import java.util.*; +import static org.junit.Assert.*; + +import org.junit.Test; + +public class LinkedArrayListTest { + @Test + public void testAdd() { + LinkedArrayList list = new LinkedArrayList(16); + + List expected = new ArrayList(32); + for (int i = 0; i < 32; i++) { + list.add(i); + expected.add(i); + } + + assertEquals(expected, list.toList()); + assertEquals(32, list.size()); + } +}