{@code subscribe} does not operate by default on a particular {@link Scheduler}.
*
* @return the new {@code Disposable} that can be used for disposing the subscription at any time
+ * @see #subscribe(Action, Consumer, DisposableContainer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
@@ -2921,6 +2922,7 @@ public final void subscribe(@NonNull CompletableObserver observer) {
* @param onError the {@link Consumer} that is called if this {@code Completable} emits an error
* @return the new {@link Disposable} that can be used for disposing the subscription at any time
* @throws NullPointerException if {@code onComplete} or {@code onError} is {@code null}
+ * @see #subscribe(Action, Consumer, DisposableContainer)
*/
@CheckReturnValue
@NonNull
@@ -2934,6 +2936,44 @@ public final Disposable subscribe(@NonNull Action onComplete, @NonNull Consumer<
return observer;
}
+ /**
+ * Wraps the given onXXX callbacks into a {@link Disposable} {@link CompletableObserver},
+ * adds it to the given {@link DisposableContainer} and ensures, that if the upstream
+ * terminates or this particular {@code Disposable} is disposed, the {@code CompletableObserver} is removed
+ * from the given composite.
+ *
+ * The {@code CompletableObserver} will be removed after the callback for the terminal event has been invoked.
+ *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onError the callback for an upstream error
+ * @param onComplete the callback for an upstream completion
+ * @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
+ * created {@code Disposable} {@code CompletableObserver}
+ * @return the {@code Disposable} that allows disposing the particular subscription.
+ * @throws NullPointerException
+ * if {@code onComplete}, {@code onError}
+ * or {@code container} is {@code null}
+ * @since 3.1.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public final Disposable subscribe(
+ @NonNull Action onComplete,
+ @NonNull Consumer super Throwable> onError,
+ @NonNull DisposableContainer container) {
+ Objects.requireNonNull(onComplete, "onComplete is null");
+ Objects.requireNonNull(onError, "onError is null");
+ Objects.requireNonNull(container, "container is null");
+
+ DisposableAutoReleaseMultiObserver observer = new DisposableAutoReleaseMultiObserver<>(
+ container, Functions.emptyConsumer(), onError, onComplete);
+ container.add(observer);
+ subscribe(observer);
+ return observer;
+ }
+
/**
* Subscribes to this {@code Completable} and calls the given {@link Action} when this {@code Completable}
* completes normally.
@@ -2950,6 +2990,7 @@ public final Disposable subscribe(@NonNull Action onComplete, @NonNull Consumer<
* @param onComplete the {@code Action} called when this {@code Completable} completes normally
* @return the new {@link Disposable} that can be used for disposing the subscription at any time
* @throws NullPointerException if {@code onComplete} is {@code null}
+ * @see #subscribe(Action, Consumer, DisposableContainer)
*/
@CheckReturnValue
@NonNull
diff --git a/src/main/java/io/reactivex/rxjava3/core/Flowable.java b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
index 804a057533..d22d8dc4dc 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Flowable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Flowable.java
@@ -20,7 +20,7 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
-import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.flowables.*;
import io.reactivex.rxjava3.functions.*;
@@ -15699,6 +15699,7 @@ public final Flowable startWithArray(@NonNull T... items) {
*
* @return the new {@link Disposable} instance that allows cancelling the flow
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@SchedulerSupport(SchedulerSupport.NONE)
@@ -15727,6 +15728,7 @@ public final Disposable subscribe() {
* @throws NullPointerException
* if {@code onNext} is {@code null}
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@@ -15753,9 +15755,10 @@ public final Disposable subscribe(@NonNull Consumer super T> onNext) {
* the {@code Consumer} you have designed to accept any error notification from the
* current {@code Flowable}
* @return the new {@link Disposable} instance that allows cancelling the flow
- * @see ReactiveX operators documentation: Subscribe
* @throws NullPointerException
* if {@code onNext} or {@code onError} is {@code null}
+ * @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@@ -15788,6 +15791,7 @@ public final Disposable subscribe(@NonNull Consumer super T> onNext, @NonNull
* @throws NullPointerException
* if {@code onNext}, {@code onError} or {@code onComplete} is {@code null}
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
@@ -15806,6 +15810,51 @@ public final Disposable subscribe(@NonNull Consumer super T> onNext, @NonNull
return ls;
}
+ /**
+ * Wraps the given onXXX callbacks into a {@link Disposable} {@link Subscriber},
+ * adds it to the given {@link DisposableContainer} and ensures, that if the upstream
+ * terminates or this particular {@code Disposable} is disposed, the {@code Subscriber} is removed
+ * from the given container.
+ *
+ * The {@coded Subscriber} will be removed after the callback for the terminal event has been invoked.
+ *
+ *
Backpressure:
+ *
The operator consumes the current {@code Flowable} in an unbounded manner (i.e., no
+ * backpressure is applied to it).
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onNext the callback for upstream items
+ * @param onError the callback for an upstream error if any
+ * @param onComplete the callback for the upstream completion if any
+ * @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
+ * created {@code Disposable} {@code Subscriber}
+ * @return the {@code Disposable} that allows disposing the particular subscription.
+ * @throws NullPointerException
+ * if {@code onNext}, {@code onError},
+ * {@code onComplete} or {@code container} is {@code null}
+ * @since 3.1.0
+ */
+ @BackpressureSupport(BackpressureKind.SPECIAL)
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public final Disposable subscribe(
+ @NonNull Consumer super T> onNext,
+ @NonNull Consumer super Throwable> onError,
+ @NonNull Action onComplete,
+ @NonNull DisposableContainer container) {
+ Objects.requireNonNull(onNext, "onNext is null");
+ Objects.requireNonNull(onError, "onError is null");
+ Objects.requireNonNull(onComplete, "onComplete is null");
+ Objects.requireNonNull(container, "container is null");
+
+ DisposableAutoReleaseSubscriber subscriber = new DisposableAutoReleaseSubscriber<>(
+ container, onNext, onError, onComplete);
+ container.add(subscriber);
+ subscribe(subscriber);
+ return subscriber;
+ }
+
@BackpressureSupport(BackpressureKind.SPECIAL)
@SchedulerSupport(SchedulerSupport.NONE)
@Override
diff --git a/src/main/java/io/reactivex/rxjava3/core/Maybe.java b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
index c0b8421fb1..917a9838a4 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Maybe.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Maybe.java
@@ -20,7 +20,7 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
-import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
@@ -5226,6 +5226,7 @@ public final Flowable startWith(@NonNull Publisher other) {
*
* @return the new {@link Disposable} instance that can be used for disposing the subscription at any time
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
@@ -5250,6 +5251,7 @@ public final Disposable subscribe() {
* @throws NullPointerException
* if {@code onSuccess} is {@code null}
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@@ -5272,10 +5274,11 @@ public final Disposable subscribe(@NonNull Consumer super T> onSuccess) {
* the {@code Consumer} you have designed to accept any error notification from the
* {@code Maybe}
* @return the new {@link Disposable} instance that can be used for disposing the subscription at any time
- * @see ReactiveX operators documentation: Subscribe
* @throws NullPointerException
* if {@code onSuccess} is {@code null}, or
* if {@code onError} is {@code null}
+ * @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@@ -5305,6 +5308,7 @@ public final Disposable subscribe(@NonNull Consumer super T> onSuccess, @NonNu
* if {@code onSuccess}, {@code onError} or
* {@code onComplete} is {@code null}
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@NonNull
@@ -5317,6 +5321,47 @@ public final Disposable subscribe(@NonNull Consumer super T> onSuccess, @NonNu
return subscribeWith(new MaybeCallbackObserver<>(onSuccess, onError, onComplete));
}
+ /**
+ * Wraps the given onXXX callbacks into a {@link Disposable} {@link MaybeObserver},
+ * adds it to the given {@link DisposableContainer} and ensures, that if the upstream
+ * terminates or this particular {@code Disposable} is disposed, the {@code MaybeObserver} is removed
+ * from the given composite.
+ *
+ * The {@code MaybeObserver} will be removed after the callback for the terminal event has been invoked.
+ *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onSuccess the callback for upstream items
+ * @param onError the callback for an upstream error
+ * @param onComplete the callback for an upstream completion without any value or error
+ * @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
+ * created {@code Disposable} {@code MaybeObserver}
+ * @return the {@code Disposable} that allows disposing the particular subscription.
+ * @throws NullPointerException
+ * if {@code onSuccess}, {@code onError},
+ * {@code onComplete} or {@code container} is {@code null}
+ * @since 3.1.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public final Disposable subscribe(
+ @NonNull Consumer super T> onSuccess,
+ @NonNull Consumer super Throwable> onError,
+ @NonNull Action onComplete,
+ @NonNull DisposableContainer container) {
+ Objects.requireNonNull(onSuccess, "onSuccess is null");
+ Objects.requireNonNull(onError, "onError is null");
+ Objects.requireNonNull(onComplete, "onComplete is null");
+ Objects.requireNonNull(container, "container is null");
+
+ DisposableAutoReleaseMultiObserver observer = new DisposableAutoReleaseMultiObserver<>(
+ container, onSuccess, onError, onComplete);
+ container.add(observer);
+ subscribe(observer);
+ return observer;
+ }
+
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull MaybeObserver super T> observer) {
diff --git a/src/main/java/io/reactivex/rxjava3/core/Observable.java b/src/main/java/io/reactivex/rxjava3/core/Observable.java
index 27d46ad8c5..18534eecc7 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Observable.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Observable.java
@@ -20,7 +20,7 @@
import org.reactivestreams.Publisher;
import io.reactivex.rxjava3.annotations.*;
-import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
@@ -13025,6 +13025,7 @@ public final Observable startWithArray(@NonNull T... items) {
*
* @return the new {@link Disposable} instance that can be used to dispose the subscription at any time
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
@@ -13049,6 +13050,7 @@ public final Disposable subscribe() {
* @throws NullPointerException
* if {@code onNext} is {@code null}
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@@ -13071,9 +13073,10 @@ public final Disposable subscribe(@NonNull Consumer super T> onNext) {
* the {@code Consumer} you have designed to accept any error notification from the current
* {@code Observable}
* @return the new {@link Disposable} instance that can be used to dispose the subscription at any time
- * @see ReactiveX operators documentation: Subscribe
* @throws NullPointerException
* if {@code onNext} or {@code onError} is {@code null}
+ * @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@@ -13102,6 +13105,7 @@ public final Disposable subscribe(@NonNull Consumer super T> onNext, @NonNull
* @throws NullPointerException
* if {@code onNext}, {@code onError} or {@code onComplete} is {@code null}
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, Action, DisposableContainer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@@ -13119,6 +13123,47 @@ public final Disposable subscribe(@NonNull Consumer super T> onNext, @NonNull
return ls;
}
+ /**
+ * Wraps the given onXXX callbacks into a {@link Disposable} {@link Observer},
+ * adds it to the given {@code DisposableContainer} and ensures, that if the upstream
+ * terminates or this particular {@code Disposable} is disposed, the {@code Observer} is removed
+ * from the given container.
+ *
+ * The {@code Observer} will be removed after the callback for the terminal event has been invoked.
+ *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onNext the callback for upstream items
+ * @param onError the callback for an upstream error if any
+ * @param onComplete the callback for the upstream completion if any
+ * @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
+ * created {@code Disposable} {@code Observer}
+ * @return the {@code Disposable} that allows disposing the particular subscription.
+ * @throws NullPointerException
+ * if {@code onNext}, {@code onError},
+ * {@code onComplete} or {@code container} is {@code null}
+ * @since 3.1.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public final Disposable subscribe(
+ @NonNull Consumer super T> onNext,
+ @NonNull Consumer super Throwable> onError,
+ @NonNull Action onComplete,
+ @NonNull DisposableContainer container) {
+ Objects.requireNonNull(onNext, "onNext is null");
+ Objects.requireNonNull(onError, "onError is null");
+ Objects.requireNonNull(onComplete, "onComplete is null");
+ Objects.requireNonNull(container, "container is null");
+
+ DisposableAutoReleaseObserver observer = new DisposableAutoReleaseObserver<>(
+ container, onNext, onError, onComplete);
+ container.add(observer);
+ subscribe(observer);
+ return observer;
+ }
+
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer super T> observer) {
diff --git a/src/main/java/io/reactivex/rxjava3/core/Single.java b/src/main/java/io/reactivex/rxjava3/core/Single.java
index dbe3694913..66e1ffd3d5 100644
--- a/src/main/java/io/reactivex/rxjava3/core/Single.java
+++ b/src/main/java/io/reactivex/rxjava3/core/Single.java
@@ -20,7 +20,7 @@
import org.reactivestreams.*;
import io.reactivex.rxjava3.annotations.*;
-import io.reactivex.rxjava3.disposables.Disposable;
+import io.reactivex.rxjava3.disposables.*;
import io.reactivex.rxjava3.exceptions.*;
import io.reactivex.rxjava3.functions.*;
import io.reactivex.rxjava3.internal.functions.*;
@@ -4705,6 +4705,7 @@ public final Flowable startWith(@NonNull Publisher other) {
*
* @return the new {@link Disposable} instance that can be used for disposing the subscription at any time
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, DisposableContainer)
*/
@SchedulerSupport(SchedulerSupport.NONE)
@NonNull
@@ -4726,9 +4727,10 @@ public final Disposable subscribe() {
* the callback that receives either the success value or the failure {@link Throwable}
* (whichever is not {@code null})
* @return the new {@link Disposable} instance that can be used for disposing the subscription at any time
- * @see ReactiveX operators documentation: Subscribe
* @throws NullPointerException
* if {@code onCallback} is {@code null}
+ * @see #subscribe(Consumer, Consumer, DisposableContainer)
+ * @see ReactiveX operators documentation: Subscribe
*/
@CheckReturnValue
@NonNull
@@ -4760,6 +4762,7 @@ public final Disposable subscribe(@NonNull BiConsumer super T, ? super Throwab
* @throws NullPointerException
* if {@code onSuccess} is {@code null}
* @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, DisposableContainer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
@@ -4784,9 +4787,10 @@ public final Disposable subscribe(@NonNull Consumer super T> onSuccess) {
* the {@code Consumer} you have designed to accept any error notification from the
* {@code Single}
* @return the new {@link Disposable} instance that can be used for disposing the subscription at any time
- * @see ReactiveX operators documentation: Subscribe
* @throws NullPointerException
* if {@code onSuccess} or {@code onError} is {@code null}
+ * @see ReactiveX operators documentation: Subscribe
+ * @see #subscribe(Consumer, Consumer, DisposableContainer)
*/
@CheckReturnValue
@NonNull
@@ -4800,6 +4804,44 @@ public final Disposable subscribe(@NonNull Consumer super T> onSuccess, @NonNu
return observer;
}
+ /**
+ * Wraps the given onXXX callbacks into a {@link Disposable} {@link SingleObserver},
+ * adds it to the given {@link DisposableContainer} and ensures, that if the upstream
+ * terminates or this particular {@code Disposable} is disposed, the {@code SingleObserver} is removed
+ * from the given container.
+ *
+ * The {@code SingleObserver} will be removed after the callback for the terminal event has been invoked.
+ *
+ *
Scheduler:
+ *
{@code subscribe} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param onSuccess the callback for upstream items
+ * @param onError the callback for an upstream error if any
+ * @param container the {@code DisposableContainer} (such as {@link CompositeDisposable}) to add and remove the
+ * created {@code Disposable} {@code SingleObserver}
+ * @return the {@code Disposable} that allows disposing the particular subscription.
+ * @throws NullPointerException
+ * if {@code onSuccess}, {@code onError}
+ * or {@code container} is {@code null}
+ * @since 3.1.0
+ */
+ @SchedulerSupport(SchedulerSupport.NONE)
+ @NonNull
+ public final Disposable subscribe(
+ @NonNull Consumer super T> onSuccess,
+ @NonNull Consumer super Throwable> onError,
+ @NonNull DisposableContainer container) {
+ Objects.requireNonNull(onSuccess, "onSuccess is null");
+ Objects.requireNonNull(onError, "onError is null");
+ Objects.requireNonNull(container, "container is null");
+
+ DisposableAutoReleaseMultiObserver observer = new DisposableAutoReleaseMultiObserver<>(
+ container, onSuccess, onError, Functions.EMPTY_ACTION);
+ container.add(observer);
+ subscribe(observer);
+ return observer;
+ }
+
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull SingleObserver super T> observer) {
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/AbstractDisposableAutoRelease.java b/src/main/java/io/reactivex/rxjava3/internal/observers/AbstractDisposableAutoRelease.java
new file mode 100644
index 0000000000..e5d89a4502
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/AbstractDisposableAutoRelease.java
@@ -0,0 +1,125 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+/*
+ * Copyright 2016-2019 David Karnok
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import io.reactivex.rxjava3.disposables.*;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+import io.reactivex.rxjava3.internal.functions.Functions;
+import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Wraps lambda callbacks and when the upstream terminates or the observer gets disposed,
+ * removes itself from a {@link io.reactivex.rxjava3.disposables.CompositeDisposable}.
+ *
History: 0.18.0 @ RxJavaExtensions
+ * @since 3.1.0
+ */
+abstract class AbstractDisposableAutoRelease
+extends AtomicReference
+implements Disposable, LambdaConsumerIntrospection {
+
+ private static final long serialVersionUID = 8924480688481408726L;
+
+ final AtomicReference composite;
+
+ final Consumer super Throwable> onError;
+
+ final Action onComplete;
+
+ AbstractDisposableAutoRelease(
+ DisposableContainer composite,
+ Consumer super Throwable> onError,
+ Action onComplete
+ ) {
+ this.onError = onError;
+ this.onComplete = onComplete;
+ this.composite = new AtomicReference<>(composite);
+ }
+
+ public final void onError(Throwable t) {
+ if (get() != DisposableHelper.DISPOSED) {
+ lazySet(DisposableHelper.DISPOSED);
+ try {
+ onError.accept(t);
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ RxJavaPlugins.onError(new CompositeException(t, e));
+ }
+ } else {
+ RxJavaPlugins.onError(t);
+ }
+ removeSelf();
+ }
+
+ public final void onComplete() {
+ if (get() != DisposableHelper.DISPOSED) {
+ lazySet(DisposableHelper.DISPOSED);
+ try {
+ onComplete.run();
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ RxJavaPlugins.onError(e);
+ }
+ }
+ removeSelf();
+ }
+
+ @Override
+ public final void dispose() {
+ DisposableHelper.dispose(this);
+ removeSelf();
+ }
+
+ final void removeSelf() {
+ DisposableContainer c = composite.getAndSet(null);
+ if (c != null) {
+ c.delete(this);
+ }
+ }
+
+ @Override
+ public final boolean isDisposed() {
+ return DisposableHelper.isDisposed(get());
+ }
+
+ public final void onSubscribe(Disposable d) {
+ DisposableHelper.setOnce(this, d);
+ }
+
+ @Override
+ public final boolean hasCustomOnError() {
+ return onError != Functions.ON_ERROR_MISSING;
+ }
+
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/DisposableAutoReleaseMultiObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/DisposableAutoReleaseMultiObserver.java
new file mode 100644
index 0000000000..4779ff5723
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/DisposableAutoReleaseMultiObserver.java
@@ -0,0 +1,78 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+/*
+ * Copyright 2016-2019 David Karnok
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.DisposableContainer;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Wraps lambda callbacks and when the upstream terminates or this (Single | Maybe | Completable)
+ * observer gets disposed, removes itself from a {@link io.reactivex.rxjava3.disposables.CompositeDisposable}.
+ *
History: 0.18.0 @ RxJavaExtensions
+ * @param the element type consumed
+ * @since 3.1.0
+ */
+public final class DisposableAutoReleaseMultiObserver
+extends AbstractDisposableAutoRelease
+implements SingleObserver, MaybeObserver, CompletableObserver {
+
+ private static final long serialVersionUID = 8924480688481408726L;
+
+ final Consumer super T> onSuccess;
+
+ public DisposableAutoReleaseMultiObserver(
+ DisposableContainer composite,
+ Consumer super T> onSuccess,
+ Consumer super Throwable> onError,
+ Action onComplete
+ ) {
+ super(composite, onError, onComplete);
+ this.onSuccess = onSuccess;
+ }
+
+ @Override
+ public void onSuccess(T t) {
+ if (get() != DisposableHelper.DISPOSED) {
+ lazySet(DisposableHelper.DISPOSED);
+ try {
+ onSuccess.accept(t);
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ RxJavaPlugins.onError(e);
+ }
+ }
+ removeSelf();
+ }
+
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/observers/DisposableAutoReleaseObserver.java b/src/main/java/io/reactivex/rxjava3/internal/observers/DisposableAutoReleaseObserver.java
new file mode 100644
index 0000000000..89435c96ed
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/observers/DisposableAutoReleaseObserver.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+/*
+ * Copyright 2016-2019 David Karnok
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import io.reactivex.rxjava3.core.Observer;
+import io.reactivex.rxjava3.disposables.DisposableContainer;
+import io.reactivex.rxjava3.exceptions.Exceptions;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
+
+/**
+ * Wraps lambda callbacks and when the upstream terminates or this observer gets disposed,
+ * removes itself from a {@link io.reactivex.rxjava3.disposables.CompositeDisposable}.
+ *
History: 0.18.0 @ RxJavaExtensions
+ * @param the element type consumed
+ * @since 3.1.0
+ */
+public final class DisposableAutoReleaseObserver
+extends AbstractDisposableAutoRelease
+implements Observer {
+
+ private static final long serialVersionUID = 8924480688481408726L;
+
+ final Consumer super T> onNext;
+
+ public DisposableAutoReleaseObserver(
+ DisposableContainer composite,
+ Consumer super T> onNext,
+ Consumer super Throwable> onError,
+ Action onComplete
+ ) {
+ super(composite, onError, onComplete);
+ this.onNext = onNext;
+ }
+
+ @Override
+ public void onNext(T t) {
+ if (get() != DisposableHelper.DISPOSED) {
+ try {
+ onNext.accept(t);
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ get().dispose();
+ onError(e);
+ }
+ }
+ }
+
+}
diff --git a/src/main/java/io/reactivex/rxjava3/internal/subscribers/DisposableAutoReleaseSubscriber.java b/src/main/java/io/reactivex/rxjava3/internal/subscribers/DisposableAutoReleaseSubscriber.java
new file mode 100644
index 0000000000..3b9d2d99b3
--- /dev/null
+++ b/src/main/java/io/reactivex/rxjava3/internal/subscribers/DisposableAutoReleaseSubscriber.java
@@ -0,0 +1,151 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+/*
+ * Copyright 2016-2019 David Karnok
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.subscribers;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+import org.reactivestreams.Subscription;
+
+import io.reactivex.rxjava3.core.FlowableSubscriber;
+import io.reactivex.rxjava3.disposables.*;
+import io.reactivex.rxjava3.exceptions.*;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.internal.functions.Functions;
+import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
+import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+
+/**
+ * Wraps lambda callbacks and when the upstream terminates or this subscriber gets disposed,
+ * removes itself from a {@link io.reactivex.rxjava3.disposables.CompositeDisposable}.
+ *
History: 0.18.0 @ RxJavaExtensions
+ * @param the element type consumed
+ * @since 3.1.0
+ */
+public final class DisposableAutoReleaseSubscriber
+extends AtomicReference
+implements FlowableSubscriber, Disposable, LambdaConsumerIntrospection {
+
+ private static final long serialVersionUID = 8924480688481408726L;
+
+ final AtomicReference composite;
+
+ final Consumer super T> onNext;
+
+ final Consumer super Throwable> onError;
+
+ final Action onComplete;
+
+ public DisposableAutoReleaseSubscriber(
+ DisposableContainer composite,
+ Consumer super T> onNext,
+ Consumer super Throwable> onError,
+ Action onComplete
+ ) {
+ this.onNext = onNext;
+ this.onError = onError;
+ this.onComplete = onComplete;
+ this.composite = new AtomicReference<>(composite);
+ }
+
+ @Override
+ public void onNext(T t) {
+ if (get() != SubscriptionHelper.CANCELLED) {
+ try {
+ onNext.accept(t);
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ get().cancel();
+ onError(e);
+ }
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ if (get() != SubscriptionHelper.CANCELLED) {
+ lazySet(SubscriptionHelper.CANCELLED);
+ try {
+ onError.accept(t);
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ RxJavaPlugins.onError(new CompositeException(t, e));
+ }
+ } else {
+ RxJavaPlugins.onError(t);
+ }
+ removeSelf();
+ }
+
+ @Override
+ public void onComplete() {
+ if (get() != SubscriptionHelper.CANCELLED) {
+ lazySet(SubscriptionHelper.CANCELLED);
+ try {
+ onComplete.run();
+ } catch (Throwable e) {
+ Exceptions.throwIfFatal(e);
+ RxJavaPlugins.onError(e);
+ }
+ }
+ removeSelf();
+ }
+
+ @Override
+ public void dispose() {
+ SubscriptionHelper.cancel(this);
+ removeSelf();
+ }
+
+ void removeSelf() {
+ DisposableContainer c = composite.getAndSet(null);
+ if (c != null) {
+ c.delete(this);
+ }
+ }
+
+ @Override
+ public boolean isDisposed() {
+ return SubscriptionHelper.CANCELLED == get();
+ }
+
+ @Override
+ public void onSubscribe(Subscription s) {
+ if (SubscriptionHelper.setOnce(this, s)) {
+ s.request(Long.MAX_VALUE);
+ }
+ }
+
+ @Override
+ public boolean hasCustomOnError() {
+ return onError != Functions.ON_ERROR_MISSING;
+ }
+
+}
diff --git a/src/test/java/io/reactivex/rxjava3/internal/observers/CompletableConsumersTest.java b/src/test/java/io/reactivex/rxjava3/internal/observers/CompletableConsumersTest.java
new file mode 100644
index 0000000000..eb17ead3db
--- /dev/null
+++ b/src/test/java/io/reactivex/rxjava3/internal/observers/CompletableConsumersTest.java
@@ -0,0 +1,221 @@
+/*
+ * Copyright (c) 2016-present, RxJava Contributors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+/*
+ * Copyright 2016-2019 David Karnok
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.reactivex.rxjava3.internal.observers;
+
+import static org.junit.Assert.*;
+
+import java.io.IOException;
+import java.util.*;
+
+import org.junit.Test;
+
+import io.reactivex.rxjava3.core.*;
+import io.reactivex.rxjava3.disposables.*;
+import io.reactivex.rxjava3.exceptions.CompositeException;
+import io.reactivex.rxjava3.functions.*;
+import io.reactivex.rxjava3.observers.LambdaConsumerIntrospection;
+import io.reactivex.rxjava3.plugins.RxJavaPlugins;
+import io.reactivex.rxjava3.subjects.CompletableSubject;
+import io.reactivex.rxjava3.testsupport.TestHelper;
+
+public class CompletableConsumersTest implements Consumer