Skip to content

Commit cedfc53

Browse files
authored
2.x: Add refCount with count & disconnect timeout (#5986)
* 2.x: Add refCount with count & disconnect timeout * Ensure coverage
1 parent a957c78 commit cedfc53

File tree

4 files changed

+342
-83
lines changed

4 files changed

+342
-83
lines changed

src/main/java/io/reactivex/flowables/ConnectableFlowable.java

Lines changed: 146 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,16 +13,19 @@
1313

1414
package io.reactivex.flowables;
1515

16-
import io.reactivex.annotations.NonNull;
16+
import java.util.concurrent.TimeUnit;
17+
1718
import org.reactivestreams.Subscriber;
1819

19-
import io.reactivex.Flowable;
20+
import io.reactivex.*;
21+
import io.reactivex.annotations.*;
2022
import io.reactivex.disposables.Disposable;
2123
import io.reactivex.functions.Consumer;
22-
import io.reactivex.internal.functions.Functions;
24+
import io.reactivex.internal.functions.*;
2325
import io.reactivex.internal.operators.flowable.*;
2426
import io.reactivex.internal.util.ConnectConsumer;
2527
import io.reactivex.plugins.RxJavaPlugins;
28+
import io.reactivex.schedulers.Schedulers;
2629

2730
/**
2831
* A {@code ConnectableFlowable} resembles an ordinary {@link Flowable}, except that it does not begin
@@ -68,15 +71,154 @@ public final Disposable connect() {
6871
/**
6972
* Returns a {@code Flowable} that stays connected to this {@code ConnectableFlowable} as long as there
7073
* is at least one subscription to this {@code ConnectableFlowable}.
71-
*
74+
* <dl>
75+
* <dt><b>Backpressure:</b></dt>
76+
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
77+
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
78+
* <dt><b>Scheduler:</b></dt>
79+
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
80+
* </dl>
7281
* @return a {@link Flowable}
7382
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
83+
* @see #refCount(int)
84+
* @see #refCount(long, TimeUnit)
85+
* @see #refCount(int, long, TimeUnit)
7486
*/
7587
@NonNull
88+
@CheckReturnValue
89+
@SchedulerSupport(SchedulerSupport.NONE)
90+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
7691
public Flowable<T> refCount() {
7792
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this));
7893
}
7994

95+
/**
96+
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
97+
* subscriber reaches the specified count and disconnect if all subscribers have unsubscribed.
98+
* <dl>
99+
* <dt><b>Backpressure:</b></dt>
100+
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
101+
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
102+
* <dt><b>Scheduler:</b></dt>
103+
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
104+
* </dl>
105+
* @param subscriberCount the number of subscribers required to connect to the upstream
106+
* @return the new Flowable instance
107+
* @since 2.1.14 - experimental
108+
*/
109+
@CheckReturnValue
110+
@Experimental
111+
@SchedulerSupport(SchedulerSupport.NONE)
112+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
113+
public final Flowable<T> refCount(int subscriberCount) {
114+
return refCount(subscriberCount, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline());
115+
}
116+
117+
/**
118+
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
119+
* subscriber reaches 1 and disconnect after the specified
120+
* timeout if all subscribers have unsubscribed.
121+
* <dl>
122+
* <dt><b>Backpressure:</b></dt>
123+
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
124+
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
125+
* <dt><b>Scheduler:</b></dt>
126+
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
127+
* </dl>
128+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
129+
* @param unit the time unit of the timeout
130+
* @return the new Flowable instance
131+
* @since 2.1.14 - experimental
132+
* @see #refCount(long, TimeUnit, Scheduler)
133+
*/
134+
@CheckReturnValue
135+
@Experimental
136+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
137+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
138+
public final Flowable<T> refCount(long timeout, TimeUnit unit) {
139+
return refCount(1, timeout, unit, Schedulers.computation());
140+
}
141+
142+
/**
143+
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
144+
* subscriber reaches 1 and disconnect after the specified
145+
* timeout if all subscribers have unsubscribed.
146+
* <dl>
147+
* <dt><b>Backpressure:</b></dt>
148+
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
149+
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
150+
* <dt><b>Scheduler:</b></dt>
151+
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
152+
* </dl>
153+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
154+
* @param unit the time unit of the timeout
155+
* @param scheduler the target scheduler to wait on before disconnecting
156+
* @return the new Flowable instance
157+
* @since 2.1.14 - experimental
158+
*/
159+
@CheckReturnValue
160+
@Experimental
161+
@SchedulerSupport(SchedulerSupport.CUSTOM)
162+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
163+
public final Flowable<T> refCount(long timeout, TimeUnit unit, Scheduler scheduler) {
164+
return refCount(1, timeout, unit, scheduler);
165+
}
166+
167+
/**
168+
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
169+
* subscriber reaches the specified count and disconnect after the specified
170+
* timeout if all subscribers have unsubscribed.
171+
* <dl>
172+
* <dt><b>Backpressure:</b></dt>
173+
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
174+
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
175+
* <dt><b>Scheduler:</b></dt>
176+
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
177+
* </dl>
178+
* @param subscriberCount the number of subscribers required to connect to the upstream
179+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
180+
* @param unit the time unit of the timeout
181+
* @return the new Flowable instance
182+
* @since 2.1.14 - experimental
183+
* @see #refCount(int, long, TimeUnit, Scheduler)
184+
*/
185+
@CheckReturnValue
186+
@Experimental
187+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
188+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
189+
public final Flowable<T> refCount(int subscriberCount, long timeout, TimeUnit unit) {
190+
return refCount(subscriberCount, timeout, unit, Schedulers.computation());
191+
}
192+
193+
/**
194+
* Connects to the upstream {@code ConnectableFlowable} if the number of subscribed
195+
* subscriber reaches the specified count and disconnect after the specified
196+
* timeout if all subscribers have unsubscribed.
197+
* <dl>
198+
* <dt><b>Backpressure:</b></dt>
199+
* <dd>The operator itself doesn't interfere with backpressure which is determined by the upstream
200+
* {@code ConnectableFlowable}'s backpressure behavior.</dd>
201+
* <dt><b>Scheduler:</b></dt>
202+
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
203+
* </dl>
204+
* @param subscriberCount the number of subscribers required to connect to the upstream
205+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
206+
* @param unit the time unit of the timeout
207+
* @param scheduler the target scheduler to wait on before disconnecting
208+
* @return the new Flowable instance
209+
* @since 2.1.14 - experimental
210+
*/
211+
@CheckReturnValue
212+
@Experimental
213+
@SchedulerSupport(SchedulerSupport.CUSTOM)
214+
@BackpressureSupport(BackpressureKind.PASS_THROUGH)
215+
public final Flowable<T> refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler) {
216+
ObjectHelper.verifyPositive(subscriberCount, "subscriberCount");
217+
ObjectHelper.requireNonNull(unit, "unit is null");
218+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
219+
return RxJavaPlugins.onAssembly(new FlowableRefCount<T>(this, subscriberCount, timeout, unit, scheduler));
220+
}
221+
80222
/**
81223
* Returns a Flowable that automatically connects (at most once) to this ConnectableFlowable
82224
* when the first Subscriber subscribes.

src/main/java/io/reactivex/observables/ConnectableObservable.java

Lines changed: 120 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,15 +13,17 @@
1313

1414
package io.reactivex.observables;
1515

16-
import io.reactivex.annotations.NonNull;
16+
import java.util.concurrent.TimeUnit;
1717

1818
import io.reactivex.*;
19+
import io.reactivex.annotations.*;
1920
import io.reactivex.disposables.Disposable;
2021
import io.reactivex.functions.Consumer;
21-
import io.reactivex.internal.functions.Functions;
22+
import io.reactivex.internal.functions.*;
2223
import io.reactivex.internal.operators.observable.*;
2324
import io.reactivex.internal.util.ConnectConsumer;
2425
import io.reactivex.plugins.RxJavaPlugins;
26+
import io.reactivex.schedulers.Schedulers;
2527

2628
/**
2729
* A {@code ConnectableObservable} resembles an ordinary {@link Observable}, except that it does not begin
@@ -67,15 +69,130 @@ public final Disposable connect() {
6769
/**
6870
* Returns an {@code Observable} that stays connected to this {@code ConnectableObservable} as long as there
6971
* is at least one subscription to this {@code ConnectableObservable}.
70-
*
72+
* <dl>
73+
* <dt><b>Scheduler:</b></dt>
74+
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
75+
* </dl>
7176
* @return an {@link Observable}
7277
* @see <a href="http://reactivex.io/documentation/operators/refcount.html">ReactiveX documentation: RefCount</a>
78+
* @see #refCount(int)
79+
* @see #refCount(long, TimeUnit)
80+
* @see #refCount(int, long, TimeUnit)
7381
*/
7482
@NonNull
83+
@CheckReturnValue
84+
@SchedulerSupport(SchedulerSupport.NONE)
7585
public Observable<T> refCount() {
7686
return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(this));
7787
}
7888

89+
/**
90+
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
91+
* subscriber reaches the specified count and disconnect if all subscribers have unsubscribed.
92+
* <dl>
93+
* <dt><b>Scheduler:</b></dt>
94+
* <dd>This {@code refCount} overload does not operate on any particular {@link Scheduler}.</dd>
95+
* </dl>
96+
* @param subscriberCount the number of subscribers required to connect to the upstream
97+
* @return the new Observable instance
98+
* @since 2.1.14 - experimental
99+
*/
100+
@CheckReturnValue
101+
@Experimental
102+
@SchedulerSupport(SchedulerSupport.NONE)
103+
public final Observable<T> refCount(int subscriberCount) {
104+
return refCount(subscriberCount, 0, TimeUnit.NANOSECONDS, Schedulers.trampoline());
105+
}
106+
107+
/**
108+
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
109+
* subscriber reaches 1 and disconnect after the specified
110+
* timeout if all subscribers have unsubscribed.
111+
* <dl>
112+
* <dt><b>Scheduler:</b></dt>
113+
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
114+
* </dl>
115+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
116+
* @param unit the time unit of the timeout
117+
* @return the new Observable instance
118+
* @since 2.1.14 - experimental
119+
* @see #refCount(long, TimeUnit, Scheduler)
120+
*/
121+
@CheckReturnValue
122+
@Experimental
123+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
124+
public final Observable<T> refCount(long timeout, TimeUnit unit) {
125+
return refCount(1, timeout, unit, Schedulers.computation());
126+
}
127+
128+
/**
129+
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
130+
* subscriber reaches 1 and disconnect after the specified
131+
* timeout if all subscribers have unsubscribed.
132+
* <dl>
133+
* <dt><b>Scheduler:</b></dt>
134+
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
135+
* </dl>
136+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
137+
* @param unit the time unit of the timeout
138+
* @param scheduler the target scheduler to wait on before disconnecting
139+
* @return the new Observable instance
140+
* @since 2.1.14 - experimental
141+
*/
142+
@CheckReturnValue
143+
@Experimental
144+
@SchedulerSupport(SchedulerSupport.CUSTOM)
145+
public final Observable<T> refCount(long timeout, TimeUnit unit, Scheduler scheduler) {
146+
return refCount(1, timeout, unit, scheduler);
147+
}
148+
149+
/**
150+
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
151+
* subscriber reaches the specified count and disconnect after the specified
152+
* timeout if all subscribers have unsubscribed.
153+
* <dl>
154+
* <dt><b>Scheduler:</b></dt>
155+
* <dd>This {@code refCount} overload operates on the {@code computation} {@link Scheduler}.</dd>
156+
* </dl>
157+
* @param subscriberCount the number of subscribers required to connect to the upstream
158+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
159+
* @param unit the time unit of the timeout
160+
* @return the new Observable instance
161+
* @since 2.1.14 - experimental
162+
* @see #refCount(int, long, TimeUnit, Scheduler)
163+
*/
164+
@CheckReturnValue
165+
@Experimental
166+
@SchedulerSupport(SchedulerSupport.COMPUTATION)
167+
public final Observable<T> refCount(int subscriberCount, long timeout, TimeUnit unit) {
168+
return refCount(subscriberCount, timeout, unit, Schedulers.computation());
169+
}
170+
171+
/**
172+
* Connects to the upstream {@code ConnectableObservable} if the number of subscribed
173+
* subscriber reaches the specified count and disconnect after the specified
174+
* timeout if all subscribers have unsubscribed.
175+
* <dl>
176+
* <dt><b>Scheduler:</b></dt>
177+
* <dd>This {@code refCount} overload operates on the specified {@link Scheduler}.</dd>
178+
* </dl>
179+
* @param subscriberCount the number of subscribers required to connect to the upstream
180+
* @param timeout the time to wait before disconnecting after all subscribers unsubscribed
181+
* @param unit the time unit of the timeout
182+
* @param scheduler the target scheduler to wait on before disconnecting
183+
* @return the new Observable instance
184+
* @since 2.1.14 - experimental
185+
*/
186+
@CheckReturnValue
187+
@Experimental
188+
@SchedulerSupport(SchedulerSupport.CUSTOM)
189+
public final Observable<T> refCount(int subscriberCount, long timeout, TimeUnit unit, Scheduler scheduler) {
190+
ObjectHelper.verifyPositive(subscriberCount, "subscriberCount");
191+
ObjectHelper.requireNonNull(unit, "unit is null");
192+
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
193+
return RxJavaPlugins.onAssembly(new ObservableRefCount<T>(this, subscriberCount, timeout, unit, scheduler));
194+
}
195+
79196
/**
80197
* Returns an Observable that automatically connects (at most once) to this ConnectableObservable
81198
* when the first Observer subscribes.

0 commit comments

Comments
 (0)