diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 11b6b43f00..92b50f6081 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -25,6 +25,7 @@ import rx.observers.SafeSubscriber; import rx.plugins.*; import rx.schedulers.*; +import rx.subjects.*; import rx.subscriptions.Subscriptions; /** @@ -3582,7 +3583,8 @@ public final Observable> buffer(Observable boundary, int initialC * of items that will use up memory. *
*
Backpressure Support:
- *
This operator supports backpressure.
+ *
This operator does not support upstream backpressure as it is purposefully requesting and caching + * everything emitted.
*
Scheduler:
*
{@code cache} does not operate by default on a particular {@link Scheduler}.
*
@@ -3592,7 +3594,7 @@ public final Observable> buffer(Observable boundary, int initialC * @see ReactiveX operators documentation: Replay */ public final Observable cache() { - return CachedObservable.from(this); + return create(new OnSubscribeCache(this)); } /** @@ -3615,7 +3617,8 @@ public final Observable cache() { * of items that will use up memory. *
*
Backpressure Support:
- *
This operator supports backpressure.
+ *
This operator does not support upstream backpressure as it is purposefully requesting and caching + * everything emitted.
*
Scheduler:
*
{@code cache} does not operate by default on a particular {@link Scheduler}.
*
@@ -3626,7 +3629,7 @@ public final Observable cache() { * @see ReactiveX operators documentation: Replay */ public final Observable cache(int capacityHint) { - return CachedObservable.from(this, capacityHint); + return create(new OnSubscribeCache(this, capacityHint)); } /** @@ -5975,9 +5978,9 @@ public Void call(Notification notification) { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -5987,7 +5990,14 @@ public Void call(Notification notification) { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay() { - return OperatorReplay.create(this); + return new OperatorMulticast(this, new Func0>() { + + @Override + public Subject call() { + return ReplaySubject. create(); + } + + }); } /** @@ -5997,9 +6007,9 @@ public final ConnectableObservable replay() { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -6014,12 +6024,12 @@ public final ConnectableObservable replay() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector) { - return OperatorReplay.multicastSelector(new Func0>() { + return create(new OnSubscribeMulticastSelector(this, new Func0>() { @Override - public ConnectableObservable call() { - return Observable.this.replay(); + public final Subject call() { + return ReplaySubject.create(); } - }, selector); + }, selector)); } /** @@ -6030,9 +6040,9 @@ public ConnectableObservable call() { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -6050,12 +6060,12 @@ public ConnectableObservable call() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { - return OperatorReplay.multicastSelector(new Func0>() { + return create(new OnSubscribeMulticastSelector(this, new Func0>() { @Override - public ConnectableObservable call() { - return Observable.this.replay(bufferSize); + public final Subject call() { + return ReplaySubject.createWithSize(bufferSize); } - }, selector); + }, selector)); } /** @@ -6066,9 +6076,9 @@ public ConnectableObservable call() { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6102,9 +6112,9 @@ public final Observable replay(Func1, ? extends Obs * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6134,12 +6144,12 @@ public final Observable replay(Func1, ? extends Obs if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return OperatorReplay.multicastSelector(new Func0>() { + return create(new OnSubscribeMulticastSelector(this, new Func0>() { @Override - public ConnectableObservable call() { - return Observable.this.replay(bufferSize, time, unit, scheduler); + public final Subject call() { + return ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler); } - }, selector); + }, selector)); } /** @@ -6150,9 +6160,9 @@ public ConnectableObservable call() { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6171,18 +6181,13 @@ public ConnectableObservable call() { * replaying no more than {@code bufferSize} notifications * @see ReactiveX operators documentation: Replay */ - public final Observable replay(final Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { - return OperatorReplay.multicastSelector(new Func0>() { + public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { + return create(new OnSubscribeMulticastSelector(this, new Func0>() { @Override - public ConnectableObservable call() { - return Observable.this.replay(bufferSize); + public final Subject call() { + return OperatorReplay. createScheduledSubject(ReplaySubject.createWithSize(bufferSize), scheduler); } - }, new Func1, Observable>() { - @Override - public Observable call(Observable t) { - return selector.call(t).observeOn(scheduler); - } - }); + }, selector)); } /** @@ -6193,9 +6198,9 @@ public Observable call(Observable t) { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6226,9 +6231,9 @@ public final Observable replay(Func1, ? extends Obs * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6250,12 +6255,12 @@ public final Observable replay(Func1, ? extends Obs * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { - return OperatorReplay.multicastSelector(new Func0>() { + return create(new OnSubscribeMulticastSelector(this, new Func0>() { @Override - public ConnectableObservable call() { - return Observable.this.replay(time, unit, scheduler); + public final Subject call() { + return ReplaySubject.createWithTime(time, unit, scheduler); } - }, selector); + }, selector)); } /** @@ -6265,9 +6270,9 @@ public ConnectableObservable call() { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6284,18 +6289,13 @@ public ConnectableObservable call() { * replaying all items * @see ReactiveX operators documentation: Replay */ - public final Observable replay(final Func1, ? extends Observable> selector, final Scheduler scheduler) { - return OperatorReplay.multicastSelector(new Func0>() { + public final Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { + return create(new OnSubscribeMulticastSelector(this, new Func0>() { @Override - public ConnectableObservable call() { - return Observable.this.replay(); + public final Subject call() { + return OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler); } - }, new Func1, Observable>() { - @Override - public Observable call(Observable t) { - return selector.call(t).observeOn(scheduler); - } - }); + }, selector)); } /** @@ -6307,9 +6307,9 @@ public Observable call(Observable t) { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -6321,7 +6321,14 @@ public Observable call(Observable t) { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final int bufferSize) { - return OperatorReplay.create(this, bufferSize); + return new OperatorMulticast(this, new Func0>() { + + @Override + public Subject call() { + return ReplaySubject.createWithSize(bufferSize); + } + + }); } /** @@ -6333,9 +6340,9 @@ public final ConnectableObservable replay(final int bufferSize) { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6364,9 +6371,9 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6390,7 +6397,14 @@ public final ConnectableObservable replay(final int bufferSize, final long ti if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return OperatorReplay.create(this, time, unit, scheduler, bufferSize); + return new OperatorMulticast(this, new Func0>() { + + @Override + public Subject call() { + return ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler); + } + + }); } /** @@ -6402,9 +6416,9 @@ public final ConnectableObservable replay(final int bufferSize, final long ti * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6418,7 +6432,14 @@ public final ConnectableObservable replay(final int bufferSize, final long ti * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final int bufferSize, final Scheduler scheduler) { - return OperatorReplay.observeOn(replay(bufferSize), scheduler); + return new OperatorMulticast(this, new Func0>() { + + @Override + public Subject call() { + return OperatorReplay.createScheduledSubject(ReplaySubject.createWithSize(bufferSize), scheduler); + } + + }); } /** @@ -6430,9 +6451,9 @@ public final ConnectableObservable replay(final int bufferSize, final Schedul * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6458,9 +6479,9 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6476,7 +6497,14 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final long time, final TimeUnit unit, final Scheduler scheduler) { - return OperatorReplay.create(this, time, unit, scheduler); + return new OperatorMulticast(this, new Func0>() { + + @Override + public Subject call() { + return ReplaySubject.createWithTime(time, unit, scheduler); + } + + }); } /** @@ -6488,9 +6516,9 @@ public final ConnectableObservable replay(final long time, final TimeUnit uni * *
*
Backpressure Support:
- *
This operator supports backpressure. Note that the upstream requests are determined by the child - * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will - * request 100 elements from the underlying Observable sequence.
+ *
This operator does not support backpressure because multicasting means the stream is "hot" with + * multiple subscribers. Each child will need to manage backpressure independently using operators such + * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6503,7 +6531,14 @@ public final ConnectableObservable replay(final long time, final TimeUnit uni * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final Scheduler scheduler) { - return OperatorReplay.observeOn(replay(), scheduler); + return new OperatorMulticast(this, new Func0>() { + + @Override + public Subject call() { + return OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler); + } + + }); } /** diff --git a/src/main/java/rx/internal/operators/OnSubscribeCache.java b/src/main/java/rx/internal/operators/OnSubscribeCache.java new file mode 100644 index 0000000000..a568fd0e0b --- /dev/null +++ b/src/main/java/rx/internal/operators/OnSubscribeCache.java @@ -0,0 +1,76 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + +import rx.Observable; +import rx.Observable.OnSubscribe; +import rx.Subscriber; +import rx.subjects.ReplaySubject; +import rx.subjects.Subject; + +/** + * This method has similar behavior to {@link Observable#replay()} except that this auto-subscribes + * to the source Observable rather than returning a connectable Observable. + *

+ * + *

+ * This is useful with an Observable that you want to cache responses when you can't control the + * subscribe/unsubscribe behavior of all the Observers. + *

+ * Note: You sacrifice the ability to unsubscribe from the origin when you use this operator, so be + * careful not to use this operator on Observables that emit infinite or very large numbers of + * items, as this will use up memory. + * + * @param + * the cached value type + */ +public final class OnSubscribeCache implements OnSubscribe { + protected final Observable source; + protected final Subject cache; + volatile int sourceSubscribed; + @SuppressWarnings("rawtypes") + static final AtomicIntegerFieldUpdater SRC_SUBSCRIBED_UPDATER + = AtomicIntegerFieldUpdater.newUpdater(OnSubscribeCache.class, "sourceSubscribed"); + + public OnSubscribeCache(Observable source) { + this(source, ReplaySubject. create()); + } + + public OnSubscribeCache(Observable source, int capacity) { + this(source, ReplaySubject. create(capacity)); + } + + /* accessible to tests */OnSubscribeCache(Observable source, Subject cache) { + this.source = source; + this.cache = cache; + } + + @Override + public void call(Subscriber s) { + if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) { + source.subscribe(cache); + /* + * Note that we will never unsubscribe from 'source' unless we receive `onCompleted` or `onError`, + * as we want to receive and cache all of its values. + * + * This means this should never be used on an infinite or very large sequence, similar to toList(). + */ + } + cache.unsafeSubscribe(s); + } +} diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 2989f50b9e..83c76dfe39 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -15,1161 +15,93 @@ */ package rx.internal.operators; -import java.util.*; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.*; -import rx.*; import rx.Observable; -import rx.exceptions.Exceptions; -import rx.functions.*; -import rx.observables.ConnectableObservable; -import rx.schedulers.Timestamped; -import rx.subscriptions.Subscriptions; +import rx.Observable.OnSubscribe; +import rx.Scheduler; +import rx.Subscriber; +import rx.subjects.Subject; -public final class OperatorReplay extends ConnectableObservable { - /** The source observable. */ - final Observable source; - /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ - final AtomicReference> current; - /** A factory that creates the appropriate buffer for the ReplaySubscriber. */ - final Func0> bufferFactory; - - @SuppressWarnings("rawtypes") - static final Func0 DEFAULT_UNBOUNDED_FACTORY = new Func0() { - @Override - public Object call() { - return new UnboundedReplayBuffer(16); - } - }; - - /** - * Given a connectable observable factory, it multicasts over the generated - * ConnectableObservable via a selector function. - * @param connectableFactory - * @param selector - * @return - */ - public static Observable multicastSelector( - final Func0> connectableFactory, - final Func1, ? extends Observable> selector) { - return Observable.create(new OnSubscribe() { - @Override - public void call(final Subscriber child) { - ConnectableObservable co; - Observable observable; - try { - co = connectableFactory.call(); - observable = selector.call(co); - } catch (Throwable e) { - Exceptions.throwIfFatal(e); - child.onError(e); - return; - } - - observable.subscribe(child); - - co.connect(new Action1() { - @Override - public void call(Subscription t) { - child.add(t); - } - }); - } - }); - } - - /** - * Child Subscribers will observe the events of the ConnectableObservable on the - * specified scheduler. - * @param co - * @param scheduler - * @return - */ - public static ConnectableObservable observeOn(final ConnectableObservable co, final Scheduler scheduler) { - final Observable observable = co.observeOn(scheduler); - OnSubscribe onSubscribe = new OnSubscribe() { - @Override - public void call(final Subscriber child) { - // apply observeOn and prevent calling onStart() again - observable.unsafeSubscribe(new Subscriber(child) { - @Override - public void onNext(T t) { - child.onNext(t); - } - @Override - public void onError(Throwable e) { - child.onError(e); - } - @Override - public void onCompleted() { - child.onCompleted(); - } - }); - } - }; - return new ConnectableObservable(onSubscribe) { - @Override - public void connect(Action1 connection) { - co.connect(connection); - } - }; - } - - /** - * Creates a replaying ConnectableObservable with an unbounded buffer. - * @param source - * @return - */ - @SuppressWarnings("unchecked") - public static ConnectableObservable create(Observable source) { - return create(source, DEFAULT_UNBOUNDED_FACTORY); - } - - /** - * Creates a replaying ConnectableObservable with a size bound buffer. - * @param source - * @param bufferSize - * @return - */ - public static ConnectableObservable create(Observable source, - final int bufferSize) { - if (bufferSize == Integer.MAX_VALUE) { - return create(source); - } - return create(source, new Func0>() { - @Override - public ReplayBuffer call() { - return new SizeBoundReplayBuffer(bufferSize); - } - }); +/** + * Replay with limited buffer and/or time constraints. + * + * + * @see MSDN: Observable.Replay overloads + */ +public final class OperatorReplay { + /** Utility class. */ + private OperatorReplay() { + throw new IllegalStateException("No instances!"); } /** - * Creates a replaying ConnectableObservable with a time bound buffer. - * @param source - * @param maxAge - * @param unit - * @param scheduler - * @return + * Creates a subject whose client observers will observe events + * propagated through the given wrapped subject. + * @param the element type + * @param subject the subject to wrap + * @param scheduler the target scheduler + * @return the created subject */ - public static ConnectableObservable create(Observable source, - long maxAge, TimeUnit unit, Scheduler scheduler) { - return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE); - } + public static Subject createScheduledSubject(Subject subject, Scheduler scheduler) { + final Observable observedOn = subject.observeOn(scheduler); + SubjectWrapper s = new SubjectWrapper(new OnSubscribe() { - /** - * Creates a replaying ConnectableObservable with a size and time bound buffer. - * @param source - * @param maxAge - * @param unit - * @param scheduler - * @param bufferSize - * @return - */ - public static ConnectableObservable create(Observable source, - long maxAge, TimeUnit unit, final Scheduler scheduler, final int bufferSize) { - final long maxAgeInMillis = unit.toMillis(maxAge); - return create(source, new Func0>() { @Override - public ReplayBuffer call() { - return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAgeInMillis, scheduler); + public void call(Subscriber o) { + subscriberOf(observedOn).call(o); } - }); + + }, subject); + return s; } /** - * Creates a OperatorReplay instance to replay values of the given source observable. - * @param source the source observable - * @param bufferFactory the factory to instantiate the appropriate buffer when the observable becomes active - * @return the connectable observable + * Return an OnSubscribeFunc which delegates the subscription to the given observable. + * + * @param the value type + * @param target the target observable + * @return the function that delegates the subscription to the target */ - static ConnectableObservable create(Observable source, - final Func0> bufferFactory) { - // the current connection to source needs to be shared between the operator and its onSubscribe call - final AtomicReference> curr = new AtomicReference>(); - OnSubscribe onSubscribe = new OnSubscribe() { + public static OnSubscribe subscriberOf(final Observable target) { + return new OnSubscribe() { @Override - public void call(Subscriber child) { - // concurrent connection/disconnection may change the state, - // we loop to be atomic while the child subscribes - for (;;) { - // get the current subscriber-to-source - ReplaySubscriber r = curr.get(); - // if there isn't one - if (r == null) { - // create a new subscriber to source - ReplaySubscriber u = new ReplaySubscriber(curr, bufferFactory.call()); - // perform extra initialization to avoid 'this' to escape during construction - u.init(); - // let's try setting it as the current subscriber-to-source - if (!curr.compareAndSet(r, u)) { - // didn't work, maybe someone else did it or the current subscriber - // to source has just finished - continue; - } - // we won, let's use it going onwards - r = u; - } - - // create the backpressure-managing producer for this child - InnerProducer inner = new InnerProducer(r, child); - // we try to add it to the array of producers - // if it fails, no worries because we will still have its buffer - // so it is going to replay it for us - r.add(inner); - // the producer has been registered with the current subscriber-to-source so - // at least it will receive the next terminal event - child.add(inner); - // setting the producer will trigger the first request to be considered by - // the subscriber-to-source. - child.setProducer(inner); - break; - } + public void call(Subscriber t1) { + target.unsafeSubscribe(t1); } }; - return new OperatorReplay(onSubscribe, source, curr, bufferFactory); - } - private OperatorReplay(OnSubscribe onSubscribe, Observable source, - final AtomicReference> current, - final Func0> bufferFactory) { - super(onSubscribe); - this.source = source; - this.current = current; - this.bufferFactory = bufferFactory; - } - - @Override - public void connect(Action1 connection) { - boolean doConnect = false; - ReplaySubscriber ps; - // we loop because concurrent connect/disconnect and termination may change the state - for (;;) { - // retrieve the current subscriber-to-source instance - ps = current.get(); - // if there is none yet or the current has unsubscribed - if (ps == null || ps.isUnsubscribed()) { - // create a new subscriber-to-source - ReplaySubscriber u = new ReplaySubscriber(current, bufferFactory.call()); - // initialize out the constructor to avoid 'this' to escape - u.init(); - // try setting it as the current subscriber-to-source - if (!current.compareAndSet(ps, u)) { - // did not work, perhaps a new subscriber arrived - // and created a new subscriber-to-source as well, retry - continue; - } - ps = u; - } - // if connect() was called concurrently, only one of them should actually - // connect to the source - doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); - break; - } - /* - * Notify the callback that we have a (new) connection which it can unsubscribe - * but since ps is unique to a connection, multiple calls to connect() will return the - * same Subscription and even if there was a connect-disconnect-connect pair, the older - * references won't disconnect the newer connection. - * Synchronous source consumers have the opportunity to disconnect via unsubscribe on the - * Subscription as unsafeSubscribe may never return in its own. - * - * Note however, that asynchronously disconnecting a running source might leave - * child-subscribers without any terminal event; ReplaySubject does not have this - * issue because the unsubscription was always triggered by the child-subscribers - * themselves. - */ - connection.call(ps); - if (doConnect) { - source.unsafeSubscribe(ps); - } } - - @SuppressWarnings("rawtypes") - static final class ReplaySubscriber extends Subscriber implements Subscription { - /** Holds notifications from upstream. */ - final ReplayBuffer buffer; - /** The notification-lite factory. */ - final NotificationLite nl; - /** Contains either an onCompleted or an onError token from upstream. */ - boolean done; - - /** Indicates an empty array of inner producers. */ - static final InnerProducer[] EMPTY = new InnerProducer[0]; - /** Indicates a terminated ReplaySubscriber. */ - static final InnerProducer[] TERMINATED = new InnerProducer[0]; - - /** Tracks the subscribed producers. */ - final AtomicReference producers; - /** - * Atomically changed from false to true by connect to make sure the - * connection is only performed by one thread. - */ - final AtomicBoolean shouldConnect; - - /** Guarded by this. */ - boolean emitting; - /** Guarded by this. */ - boolean missed; - - - /** Contains the maximum element index the child Subscribers requested so far. Accessed while emitting is true. */ - long maxChildRequested; - /** Counts the outstanding upstream requests until the producer arrives. */ - long maxUpstreamRequested; - /** The upstream producer. */ - volatile Producer producer; - - public ReplaySubscriber(AtomicReference> current, - ReplayBuffer buffer) { - this.buffer = buffer; - - this.nl = NotificationLite.instance(); - this.producers = new AtomicReference(EMPTY); - this.shouldConnect = new AtomicBoolean(); - // make sure the source doesn't produce values until the child subscribers - // expressed their request amounts - this.request(0); - } - /** Should be called after the constructor finished to setup nulling-out the current reference. */ - void init() { - add(Subscriptions.create(new Action0() { - @Override - public void call() { - ReplaySubscriber.this.producers.getAndSet(TERMINATED); - // unlike OperatorPublish, we can't null out the terminated so - // late subscribers can still get replay - // current.compareAndSet(ReplaySubscriber.this, null); - // we don't care if it fails because it means the current has - // been replaced in the meantime - } - })); - } - /** - * Atomically try adding a new InnerProducer to this Subscriber or return false if this - * Subscriber was terminated. - * @param producer the producer to add - * @return true if succeeded, false otherwise - */ - boolean add(InnerProducer producer) { - if (producer == null) { - throw new NullPointerException(); - } - // the state can change so we do a CAS loop to achieve atomicity - for (;;) { - // get the current producer array - InnerProducer[] c = producers.get(); - // if this subscriber-to-source reached a terminal state by receiving - // an onError or onCompleted, just refuse to add the new producer - if (c == TERMINATED) { - return false; - } - // we perform a copy-on-write logic - int len = c.length; - InnerProducer[] u = new InnerProducer[len + 1]; - System.arraycopy(c, 0, u, 0, len); - u[len] = producer; - // try setting the producers array - if (producers.compareAndSet(c, u)) { - return true; - } - // if failed, some other operation succeded (another add, remove or termination) - // so retry - } - } - - /** - * Atomically removes the given producer from the producers array. - * @param producer the producer to remove - */ - void remove(InnerProducer producer) { - // the state can change so we do a CAS loop to achieve atomicity - for (;;) { - // let's read the current producers array - InnerProducer[] c = producers.get(); - // if it is either empty or terminated, there is nothing to remove so we quit - if (c == EMPTY || c == TERMINATED) { - return; - } - // let's find the supplied producer in the array - // although this is O(n), we don't expect too many child subscribers in general - int j = -1; - int len = c.length; - for (int i = 0; i < len; i++) { - if (c[i].equals(producer)) { - j = i; - break; - } - } - // we didn't find it so just quit - if (j < 0) { - return; - } - // we do copy-on-write logic here - InnerProducer[] u; - // we don't create a new empty array if producer was the single inhabitant - // but rather reuse an empty array - if (len == 1) { - u = EMPTY; - } else { - // otherwise, create a new array one less in size - u = new InnerProducer[len - 1]; - // copy elements being before the given producer - System.arraycopy(c, 0, u, 0, j); - // copy elements being after the given producer - System.arraycopy(c, j + 1, u, j, len - j - 1); - } - // try setting this new array as - if (producers.compareAndSet(c, u)) { - return; - } - // if we failed, it means something else happened - // (a concurrent add/remove or termination), we need to retry - } - } - - @Override - public void setProducer(Producer p) { - Producer p0 = producer; - if (p0 != null) { - throw new IllegalStateException("Only a single producer can be set on a Subscriber."); - } - producer = p; - manageRequests(); - replay(); - } - - @Override - public void onNext(T t) { - if (!done) { - buffer.next(t); - replay(); - } - } - @Override - public void onError(Throwable e) { - // The observer front is accessed serially as required by spec so - // no need to CAS in the terminal value - if (!done) { - done = true; - try { - buffer.error(e); - replay(); - } finally { - unsubscribe(); // expectation of testIssue2191 - } - } - } - @Override - public void onCompleted() { - // The observer front is accessed serially as required by spec so - // no need to CAS in the terminal value - if (!done) { - done = true; - try { - buffer.complete(); - replay(); - } finally { - unsubscribe(); - } - } - } - - /** - * Coordinates the request amounts of various child Subscribers. - */ - void manageRequests() { - // if the upstream has completed, no more requesting is possible - if (isUnsubscribed()) { - return; - } - synchronized (this) { - if (emitting) { - missed = true; - return; - } - emitting = true; - } - for (;;) { - // if the upstream has completed, no more requesting is possible - if (isUnsubscribed()) { - return; - } - - @SuppressWarnings("unchecked") - InnerProducer[] a = producers.get(); - - long ri = maxChildRequested; - long maxTotalRequests = 0; - - for (InnerProducer rp : a) { - maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get()); - } - - long ur = maxUpstreamRequested; - Producer p = producer; - long diff = maxTotalRequests - ri; - if (diff != 0) { - maxChildRequested = maxTotalRequests; - if (p != null) { - if (ur != 0L) { - maxUpstreamRequested = 0L; - p.request(ur + diff); - } else { - p.request(diff); - } - } else { - // collect upstream request amounts until there is a producer for them - long u = ur + diff; - if (u < 0) { - u = Long.MAX_VALUE; - } - maxUpstreamRequested = u; - } - } else - // if there were outstanding upstream requests and we have a producer - if (ur != 0L && p != null) { - maxUpstreamRequested = 0L; - // fire the accumulated requests - p.request(ur); - } - - synchronized (this) { - if (!missed) { - emitting = false; - return; - } - missed = false; - } - } - } - - /** - * Tries to replay the buffer contents to all known subscribers. - */ - void replay() { - @SuppressWarnings("unchecked") - InnerProducer[] a = producers.get(); - for (InnerProducer rp : a) { - buffer.replay(rp); - } - } - } /** - * A Producer and Subscription that manages the request and unsubscription state of a - * child subscriber in thread-safe manner. - * We use AtomicLong as a base class to save on extra allocation of an AtomicLong and also - * save the overhead of the AtomicIntegerFieldUpdater. + * A subject that wraps another subject. * @param the value type */ - static final class InnerProducer extends AtomicLong implements Producer, Subscription { - /** */ - private static final long serialVersionUID = -4453897557930727610L; - /** - * The parent subscriber-to-source used to allow removing the child in case of - * child unsubscription. - */ - final ReplaySubscriber parent; - /** The actual child subscriber. */ - final Subscriber child; - /** - * Holds an object that represents the current location in the buffer. - * Guarded by the emitter loop. - */ - Object index; - /** - * Keeps the sum of all requested amounts. - */ - final AtomicLong totalRequested; - /** Indicates an emission state. Guarded by this. */ - boolean emitting; - /** Indicates a missed update. Guarded by this. */ - boolean missed; - /** - * Indicates this child has been unsubscribed: the state is swapped in atomically and - * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. - */ - static final long UNSUBSCRIBED = Long.MIN_VALUE; - - public InnerProducer(ReplaySubscriber parent, Subscriber child) { - this.parent = parent; - this.child = child; - this.totalRequested = new AtomicLong(); - } - - @Override - public void request(long n) { - // ignore negative requests - if (n < 0) { - return; - } - // In general, RxJava doesn't prevent concurrent requests (with each other or with - // an unsubscribe) so we need a CAS-loop, but we need to handle - // request overflow and unsubscribed/not requested state as well. - for (;;) { - // get the current request amount - long r = get(); - // if child called unsubscribe() do nothing - if (r == UNSUBSCRIBED) { - return; - } - // ignore zero requests except any first that sets in zero - if (r >= 0L && n == 0) { - return; - } - // otherwise, increase the request count - long u = r + n; - // and check for long overflow - if (u < 0) { - // cap at max value, which is essentially unlimited - u = Long.MAX_VALUE; - } - // try setting the new request value - if (compareAndSet(r, u)) { - // increment the total request counter - addTotalRequested(n); - // if successful, notify the parent dispacher this child can receive more - // elements - parent.manageRequests(); - - parent.buffer.replay(this); - return; - } - // otherwise, someone else changed the state (perhaps a concurrent - // request or unsubscription so retry - } - } - - /** - * Increments the total requested amount. - * @param n the additional request amount - */ - void addTotalRequested(long n) { - for (;;) { - long r = totalRequested.get(); - long u = r + n; - if (u < 0) { - u = Long.MAX_VALUE; - } - if (totalRequested.compareAndSet(r, u)) { - return; - } - } - } - - /** - * Indicate that values have been emitted to this child subscriber by the dispatch() method. - * @param n the number of items emitted - * @return the updated request value (may indicate how much can be produced or a terminal state) - */ - public long produced(long n) { - // we don't allow producing zero or less: it would be a bug in the operator - if (n <= 0) { - throw new IllegalArgumentException("Cant produce zero or less"); - } - for (;;) { - // get the current request value - long r = get(); - // if the child has unsubscribed, simply return and indicate this - if (r == UNSUBSCRIBED) { - return UNSUBSCRIBED; - } - // reduce the requested amount - long u = r - n; - // if the new amount is less than zero, we have a bug in this operator - if (u < 0) { - throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); - } - // try updating the request value - if (compareAndSet(r, u)) { - // and return the udpated value - return u; - } - // otherwise, some concurrent activity happened and we need to retry - } - } - - @Override - public boolean isUnsubscribed() { - return get() == UNSUBSCRIBED; - } - @Override - public void unsubscribe() { - long r = get(); - // let's see if we are unsubscribed - if (r != UNSUBSCRIBED) { - // if not, swap in the terminal state, this is idempotent - // because other methods using CAS won't overwrite this value, - // concurrent calls to unsubscribe will atomically swap in the same - // terminal value - r = getAndSet(UNSUBSCRIBED); - // and only one of them will see a non-terminated value before the swap - if (r != UNSUBSCRIBED) { - // remove this from the parent - parent.remove(this); - // After removal, we might have unblocked the other child subscribers: - // let's assume this child had 0 requested before the unsubscription while - // the others had non-zero. By removing this 'blocking' child, the others - // are now free to receive events - parent.manageRequests(); - } - } - } - /** - * Convenience method to auto-cast the index object. - * @return - */ - @SuppressWarnings("unchecked") - U index() { - return (U)index; - } - } - /** - * The interface for interacting with various buffering logic. - * - * @param the value type - */ - interface ReplayBuffer { - /** - * Adds a regular value to the buffer. - * @param value - */ - void next(T value); - /** - * Adds a terminal exception to the buffer - * @param e - */ - void error(Throwable e); - /** - * Adds a completion event to the buffer - */ - void complete(); - /** - * Tries to replay the buffered values to the - * subscriber inside the output if there - * is new value and requests available at the - * same time. - * @param output - */ - void replay(InnerProducer output); - } - - /** - * Holds an unbounded list of events. - * - * @param the value type - */ - static final class UnboundedReplayBuffer extends ArrayList implements ReplayBuffer { - /** */ - private static final long serialVersionUID = 7063189396499112664L; - final NotificationLite nl; - /** The total number of events in the buffer. */ - volatile int size; - - public UnboundedReplayBuffer(int capacityHint) { - super(capacityHint); - nl = NotificationLite.instance(); - } - @Override - public void next(T value) { - add(nl.next(value)); - size++; - } - - @Override - public void error(Throwable e) { - add(nl.error(e)); - size++; - } + public static final class SubjectWrapper extends Subject { + /** The wrapped subject. */ + final Subject subject; - @Override - public void complete() { - add(nl.completed()); - size++; + public SubjectWrapper(OnSubscribe func, Subject subject) { + super(func); + this.subject = subject; } @Override - public void replay(InnerProducer output) { - synchronized (output) { - if (output.emitting) { - output.missed = true; - return; - } - output.emitting = true; - } - for (;;) { - if (output.isUnsubscribed()) { - return; - } - int sourceIndex = size; - - Integer destIndexObject = output.index(); - int destIndex = destIndexObject != null ? destIndexObject.intValue() : 0; - - long r = output.get(); - long r0 = r; - long e = 0L; - - while (r != 0L && destIndex < sourceIndex) { - Object o = get(destIndex); - if (nl.accept(output.child, o)) { - return; - } - if (output.isUnsubscribed()) { - return; - } - destIndex++; - r--; - e++; - } - if (e != 0L) { - output.index = destIndex; - if (r0 != Long.MAX_VALUE) { - output.produced(e); - } - } - - synchronized (output) { - if (!output.missed) { - output.emitting = false; - return; - } - output.missed = false; - } - } - } - } - - /** - * Represents a node in a bounded replay buffer's linked list. - * - * @param the contained value type - */ - static final class Node extends AtomicReference { - /** */ - private static final long serialVersionUID = 245354315435971818L; - final Object value; - public Node(Object value) { - this.value = value; - } - } - - /** - * Base class for bounded buffering with options to specify an - * enter and leave transforms and custom truncation behavior. - * - * @param the value type - */ - static class BoundedReplayBuffer extends AtomicReference implements ReplayBuffer { - /** */ - private static final long serialVersionUID = 2346567790059478686L; - final NotificationLite nl; - - Node tail; - int size; - - public BoundedReplayBuffer() { - nl = NotificationLite.instance(); - Node n = new Node(null); - tail = n; - set(n); - } - - /** - * Add a new node to the linked list. - * @param n - */ - final void addLast(Node n) { - tail.set(n); - tail = n; - size++; - } - /** - * Remove the first node from the linked list. - */ - final void removeFirst() { - Node head = get(); - Node next = head.get(); - if (next == null) { - throw new IllegalStateException("Empty list!"); - } - size--; - // can't just move the head because it would retain the very first value - // can't null out the head's value because of late replayers would see null - setFirst(next); - } - /* test */ final void removeSome(int n) { - Node head = get(); - while (n > 0) { - head = head.get(); - n--; - size--; - } - - setFirst(head); - } - /** - * Arranges the given node is the new head from now on. - * @param n - */ - final void setFirst(Node n) { - set(n); - } - - @Override - public final void next(T value) { - Object o = enterTransform(nl.next(value)); - Node n = new Node(o); - addLast(n); - truncate(); + public void onNext(T args) { + subject.onNext(args); } @Override - public final void error(Throwable e) { - Object o = enterTransform(nl.error(e)); - Node n = new Node(o); - addLast(n); - truncateFinal(); + public void onError(Throwable e) { + subject.onError(e); } @Override - public final void complete() { - Object o = enterTransform(nl.completed()); - Node n = new Node(o); - addLast(n); - truncateFinal(); + public void onCompleted() { + subject.onCompleted(); } @Override - public final void replay(InnerProducer output) { - synchronized (output) { - if (output.emitting) { - output.missed = true; - return; - } - output.emitting = true; - } - for (;;) { - if (output.isUnsubscribed()) { - return; - } - - long r = output.get(); - long r0 = r; - long e = 0L; - - Node node = output.index(); - if (node == null) { - node = get(); - output.index = node; - } - - while (r != 0) { - Node v = node.get(); - if (v != null) { - Object o = leaveTransform(v.value); - if (nl.accept(output.child, o)) { - output.index = null; - return; - } - e++; - node = v; - } else { - break; - } - if (output.isUnsubscribed()) { - return; - } - } - - if (e != 0L) { - output.index = node; - if (r0 != Long.MAX_VALUE) { - output.produced(e); - } - } - - synchronized (output) { - if (!output.missed) { - output.emitting = false; - return; - } - output.missed = false; - } - } - - } - - /** - * Override this to wrap the NotificationLite object into a - * container to be used later by truncate. - * @param value - * @return - */ - Object enterTransform(Object value) { - return value; - } - /** - * Override this to unwrap the transformed value into a - * NotificationLite object. - * @param value - * @return - */ - Object leaveTransform(Object value) { - return value; - } - /** - * Override this method to truncate a non-terminated buffer - * based on its current properties. - */ - void truncate() { - - } - /** - * Override this method to truncate a terminated buffer - * based on its properties (i.e., truncate but the very last node). - */ - void truncateFinal() { - - } - /* test */ final void collect(Collection output) { - Node n = get(); - for (;;) { - Node next = n.get(); - if (next != null) { - Object o = next.value; - Object v = leaveTransform(o); - if (nl.isCompleted(v) || nl.isError(v)) { - break; - } - output.add(nl.getValue(v)); - n = next; - } else { - break; - } - } - } - /* test */ boolean hasError() { - return tail.value != null && nl.isError(leaveTransform(tail.value)); - } - /* test */ boolean hasCompleted() { - return tail.value != null && nl.isCompleted(leaveTransform(tail.value)); - } - } - - /** - * A bounded replay buffer implementation with size limit only. - * - * @param the value type - */ - static final class SizeBoundReplayBuffer extends BoundedReplayBuffer { - /** */ - private static final long serialVersionUID = -5898283885385201806L; - - final int limit; - public SizeBoundReplayBuffer(int limit) { - this.limit = limit; - } - - @Override - void truncate() { - // overflow can be at most one element - if (size > limit) { - removeFirst(); - } - } - - // no need for final truncation because values are truncated one by one - } - - /** - * Size and time bound replay buffer. - * - * @param the buffered value type - */ - static final class SizeAndTimeBoundReplayBuffer extends BoundedReplayBuffer { - /** */ - private static final long serialVersionUID = 3457957419649567404L; - final Scheduler scheduler; - final long maxAgeInMillis; - final int limit; - public SizeAndTimeBoundReplayBuffer(int limit, long maxAgeInMillis, Scheduler scheduler) { - this.scheduler = scheduler; - this.limit = limit; - this.maxAgeInMillis = maxAgeInMillis; - } - - @Override - Object enterTransform(Object value) { - return new Timestamped(scheduler.now(), value); - } - - @Override - Object leaveTransform(Object value) { - return ((Timestamped)value).getValue(); - } - - @Override - void truncate() { - long timeLimit = scheduler.now() - maxAgeInMillis; - - Node prev = get(); - Node next = prev.get(); - - int e = 0; - for (;;) { - if (next != null) { - if (size > limit) { - e++; - size--; - prev = next; - next = next.get(); - } else { - Timestamped v = (Timestamped)next.value; - if (v.getTimestampMillis() <= timeLimit) { - e++; - size--; - prev = next; - next = next.get(); - } else { - break; - } - } - } else { - break; - } - } - if (e != 0) { - setFirst(prev); - } - } - @Override - void truncateFinal() { - long timeLimit = scheduler.now() - maxAgeInMillis; - - Node prev = get(); - Node next = prev.get(); - - int e = 0; - for (;;) { - if (next != null && size > 1) { - Timestamped v = (Timestamped)next.value; - if (v.getTimestampMillis() <= timeLimit) { - e++; - size--; - prev = next; - next = next.get(); - } else { - break; - } - } else { - break; - } - } - if (e != 0) { - setFirst(prev); - } + public boolean hasObservers() { + return this.subject.hasObservers(); } } -} +} \ No newline at end of file diff --git a/src/main/java/rx/internal/util/CachedObservable.java b/src/main/java/rx/internal/util/CachedObservable.java deleted file mode 100644 index cda4b9d277..0000000000 --- a/src/main/java/rx/internal/util/CachedObservable.java +++ /dev/null @@ -1,432 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */package rx.internal.util; - -import java.util.concurrent.atomic.*; - -import rx.*; -import rx.internal.operators.NotificationLite; -import rx.subscriptions.SerialSubscription; - -/** - * An observable which auto-connects to another observable, caches the elements - * from that observable but allows terminating the connection and completing the cache. - * - * @param the source element type - */ -public final class CachedObservable extends Observable { - /** The cache and replay state. */ - private CacheState state; - - /** - * Creates a cached Observable with a default capacity hint of 16. - * @param source the source Observable to cache - * @return the CachedObservable instance - */ - public static CachedObservable from(Observable source) { - return from(source, 16); - } - - /** - * Creates a cached Observable with the given capacity hint. - * @param source the source Observable to cache - * @param capacityHint the hint for the internal buffer size - * @return the CachedObservable instance - */ - public static CachedObservable from(Observable source, int capacityHint) { - if (capacityHint < 1) { - throw new IllegalArgumentException("capacityHint > 0 required"); - } - CacheState state = new CacheState(source, capacityHint); - CachedSubscribe onSubscribe = new CachedSubscribe(state); - return new CachedObservable(onSubscribe, state); - } - - /** - * Private constructor because state needs to be shared between the Observable body and - * the onSubscribe function. - * @param onSubscribe - * @param state - */ - private CachedObservable(OnSubscribe onSubscribe, CacheState state) { - super(onSubscribe); - this.state = state; - } - - /** - * Check if this cached observable is connected to its source. - * @return true if already connected - */ - /* public */boolean isConnected() { - return state.isConnected; - } - - /** - * Returns true if there are observers subscribed to this observable. - * @return - */ - /* public */ boolean hasObservers() { - return state.producers.length != 0; - } - - /** - * Returns the number of events currently cached. - * @return - */ - /* public */ int cachedEventCount() { - return state.size(); - } - - /** - * Contains the active child producers and the values to replay. - * - * @param - */ - static final class CacheState extends LinkedArrayList implements Observer { - /** The source observable to connect to. */ - final Observable source; - /** Holds onto the subscriber connected to source. */ - final SerialSubscription connection; - /** Guarded by connection (not this). */ - volatile ReplayProducer[] producers; - /** The default empty array of producers. */ - static final ReplayProducer[] EMPTY = new ReplayProducer[0]; - - final NotificationLite nl; - - /** Set to true after connection. */ - volatile boolean isConnected; - /** - * Indicates that the source has completed emitting values or the - * Observable was forcefully terminated. - */ - boolean sourceDone; - - public CacheState(Observable source, int capacityHint) { - super(capacityHint); - this.source = source; - this.producers = EMPTY; - this.nl = NotificationLite.instance(); - this.connection = new SerialSubscription(); - } - /** - * Adds a ReplayProducer to the producers array atomically. - * @param p - */ - public void addProducer(ReplayProducer p) { - // guarding by connection to save on allocating another object - // thus there are two distinct locks guarding the value-addition and child come-and-go - synchronized (connection) { - ReplayProducer[] a = producers; - int n = a.length; - ReplayProducer[] b = new ReplayProducer[n + 1]; - System.arraycopy(a, 0, b, 0, n); - b[n] = p; - producers = b; - } - } - /** - * Removes the ReplayProducer (if present) from the producers array atomically. - * @param p - */ - public void removeProducer(ReplayProducer p) { - synchronized (connection) { - ReplayProducer[] a = producers; - int n = a.length; - int j = -1; - for (int i = 0; i < n; i++) { - if (a[i].equals(p)) { - j = i; - break; - } - } - if (j < 0) { - return; - } - if (n == 1) { - producers = EMPTY; - return; - } - ReplayProducer[] b = new ReplayProducer[n - 1]; - System.arraycopy(a, 0, b, 0, j); - System.arraycopy(a, j + 1, b, j, n - j - 1); - producers = b; - } - } - /** - * Connects the cache to the source. - * Make sure this is called only once. - */ - public void connect() { - connection.set(source.subscribe(this)); - isConnected = true; - } - @Override - public void onNext(T t) { - Object o = nl.next(t); - synchronized (this) { - if (!sourceDone) { - add(o); - } else { - return; - } - } - dispatch(); - } - @Override - public void onError(Throwable e) { - Object o = nl.error(e); - synchronized (this) { - if (!sourceDone) { - sourceDone = true; - add(o); - } else { - return; - } - } - connection.unsubscribe(); - dispatch(); - } - @Override - public void onCompleted() { - Object o = nl.completed(); - synchronized (this) { - if (!sourceDone) { - sourceDone = true; - add(o); - } else { - return; - } - } - connection.unsubscribe(); - dispatch(); - } - /** - * Signals all known children there is work to do. - */ - void dispatch() { - ReplayProducer[] a = producers; - for (ReplayProducer rp : a) { - rp.replay(); - } - } - } - - /** - * Manages the subscription of child subscribers by setting up a replay producer and - * performs auto-connection of the very first subscription. - * @param the value type emitted - */ - static final class CachedSubscribe extends AtomicBoolean implements OnSubscribe { - /** */ - private static final long serialVersionUID = -2817751667698696782L; - final CacheState state; - public CachedSubscribe(CacheState state) { - this.state = state; - } - @Override - public void call(Subscriber t) { - // we can connect first because we replay everything anyway - ReplayProducer rp = new ReplayProducer(t, state); - state.addProducer(rp); - - t.add(rp); - t.setProducer(rp); - - // we ensure a single connection here to save an instance field of AtomicBoolean in state. - if (!get() && compareAndSet(false, true)) { - state.connect(); - } - - // no need to call rp.replay() here because the very first request will trigger it anyway - } - } - - /** - * Keeps track of the current request amount and the replay position for a child Subscriber. - * - * @param - */ - static final class ReplayProducer extends AtomicLong implements Producer, Subscription { - /** */ - private static final long serialVersionUID = -2557562030197141021L; - /** The actual child subscriber. */ - final Subscriber child; - /** The cache state object. */ - final CacheState state; - - /** - * Contains the reference to the buffer segment in replay. - * Accessed after reading state.size() and when emitting == true. - */ - Object[] currentBuffer; - /** - * Contains the index into the currentBuffer where the next value is expected. - * Accessed after reading state.size() and when emitting == true. - */ - int currentIndexInBuffer; - /** - * Contains the absolute index up until the values have been replayed so far. - */ - int index; - - /** Indicates there is a replay going on; guarded by this. */ - boolean emitting; - /** Indicates there were some state changes/replay attempts; guarded by this. */ - boolean missed; - - public ReplayProducer(Subscriber child, CacheState state) { - this.child = child; - this.state = state; - } - @Override - public void request(long n) { - for (;;) { - long r = get(); - if (r < 0) { - return; - } - long u = r + n; - if (u < 0) { - u = Long.MAX_VALUE; - } - if (compareAndSet(r, u)) { - replay(); - return; - } - } - } - /** - * Updates the request count to reflect values have been produced. - * @param n - * @return - */ - public long produced(long n) { - return addAndGet(-n); - } - - @Override - public boolean isUnsubscribed() { - return get() < 0; - } - @Override - public void unsubscribe() { - long r = get(); - if (r >= 0) { - r = getAndSet(-1L); // unsubscribed state is negative - if (r >= 0) { - state.removeProducer(this); - } - } - } - - /** - * Continue replaying available values if there are requests for them. - */ - public void replay() { - // make sure there is only a single thread emitting - synchronized (this) { - if (emitting) { - missed = true; - return; - } - emitting = true; - } - boolean skipFinal = false; - try { - final NotificationLite nl = state.nl; - final Subscriber child = this.child; - - for (;;) { - - long r = get(); - // read the size, if it is non-zero, we can safely read the head and - // read values up to the given absolute index - int s = state.size(); - if (s != 0) { - Object[] b = currentBuffer; - - // latch onto the very first buffer now that it is available. - if (b == null) { - b = state.head(); - currentBuffer = b; - } - final int n = b.length - 1; - int j = index; - int k = currentIndexInBuffer; - // eagerly emit any terminal event - if (r == 0) { - Object o = b[k]; - if (nl.isCompleted(o)) { - child.onCompleted(); - skipFinal = true; - unsubscribe(); - return; - } else - if (nl.isError(o)) { - child.onError(nl.getError(o)); - skipFinal = true; - unsubscribe(); - return; - } - } else - if (r > 0) { - int valuesProduced = 0; - - while (j < s && r > 0 && !child.isUnsubscribed()) { - if (k == n) { - b = (Object[])b[n]; - k = 0; - } - Object o = b[k]; - - if (nl.accept(child, o)) { - skipFinal = true; - unsubscribe(); - return; - } - - k++; - j++; - r--; - valuesProduced++; - } - - index = j; - currentIndexInBuffer = k; - currentBuffer = b; - produced(valuesProduced); - } - } - - synchronized (this) { - if (!missed) { - emitting = false; - skipFinal = true; - return; - } - missed = false; - } - } - } finally { - if (!skipFinal) { - synchronized (this) { - emitting = false; - } - } - } - } - } -} diff --git a/src/main/java/rx/internal/util/LinkedArrayList.java b/src/main/java/rx/internal/util/LinkedArrayList.java deleted file mode 100644 index 57a1289640..0000000000 --- a/src/main/java/rx/internal/util/LinkedArrayList.java +++ /dev/null @@ -1,136 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.util; - -import java.util.*; - -/** - * A list implementation which combines an ArrayList with a LinkedList to - * avoid copying values when the capacity needs to be increased. - *

- * The class is non final to allow embedding it directly and thus saving on object allocation. - */ -public class LinkedArrayList { - /** The capacity of each array segment. */ - final int capacityHint; - /** - * Contains the head of the linked array list if not null. The - * length is always capacityHint + 1 and the last element is an Object[] pointing - * to the next element of the linked array list. - */ - Object[] head; - /** The tail array where new elements will be added. */ - Object[] tail; - /** - * The total size of the list; written after elements have been added (release) and - * and when read, the value indicates how many elements can be safely read (acquire). - */ - volatile int size; - /** The next available slot in the current tail. */ - int indexInTail; - /** - * Constructor with the capacity hint of each array segment. - * @param capacityHint - */ - public LinkedArrayList(int capacityHint) { - this.capacityHint = capacityHint; - } - /** - * Adds a new element to this list. - * @param o the object to add, nulls are accepted - */ - public void add(Object o) { - // if no value yet, create the first array - if (size == 0) { - head = new Object[capacityHint + 1]; - tail = head; - head[0] = o; - indexInTail = 1; - size = 1; - } else - // if the tail is full, create a new tail and link - if (indexInTail == capacityHint) { - Object[] t = new Object[capacityHint + 1]; - t[0] = o; - tail[capacityHint] = t; - tail = t; - indexInTail = 1; - size++; - } else { - tail[indexInTail] = o; - indexInTail++; - size++; - } - } - /** - * Returns the head buffer segment or null if the list is empty. - * @return - */ - public Object[] head() { - return head; - } - /** - * Returns the tail buffer segment or null if the list is empty. - * @return - */ - public Object[] tail() { - return tail; - } - /** - * Returns the total size of the list. - * @return - */ - public int size() { - return size; - } - /** - * Returns the index of the next slot in the tail buffer segment. - * @return - */ - public int indexInTail() { - return indexInTail; - } - /** - * Returns the capacity hint that indicates the capacity of each buffer segment. - * @return - */ - public int capacityHint() { - return capacityHint; - } - /* Test support */List toList() { - final int cap = capacityHint; - final int s = size; - final List list = new ArrayList(s + 1); - - Object[] h = head(); - int j = 0; - int k = 0; - while (j < s) { - list.add(h[k]); - j++; - if (++k == cap) { - k = 0; - h = (Object[])h[cap]; - } - } - - return list; - } - @Override - public String toString() { - return toList().toString(); - } -} \ No newline at end of file diff --git a/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java b/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java new file mode 100644 index 0000000000..0d74cd878b --- /dev/null +++ b/src/test/java/rx/internal/operators/OnSubscribeCacheTest.java @@ -0,0 +1,164 @@ +/** + * Copyright 2014 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.internal.operators; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.Arrays; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Subscriber; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; +import rx.functions.Func2; +import rx.observers.TestSubscriber; +import rx.schedulers.Schedulers; +import rx.subjects.AsyncSubject; +import rx.subjects.BehaviorSubject; +import rx.subjects.PublishSubject; +import rx.subjects.ReplaySubject; +import rx.subjects.Subject; + +public class OnSubscribeCacheTest { + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new Observable.OnSubscribe() { + + @Override + public void call(final Subscriber observer) { + new Thread(new Runnable() { + + @Override + public void run() { + counter.incrementAndGet(); + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + } + }).start(); + } + }).cache(); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + + private void testWithCustomSubjectAndRepeat(Subject subject, Integer... expected) { + Observable source0 = Observable.just(1, 2, 3) + .subscribeOn(Schedulers.io()) + .flatMap(new Func1>() { + @Override + public Observable call(final Integer i) { + return Observable.timer(i * 20, TimeUnit.MILLISECONDS).map(new Func1() { + @Override + public Integer call(Long t1) { + return i; + } + }); + } + }); + + Observable source1 = Observable.create(new OnSubscribeCache(source0, subject)); + + Observable source2 = source1 + .repeat(4) + .zipWith(Observable.interval(0, 10, TimeUnit.MILLISECONDS, Schedulers.newThread()), new Func2() { + @Override + public Integer call(Integer t1, Long t2) { + return t1; + } + + }); + TestSubscriber ts = new TestSubscriber(); + source2.subscribe(ts); + + ts.awaitTerminalEvent(); + ts.assertNoErrors(); + System.out.println(ts.getOnNextEvents()); + ts.assertReceivedOnNext(Arrays.asList(expected)); + } + + @Test(timeout = 10000) + public void testWithAsyncSubjectAndRepeat() { + testWithCustomSubjectAndRepeat(AsyncSubject. create(), 3, 3, 3, 3); + } + + @Test(timeout = 10000) + public void testWithBehaviorSubjectAndRepeat() { + // BehaviorSubject just completes when repeated + testWithCustomSubjectAndRepeat(BehaviorSubject.create(0), 0, 1, 2, 3); + } + + @Test(timeout = 10000) + public void testWithPublishSubjectAndRepeat() { + // PublishSubject just completes when repeated + testWithCustomSubjectAndRepeat(PublishSubject. create(), 1, 2, 3); + } + + @Test + public void testWithReplaySubjectAndRepeat() { + testWithCustomSubjectAndRepeat(ReplaySubject. create(), 1, 2, 3, 1, 2, 3, 1, 2, 3, 1, 2, 3); + } + + @Test + public void testUnsubscribeSource() { + Action0 unsubscribe = mock(Action0.class); + Observable o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache(); + o.subscribe(); + o.subscribe(); + o.subscribe(); + verify(unsubscribe, times(1)).call(); + } +} diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index 5c31503da4..a5ff85864d 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -16,27 +16,33 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.notNull; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; -import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.*; +import java.util.concurrent.atomic.AtomicInteger; -import org.junit.*; +import org.junit.Test; import org.mockito.InOrder; -import rx.*; -import rx.Scheduler.Worker; import rx.Observable; import rx.Observer; -import rx.functions.*; -import rx.internal.operators.OperatorReplay.BoundedReplayBuffer; -import rx.internal.operators.OperatorReplay.Node; -import rx.internal.operators.OperatorReplay.SizeAndTimeBoundReplayBuffer; +import rx.Scheduler; +import rx.Scheduler.Worker; +import rx.Subscription; +import rx.functions.Action0; +import rx.functions.Action1; +import rx.functions.Func1; import rx.observables.ConnectableObservable; -import rx.observers.TestSubscriber; -import rx.schedulers.*; +import rx.schedulers.TestScheduler; import rx.subjects.PublishSubject; public class OperatorReplayTest { @@ -733,132 +739,4 @@ public boolean isUnsubscribed() { } } - @Test - public void testBoundedReplayBuffer() { - BoundedReplayBuffer buf = new BoundedReplayBuffer(); - buf.addLast(new Node(1)); - buf.addLast(new Node(2)); - buf.addLast(new Node(3)); - buf.addLast(new Node(4)); - buf.addLast(new Node(5)); - - List values = new ArrayList(); - buf.collect(values); - - Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), values); - - buf.removeSome(2); - buf.removeFirst(); - buf.removeSome(2); - - values.clear(); - buf.collect(values); - Assert.assertTrue(values.isEmpty()); - - buf.addLast(new Node(5)); - buf.addLast(new Node(6)); - buf.collect(values); - - Assert.assertEquals(Arrays.asList(5, 6), values); - - } - - @Test - public void testTimedAndSizedTruncation() { - TestScheduler test = Schedulers.test(); - SizeAndTimeBoundReplayBuffer buf = new SizeAndTimeBoundReplayBuffer(2, 2000, test); - List values = new ArrayList(); - - buf.next(1); - test.advanceTimeBy(1, TimeUnit.SECONDS); - buf.next(2); - test.advanceTimeBy(1, TimeUnit.SECONDS); - buf.collect(values); - Assert.assertEquals(Arrays.asList(1, 2), values); - - buf.next(3); - buf.next(4); - values.clear(); - buf.collect(values); - Assert.assertEquals(Arrays.asList(3, 4), values); - - test.advanceTimeBy(2, TimeUnit.SECONDS); - buf.next(5); - - values.clear(); - buf.collect(values); - Assert.assertEquals(Arrays.asList(5), values); - - test.advanceTimeBy(2, TimeUnit.SECONDS); - buf.complete(); - - values.clear(); - buf.collect(values); - Assert.assertTrue(values.isEmpty()); - - Assert.assertEquals(1, buf.size); - Assert.assertTrue(buf.hasCompleted()); - } - - @Test - public void testBackpressure() { - final AtomicLong requested = new AtomicLong(); - Observable source = Observable.range(1, 1000) - .doOnRequest(new Action1() { - @Override - public void call(Long t) { - requested.addAndGet(t); - } - }); - ConnectableObservable co = source.replay(); - - TestSubscriber ts1 = TestSubscriber.create(10); - TestSubscriber ts2 = TestSubscriber.create(90); - - co.subscribe(ts1); - co.subscribe(ts2); - - ts2.requestMore(10); - - co.connect(); - - ts1.assertValueCount(10); - ts1.assertNoTerminalEvent(); - - ts2.assertValueCount(100); - ts2.assertNoTerminalEvent(); - - Assert.assertEquals(100, requested.get()); - } - - @Test - public void testBackpressureBounded() { - final AtomicLong requested = new AtomicLong(); - Observable source = Observable.range(1, 1000) - .doOnRequest(new Action1() { - @Override - public void call(Long t) { - requested.addAndGet(t); - } - }); - ConnectableObservable co = source.replay(50); - - TestSubscriber ts1 = TestSubscriber.create(10); - TestSubscriber ts2 = TestSubscriber.create(90); - - co.subscribe(ts1); - co.subscribe(ts2); - - ts2.requestMore(10); - - co.connect(); - - ts1.assertValueCount(10); - ts1.assertNoTerminalEvent(); - - ts2.assertValueCount(100); - ts2.assertNoTerminalEvent(); - - Assert.assertEquals(100, requested.get()); - } } \ No newline at end of file diff --git a/src/test/java/rx/internal/util/CachedObservableTest.java b/src/test/java/rx/internal/util/CachedObservableTest.java deleted file mode 100644 index c14018390f..0000000000 --- a/src/test/java/rx/internal/util/CachedObservableTest.java +++ /dev/null @@ -1,264 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.util; - -import static org.junit.Assert.*; -import static org.mockito.Mockito.*; - -import java.util.*; -import java.util.concurrent.*; -import java.util.concurrent.atomic.AtomicInteger; - -import org.junit.*; - -import rx.*; -import rx.Observable.OnSubscribe; -import rx.Observable; -import rx.exceptions.TestException; -import rx.functions.*; -import rx.observers.TestSubscriber; -import rx.schedulers.Schedulers; - -public class CachedObservableTest { - @Test - public void testColdReplayNoBackpressure() { - CachedObservable source = CachedObservable.from(Observable.range(0, 1000)); - - assertFalse("Source is connected!", source.isConnected()); - - TestSubscriber ts = new TestSubscriber(); - - source.subscribe(ts); - - assertTrue("Source is not connected!", source.isConnected()); - assertFalse("Subscribers retained!", source.hasObservers()); - - ts.assertNoErrors(); - ts.assertTerminalEvent(); - List onNextEvents = ts.getOnNextEvents(); - assertEquals(1000, onNextEvents.size()); - - for (int i = 0; i < 1000; i++) { - assertEquals((Integer)i, onNextEvents.get(i)); - } - } - @Test - public void testColdReplayBackpressure() { - CachedObservable source = CachedObservable.from(Observable.range(0, 1000)); - - assertFalse("Source is connected!", source.isConnected()); - - TestSubscriber ts = new TestSubscriber(); - ts.requestMore(10); - - source.subscribe(ts); - - assertTrue("Source is not connected!", source.isConnected()); - assertTrue("Subscribers not retained!", source.hasObservers()); - - ts.assertNoErrors(); - assertTrue(ts.getOnCompletedEvents().isEmpty()); - List onNextEvents = ts.getOnNextEvents(); - assertEquals(10, onNextEvents.size()); - - for (int i = 0; i < 10; i++) { - assertEquals((Integer)i, onNextEvents.get(i)); - } - - ts.unsubscribe(); - assertFalse("Subscribers retained!", source.hasObservers()); - } - - @Test - public void testCache() throws InterruptedException { - final AtomicInteger counter = new AtomicInteger(); - Observable o = Observable.create(new Observable.OnSubscribe() { - - @Override - public void call(final Subscriber observer) { - new Thread(new Runnable() { - - @Override - public void run() { - counter.incrementAndGet(); - System.out.println("published observable being executed"); - observer.onNext("one"); - observer.onCompleted(); - } - }).start(); - } - }).cache(); - - // we then expect the following 2 subscriptions to get that same value - final CountDownLatch latch = new CountDownLatch(2); - - // subscribe once - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - System.out.println("v: " + v); - latch.countDown(); - } - }); - - // subscribe again - o.subscribe(new Action1() { - - @Override - public void call(String v) { - assertEquals("one", v); - System.out.println("v: " + v); - latch.countDown(); - } - }); - - if (!latch.await(1000, TimeUnit.MILLISECONDS)) { - fail("subscriptions did not receive values"); - } - assertEquals(1, counter.get()); - } - - @Test - public void testUnsubscribeSource() { - Action0 unsubscribe = mock(Action0.class); - Observable o = Observable.just(1).doOnUnsubscribe(unsubscribe).cache(); - o.subscribe(); - o.subscribe(); - o.subscribe(); - verify(unsubscribe, times(1)).call(); - } - - @Test - public void testTake() { - TestSubscriber ts = new TestSubscriber(); - - CachedObservable cached = CachedObservable.from(Observable.range(1, 100)); - cached.take(10).subscribe(ts); - - ts.assertNoErrors(); - ts.assertTerminalEvent(); - ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - ts.assertUnsubscribed(); - assertFalse(cached.hasObservers()); - } - - @Test - public void testAsync() { - Observable source = Observable.range(1, 10000); - for (int i = 0; i < 100; i++) { - TestSubscriber ts1 = new TestSubscriber(); - - CachedObservable cached = CachedObservable.from(source); - - cached.observeOn(Schedulers.computation()).subscribe(ts1); - - ts1.awaitTerminalEvent(2, TimeUnit.SECONDS); - ts1.assertNoErrors(); - ts1.assertTerminalEvent(); - assertEquals(10000, ts1.getOnNextEvents().size()); - - TestSubscriber ts2 = new TestSubscriber(); - cached.observeOn(Schedulers.computation()).subscribe(ts2); - - ts2.awaitTerminalEvent(2, TimeUnit.SECONDS); - ts2.assertNoErrors(); - ts2.assertTerminalEvent(); - assertEquals(10000, ts2.getOnNextEvents().size()); - } - } - @Test - public void testAsyncComeAndGo() { - Observable source = Observable.timer(1, 1, TimeUnit.MILLISECONDS) - .take(1000) - .subscribeOn(Schedulers.io()); - CachedObservable cached = CachedObservable.from(source); - - Observable output = cached.observeOn(Schedulers.computation()); - - List> list = new ArrayList>(100); - for (int i = 0; i < 100; i++) { - TestSubscriber ts = new TestSubscriber(); - list.add(ts); - output.skip(i * 10).take(10).subscribe(ts); - } - - List expected = new ArrayList(); - for (int i = 0; i < 10; i++) { - expected.add((long)(i - 10)); - } - int j = 0; - for (TestSubscriber ts : list) { - ts.awaitTerminalEvent(3, TimeUnit.SECONDS); - ts.assertNoErrors(); - ts.assertTerminalEvent(); - - for (int i = j * 10; i < j * 10 + 10; i++) { - expected.set(i - j * 10, (long)i); - } - - ts.assertReceivedOnNext(expected); - - j++; - } - } - - @Test - public void testNoMissingBackpressureException() { - final int m = 4 * 1000 * 1000; - Observable firehose = Observable.create(new OnSubscribe() { - @Override - public void call(Subscriber t) { - for (int i = 0; i < m; i++) { - t.onNext(i); - } - t.onCompleted(); - } - }); - - TestSubscriber ts = new TestSubscriber(); - firehose.cache().observeOn(Schedulers.computation()).takeLast(100).subscribe(ts); - - ts.awaitTerminalEvent(3, TimeUnit.SECONDS); - ts.assertNoErrors(); - ts.assertTerminalEvent(); - - assertEquals(100, ts.getOnNextEvents().size()); - } - - @Test - public void testValuesAndThenError() { - Observable source = Observable.range(1, 10) - .concatWith(Observable.error(new TestException())) - .cache(); - - - TestSubscriber ts = new TestSubscriber(); - source.subscribe(ts); - - ts.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - Assert.assertTrue(ts.getOnCompletedEvents().isEmpty()); - Assert.assertEquals(1, ts.getOnErrorEvents().size()); - - TestSubscriber ts2 = new TestSubscriber(); - source.subscribe(ts2); - - ts2.assertReceivedOnNext(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)); - Assert.assertTrue(ts2.getOnCompletedEvents().isEmpty()); - Assert.assertEquals(1, ts2.getOnErrorEvents().size()); - } -} diff --git a/src/test/java/rx/internal/util/LinkedArrayListTest.java b/src/test/java/rx/internal/util/LinkedArrayListTest.java deleted file mode 100644 index af7e167c19..0000000000 --- a/src/test/java/rx/internal/util/LinkedArrayListTest.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Copyright 2014 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.internal.util; - -import java.util.*; -import static org.junit.Assert.*; - -import org.junit.Test; - -public class LinkedArrayListTest { - @Test - public void testAdd() { - LinkedArrayList list = new LinkedArrayList(16); - - List expected = new ArrayList(32); - for (int i = 0; i < 32; i++) { - list.add(i); - expected.add(i); - } - - assertEquals(expected, list.toList()); - assertEquals(32, list.size()); - } -}