diff --git a/src/main/java/io/reactivex/observers/DefaultObserver.java b/src/main/java/io/reactivex/observers/DefaultObserver.java index ec79fb41e6..096388c2e9 100644 --- a/src/main/java/io/reactivex/observers/DefaultObserver.java +++ b/src/main/java/io/reactivex/observers/DefaultObserver.java @@ -18,10 +18,48 @@ import io.reactivex.internal.disposables.DisposableHelper; /** - * Abstract base implementation of an Observer with support for cancelling a + * Abstract base implementation of an {@link io.reactivex.Observer Observer} with support for cancelling a * subscription via {@link #cancel()} (synchronously) and calls {@link #onStart()} * when the subscription happens. * + *

All pre-implemented final methods are thread-safe. + * + *

Use the protected {@link #cancel()} to dispose the sequence from within an + * {@code onNext} implementation. + * + *

Like all other consumers, {@code DefaultObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)} + * instead of the standard {@code subscribe()} method. + * + *

Example

+ * Disposable d =
+ *     Observable.range(1, 5)
+ *     .subscribeWith(new DefaultObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             System.out.println("Start!");
+ *         }
+ *         @Override public void onNext(Integer t) {
+ *             if (t == 3) {
+ *                 cancel();
+ *             }
+ *             System.out.println(t);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * * @param the value type */ public abstract class DefaultObserver implements Observer { diff --git a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java index b9ab7bc32f..b0a1c18914 100644 --- a/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableCompletableObserver.java @@ -21,6 +21,33 @@ /** * An abstract {@link CompletableObserver} that allows asynchronous cancellation by implementing Disposable. + * + *

All pre-implemented final methods are thread-safe. + * + *

Like all other consumers, {@code DisposableCompletableObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onError(Throwable)} and + * {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * + *

Example

+ * Disposable d =
+ *     Completable.complete().delay(1, TimeUnit.SECONDS)
+ *     .subscribeWith(new DisposableMaybeObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             System.out.println("Start!");
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
*/ public abstract class DisposableCompletableObserver implements CompletableObserver, Disposable { final AtomicReference s = new AtomicReference(); diff --git a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java index 632d8139c1..f4d980f060 100644 --- a/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableMaybeObserver.java @@ -22,6 +22,41 @@ /** * An abstract {@link MaybeObserver} that allows asynchronous cancellation by implementing Disposable. * + *

All pre-implemented final methods are thread-safe. + * + *

Note that {@link #onSuccess(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} are + * exclusive to each other, unlike a regular {@link io.reactivex.Observer Observer}, and + * {@code onComplete()} is never called after an {@code onSuccess()}. + * + *

Like all other consumers, {@code DisposableMaybeObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} and + * {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * + *

Example

+ * Disposable d =
+ *     Maybe.just(1).delay(1, TimeUnit.SECONDS)
+ *     .subscribeWith(new DisposableMaybeObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             System.out.println("Start!");
+ *         }
+ *         @Override public void onSuccess(Integer t) {
+ *             System.out.println(t);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * + * * @param the received value type */ public abstract class DisposableMaybeObserver implements MaybeObserver, Disposable { diff --git a/src/main/java/io/reactivex/observers/DisposableObserver.java b/src/main/java/io/reactivex/observers/DisposableObserver.java index fbeb30515a..40a18fee73 100644 --- a/src/main/java/io/reactivex/observers/DisposableObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableObserver.java @@ -22,6 +22,44 @@ /** * An abstract {@link Observer} that allows asynchronous cancellation by implementing Disposable. * + *

All pre-implemented final methods are thread-safe. + * + *

Use the protected {@link #dispose()} to dispose the sequence from within an + * {@code onNext} implementation. + * + *

Like all other consumers, {@code DefaultObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)} + * instead of the standard {@code subscribe()} method. + * + *

Example

+ * Disposable d =
+ *     Observable.range(1, 5)
+ *     .subscribeWith(new DisposableObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             System.out.println("Start!");
+ *         }
+ *         @Override public void onNext(Integer t) {
+ *             if (t == 3) {
+ *                 dispose();
+ *             }
+ *             System.out.println(t);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * * @param the received value type */ public abstract class DisposableObserver implements Observer, Disposable { diff --git a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java index 2f31fe6d8e..dd5f0aab09 100644 --- a/src/main/java/io/reactivex/observers/DisposableSingleObserver.java +++ b/src/main/java/io/reactivex/observers/DisposableSingleObserver.java @@ -22,6 +22,33 @@ /** * An abstract {@link SingleObserver} that allows asynchronous cancellation by implementing Disposable. * + *

All pre-implemented final methods are thread-safe. + * + *

Like all other consumers, {@code DisposableSingleObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)} + * are not allowed to throw any unchecked exceptions. + * + *

Example

+ * Disposable d =
+ *     Single.just(1).delay(1, TimeUnit.SECONDS)
+ *     .subscribeWith(new DisposableSingleObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             System.out.println("Start!");
+ *         }
+ *         @Override public void onSuccess(Integer t) {
+ *             System.out.println(t);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * * @param the received value type */ public abstract class DisposableSingleObserver implements SingleObserver, Disposable { diff --git a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java index 91f3f58c13..26740c70b0 100644 --- a/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceCompletableObserver.java @@ -25,6 +25,51 @@ * An abstract {@link CompletableObserver} that allows asynchronous cancellation of its subscription and associated resources. * *

All pre-implemented final methods are thread-safe. + * + *

Override the protected {@link #onStart()} to perform initialization when this + * {@code ResourceCompletableObserver} is subscribed to a source. + * + *

Use the protected {@link #dispose()} to dispose the sequence externally and release + * all resources. + * + *

To release the associated resources, one has to call {@link #dispose()} + * in {@code onError()} and {@code onComplete()} explicitly. + * + *

Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s) + * with this {@code ResourceCompletableObserver} that will be cleaned up when {@link #dispose()} is called. + * Removing previously associated resources is not possible but one can create a + * {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this + * {@code ResourceCompletableObserver} and then add/remove resources to/from the {@code CompositeDisposable} + * freely. + * + *

Like all other consumers, {@code ResourceCompletableObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * + *

Example

+ * Disposable d =
+ *     Completable.complete().delay(1, TimeUnit.SECONDS)
+ *     .subscribeWith(new ResourceCompletableObserver() {
+ *         @Override public void onStart() {
+ *             add(Schedulers.single()
+ *                 .scheduleDirect(() -> System.out.println("Time!"),
+ *                     2, TimeUnit.SECONDS));
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *             dispose();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *             dispose();
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
*/ public abstract class ResourceCompletableObserver implements CompletableObserver, Disposable { /** The active subscription. */ diff --git a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java index c116ab1e57..9cb97a689a 100644 --- a/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceMaybeObserver.java @@ -26,6 +26,59 @@ * *

All pre-implemented final methods are thread-safe. * + *

Note that {@link #onSuccess(Object)}, {@link #onError(Throwable)} and {@link #onComplete()} are + * exclusive to each other, unlike a regular {@link io.reactivex.Observer Observer}, and + * {@code onComplete()} is never called after an {@code onSuccess()}. + * + *

Override the protected {@link #onStart()} to perform initialization when this + * {@code ResourceMaybeObserver} is subscribed to a source. + * + *

Use the protected {@link #dispose()} to dispose the sequence externally and release + * all resources. + * + *

To release the associated resources, one has to call {@link #dispose()} + * in {@code onSuccess()}, {@code onError()} and {@code onComplete()} explicitly. + * + *

Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s) + * with this {@code ResourceMaybeObserver} that will be cleaned up when {@link #dispose()} is called. + * Removing previously associated resources is not possible but one can create a + * {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this + * {@code ResourceMaybeObserver} and then add/remove resources to/from the {@code CompositeDisposable} + * freely. + * + *

Like all other consumers, {@code ResourceMaybeObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * + *

Example

+ * Disposable d =
+ *     Maybe.just(1).delay(1, TimeUnit.SECONDS)
+ *     .subscribeWith(new ResourceMaybeObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             add(Schedulers.single()
+ *                 .scheduleDirect(() -> System.out.println("Time!"),
+ *                     2, TimeUnit.SECONDS));
+ *         }
+ *         @Override public void onSuccess(Integer t) {
+ *             System.out.println(t);
+ *             dispose();
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *             dispose();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *             dispose();
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * * @param the value type */ public abstract class ResourceMaybeObserver implements MaybeObserver, Disposable { diff --git a/src/main/java/io/reactivex/observers/ResourceObserver.java b/src/main/java/io/reactivex/observers/ResourceObserver.java index b05279f70b..ef7fd703fd 100644 --- a/src/main/java/io/reactivex/observers/ResourceObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceObserver.java @@ -25,6 +25,57 @@ * *

All pre-implemented final methods are thread-safe. * + *

To release the associated resources, one has to call {@link #dispose()} + * in {@code onError()} and {@code onComplete()} explicitly. + * + *

Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s) + * with this {@code ResourceObserver} that will be cleaned up when {@link #dispose()} is called. + * Removing previously associated resources is not possible but one can create a + * {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this + * {@code ResourceObserver} and then add/remove resources to/from the {@code CompositeDisposable} + * freely. + * + *

Use the {@link #dispose()} to dispose the sequence from within an + * {@code onNext} implementation. + * + *

Like all other consumers, {@code ResourceObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * If for some reason this can't be avoided, use {@link io.reactivex.Observable#safeSubscribe(io.reactivex.Observer)} + * instead of the standard {@code subscribe()} method. + * + *

Example

+ * Disposable d =
+ *     Observable.range(1, 5)
+ *     .subscribeWith(new ResourceObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             add(Schedulers.single()
+ *                 .scheduleDirect(() -> System.out.println("Time!"),
+ *                     2, TimeUnit.SECONDS));
+ *             request(1);
+ *         }
+ *         @Override public void onNext(Integer t) {
+ *             if (t == 3) {
+ *                 dispose();
+ *             }
+ *             System.out.println(t);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *             dispose();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *             dispose();
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * * @param the value type */ public abstract class ResourceObserver implements Observer, Disposable { diff --git a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java index 2aa950a052..a0182adba9 100644 --- a/src/main/java/io/reactivex/observers/ResourceSingleObserver.java +++ b/src/main/java/io/reactivex/observers/ResourceSingleObserver.java @@ -22,10 +22,56 @@ import io.reactivex.internal.functions.ObjectHelper; /** - * An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription and associated resources. + * An abstract {@link SingleObserver} that allows asynchronous cancellation of its subscription + * and the associated resources. * *

All pre-implemented final methods are thread-safe. * + *

Override the protected {@link #onStart()} to perform initialization when this + * {@code ResourceSingleObserver} is subscribed to a source. + * + *

Use the protected {@link #dispose()} to dispose the sequence externally and release + * all resources. + * + *

To release the associated resources, one has to call {@link #dispose()} + * in {@code onSuccess()} and {@code onError()} explicitly. + * + *

Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s) + * with this {@code ResourceSingleObserver} that will be cleaned up when {@link #dispose()} is called. + * Removing previously associated resources is not possible but one can create a + * {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this + * {@code ResourceSingleObserver} and then add/remove resources to/from the {@code CompositeDisposable} + * freely. + * + *

Like all other consumers, {@code ResourceSingleObserver} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Disposable already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onSuccess(Object)} and {@link #onError(Throwable)} + * are not allowed to throw any unchecked exceptions. + * + *

Example

+ * Disposable d =
+ *     Single.just(1).delay(1, TimeUnit.SECONDS)
+ *     .subscribeWith(new ResourceSingleObserver<Integer>() {
+ *         @Override public void onStart() {
+ *             add(Schedulers.single()
+ *                 .scheduleDirect(() -> System.out.println("Time!"),
+ *                     2, TimeUnit.SECONDS));
+ *         }
+ *         @Override public void onSuccess(Integer t) {
+ *             System.out.println(t);
+ *             dispose();
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *             dispose();
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * * @param the value type */ public abstract class ResourceSingleObserver implements SingleObserver, Disposable { diff --git a/src/main/java/io/reactivex/observers/SerializedObserver.java b/src/main/java/io/reactivex/observers/SerializedObserver.java index 79c0bb6cea..7b429415af 100644 --- a/src/main/java/io/reactivex/observers/SerializedObserver.java +++ b/src/main/java/io/reactivex/observers/SerializedObserver.java @@ -19,12 +19,13 @@ import io.reactivex.plugins.RxJavaPlugins; /** - * Serializes access to the onNext, onError and onComplete methods of another Subscriber. + * Serializes access to the onNext, onError and onComplete methods of another Observer. * - *

Note that onSubscribe is not serialized in respect of the other methods so - * make sure the Subscription is set before any of the other methods are called. + *

Note that {@link #onSubscribe(Disposable)} is not serialized in respect of the other methods so + * make sure the {@code onSubscribe()} is called with a non-null {@code Disposable} + * before any of the other methods are called. * - *

The implementation assumes that the actual Subscriber's methods don't throw. + *

The implementation assumes that the actual Observer's methods don't throw. * * @param the value type */ diff --git a/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java b/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java index dbe04d6723..5a5dff485f 100644 --- a/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/DefaultSubscriber.java @@ -19,12 +19,60 @@ import io.reactivex.internal.subscriptions.SubscriptionHelper; /** - * Abstract base implementation of a Subscriber with support for requesting via - * {@link #request(long)}, cancelling via + * Abstract base implementation of a {@link org.reactivestreams.Subscriber Subscriber} with + * support for requesting via {@link #request(long)}, cancelling via * via {@link #cancel()} (both synchronously) and calls {@link #onStart()} * when the subscription happens. * + *

All pre-implemented final methods are thread-safe. + * + *

The default {@link #onStart()} requests Long.MAX_VALUE by default. Override + * the method to request a custom positive amount. + * + *

Note that calling {@link #request(long)} from {@link #onStart()} may trigger + * an immediate, asynchronous emission of data to {@link #onNext(Object)}. Make sure + * all initialization happens before the call to {@code request()} in {@code onStart()}. + * Calling {@link #request(long)} inside {@link #onNext(Object)} can happen at any time + * because by design, {@code onNext} calls from upstream are non-reentrant and non-overlapping. + * + *

Use the protected {@link #cancel()} to cancel the sequence from within an + * {@code onNext} implementation. + * + *

Like all other consumers, {@code DefaultSubscriber} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Subscription already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * If for some reason this can't be avoided, use {@link io.reactivex.Flowable#safeSubscribe(org.reactivestreams.Subscriber)} + * instead of the standard {@code subscribe()} method. * @param the value type + * + *

Example

+ * Disposable d =
+ *     Flowable.range(1, 5)
+ *     .subscribeWith(new DefaultSubscriber<Integer>() {
+ *         @Override public void onStart() {
+ *             System.out.println("Start!");
+ *             request(1);
+ *         }
+ *         @Override public void onNext(Integer t) {
+ *             if (t == 3) {
+ *                 cancel();
+ *             }
+ *             System.out.println(t);
+ *             request(1);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
*/ public abstract class DefaultSubscriber implements FlowableSubscriber { private Subscription s; diff --git a/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java b/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java index 2ccb433b89..8ad8cc96af 100644 --- a/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/DisposableSubscriber.java @@ -22,8 +22,54 @@ import io.reactivex.internal.subscriptions.SubscriptionHelper; /** - * An abstract Subscriber that allows asynchronous cancellation by implementing Disposable. + * An abstract Subscriber that allows asynchronous, external cancellation by implementing Disposable. * + *

All pre-implemented final methods are thread-safe. + * + *

The default {@link #onStart()} requests Long.MAX_VALUE by default. Override + * the method to request a custom positive amount. Use the protected {@link #request(long)} + * to request more items and {@link #cancel()} to cancel the sequence from within an + * {@code onNext} implementation. + * + *

Note that calling {@link #request(long)} from {@link #onStart()} may trigger + * an immediate, asynchronous emission of data to {@link #onNext(Object)}. Make sure + * all initialization happens before the call to {@code request()} in {@code onStart()}. + * Calling {@link #request(long)} inside {@link #onNext(Object)} can happen at any time + * because by design, {@code onNext} calls from upstream are non-reentrant and non-overlapping. + * + *

Like all other consumers, {@code DefaultSubscriber} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Subscription already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * If for some reason this can't be avoided, use {@link io.reactivex.Flowable#safeSubscribe(org.reactivestreams.Subscriber)} + * instead of the standard {@code subscribe()} method. + * + *

Example

+ * Disposable d =
+ *     Flowable.range(1, 5)
+ *     .subscribeWith(new DisposableSubscriber<Integer>() {
+ *         @Override public void onStart() {
+ *             request(1);
+ *         }
+ *         @Override public void onNext(Integer t) {
+ *             if (t == 3) {
+ *                 cancel();
+ *             }
+ *             System.out.println(t);
+ *             request(1);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
* @param the received value type. */ public abstract class DisposableSubscriber implements FlowableSubscriber, Disposable { diff --git a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java index 0f960bc523..986c0d5ce2 100644 --- a/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/ResourceSubscriber.java @@ -24,13 +24,71 @@ import io.reactivex.internal.subscriptions.SubscriptionHelper; /** - * An abstract Subscriber that allows asynchronous cancellation of its subscription and associated resources. - * - *

This implementation let's you chose if the AsyncObserver manages resources or not, - * thus saving memory on cases where there is no need for that. + * An abstract Subscriber that allows asynchronous cancellation of its + * subscription and associated resources. * *

All pre-implemented final methods are thread-safe. * + *

To release the associated resources, one has to call {@link #dispose()} + * in {@code onError()} and {@code onComplete()} explicitly. + * + *

Use {@link #add(Disposable)} to associate resources (as {@link io.reactivex.disposables.Disposable Disposable}s) + * with this {@code ResourceSubscriber} that will be cleaned up when {@link #dispose()} is called. + * Removing previously associated resources is not possible but one can create a + * {@link io.reactivex.disposables.CompositeDisposable CompositeDisposable}, associate it with this + * {@code ResourceSubscriber} and then add/remove resources to/from the {@code CompositeDisposable} + * freely. + * + *

The default {@link #onStart()} requests Long.MAX_VALUE by default. Override + * the method to request a custom positive amount. Use the protected {@link #request(long)} + * to request more items and {@link #dispose()} to cancel the sequence from within an + * {@code onNext} implementation. + * + *

Note that calling {@link #request(long)} from {@link #onStart()} may trigger + * an immediate, asynchronous emission of data to {@link #onNext(Object)}. Make sure + * all initialization happens before the call to {@code request()} in {@code onStart()}. + * Calling {@link #request(long)} inside {@link #onNext(Object)} can happen at any time + * because by design, {@code onNext} calls from upstream are non-reentrant and non-overlapping. + * + *

Like all other consumers, {@code ResourceSubscriber} can be subscribed only once. + * Any subsequent attempt to subscribe it to a new source will yield an + * {@link IllegalStateException} with message {@code "Subscription already set!"}. + * + *

Implementation of {@link #onStart()}, {@link #onNext(Object)}, {@link #onError(Throwable)} + * and {@link #onComplete()} are not allowed to throw any unchecked exceptions. + * If for some reason this can't be avoided, use {@link io.reactivex.Flowable#safeSubscribe(org.reactivestreams.Subscriber)} + * instead of the standard {@code subscribe()} method. + * + *

Example

+ * Disposable d =
+ *     Flowable.range(1, 5)
+ *     .subscribeWith(new ResourceSubscriber<Integer>() {
+ *         @Override public void onStart() {
+ *             add(Schedulers.single()
+ *                 .scheduleDirect(() -> System.out.println("Time!"),
+ *                     2, TimeUnit.SECONDS));
+ *             request(1);
+ *         }
+ *         @Override public void onNext(Integer t) {
+ *             if (t == 3) {
+ *                 dispose();
+ *             }
+ *             System.out.println(t);
+ *             request(1);
+ *         }
+ *         @Override public void onError(Throwable t) {
+ *             t.printStackTrace();
+ *             dispose();
+ *         }
+ *         @Override public void onComplete() {
+ *             System.out.println("Done!");
+ *             dispose();
+ *         }
+ *     });
+ * // ...
+ * d.dispose();
+ * 
+ * * @param the value type */ public abstract class ResourceSubscriber implements FlowableSubscriber, Disposable { diff --git a/src/main/java/io/reactivex/subscribers/SerializedSubscriber.java b/src/main/java/io/reactivex/subscribers/SerializedSubscriber.java index dd9f4d78c0..9743518eea 100644 --- a/src/main/java/io/reactivex/subscribers/SerializedSubscriber.java +++ b/src/main/java/io/reactivex/subscribers/SerializedSubscriber.java @@ -22,8 +22,9 @@ /** * Serializes access to the onNext, onError and onComplete methods of another Subscriber. * - *

Note that onSubscribe is not serialized in respect of the other methods so - * make sure the Subscription is set before any of the other methods are called. + *

Note that {@link #onSubscribe(Subscription)} is not serialized in respect of the other methods so + * make sure the {@code onSubscribe} is called with a non-null {@code Subscription} + * before any of the other methods are called. * *

The implementation assumes that the actual Subscriber's methods don't throw. *