diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 776026fdb4..c328582800 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -25,7 +25,6 @@ import rx.observers.SafeSubscriber; import rx.plugins.*; import rx.schedulers.*; -import rx.subjects.*; import rx.subscriptions.Subscriptions; /** @@ -5883,9 +5882,9 @@ public Void call(Notification notification) { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -5895,14 +5894,7 @@ public Void call(Notification notification) { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay() { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject. create(); - } - - }); + return OperatorReplay.create(this); } /** @@ -5912,9 +5904,9 @@ public final ConnectableObservable replay() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -5929,12 +5921,12 @@ public final ConnectableObservable replay() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.create(); + public ConnectableObservable call() { + return Observable.this.replay(); } - }, selector)); + }, selector); } /** @@ -5945,9 +5937,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -5965,12 +5957,12 @@ public final Subject call() { * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.createWithSize(bufferSize); + public ConnectableObservable call() { + return Observable.this.replay(bufferSize); } - }, selector)); + }, selector); } /** @@ -5981,9 +5973,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6017,9 +6009,9 @@ public final Observable replay(Func1, ? extends Obs * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6049,12 +6041,12 @@ public final Observable replay(Func1, ? extends Obs if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler); + public ConnectableObservable call() { + return Observable.this.replay(bufferSize, time, unit, scheduler); } - }, selector)); + }, selector); } /** @@ -6065,9 +6057,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6086,13 +6078,18 @@ public final Subject call() { * replaying no more than {@code bufferSize} notifications * @see ReactiveX operators documentation: Replay */ - public final Observable replay(Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + public final Observable replay(final Func1, ? extends Observable> selector, final int bufferSize, final Scheduler scheduler) { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return OperatorReplay. createScheduledSubject(ReplaySubject.createWithSize(bufferSize), scheduler); + public ConnectableObservable call() { + return Observable.this.replay(bufferSize); } - }, selector)); + }, new Func1, Observable>() { + @Override + public Observable call(Observable t) { + return selector.call(t).observeOn(scheduler); + } + }); } /** @@ -6103,9 +6100,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6136,9 +6133,9 @@ public final Observable replay(Func1, ? extends Obs * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6160,12 +6157,12 @@ public final Observable replay(Func1, ? extends Obs * @see ReactiveX operators documentation: Replay */ public final Observable replay(Func1, ? extends Observable> selector, final long time, final TimeUnit unit, final Scheduler scheduler) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return ReplaySubject.createWithTime(time, unit, scheduler); + public ConnectableObservable call() { + return Observable.this.replay(time, unit, scheduler); } - }, selector)); + }, selector); } /** @@ -6175,9 +6172,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6194,13 +6191,18 @@ public final Subject call() { * replaying all items * @see ReactiveX operators documentation: Replay */ - public final Observable replay(Func1, ? extends Observable> selector, final Scheduler scheduler) { - return create(new OnSubscribeMulticastSelector(this, new Func0>() { + public final Observable replay(final Func1, ? extends Observable> selector, final Scheduler scheduler) { + return OperatorReplay.multicastSelector(new Func0>() { @Override - public final Subject call() { - return OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler); + public ConnectableObservable call() { + return Observable.this.replay(); } - }, selector)); + }, new Func1, Observable>() { + @Override + public Observable call(Observable t) { + return selector.call(t).observeOn(scheduler); + } + }); } /** @@ -6212,9 +6214,9 @@ public final Subject call() { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} does not operate by default on a particular {@link Scheduler}.
*
@@ -6226,14 +6228,7 @@ public final Subject call() { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final int bufferSize) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject.createWithSize(bufferSize); - } - - }); + return OperatorReplay.create(this, bufferSize); } /** @@ -6245,9 +6240,9 @@ public final ConnectableObservable replay(final int bufferSize) { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6276,9 +6271,9 @@ public final ConnectableObservable replay(int bufferSize, long time, TimeUnit * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6302,14 +6297,7 @@ public final ConnectableObservable replay(final int bufferSize, final long ti if (bufferSize < 0) { throw new IllegalArgumentException("bufferSize < 0"); } - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject.createWithTimeAndSize(time, unit, bufferSize, scheduler); - } - - }); + return OperatorReplay.create(this, time, unit, scheduler, bufferSize); } /** @@ -6321,9 +6309,9 @@ public final ConnectableObservable replay(final int bufferSize, final long ti * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6337,14 +6325,7 @@ public final ConnectableObservable replay(final int bufferSize, final long ti * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final int bufferSize, final Scheduler scheduler) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return OperatorReplay.createScheduledSubject(ReplaySubject.createWithSize(bufferSize), scheduler); - } - - }); + return OperatorReplay.observeOn(replay(bufferSize), scheduler); } /** @@ -6356,9 +6337,9 @@ public final ConnectableObservable replay(final int bufferSize, final Schedul * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
This version of {@code replay} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -6384,9 +6365,9 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6402,14 +6383,7 @@ public final ConnectableObservable replay(long time, TimeUnit unit) { * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final long time, final TimeUnit unit, final Scheduler scheduler) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return ReplaySubject.createWithTime(time, unit, scheduler); - } - - }); + return OperatorReplay.create(this, time, unit, scheduler); } /** @@ -6421,9 +6395,9 @@ public final ConnectableObservable replay(final long time, final TimeUnit uni * *
*
Backpressure Support:
- *
This operator does not support backpressure because multicasting means the stream is "hot" with - * multiple subscribers. Each child will need to manage backpressure independently using operators such - * as {@link #onBackpressureDrop} and {@link #onBackpressureBuffer}.
+ *
This operator supports backpressure. Note that the upstream requests are determined by the child + * Subscriber which requests the largest amount: i.e., two child Subscribers with requests of 10 and 100 will + * request 100 elements from the underlying Observable sequence.
*
Scheduler:
*
you specify which {@link Scheduler} this operator will use
*
@@ -6436,14 +6410,7 @@ public final ConnectableObservable replay(final long time, final TimeUnit uni * @see ReactiveX operators documentation: Replay */ public final ConnectableObservable replay(final Scheduler scheduler) { - return new OperatorMulticast(this, new Func0>() { - - @Override - public Subject call() { - return OperatorReplay.createScheduledSubject(ReplaySubject. create(), scheduler); - } - - }); + return OperatorReplay.observeOn(replay(), scheduler); } /** diff --git a/src/main/java/rx/internal/operators/OperatorReplay.java b/src/main/java/rx/internal/operators/OperatorReplay.java index 83c76dfe39..2989f50b9e 100644 --- a/src/main/java/rx/internal/operators/OperatorReplay.java +++ b/src/main/java/rx/internal/operators/OperatorReplay.java @@ -15,93 +15,1161 @@ */ package rx.internal.operators; +import java.util.*; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.*; +import rx.*; import rx.Observable; -import rx.Observable.OnSubscribe; -import rx.Scheduler; -import rx.Subscriber; -import rx.subjects.Subject; +import rx.exceptions.Exceptions; +import rx.functions.*; +import rx.observables.ConnectableObservable; +import rx.schedulers.Timestamped; +import rx.subscriptions.Subscriptions; -/** - * Replay with limited buffer and/or time constraints. - * - * - * @see MSDN: Observable.Replay overloads - */ -public final class OperatorReplay { - /** Utility class. */ - private OperatorReplay() { - throw new IllegalStateException("No instances!"); +public final class OperatorReplay extends ConnectableObservable { + /** The source observable. */ + final Observable source; + /** Holds the current subscriber that is, will be or just was subscribed to the source observable. */ + final AtomicReference> current; + /** A factory that creates the appropriate buffer for the ReplaySubscriber. */ + final Func0> bufferFactory; + + @SuppressWarnings("rawtypes") + static final Func0 DEFAULT_UNBOUNDED_FACTORY = new Func0() { + @Override + public Object call() { + return new UnboundedReplayBuffer(16); + } + }; + + /** + * Given a connectable observable factory, it multicasts over the generated + * ConnectableObservable via a selector function. + * @param connectableFactory + * @param selector + * @return + */ + public static Observable multicastSelector( + final Func0> connectableFactory, + final Func1, ? extends Observable> selector) { + return Observable.create(new OnSubscribe() { + @Override + public void call(final Subscriber child) { + ConnectableObservable co; + Observable observable; + try { + co = connectableFactory.call(); + observable = selector.call(co); + } catch (Throwable e) { + Exceptions.throwIfFatal(e); + child.onError(e); + return; + } + + observable.subscribe(child); + + co.connect(new Action1() { + @Override + public void call(Subscription t) { + child.add(t); + } + }); + } + }); + } + + /** + * Child Subscribers will observe the events of the ConnectableObservable on the + * specified scheduler. + * @param co + * @param scheduler + * @return + */ + public static ConnectableObservable observeOn(final ConnectableObservable co, final Scheduler scheduler) { + final Observable observable = co.observeOn(scheduler); + OnSubscribe onSubscribe = new OnSubscribe() { + @Override + public void call(final Subscriber child) { + // apply observeOn and prevent calling onStart() again + observable.unsafeSubscribe(new Subscriber(child) { + @Override + public void onNext(T t) { + child.onNext(t); + } + @Override + public void onError(Throwable e) { + child.onError(e); + } + @Override + public void onCompleted() { + child.onCompleted(); + } + }); + } + }; + return new ConnectableObservable(onSubscribe) { + @Override + public void connect(Action1 connection) { + co.connect(connection); + } + }; + } + + /** + * Creates a replaying ConnectableObservable with an unbounded buffer. + * @param source + * @return + */ + @SuppressWarnings("unchecked") + public static ConnectableObservable create(Observable source) { + return create(source, DEFAULT_UNBOUNDED_FACTORY); + } + + /** + * Creates a replaying ConnectableObservable with a size bound buffer. + * @param source + * @param bufferSize + * @return + */ + public static ConnectableObservable create(Observable source, + final int bufferSize) { + if (bufferSize == Integer.MAX_VALUE) { + return create(source); + } + return create(source, new Func0>() { + @Override + public ReplayBuffer call() { + return new SizeBoundReplayBuffer(bufferSize); + } + }); } /** - * Creates a subject whose client observers will observe events - * propagated through the given wrapped subject. - * @param the element type - * @param subject the subject to wrap - * @param scheduler the target scheduler - * @return the created subject + * Creates a replaying ConnectableObservable with a time bound buffer. + * @param source + * @param maxAge + * @param unit + * @param scheduler + * @return */ - public static Subject createScheduledSubject(Subject subject, Scheduler scheduler) { - final Observable observedOn = subject.observeOn(scheduler); - SubjectWrapper s = new SubjectWrapper(new OnSubscribe() { + public static ConnectableObservable create(Observable source, + long maxAge, TimeUnit unit, Scheduler scheduler) { + return create(source, maxAge, unit, scheduler, Integer.MAX_VALUE); + } + /** + * Creates a replaying ConnectableObservable with a size and time bound buffer. + * @param source + * @param maxAge + * @param unit + * @param scheduler + * @param bufferSize + * @return + */ + public static ConnectableObservable create(Observable source, + long maxAge, TimeUnit unit, final Scheduler scheduler, final int bufferSize) { + final long maxAgeInMillis = unit.toMillis(maxAge); + return create(source, new Func0>() { @Override - public void call(Subscriber o) { - subscriberOf(observedOn).call(o); + public ReplayBuffer call() { + return new SizeAndTimeBoundReplayBuffer(bufferSize, maxAgeInMillis, scheduler); } - - }, subject); - return s; + }); } /** - * Return an OnSubscribeFunc which delegates the subscription to the given observable. - * - * @param the value type - * @param target the target observable - * @return the function that delegates the subscription to the target + * Creates a OperatorReplay instance to replay values of the given source observable. + * @param source the source observable + * @param bufferFactory the factory to instantiate the appropriate buffer when the observable becomes active + * @return the connectable observable */ - public static OnSubscribe subscriberOf(final Observable target) { - return new OnSubscribe() { + static ConnectableObservable create(Observable source, + final Func0> bufferFactory) { + // the current connection to source needs to be shared between the operator and its onSubscribe call + final AtomicReference> curr = new AtomicReference>(); + OnSubscribe onSubscribe = new OnSubscribe() { @Override - public void call(Subscriber t1) { - target.unsafeSubscribe(t1); + public void call(Subscriber child) { + // concurrent connection/disconnection may change the state, + // we loop to be atomic while the child subscribes + for (;;) { + // get the current subscriber-to-source + ReplaySubscriber r = curr.get(); + // if there isn't one + if (r == null) { + // create a new subscriber to source + ReplaySubscriber u = new ReplaySubscriber(curr, bufferFactory.call()); + // perform extra initialization to avoid 'this' to escape during construction + u.init(); + // let's try setting it as the current subscriber-to-source + if (!curr.compareAndSet(r, u)) { + // didn't work, maybe someone else did it or the current subscriber + // to source has just finished + continue; + } + // we won, let's use it going onwards + r = u; + } + + // create the backpressure-managing producer for this child + InnerProducer inner = new InnerProducer(r, child); + // we try to add it to the array of producers + // if it fails, no worries because we will still have its buffer + // so it is going to replay it for us + r.add(inner); + // the producer has been registered with the current subscriber-to-source so + // at least it will receive the next terminal event + child.add(inner); + // setting the producer will trigger the first request to be considered by + // the subscriber-to-source. + child.setProducer(inner); + break; + } } }; + return new OperatorReplay(onSubscribe, source, curr, bufferFactory); + } + private OperatorReplay(OnSubscribe onSubscribe, Observable source, + final AtomicReference> current, + final Func0> bufferFactory) { + super(onSubscribe); + this.source = source; + this.current = current; + this.bufferFactory = bufferFactory; + } + + @Override + public void connect(Action1 connection) { + boolean doConnect = false; + ReplaySubscriber ps; + // we loop because concurrent connect/disconnect and termination may change the state + for (;;) { + // retrieve the current subscriber-to-source instance + ps = current.get(); + // if there is none yet or the current has unsubscribed + if (ps == null || ps.isUnsubscribed()) { + // create a new subscriber-to-source + ReplaySubscriber u = new ReplaySubscriber(current, bufferFactory.call()); + // initialize out the constructor to avoid 'this' to escape + u.init(); + // try setting it as the current subscriber-to-source + if (!current.compareAndSet(ps, u)) { + // did not work, perhaps a new subscriber arrived + // and created a new subscriber-to-source as well, retry + continue; + } + ps = u; + } + // if connect() was called concurrently, only one of them should actually + // connect to the source + doConnect = !ps.shouldConnect.get() && ps.shouldConnect.compareAndSet(false, true); + break; + } + /* + * Notify the callback that we have a (new) connection which it can unsubscribe + * but since ps is unique to a connection, multiple calls to connect() will return the + * same Subscription and even if there was a connect-disconnect-connect pair, the older + * references won't disconnect the newer connection. + * Synchronous source consumers have the opportunity to disconnect via unsubscribe on the + * Subscription as unsafeSubscribe may never return in its own. + * + * Note however, that asynchronously disconnecting a running source might leave + * child-subscribers without any terminal event; ReplaySubject does not have this + * issue because the unsubscription was always triggered by the child-subscribers + * themselves. + */ + connection.call(ps); + if (doConnect) { + source.unsafeSubscribe(ps); + } } + + @SuppressWarnings("rawtypes") + static final class ReplaySubscriber extends Subscriber implements Subscription { + /** Holds notifications from upstream. */ + final ReplayBuffer buffer; + /** The notification-lite factory. */ + final NotificationLite nl; + /** Contains either an onCompleted or an onError token from upstream. */ + boolean done; + + /** Indicates an empty array of inner producers. */ + static final InnerProducer[] EMPTY = new InnerProducer[0]; + /** Indicates a terminated ReplaySubscriber. */ + static final InnerProducer[] TERMINATED = new InnerProducer[0]; + + /** Tracks the subscribed producers. */ + final AtomicReference producers; + /** + * Atomically changed from false to true by connect to make sure the + * connection is only performed by one thread. + */ + final AtomicBoolean shouldConnect; + + /** Guarded by this. */ + boolean emitting; + /** Guarded by this. */ + boolean missed; + + + /** Contains the maximum element index the child Subscribers requested so far. Accessed while emitting is true. */ + long maxChildRequested; + /** Counts the outstanding upstream requests until the producer arrives. */ + long maxUpstreamRequested; + /** The upstream producer. */ + volatile Producer producer; + + public ReplaySubscriber(AtomicReference> current, + ReplayBuffer buffer) { + this.buffer = buffer; + + this.nl = NotificationLite.instance(); + this.producers = new AtomicReference(EMPTY); + this.shouldConnect = new AtomicBoolean(); + // make sure the source doesn't produce values until the child subscribers + // expressed their request amounts + this.request(0); + } + /** Should be called after the constructor finished to setup nulling-out the current reference. */ + void init() { + add(Subscriptions.create(new Action0() { + @Override + public void call() { + ReplaySubscriber.this.producers.getAndSet(TERMINATED); + // unlike OperatorPublish, we can't null out the terminated so + // late subscribers can still get replay + // current.compareAndSet(ReplaySubscriber.this, null); + // we don't care if it fails because it means the current has + // been replaced in the meantime + } + })); + } + /** + * Atomically try adding a new InnerProducer to this Subscriber or return false if this + * Subscriber was terminated. + * @param producer the producer to add + * @return true if succeeded, false otherwise + */ + boolean add(InnerProducer producer) { + if (producer == null) { + throw new NullPointerException(); + } + // the state can change so we do a CAS loop to achieve atomicity + for (;;) { + // get the current producer array + InnerProducer[] c = producers.get(); + // if this subscriber-to-source reached a terminal state by receiving + // an onError or onCompleted, just refuse to add the new producer + if (c == TERMINATED) { + return false; + } + // we perform a copy-on-write logic + int len = c.length; + InnerProducer[] u = new InnerProducer[len + 1]; + System.arraycopy(c, 0, u, 0, len); + u[len] = producer; + // try setting the producers array + if (producers.compareAndSet(c, u)) { + return true; + } + // if failed, some other operation succeded (another add, remove or termination) + // so retry + } + } + + /** + * Atomically removes the given producer from the producers array. + * @param producer the producer to remove + */ + void remove(InnerProducer producer) { + // the state can change so we do a CAS loop to achieve atomicity + for (;;) { + // let's read the current producers array + InnerProducer[] c = producers.get(); + // if it is either empty or terminated, there is nothing to remove so we quit + if (c == EMPTY || c == TERMINATED) { + return; + } + // let's find the supplied producer in the array + // although this is O(n), we don't expect too many child subscribers in general + int j = -1; + int len = c.length; + for (int i = 0; i < len; i++) { + if (c[i].equals(producer)) { + j = i; + break; + } + } + // we didn't find it so just quit + if (j < 0) { + return; + } + // we do copy-on-write logic here + InnerProducer[] u; + // we don't create a new empty array if producer was the single inhabitant + // but rather reuse an empty array + if (len == 1) { + u = EMPTY; + } else { + // otherwise, create a new array one less in size + u = new InnerProducer[len - 1]; + // copy elements being before the given producer + System.arraycopy(c, 0, u, 0, j); + // copy elements being after the given producer + System.arraycopy(c, j + 1, u, j, len - j - 1); + } + // try setting this new array as + if (producers.compareAndSet(c, u)) { + return; + } + // if we failed, it means something else happened + // (a concurrent add/remove or termination), we need to retry + } + } + + @Override + public void setProducer(Producer p) { + Producer p0 = producer; + if (p0 != null) { + throw new IllegalStateException("Only a single producer can be set on a Subscriber."); + } + producer = p; + manageRequests(); + replay(); + } + + @Override + public void onNext(T t) { + if (!done) { + buffer.next(t); + replay(); + } + } + @Override + public void onError(Throwable e) { + // The observer front is accessed serially as required by spec so + // no need to CAS in the terminal value + if (!done) { + done = true; + try { + buffer.error(e); + replay(); + } finally { + unsubscribe(); // expectation of testIssue2191 + } + } + } + @Override + public void onCompleted() { + // The observer front is accessed serially as required by spec so + // no need to CAS in the terminal value + if (!done) { + done = true; + try { + buffer.complete(); + replay(); + } finally { + unsubscribe(); + } + } + } + + /** + * Coordinates the request amounts of various child Subscribers. + */ + void manageRequests() { + // if the upstream has completed, no more requesting is possible + if (isUnsubscribed()) { + return; + } + synchronized (this) { + if (emitting) { + missed = true; + return; + } + emitting = true; + } + for (;;) { + // if the upstream has completed, no more requesting is possible + if (isUnsubscribed()) { + return; + } + + @SuppressWarnings("unchecked") + InnerProducer[] a = producers.get(); + + long ri = maxChildRequested; + long maxTotalRequests = 0; + + for (InnerProducer rp : a) { + maxTotalRequests = Math.max(maxTotalRequests, rp.totalRequested.get()); + } + + long ur = maxUpstreamRequested; + Producer p = producer; + long diff = maxTotalRequests - ri; + if (diff != 0) { + maxChildRequested = maxTotalRequests; + if (p != null) { + if (ur != 0L) { + maxUpstreamRequested = 0L; + p.request(ur + diff); + } else { + p.request(diff); + } + } else { + // collect upstream request amounts until there is a producer for them + long u = ur + diff; + if (u < 0) { + u = Long.MAX_VALUE; + } + maxUpstreamRequested = u; + } + } else + // if there were outstanding upstream requests and we have a producer + if (ur != 0L && p != null) { + maxUpstreamRequested = 0L; + // fire the accumulated requests + p.request(ur); + } + + synchronized (this) { + if (!missed) { + emitting = false; + return; + } + missed = false; + } + } + } + + /** + * Tries to replay the buffer contents to all known subscribers. + */ + void replay() { + @SuppressWarnings("unchecked") + InnerProducer[] a = producers.get(); + for (InnerProducer rp : a) { + buffer.replay(rp); + } + } + } /** - * A subject that wraps another subject. + * A Producer and Subscription that manages the request and unsubscription state of a + * child subscriber in thread-safe manner. + * We use AtomicLong as a base class to save on extra allocation of an AtomicLong and also + * save the overhead of the AtomicIntegerFieldUpdater. * @param the value type */ - public static final class SubjectWrapper extends Subject { - /** The wrapped subject. */ - final Subject subject; + static final class InnerProducer extends AtomicLong implements Producer, Subscription { + /** */ + private static final long serialVersionUID = -4453897557930727610L; + /** + * The parent subscriber-to-source used to allow removing the child in case of + * child unsubscription. + */ + final ReplaySubscriber parent; + /** The actual child subscriber. */ + final Subscriber child; + /** + * Holds an object that represents the current location in the buffer. + * Guarded by the emitter loop. + */ + Object index; + /** + * Keeps the sum of all requested amounts. + */ + final AtomicLong totalRequested; + /** Indicates an emission state. Guarded by this. */ + boolean emitting; + /** Indicates a missed update. Guarded by this. */ + boolean missed; + /** + * Indicates this child has been unsubscribed: the state is swapped in atomically and + * will prevent the dispatch() to emit (too many) values to a terminated child subscriber. + */ + static final long UNSUBSCRIBED = Long.MIN_VALUE; + + public InnerProducer(ReplaySubscriber parent, Subscriber child) { + this.parent = parent; + this.child = child; + this.totalRequested = new AtomicLong(); + } + + @Override + public void request(long n) { + // ignore negative requests + if (n < 0) { + return; + } + // In general, RxJava doesn't prevent concurrent requests (with each other or with + // an unsubscribe) so we need a CAS-loop, but we need to handle + // request overflow and unsubscribed/not requested state as well. + for (;;) { + // get the current request amount + long r = get(); + // if child called unsubscribe() do nothing + if (r == UNSUBSCRIBED) { + return; + } + // ignore zero requests except any first that sets in zero + if (r >= 0L && n == 0) { + return; + } + // otherwise, increase the request count + long u = r + n; + // and check for long overflow + if (u < 0) { + // cap at max value, which is essentially unlimited + u = Long.MAX_VALUE; + } + // try setting the new request value + if (compareAndSet(r, u)) { + // increment the total request counter + addTotalRequested(n); + // if successful, notify the parent dispacher this child can receive more + // elements + parent.manageRequests(); + + parent.buffer.replay(this); + return; + } + // otherwise, someone else changed the state (perhaps a concurrent + // request or unsubscription so retry + } + } + + /** + * Increments the total requested amount. + * @param n the additional request amount + */ + void addTotalRequested(long n) { + for (;;) { + long r = totalRequested.get(); + long u = r + n; + if (u < 0) { + u = Long.MAX_VALUE; + } + if (totalRequested.compareAndSet(r, u)) { + return; + } + } + } + + /** + * Indicate that values have been emitted to this child subscriber by the dispatch() method. + * @param n the number of items emitted + * @return the updated request value (may indicate how much can be produced or a terminal state) + */ + public long produced(long n) { + // we don't allow producing zero or less: it would be a bug in the operator + if (n <= 0) { + throw new IllegalArgumentException("Cant produce zero or less"); + } + for (;;) { + // get the current request value + long r = get(); + // if the child has unsubscribed, simply return and indicate this + if (r == UNSUBSCRIBED) { + return UNSUBSCRIBED; + } + // reduce the requested amount + long u = r - n; + // if the new amount is less than zero, we have a bug in this operator + if (u < 0) { + throw new IllegalStateException("More produced (" + n + ") than requested (" + r + ")"); + } + // try updating the request value + if (compareAndSet(r, u)) { + // and return the udpated value + return u; + } + // otherwise, some concurrent activity happened and we need to retry + } + } + + @Override + public boolean isUnsubscribed() { + return get() == UNSUBSCRIBED; + } + @Override + public void unsubscribe() { + long r = get(); + // let's see if we are unsubscribed + if (r != UNSUBSCRIBED) { + // if not, swap in the terminal state, this is idempotent + // because other methods using CAS won't overwrite this value, + // concurrent calls to unsubscribe will atomically swap in the same + // terminal value + r = getAndSet(UNSUBSCRIBED); + // and only one of them will see a non-terminated value before the swap + if (r != UNSUBSCRIBED) { + // remove this from the parent + parent.remove(this); + // After removal, we might have unblocked the other child subscribers: + // let's assume this child had 0 requested before the unsubscription while + // the others had non-zero. By removing this 'blocking' child, the others + // are now free to receive events + parent.manageRequests(); + } + } + } + /** + * Convenience method to auto-cast the index object. + * @return + */ + @SuppressWarnings("unchecked") + U index() { + return (U)index; + } + } + /** + * The interface for interacting with various buffering logic. + * + * @param the value type + */ + interface ReplayBuffer { + /** + * Adds a regular value to the buffer. + * @param value + */ + void next(T value); + /** + * Adds a terminal exception to the buffer + * @param e + */ + void error(Throwable e); + /** + * Adds a completion event to the buffer + */ + void complete(); + /** + * Tries to replay the buffered values to the + * subscriber inside the output if there + * is new value and requests available at the + * same time. + * @param output + */ + void replay(InnerProducer output); + } + + /** + * Holds an unbounded list of events. + * + * @param the value type + */ + static final class UnboundedReplayBuffer extends ArrayList implements ReplayBuffer { + /** */ + private static final long serialVersionUID = 7063189396499112664L; + final NotificationLite nl; + /** The total number of events in the buffer. */ + volatile int size; + + public UnboundedReplayBuffer(int capacityHint) { + super(capacityHint); + nl = NotificationLite.instance(); + } + @Override + public void next(T value) { + add(nl.next(value)); + size++; + } - public SubjectWrapper(OnSubscribe func, Subject subject) { - super(func); - this.subject = subject; + @Override + public void error(Throwable e) { + add(nl.error(e)); + size++; } @Override - public void onNext(T args) { - subject.onNext(args); + public void complete() { + add(nl.completed()); + size++; } @Override - public void onError(Throwable e) { - subject.onError(e); + public void replay(InnerProducer output) { + synchronized (output) { + if (output.emitting) { + output.missed = true; + return; + } + output.emitting = true; + } + for (;;) { + if (output.isUnsubscribed()) { + return; + } + int sourceIndex = size; + + Integer destIndexObject = output.index(); + int destIndex = destIndexObject != null ? destIndexObject.intValue() : 0; + + long r = output.get(); + long r0 = r; + long e = 0L; + + while (r != 0L && destIndex < sourceIndex) { + Object o = get(destIndex); + if (nl.accept(output.child, o)) { + return; + } + if (output.isUnsubscribed()) { + return; + } + destIndex++; + r--; + e++; + } + if (e != 0L) { + output.index = destIndex; + if (r0 != Long.MAX_VALUE) { + output.produced(e); + } + } + + synchronized (output) { + if (!output.missed) { + output.emitting = false; + return; + } + output.missed = false; + } + } + } + } + + /** + * Represents a node in a bounded replay buffer's linked list. + * + * @param the contained value type + */ + static final class Node extends AtomicReference { + /** */ + private static final long serialVersionUID = 245354315435971818L; + final Object value; + public Node(Object value) { + this.value = value; + } + } + + /** + * Base class for bounded buffering with options to specify an + * enter and leave transforms and custom truncation behavior. + * + * @param the value type + */ + static class BoundedReplayBuffer extends AtomicReference implements ReplayBuffer { + /** */ + private static final long serialVersionUID = 2346567790059478686L; + final NotificationLite nl; + + Node tail; + int size; + + public BoundedReplayBuffer() { + nl = NotificationLite.instance(); + Node n = new Node(null); + tail = n; + set(n); + } + + /** + * Add a new node to the linked list. + * @param n + */ + final void addLast(Node n) { + tail.set(n); + tail = n; + size++; + } + /** + * Remove the first node from the linked list. + */ + final void removeFirst() { + Node head = get(); + Node next = head.get(); + if (next == null) { + throw new IllegalStateException("Empty list!"); + } + size--; + // can't just move the head because it would retain the very first value + // can't null out the head's value because of late replayers would see null + setFirst(next); + } + /* test */ final void removeSome(int n) { + Node head = get(); + while (n > 0) { + head = head.get(); + n--; + size--; + } + + setFirst(head); + } + /** + * Arranges the given node is the new head from now on. + * @param n + */ + final void setFirst(Node n) { + set(n); + } + + @Override + public final void next(T value) { + Object o = enterTransform(nl.next(value)); + Node n = new Node(o); + addLast(n); + truncate(); } @Override - public void onCompleted() { - subject.onCompleted(); + public final void error(Throwable e) { + Object o = enterTransform(nl.error(e)); + Node n = new Node(o); + addLast(n); + truncateFinal(); } @Override - public boolean hasObservers() { - return this.subject.hasObservers(); + public final void complete() { + Object o = enterTransform(nl.completed()); + Node n = new Node(o); + addLast(n); + truncateFinal(); + } + + @Override + public final void replay(InnerProducer output) { + synchronized (output) { + if (output.emitting) { + output.missed = true; + return; + } + output.emitting = true; + } + for (;;) { + if (output.isUnsubscribed()) { + return; + } + + long r = output.get(); + long r0 = r; + long e = 0L; + + Node node = output.index(); + if (node == null) { + node = get(); + output.index = node; + } + + while (r != 0) { + Node v = node.get(); + if (v != null) { + Object o = leaveTransform(v.value); + if (nl.accept(output.child, o)) { + output.index = null; + return; + } + e++; + node = v; + } else { + break; + } + if (output.isUnsubscribed()) { + return; + } + } + + if (e != 0L) { + output.index = node; + if (r0 != Long.MAX_VALUE) { + output.produced(e); + } + } + + synchronized (output) { + if (!output.missed) { + output.emitting = false; + return; + } + output.missed = false; + } + } + + } + + /** + * Override this to wrap the NotificationLite object into a + * container to be used later by truncate. + * @param value + * @return + */ + Object enterTransform(Object value) { + return value; + } + /** + * Override this to unwrap the transformed value into a + * NotificationLite object. + * @param value + * @return + */ + Object leaveTransform(Object value) { + return value; + } + /** + * Override this method to truncate a non-terminated buffer + * based on its current properties. + */ + void truncate() { + + } + /** + * Override this method to truncate a terminated buffer + * based on its properties (i.e., truncate but the very last node). + */ + void truncateFinal() { + + } + /* test */ final void collect(Collection output) { + Node n = get(); + for (;;) { + Node next = n.get(); + if (next != null) { + Object o = next.value; + Object v = leaveTransform(o); + if (nl.isCompleted(v) || nl.isError(v)) { + break; + } + output.add(nl.getValue(v)); + n = next; + } else { + break; + } + } + } + /* test */ boolean hasError() { + return tail.value != null && nl.isError(leaveTransform(tail.value)); + } + /* test */ boolean hasCompleted() { + return tail.value != null && nl.isCompleted(leaveTransform(tail.value)); + } + } + + /** + * A bounded replay buffer implementation with size limit only. + * + * @param the value type + */ + static final class SizeBoundReplayBuffer extends BoundedReplayBuffer { + /** */ + private static final long serialVersionUID = -5898283885385201806L; + + final int limit; + public SizeBoundReplayBuffer(int limit) { + this.limit = limit; + } + + @Override + void truncate() { + // overflow can be at most one element + if (size > limit) { + removeFirst(); + } + } + + // no need for final truncation because values are truncated one by one + } + + /** + * Size and time bound replay buffer. + * + * @param the buffered value type + */ + static final class SizeAndTimeBoundReplayBuffer extends BoundedReplayBuffer { + /** */ + private static final long serialVersionUID = 3457957419649567404L; + final Scheduler scheduler; + final long maxAgeInMillis; + final int limit; + public SizeAndTimeBoundReplayBuffer(int limit, long maxAgeInMillis, Scheduler scheduler) { + this.scheduler = scheduler; + this.limit = limit; + this.maxAgeInMillis = maxAgeInMillis; + } + + @Override + Object enterTransform(Object value) { + return new Timestamped(scheduler.now(), value); + } + + @Override + Object leaveTransform(Object value) { + return ((Timestamped)value).getValue(); + } + + @Override + void truncate() { + long timeLimit = scheduler.now() - maxAgeInMillis; + + Node prev = get(); + Node next = prev.get(); + + int e = 0; + for (;;) { + if (next != null) { + if (size > limit) { + e++; + size--; + prev = next; + next = next.get(); + } else { + Timestamped v = (Timestamped)next.value; + if (v.getTimestampMillis() <= timeLimit) { + e++; + size--; + prev = next; + next = next.get(); + } else { + break; + } + } + } else { + break; + } + } + if (e != 0) { + setFirst(prev); + } + } + @Override + void truncateFinal() { + long timeLimit = scheduler.now() - maxAgeInMillis; + + Node prev = get(); + Node next = prev.get(); + + int e = 0; + for (;;) { + if (next != null && size > 1) { + Timestamped v = (Timestamped)next.value; + if (v.getTimestampMillis() <= timeLimit) { + e++; + size--; + prev = next; + next = next.get(); + } else { + break; + } + } else { + break; + } + } + if (e != 0) { + setFirst(prev); + } } } -} \ No newline at end of file +} diff --git a/src/test/java/rx/internal/operators/OperatorReplayTest.java b/src/test/java/rx/internal/operators/OperatorReplayTest.java index a5ff85864d..5c31503da4 100644 --- a/src/test/java/rx/internal/operators/OperatorReplayTest.java +++ b/src/test/java/rx/internal/operators/OperatorReplayTest.java @@ -16,33 +16,27 @@ package rx.internal.operators; import static org.junit.Assert.assertEquals; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.notNull; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.spy; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; -import static org.mockito.Mockito.when; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; +import java.util.*; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.*; -import org.junit.Test; +import org.junit.*; import org.mockito.InOrder; +import rx.*; +import rx.Scheduler.Worker; import rx.Observable; import rx.Observer; -import rx.Scheduler; -import rx.Scheduler.Worker; -import rx.Subscription; -import rx.functions.Action0; -import rx.functions.Action1; -import rx.functions.Func1; +import rx.functions.*; +import rx.internal.operators.OperatorReplay.BoundedReplayBuffer; +import rx.internal.operators.OperatorReplay.Node; +import rx.internal.operators.OperatorReplay.SizeAndTimeBoundReplayBuffer; import rx.observables.ConnectableObservable; -import rx.schedulers.TestScheduler; +import rx.observers.TestSubscriber; +import rx.schedulers.*; import rx.subjects.PublishSubject; public class OperatorReplayTest { @@ -739,4 +733,132 @@ public boolean isUnsubscribed() { } } + @Test + public void testBoundedReplayBuffer() { + BoundedReplayBuffer buf = new BoundedReplayBuffer(); + buf.addLast(new Node(1)); + buf.addLast(new Node(2)); + buf.addLast(new Node(3)); + buf.addLast(new Node(4)); + buf.addLast(new Node(5)); + + List values = new ArrayList(); + buf.collect(values); + + Assert.assertEquals(Arrays.asList(1, 2, 3, 4, 5), values); + + buf.removeSome(2); + buf.removeFirst(); + buf.removeSome(2); + + values.clear(); + buf.collect(values); + Assert.assertTrue(values.isEmpty()); + + buf.addLast(new Node(5)); + buf.addLast(new Node(6)); + buf.collect(values); + + Assert.assertEquals(Arrays.asList(5, 6), values); + + } + + @Test + public void testTimedAndSizedTruncation() { + TestScheduler test = Schedulers.test(); + SizeAndTimeBoundReplayBuffer buf = new SizeAndTimeBoundReplayBuffer(2, 2000, test); + List values = new ArrayList(); + + buf.next(1); + test.advanceTimeBy(1, TimeUnit.SECONDS); + buf.next(2); + test.advanceTimeBy(1, TimeUnit.SECONDS); + buf.collect(values); + Assert.assertEquals(Arrays.asList(1, 2), values); + + buf.next(3); + buf.next(4); + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(3, 4), values); + + test.advanceTimeBy(2, TimeUnit.SECONDS); + buf.next(5); + + values.clear(); + buf.collect(values); + Assert.assertEquals(Arrays.asList(5), values); + + test.advanceTimeBy(2, TimeUnit.SECONDS); + buf.complete(); + + values.clear(); + buf.collect(values); + Assert.assertTrue(values.isEmpty()); + + Assert.assertEquals(1, buf.size); + Assert.assertTrue(buf.hasCompleted()); + } + + @Test + public void testBackpressure() { + final AtomicLong requested = new AtomicLong(); + Observable source = Observable.range(1, 1000) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + requested.addAndGet(t); + } + }); + ConnectableObservable co = source.replay(); + + TestSubscriber ts1 = TestSubscriber.create(10); + TestSubscriber ts2 = TestSubscriber.create(90); + + co.subscribe(ts1); + co.subscribe(ts2); + + ts2.requestMore(10); + + co.connect(); + + ts1.assertValueCount(10); + ts1.assertNoTerminalEvent(); + + ts2.assertValueCount(100); + ts2.assertNoTerminalEvent(); + + Assert.assertEquals(100, requested.get()); + } + + @Test + public void testBackpressureBounded() { + final AtomicLong requested = new AtomicLong(); + Observable source = Observable.range(1, 1000) + .doOnRequest(new Action1() { + @Override + public void call(Long t) { + requested.addAndGet(t); + } + }); + ConnectableObservable co = source.replay(50); + + TestSubscriber ts1 = TestSubscriber.create(10); + TestSubscriber ts2 = TestSubscriber.create(90); + + co.subscribe(ts1); + co.subscribe(ts2); + + ts2.requestMore(10); + + co.connect(); + + ts1.assertValueCount(10); + ts1.assertNoTerminalEvent(); + + ts2.assertValueCount(100); + ts2.assertNoTerminalEvent(); + + Assert.assertEquals(100, requested.get()); + } } \ No newline at end of file