diff --git a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java index 9ab9ca1c04..e62ea61494 100644 --- a/src/main/java/io/reactivex/flowables/ConnectableFlowable.java +++ b/src/main/java/io/reactivex/flowables/ConnectableFlowable.java @@ -13,16 +13,19 @@ package io.reactivex.flowables; -import io.reactivex.annotations.NonNull; +import java.util.concurrent.TimeUnit; + import org.reactivestreams.Subscriber; -import io.reactivex.Flowable; +import io.reactivex.*; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; -import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.functions.*; import io.reactivex.internal.operators.flowable.*; import io.reactivex.internal.util.ConnectConsumer; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; /** * A {@code ConnectableFlowable} resembles an ordinary {@link Flowable}, except that it does not begin @@ -68,15 +71,154 @@ public final Disposable connect() { /** * Returns a {@code Flowable} that stays connected to this {@code ConnectableFlowable} as long as there * is at least one subscription to this {@code ConnectableFlowable}. - * + *
+ *
Backpressure:
+ *
The operator itself doesn't interfere with backpressure which is determined by the upstream + * {@code ConnectableFlowable}'s backpressure behavior.
+ *
Scheduler:
+ *
This {@code refCount} overload does not operate on any particular {@link Scheduler}.
+ *
* @return a {@link Flowable} * @see ReactiveX documentation: RefCount + * @see #refCount(int) + * @see #refCount(long, TimeUnit) + * @see #refCount(int, long, TimeUnit) */ @NonNull + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) public Flowable refCount() { return RxJavaPlugins.onAssembly(new FlowableRefCount(this)); } + /** + * Connects to the upstream {@code ConnectableFlowable} if the number of subscribed + * subscriber reaches the specified count and disconnect if all subscribers have unsubscribed. + *
+ *
Backpressure:
+ *
The operator itself doesn't interfere with backpressure which is determined by the upstream + * {@code ConnectableFlowable}'s backpressure behavior.
+ *
Scheduler:
+ *
This {@code refCount} overload does not operate on any particular {@link Scheduler}.
+ *
+ * @param subscriberCount the number of subscribers required to connect to the upstream + * @return the new Flowable instance + * @since 2.1.14 - experimental + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final Flowable refCount(int subscriberCount) { + return refCount(subscriberCount, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline()); + } + + /** + * Connects to the upstream {@code ConnectableFlowable} if the number of subscribed + * subscriber reaches 1 and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Backpressure:
+ *
The operator itself doesn't interfere with backpressure which is determined by the upstream + * {@code ConnectableFlowable}'s backpressure behavior.
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.
+ *
+ * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @return the new Flowable instance + * @since 2.1.14 - experimental + * @see #refCount(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.COMPUTATION) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final Flowable refCount(long timeout, TimeUnit unit) { + return refCount(1, timeout, unit, Schedulers.computation()); + } + + /** + * Connects to the upstream {@code ConnectableFlowable} if the number of subscribed + * subscriber reaches 1 and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Backpressure:
+ *
The operator itself doesn't interfere with backpressure which is determined by the upstream + * {@code ConnectableFlowable}'s backpressure behavior.
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the specified {@link Scheduler}.
+ *
+ * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @param scheduler the target scheduler to wait on before disconnecting + * @return the new Flowable instance + * @since 2.1.14 - experimental + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.CUSTOM) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final Flowable refCount(long timeout, TimeUnit unit, Scheduler scheduler) { + return refCount(1, timeout, unit, scheduler); + } + + /** + * Connects to the upstream {@code ConnectableFlowable} if the number of subscribed + * subscriber reaches the specified count and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Backpressure:
+ *
The operator itself doesn't interfere with backpressure which is determined by the upstream + * {@code ConnectableFlowable}'s backpressure behavior.
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.
+ *
+ * @param subscriberCount the number of subscribers required to connect to the upstream + * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @return the new Flowable instance + * @since 2.1.14 - experimental + * @see #refCount(int, long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.COMPUTATION) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final Flowable refCount(int subscriberCount, long timeout, TimeUnit unit) { + return refCount(subscriberCount, timeout, unit, Schedulers.computation()); + } + + /** + * Connects to the upstream {@code ConnectableFlowable} if the number of subscribed + * subscriber reaches the specified count and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Backpressure:
+ *
The operator itself doesn't interfere with backpressure which is determined by the upstream + * {@code ConnectableFlowable}'s backpressure behavior.
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the specified {@link Scheduler}.
+ *
+ * @param subscriberCount the number of subscribers required to connect to the upstream + * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @param scheduler the target scheduler to wait on before disconnecting + * @return the new Flowable instance + * @since 2.1.14 - experimental + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.CUSTOM) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final Flowable refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler) { + ObjectHelper.verifyPositive(subscriberCount, "subscriberCount"); + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new FlowableRefCount(this, subscriberCount, timeout, unit, scheduler)); + } + /** * Returns a Flowable that automatically connects (at most once) to this ConnectableFlowable * when the first Subscriber subscribes. diff --git a/src/main/java/io/reactivex/observables/ConnectableObservable.java b/src/main/java/io/reactivex/observables/ConnectableObservable.java index 62c76ab9a6..04d1648077 100644 --- a/src/main/java/io/reactivex/observables/ConnectableObservable.java +++ b/src/main/java/io/reactivex/observables/ConnectableObservable.java @@ -13,15 +13,17 @@ package io.reactivex.observables; -import io.reactivex.annotations.NonNull; +import java.util.concurrent.TimeUnit; import io.reactivex.*; +import io.reactivex.annotations.*; import io.reactivex.disposables.Disposable; import io.reactivex.functions.Consumer; -import io.reactivex.internal.functions.Functions; +import io.reactivex.internal.functions.*; import io.reactivex.internal.operators.observable.*; import io.reactivex.internal.util.ConnectConsumer; import io.reactivex.plugins.RxJavaPlugins; +import io.reactivex.schedulers.Schedulers; /** * A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin @@ -67,15 +69,130 @@ public final Disposable connect() { /** * Returns an {@code Observable} that stays connected to this {@code ConnectableObservable} as long as there * is at least one subscription to this {@code ConnectableObservable}. - * + *
+ *
Scheduler:
+ *
This {@code refCount} overload does not operate on any particular {@link Scheduler}.
+ *
* @return an {@link Observable} * @see ReactiveX documentation: RefCount + * @see #refCount(int) + * @see #refCount(long, TimeUnit) + * @see #refCount(int, long, TimeUnit) */ @NonNull + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) public Observable refCount() { return RxJavaPlugins.onAssembly(new ObservableRefCount(this)); } + /** + * Connects to the upstream {@code ConnectableObservable} if the number of subscribed + * subscriber reaches the specified count and disconnect if all subscribers have unsubscribed. + *
+ *
Scheduler:
+ *
This {@code refCount} overload does not operate on any particular {@link Scheduler}.
+ *
+ * @param subscriberCount the number of subscribers required to connect to the upstream + * @return the new Observable instance + * @since 2.1.14 - experimental + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable refCount(int subscriberCount) { + return refCount(subscriberCount, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline()); + } + + /** + * Connects to the upstream {@code ConnectableObservable} if the number of subscribed + * subscriber reaches 1 and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.
+ *
+ * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @return the new Observable instance + * @since 2.1.14 - experimental + * @see #refCount(long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Observable refCount(long timeout, TimeUnit unit) { + return refCount(1, timeout, unit, Schedulers.computation()); + } + + /** + * Connects to the upstream {@code ConnectableObservable} if the number of subscribed + * subscriber reaches 1 and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the specified {@link Scheduler}.
+ *
+ * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @param scheduler the target scheduler to wait on before disconnecting + * @return the new Observable instance + * @since 2.1.14 - experimental + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Observable refCount(long timeout, TimeUnit unit, Scheduler scheduler) { + return refCount(1, timeout, unit, scheduler); + } + + /** + * Connects to the upstream {@code ConnectableObservable} if the number of subscribed + * subscriber reaches the specified count and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.
+ *
+ * @param subscriberCount the number of subscribers required to connect to the upstream + * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @return the new Observable instance + * @since 2.1.14 - experimental + * @see #refCount(int, long, TimeUnit, Scheduler) + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.COMPUTATION) + public final Observable refCount(int subscriberCount, long timeout, TimeUnit unit) { + return refCount(subscriberCount, timeout, unit, Schedulers.computation()); + } + + /** + * Connects to the upstream {@code ConnectableObservable} if the number of subscribed + * subscriber reaches the specified count and disconnect after the specified + * timeout if all subscribers have unsubscribed. + *
+ *
Scheduler:
+ *
This {@code refCount} overload operates on the specified {@link Scheduler}.
+ *
+ * @param subscriberCount the number of subscribers required to connect to the upstream + * @param timeout the time to wait before disconnecting after all subscribers unsubscribed + * @param unit the time unit of the timeout + * @param scheduler the target scheduler to wait on before disconnecting + * @return the new Observable instance + * @since 2.1.14 - experimental + */ + @CheckReturnValue + @Experimental + @SchedulerSupport(SchedulerSupport.CUSTOM) + public final Observable refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler) { + ObjectHelper.verifyPositive(subscriberCount, "subscriberCount"); + ObjectHelper.requireNonNull(unit, "unit is null"); + ObjectHelper.requireNonNull(scheduler, "scheduler is null"); + return RxJavaPlugins.onAssembly(new ObservableRefCount(this, subscriberCount, timeout, unit, scheduler)); + } + /** * Returns an Observable that automatically connects (at most once) to this ConnectableObservable * when the first Observer subscribes. diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java index 01a6d9396e..d91f4706d6 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableRefCountTest.java @@ -1001,31 +1001,6 @@ public void subscribe(FlowableEmitter emitter) throws Exception { assertTrue(interrupted.get()); } - static FlowableTransformer refCount(final int n) { - return refCount(n, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline()); - } - - static FlowableTransformer refCount(final long time, final TimeUnit unit) { - return refCount(1, time, unit, Schedulers.computation()); - } - - static FlowableTransformer refCount(final long time, final TimeUnit unit, final Scheduler scheduler) { - return refCount(1, time, unit, scheduler); - } - - static FlowableTransformer refCount(final int n, final long time, final TimeUnit unit) { - return refCount(1, time, unit, Schedulers.computation()); - } - - static FlowableTransformer refCount(final int n, final long time, final TimeUnit unit, final Scheduler scheduler) { - return new FlowableTransformer() { - @Override - public Publisher apply(Flowable f) { - return new FlowableRefCount((ConnectableFlowable)f, n, time, unit, scheduler); - } - }; - } - @Test public void byCount() { final int[] subscriptions = { 0 }; @@ -1038,7 +1013,7 @@ public void accept(Subscription s) throws Exception { } }) .publish() - .compose(FlowableRefCountTest.refCount(2)); + .refCount(2); for (int i = 0; i < 3; i++) { TestSubscriber ts1 = source.test(); @@ -1068,7 +1043,7 @@ public void accept(Subscription s) throws Exception { } }) .publish() - .compose(FlowableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)); + .refCount(500, TimeUnit.MILLISECONDS); TestSubscriber ts1 = source.test(0); @@ -1111,7 +1086,7 @@ public void accept(Subscription s) throws Exception { } }) .publish() - .compose(FlowableRefCountTest.refCount(1, 100, TimeUnit.MILLISECONDS)); + .refCount(1, 100, TimeUnit.MILLISECONDS); TestSubscriber ts1 = source.test(0); @@ -1130,25 +1105,18 @@ public void accept(Subscription s) throws Exception { public void error() { Flowable.error(new IOException()) .publish() - .compose(FlowableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)) + .refCount(500, TimeUnit.MILLISECONDS) .test() .assertFailure(IOException.class); } - @Test(expected = ClassCastException.class) - public void badUpstream() { - Flowable.range(1, 5) - .compose(FlowableRefCountTest.refCount(500, TimeUnit.MILLISECONDS, Schedulers.single())) - ; - } - @Test public void comeAndGo() { PublishProcessor pp = PublishProcessor.create(); Flowable source = pp .publish() - .compose(FlowableRefCountTest.refCount(1)); + .refCount(1); TestSubscriber ts1 = source.test(0); @@ -1171,7 +1139,7 @@ public void unsubscribeSubscribeRace() { final Flowable source = Flowable.range(1, 5) .replay() - .compose(FlowableRefCountTest.refCount(1)) + .refCount(1) ; final TestSubscriber ts1 = source.test(0); @@ -1247,6 +1215,38 @@ public void doubleOnX() { } } + @Test + public void doubleOnXCount() { + List errors = TestHelper.trackPluginErrors(); + try { + new BadFlowableDoubleOnX() + .refCount(1) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doubleOnXTime() { + List errors = TestHelper.trackPluginErrors(); + try { + new BadFlowableDoubleOnX() + .refCount(5, TimeUnit.SECONDS, Schedulers.single()) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void cancelTerminateStateExclusion() { FlowableRefCount o = (FlowableRefCount)PublishProcessor.create() diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java index 7648cc69b5..d46ae8b204 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableRefCountTest.java @@ -985,31 +985,6 @@ public void subscribe(ObservableEmitter emitter) throws Exception { assertTrue(interrupted.get()); } - static ObservableTransformer refCount(final int n) { - return refCount(n, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline()); - } - - static ObservableTransformer refCount(final long time, final TimeUnit unit) { - return refCount(1, time, unit, Schedulers.computation()); - } - - static ObservableTransformer refCount(final long time, final TimeUnit unit, final Scheduler scheduler) { - return refCount(1, time, unit, scheduler); - } - - static ObservableTransformer refCount(final int n, final long time, final TimeUnit unit) { - return refCount(1, time, unit, Schedulers.computation()); - } - - static ObservableTransformer refCount(final int n, final long time, final TimeUnit unit, final Scheduler scheduler) { - return new ObservableTransformer() { - @Override - public Observable apply(Observable f) { - return new ObservableRefCount((ConnectableObservable)f, n, time, unit, scheduler); - } - }; - } - @Test public void byCount() { final int[] subscriptions = { 0 }; @@ -1022,7 +997,7 @@ public void accept(Disposable s) throws Exception { } }) .publish() - .compose(ObservableRefCountTest.refCount(2)); + .refCount(2); for (int i = 0; i < 3; i++) { TestObserver to1 = source.test(); @@ -1052,7 +1027,7 @@ public void accept(Disposable s) throws Exception { } }) .publish() - .compose(ObservableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)); + .refCount(500, TimeUnit.MILLISECONDS); TestObserver to1 = source.test(); @@ -1095,7 +1070,7 @@ public void accept(Disposable s) throws Exception { } }) .publish() - .compose(ObservableRefCountTest.refCount(1, 100, TimeUnit.MILLISECONDS)); + .refCount(1, 100, TimeUnit.MILLISECONDS); TestObserver to1 = source.test(); @@ -1114,25 +1089,18 @@ public void accept(Disposable s) throws Exception { public void error() { Observable.error(new IOException()) .publish() - .compose(ObservableRefCountTest.refCount(500, TimeUnit.MILLISECONDS)) + .refCount(500, TimeUnit.MILLISECONDS) .test() .assertFailure(IOException.class); } - @Test(expected = ClassCastException.class) - public void badUpstream() { - Observable.range(1, 5) - .compose(ObservableRefCountTest.refCount(500, TimeUnit.MILLISECONDS, Schedulers.single())) - ; - } - @Test public void comeAndGo() { PublishSubject ps = PublishSubject.create(); Observable source = ps .publish() - .compose(ObservableRefCountTest.refCount(1)); + .refCount(1); TestObserver to1 = source.test(); @@ -1155,7 +1123,7 @@ public void unsubscribeSubscribeRace() { final Observable source = Observable.range(1, 5) .replay() - .compose(ObservableRefCountTest.refCount(1)) + .refCount(1) ; final TestObserver to1 = source.test(); @@ -1231,6 +1199,38 @@ public void doubleOnX() { } } + @Test + public void doubleOnXCount() { + List errors = TestHelper.trackPluginErrors(); + try { + new BadObservableDoubleOnX() + .refCount(1) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + + @Test + public void doubleOnXTime() { + List errors = TestHelper.trackPluginErrors(); + try { + new BadObservableDoubleOnX() + .refCount(5, TimeUnit.SECONDS, Schedulers.single()) + .test() + .assertResult(); + + TestHelper.assertError(errors, 0, ProtocolViolationException.class); + TestHelper.assertUndeliverable(errors, 1, TestException.class); + } finally { + RxJavaPlugins.reset(); + } + } + @Test public void cancelTerminateStateExclusion() { ObservableRefCount o = (ObservableRefCount)PublishSubject.create()