Skip to content

Commit 66fd701

Browse files
authored
3.x: Add subscribe with disposable container (ReactiveX#7298)
1 parent a9e0a8a commit 66fd701

18 files changed

+2009
-14
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.reactivestreams.*;
2020

2121
import io.reactivex.rxjava3.annotations.*;
22-
import io.reactivex.rxjava3.disposables.Disposable;
22+
import io.reactivex.rxjava3.disposables.*;
2323
import io.reactivex.rxjava3.exceptions.*;
2424
import io.reactivex.rxjava3.functions.*;
2525
import io.reactivex.rxjava3.internal.functions.*;
@@ -29,7 +29,7 @@
2929
import io.reactivex.rxjava3.internal.operators.completable.*;
3030
import io.reactivex.rxjava3.internal.operators.maybe.*;
3131
import io.reactivex.rxjava3.internal.operators.mixed.*;
32-
import io.reactivex.rxjava3.internal.operators.single.*;
32+
import io.reactivex.rxjava3.internal.operators.single.SingleDelayWithCompletable;
3333
import io.reactivex.rxjava3.observers.TestObserver;
3434
import io.reactivex.rxjava3.plugins.RxJavaPlugins;
3535
import io.reactivex.rxjava3.schedulers.Schedulers;
@@ -2836,6 +2836,7 @@ public final Completable hide() {
28362836
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
28372837
* </dl>
28382838
* @return the new {@code Disposable} that can be used for disposing the subscription at any time
2839+
* @see #subscribe(Action, Consumer, DisposableContainer)
28392840
*/
28402841
@SchedulerSupport(SchedulerSupport.NONE)
28412842
@NonNull
@@ -2921,6 +2922,7 @@ public final void subscribe(@NonNull CompletableObserver observer) {
29212922
* @param onError the {@link Consumer} that is called if this {@code Completable} emits an error
29222923
* @return the new {@link Disposable} that can be used for disposing the subscription at any time
29232924
* @throws NullPointerException if {@code onComplete} or {@code onError} is {@code null}
2925+
* @see #subscribe(Action, Consumer, DisposableContainer)
29242926
*/
29252927
@CheckReturnValue
29262928
@NonNull
@@ -2934,6 +2936,44 @@ public final Disposable subscribe(@NonNull Action onComplete, @NonNull Consumer<
29342936
return observer;
29352937
}
29362938

2939+
/**
2940+
* Wraps the given onXXX callbacks into a {@link Disposable} {@link CompletableObserver},
2941+
* adds it to the given {@link DisposableContainer} and ensures, that if the upstream
2942+
* terminates or this particular {@code Disposable} is disposed, the {@code CompletableObserver} is removed
2943+
* from the given composite.
2944+
* <p>
2945+
* The {@code CompletableObserver} will be removed after the callback for the terminal event has been invoked.
2946+
* <dl>
2947+
* <dt><b>Scheduler:</b></dt>
2948+
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
2949+
* </dl>
2950+
* @param onError the callback for an upstream error
2951+
* @param onComplete the callback for an upstream completion
2952+
* @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
2953+
* created {@code Disposable} {@code CompletableObserver}
2954+
* @return the {@code Disposable} that allows disposing the particular subscription.
2955+
* @throws NullPointerException
2956+
* if {@code onComplete}, {@code onError}
2957+
* or {@code container} is {@code null}
2958+
* @since 3.1.0
2959+
*/
2960+
@SchedulerSupport(SchedulerSupport.NONE)
2961+
@NonNull
2962+
public final Disposable subscribe(
2963+
@NonNull Action onComplete,
2964+
@NonNull Consumer<? super Throwable> onError,
2965+
@NonNull DisposableContainer container) {
2966+
Objects.requireNonNull(onComplete, "onComplete is null");
2967+
Objects.requireNonNull(onError, "onError is null");
2968+
Objects.requireNonNull(container, "container is null");
2969+
2970+
DisposableAutoReleaseMultiObserver<Void> observer = new DisposableAutoReleaseMultiObserver<>(
2971+
container, Functions.emptyConsumer(), onError, onComplete);
2972+
container.add(observer);
2973+
subscribe(observer);
2974+
return observer;
2975+
}
2976+
29372977
/**
29382978
* Subscribes to this {@code Completable} and calls the given {@link Action} when this {@code Completable}
29392979
* completes normally.
@@ -2950,6 +2990,7 @@ public final Disposable subscribe(@NonNull Action onComplete, @NonNull Consumer<
29502990
* @param onComplete the {@code Action} called when this {@code Completable} completes normally
29512991
* @return the new {@link Disposable} that can be used for disposing the subscription at any time
29522992
* @throws NullPointerException if {@code onComplete} is {@code null}
2993+
* @see #subscribe(Action, Consumer, DisposableContainer)
29532994
*/
29542995
@CheckReturnValue
29552996
@NonNull

src/main/java/io/reactivex/rxjava3/core/Flowable.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.*;
2121

2222
import io.reactivex.rxjava3.annotations.*;
23-
import io.reactivex.rxjava3.disposables.Disposable;
23+
import io.reactivex.rxjava3.disposables.*;
2424
import io.reactivex.rxjava3.exceptions.*;
2525
import io.reactivex.rxjava3.flowables.*;
2626
import io.reactivex.rxjava3.functions.*;
@@ -15699,6 +15699,7 @@ public final Flowable<T> startWithArray(@NonNull T... items) {
1569915699
*
1570015700
* @return the new {@link Disposable} instance that allows cancelling the flow
1570115701
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
15702+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1570215703
*/
1570315704
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
1570415705
@SchedulerSupport(SchedulerSupport.NONE)
@@ -15727,6 +15728,7 @@ public final Disposable subscribe() {
1572715728
* @throws NullPointerException
1572815729
* if {@code onNext} is {@code null}
1572915730
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
15731+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1573015732
*/
1573115733
@CheckReturnValue
1573215734
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@@ -15753,9 +15755,10 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
1575315755
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the
1575415756
* current {@code Flowable}
1575515757
* @return the new {@link Disposable} instance that allows cancelling the flow
15756-
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
1575715758
* @throws NullPointerException
1575815759
* if {@code onNext} or {@code onError} is {@code null}
15760+
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
15761+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1575915762
*/
1576015763
@CheckReturnValue
1576115764
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@@ -15788,6 +15791,7 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull
1578815791
* @throws NullPointerException
1578915792
* if {@code onNext}, {@code onError} or {@code onComplete} is {@code null}
1579015793
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
15794+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1579115795
*/
1579215796
@CheckReturnValue
1579315797
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@@ -15806,6 +15810,51 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull
1580615810
return ls;
1580715811
}
1580815812

15813+
/**
15814+
* Wraps the given onXXX callbacks into a {@link Disposable} {@link Subscriber},
15815+
* adds it to the given {@link DisposableContainer} and ensures, that if the upstream
15816+
* terminates or this particular {@code Disposable} is disposed, the {@code Subscriber} is removed
15817+
* from the given container.
15818+
* <p>
15819+
* The {@coded Subscriber} will be removed after the callback for the terminal event has been invoked.
15820+
* <dl>
15821+
* <dt><b>Backpressure:</b></dt>
15822+
* <dd>The operator consumes the current {@code Flowable} in an unbounded manner (i.e., no
15823+
* backpressure is applied to it).</dd>
15824+
* <dt><b>Scheduler:</b></dt>
15825+
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
15826+
* </dl>
15827+
* @param onNext the callback for upstream items
15828+
* @param onError the callback for an upstream error if any
15829+
* @param onComplete the callback for the upstream completion if any
15830+
* @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
15831+
* created {@code Disposable} {@code Subscriber}
15832+
* @return the {@code Disposable} that allows disposing the particular subscription.
15833+
* @throws NullPointerException
15834+
* if {@code onNext}, {@code onError},
15835+
* {@code onComplete} or {@code container} is {@code null}
15836+
* @since 3.1.0
15837+
*/
15838+
@BackpressureSupport(BackpressureKind.SPECIAL)
15839+
@SchedulerSupport(SchedulerSupport.NONE)
15840+
@NonNull
15841+
public final Disposable subscribe(
15842+
@NonNull Consumer<? super T> onNext,
15843+
@NonNull Consumer<? super Throwable> onError,
15844+
@NonNull Action onComplete,
15845+
@NonNull DisposableContainer container) {
15846+
Objects.requireNonNull(onNext, "onNext is null");
15847+
Objects.requireNonNull(onError, "onError is null");
15848+
Objects.requireNonNull(onComplete, "onComplete is null");
15849+
Objects.requireNonNull(container, "container is null");
15850+
15851+
DisposableAutoReleaseSubscriber<T> subscriber = new DisposableAutoReleaseSubscriber<>(
15852+
container, onNext, onError, onComplete);
15853+
container.add(subscriber);
15854+
subscribe(subscriber);
15855+
return subscriber;
15856+
}
15857+
1580915858
@BackpressureSupport(BackpressureKind.SPECIAL)
1581015859
@SchedulerSupport(SchedulerSupport.NONE)
1581115860
@Override

src/main/java/io/reactivex/rxjava3/core/Maybe.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.*;
2121

2222
import io.reactivex.rxjava3.annotations.*;
23-
import io.reactivex.rxjava3.disposables.Disposable;
23+
import io.reactivex.rxjava3.disposables.*;
2424
import io.reactivex.rxjava3.exceptions.*;
2525
import io.reactivex.rxjava3.functions.*;
2626
import io.reactivex.rxjava3.internal.functions.*;
@@ -5226,6 +5226,7 @@ public final Flowable<T> startWith(@NonNull Publisher<T> other) {
52265226
*
52275227
* @return the new {@link Disposable} instance that can be used for disposing the subscription at any time
52285228
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
5229+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
52295230
*/
52305231
@SchedulerSupport(SchedulerSupport.NONE)
52315232
@NonNull
@@ -5250,6 +5251,7 @@ public final Disposable subscribe() {
52505251
* @throws NullPointerException
52515252
* if {@code onSuccess} is {@code null}
52525253
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
5254+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
52535255
*/
52545256
@CheckReturnValue
52555257
@SchedulerSupport(SchedulerSupport.NONE)
@@ -5272,10 +5274,11 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onSuccess) {
52725274
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the
52735275
* {@code Maybe}
52745276
* @return the new {@link Disposable} instance that can be used for disposing the subscription at any time
5275-
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
52765277
* @throws NullPointerException
52775278
* if {@code onSuccess} is {@code null}, or
52785279
* if {@code onError} is {@code null}
5280+
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
5281+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
52795282
*/
52805283
@CheckReturnValue
52815284
@SchedulerSupport(SchedulerSupport.NONE)
@@ -5305,6 +5308,7 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onSuccess, @NonNu
53055308
* if {@code onSuccess}, {@code onError} or
53065309
* {@code onComplete} is {@code null}
53075310
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
5311+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
53085312
*/
53095313
@CheckReturnValue
53105314
@NonNull
@@ -5317,6 +5321,47 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onSuccess, @NonNu
53175321
return subscribeWith(new MaybeCallbackObserver<>(onSuccess, onError, onComplete));
53185322
}
53195323

5324+
/**
5325+
* Wraps the given onXXX callbacks into a {@link Disposable} {@link MaybeObserver},
5326+
* adds it to the given {@link DisposableContainer} and ensures, that if the upstream
5327+
* terminates or this particular {@code Disposable} is disposed, the {@code MaybeObserver} is removed
5328+
* from the given composite.
5329+
* <p>
5330+
* The {@code MaybeObserver} will be removed after the callback for the terminal event has been invoked.
5331+
* <dl>
5332+
* <dt><b>Scheduler:</b></dt>
5333+
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
5334+
* </dl>
5335+
* @param onSuccess the callback for upstream items
5336+
* @param onError the callback for an upstream error
5337+
* @param onComplete the callback for an upstream completion without any value or error
5338+
* @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
5339+
* created {@code Disposable} {@code MaybeObserver}
5340+
* @return the {@code Disposable} that allows disposing the particular subscription.
5341+
* @throws NullPointerException
5342+
* if {@code onSuccess}, {@code onError},
5343+
* {@code onComplete} or {@code container} is {@code null}
5344+
* @since 3.1.0
5345+
*/
5346+
@SchedulerSupport(SchedulerSupport.NONE)
5347+
@NonNull
5348+
public final Disposable subscribe(
5349+
@NonNull Consumer<? super T> onSuccess,
5350+
@NonNull Consumer<? super Throwable> onError,
5351+
@NonNull Action onComplete,
5352+
@NonNull DisposableContainer container) {
5353+
Objects.requireNonNull(onSuccess, "onSuccess is null");
5354+
Objects.requireNonNull(onError, "onError is null");
5355+
Objects.requireNonNull(onComplete, "onComplete is null");
5356+
Objects.requireNonNull(container, "container is null");
5357+
5358+
DisposableAutoReleaseMultiObserver<T> observer = new DisposableAutoReleaseMultiObserver<>(
5359+
container, onSuccess, onError, onComplete);
5360+
container.add(observer);
5361+
subscribe(observer);
5362+
return observer;
5363+
}
5364+
53205365
@SchedulerSupport(SchedulerSupport.NONE)
53215366
@Override
53225367
public final void subscribe(@NonNull MaybeObserver<? super T> observer) {

src/main/java/io/reactivex/rxjava3/core/Observable.java

Lines changed: 47 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
import org.reactivestreams.Publisher;
2121

2222
import io.reactivex.rxjava3.annotations.*;
23-
import io.reactivex.rxjava3.disposables.Disposable;
23+
import io.reactivex.rxjava3.disposables.*;
2424
import io.reactivex.rxjava3.exceptions.*;
2525
import io.reactivex.rxjava3.functions.*;
2626
import io.reactivex.rxjava3.internal.functions.*;
@@ -13025,6 +13025,7 @@ public final Observable<T> startWithArray(@NonNull T... items) {
1302513025
*
1302613026
* @return the new {@link Disposable} instance that can be used to dispose the subscription at any time
1302713027
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
13028+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1302813029
*/
1302913030
@SchedulerSupport(SchedulerSupport.NONE)
1303013031
@NonNull
@@ -13049,6 +13050,7 @@ public final Disposable subscribe() {
1304913050
* @throws NullPointerException
1305013051
* if {@code onNext} is {@code null}
1305113052
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
13053+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1305213054
*/
1305313055
@CheckReturnValue
1305413056
@SchedulerSupport(SchedulerSupport.NONE)
@@ -13071,9 +13073,10 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onNext) {
1307113073
* the {@code Consumer<Throwable>} you have designed to accept any error notification from the current
1307213074
* {@code Observable}
1307313075
* @return the new {@link Disposable} instance that can be used to dispose the subscription at any time
13074-
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
1307513076
* @throws NullPointerException
1307613077
* if {@code onNext} or {@code onError} is {@code null}
13078+
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
13079+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1307713080
*/
1307813081
@CheckReturnValue
1307913082
@SchedulerSupport(SchedulerSupport.NONE)
@@ -13102,6 +13105,7 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull
1310213105
* @throws NullPointerException
1310313106
* if {@code onNext}, {@code onError} or {@code onComplete} is {@code null}
1310413107
* @see <a href="http://reactivex.io/documentation/operators/subscribe.html">ReactiveX operators documentation: Subscribe</a>
13108+
* @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
1310513109
*/
1310613110
@CheckReturnValue
1310713111
@SchedulerSupport(SchedulerSupport.NONE)
@@ -13119,6 +13123,47 @@ public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull
1311913123
return ls;
1312013124
}
1312113125

13126+
/**
13127+
* Wraps the given onXXX callbacks into a {@link Disposable} {@link Observer},
13128+
* adds it to the given {@code DisposableContainer} and ensures, that if the upstream
13129+
* terminates or this particular {@code Disposable} is disposed, the {@code Observer} is removed
13130+
* from the given container.
13131+
* <p>
13132+
* The {@code Observer} will be removed after the callback for the terminal event has been invoked.
13133+
* <dl>
13134+
* <dt><b>Scheduler:</b></dt>
13135+
* <dd>{@code subscribe} does not operate by default on a particular {@link Scheduler}.</dd>
13136+
* </dl>
13137+
* @param onNext the callback for upstream items
13138+
* @param onError the callback for an upstream error if any
13139+
* @param onComplete the callback for the upstream completion if any
13140+
* @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
13141+
* created {@code Disposable} {@code Observer}
13142+
* @return the {@code Disposable} that allows disposing the particular subscription.
13143+
* @throws NullPointerException
13144+
* if {@code onNext}, {@code onError},
13145+
* {@code onComplete} or {@code container} is {@code null}
13146+
* @since 3.1.0
13147+
*/
13148+
@SchedulerSupport(SchedulerSupport.NONE)
13149+
@NonNull
13150+
public final Disposable subscribe(
13151+
@NonNull Consumer<? super T> onNext,
13152+
@NonNull Consumer<? super Throwable> onError,
13153+
@NonNull Action onComplete,
13154+
@NonNull DisposableContainer container) {
13155+
Objects.requireNonNull(onNext, "onNext is null");
13156+
Objects.requireNonNull(onError, "onError is null");
13157+
Objects.requireNonNull(onComplete, "onComplete is null");
13158+
Objects.requireNonNull(container, "container is null");
13159+
13160+
DisposableAutoReleaseObserver<T> observer = new DisposableAutoReleaseObserver<>(
13161+
container, onNext, onError, onComplete);
13162+
container.add(observer);
13163+
subscribe(observer);
13164+
return observer;
13165+
}
13166+
1312213167
@SchedulerSupport(SchedulerSupport.NONE)
1312313168
@Override
1312413169
public final void subscribe(@NonNull Observer<? super T> observer) {

0 commit comments

Comments
 (0)