Skip to content

2.x: Improve the JavaDoc of the other lift() operators #5865

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 28, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 122 additions & 5 deletions src/main/java/io/reactivex/Completable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1369,15 +1369,132 @@ public final Completable doFinally(Action onFinally) {
}

/**
* <strong>Advanced use without safeguards:</strong> lifts a CompletableOperator
* transformation into the chain of Completables.
* <strong>This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;</strong>
* Returns a {@code Completable} which, when subscribed to, invokes the {@link CompletableOperator#apply(CompletableObserver) apply(CompletableObserver)} method
* of the provided {@link CompletableOperator} for each individual downstream {@link Completable} and allows the
* insertion of a custom operator by accessing the downstream's {@link CompletableObserver} during this subscription phase
* and providing a new {@code CompletableObserver}, containing the custom operator's intended business logic, that will be
* used in the subscription process going further upstream.
* <p>
* Generally, such a new {@code CompletableObserver} will wrap the downstream's {@code CompletableObserver} and forwards the
* {@code onError} and {@code onComplete} events from the upstream directly or according to the
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code dispose} and {@code isDisposed} that would have traveled upstream and perform
* additional actions depending on the same business logic requirements.
* <p>
* Example:
* <pre><code>
* // Step 1: Create the consumer type that will be returned by the CompletableOperator.apply():
*
* public final class CustomCompletableObserver implements CompletableObserver, Disposable {
*
* // The donstream's CompletableObserver that will receive the onXXX events
* final CompletableObserver downstream;
*
* // The connection to the upstream source that will call this class' onXXX methods
* Disposable upstream;
*
* // The constructor takes the downstream subscriber and usually any other parameters
* public CustomCompletableObserver(CompletableObserver downstream) {
* this.downstream = downstream;
* }
*
* // In the subscription phase, the upstream sends a Disposable to this class
* // and subsequently this class has to send a Disposable to the downstream.
* // Note that relaying the upstream's Disposable directly is not allowed in RxJava
* &#64;Override
* public void onSubscribe(Disposable s) {
* if (upstream != null) {
* s.cancel();
* } else {
* upstream = s;
* downstream.onSubscribe(this);
* }
* }
*
* // Some operators may handle the upstream's error while others
* // could just forward it to the downstream.
* &#64;Override
* public void onError(Throwable throwable) {
* downstream.onError(throwable);
* }
*
* // When the upstream completes, usually the downstream should complete as well.
* // In completable, this could also mean doing some side-effects
* &#64;Override
* public void onComplete() {
* System.out.println("Sequence completed");
* downstream.onComplete();
* }
*
* // Some operators may use their own resources which should be cleaned up if
* // the downstream disposes the flow before it completed. Operators without
* // resources can simply forward the dispose to the upstream.
* // In some cases, a disposed flag may be set by this method so that other parts
* // of this class may detect the dispose and stop sending events
* // to the downstream.
* &#64;Override
* public void dispose() {
* upstream.dispose();
* }
*
* // Some operators may simply forward the call to the upstream while others
* // can return the disposed flag set in dispose().
* &#64;Override
* public boolean isDisposed() {
* return upstream.isDisposed();
* }
* }
*
* // Step 2: Create a class that implements the CompletableOperator interface and
* // returns the custom consumer type from above in its apply() method.
* // Such class may define additional parameters to be submitted to
* // the custom consumer type.
*
* final class CustomCompletableOperator implements CompletableOperator {
* &#64;Override
* public CompletableObserver apply(CompletableObserver upstream) {
* return new CustomCompletableObserver(upstream);
* }
* }
*
* // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
* // or reusing an existing one.
*
* Completable.complete()
* .lift(new CustomCompletableOperator())
* .test()
* .assertResult();
* </code></pre>
* <p>
* Creating custom operators can be complicated and it is recommended one consults the
* <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
* the tools, requirements, rules, considerations and pitfalls of implementing them.
* <p>
* Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
* an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Completable}
* class and creating a {@link CompletableTransformer} with it is recommended.
* <p>
* Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
* requires a non-null {@code CompletableObserver} instance to be returned, which is then unconditionally subscribed to
* the upstream {@code Completable}. For example, if the operator decided there is no reason to subscribe to the
* upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
* return a {@code CompletableObserver} that should immediately dispose the upstream's {@code Disposable} in its
* {@code onSubscribe} method. Again, using a {@code CompletableTransformer} and extending the {@code Completable} is
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
* {@link CompletableOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
* </dl>
* @param onLift the lifting function that transforms the child subscriber with a parent subscriber.
*
* @param onLift the {@link CompletableOperator} that receives the downstream's {@code CompletableObserver} and should return
* a {@code CompletableObserver} with custom behavior to be used as the consumer for the current
* {@code Completable}.
* @return the new Completable instance
* @throws NullPointerException if onLift is null
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a>
* @see #compose(CompletableTransformer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
8 changes: 4 additions & 4 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10068,13 +10068,13 @@ public final Single<T> lastOrError() {
* Returns a {@code Flowable} which, when subscribed to, invokes the {@link FlowableOperator#apply(Subscriber) apply(Subscriber)} method
* of the provided {@link FlowableOperator} for each individual downstream {@link Subscriber} and allows the
* insertion of a custom operator by accessing the downstream's {@link Subscriber} during this subscription phase
* and providing a new {@code Subscriber}, containing the custom operator's intended business logic, that will be
* and providing a new {@code Subscriber}, containing the custom operator's intended business logic, that will be
* used in the subscription process going further upstream.
* <p>
* Generally, such a new {@code Subscriber} will wrap the downstream's {@code Subscriber} and forwards the
* {@code onNext}, {@code onError} and {@code onComplete} events from the upstream directly or according to the
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code cancel} and {@code request} that would have travelled upstream and perform
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code cancel} and {@code request} that would have traveled upstream and perform
* additional actions depending on the same business logic requirements.
* <p>
* Example:
Expand Down Expand Up @@ -10192,7 +10192,7 @@ public final Single<T> lastOrError() {
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code Subscriber} instance returned by the {@link FlowableOperator} is responsible to be
* <dd>The {@code Subscriber} instance returned by the {@link FlowableOperator} is responsible to be
* backpressure-aware or document the fact that the consumer of the returned {@code Publisher} has to apply one of
* the {@code onBackpressureXXX} operators.</dd>
* <dt><b>Scheduler:</b></dt>
Expand Down
154 changes: 139 additions & 15 deletions src/main/java/io/reactivex/Maybe.java
Original file line number Diff line number Diff line change
Expand Up @@ -3110,27 +3110,151 @@ public final Single<Boolean> isEmpty() {
}

/**
* Lifts a function to the current Maybe and returns a new Maybe that when subscribed to will pass the
* values of the current Maybe through the MaybeOperator function.
* <strong>This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;</strong>
* Returns a {@code Maybe} which, when subscribed to, invokes the {@link MaybeOperator#apply(MaybeObserver) apply(MaybeObserver)} method
* of the provided {@link MaybeOperator} for each individual downstream {@link Maybe} and allows the
* insertion of a custom operator by accessing the downstream's {@link MaybeObserver} during this subscription phase
* and providing a new {@code MaybeObserver}, containing the custom operator's intended business logic, that will be
* used in the subscription process going further upstream.
* <p>
* Generally, such a new {@code MaybeObserver} will wrap the downstream's {@code MaybeObserver} and forwards the
* {@code onSuccess}, {@code onError} and {@code onComplete} events from the upstream directly or according to the
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code dispose} and {@code isDisposed} that would have traveled upstream and perform
* additional actions depending on the same business logic requirements.
* <p>
* In other words, this allows chaining TaskExecutors together on a Maybe for acting on the values within
* the Maybe.
* Example:
* <pre><code>
* // Step 1: Create the consumer type that will be returned by the MaybeOperator.apply():
*
* public final class CustomMaybeObserver&lt;T&gt; implements MaybeObserver&lt;T&gt;, Disposable {
*
* // The donstream's MaybeObserver that will receive the onXXX events
* final MaybeObserver&lt;? super String&gt; downstream;
*
* // The connection to the upstream source that will call this class' onXXX methods
* Disposable upstream;
*
* // The constructor takes the downstream subscriber and usually any other parameters
* public CustomMaybeObserver(MaybeObserver&lt;? super String&gt; downstream) {
* this.downstream = downstream;
* }
*
* // In the subscription phase, the upstream sends a Disposable to this class
* // and subsequently this class has to send a Disposable to the downstream.
* // Note that relaying the upstream's Disposable directly is not allowed in RxJava
* &#64;Override
* public void onSubscribe(Disposable s) {
* if (upstream != null) {
* s.cancel();
* } else {
* upstream = s;
* downstream.onSubscribe(this);
* }
* }
*
* // The upstream calls this with the next item and the implementation's
* // responsibility is to emit an item to the downstream based on the intended
* // business logic, or if it can't do so for the particular item,
* // request more from the upstream
* &#64;Override
* public void onSuccess(T item) {
* String str = item.toString();
* if (str.length() &lt; 2) {
* downstream.onSuccess(str);
* } else {
* // Maybe is usually expected to produce one of the onXXX events
* downstream.onComplete();
* }
* }
*
* // Some operators may handle the upstream's error while others
* // could just forward it to the downstream.
* &#64;Override
* public void onError(Throwable throwable) {
* downstream.onError(throwable);
* }
*
* // When the upstream completes, usually the downstream should complete as well.
* &#64;Override
* public void onComplete() {
* downstream.onComplete();
* }
*
* // Some operators may use their own resources which should be cleaned up if
* // the downstream disposes the flow before it completed. Operators without
* // resources can simply forward the dispose to the upstream.
* // In some cases, a disposed flag may be set by this method so that other parts
* // of this class may detect the dispose and stop sending events
* // to the downstream.
* &#64;Override
* public void dispose() {
* upstream.dispose();
* }
*
* // Some operators may simply forward the call to the upstream while others
* // can return the disposed flag set in dispose().
* &#64;Override
* public boolean isDisposed() {
* return upstream.isDisposed();
* }
* }
*
* // Step 2: Create a class that implements the MaybeOperator interface and
* // returns the custom consumer type from above in its apply() method.
* // Such class may define additional parameters to be submitted to
* // the custom consumer type.
*
* final class CustomMaybeOperator&lt;T&gt; implements MaybeOperator&lt;String&gt; {
* &#64;Override
* public MaybeObserver&lt;? super String&gt; apply(MaybeObserver&lt;? super T&gt; upstream) {
* return new CustomMaybeObserver&lt;T&gt;(upstream);
* }
* }
*
* // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
* // or reusing an existing one.
*
* Maybe.just(5)
* .lift(new CustomMaybeOperator&lt;Integer&gt;())
* .test()
* .assertResult("5");
*
* Maybe.just(15)
* .lift(new CustomMaybeOperator&lt;Integer&gt;())
* .test()
* .assertResult();
* </code></pre>
* <p>
* Creating custom operators can be complicated and it is recommended one consults the
* <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
* the tools, requirements, rules, considerations and pitfalls of implementing them.
* <p>
* {@code task.map(...).filter(...).lift(new OperatorA()).lift(new OperatorB(...)).subscribe() }
* Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
* an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Maybe}
* class and creating a {@link MaybeTransformer} with it is recommended.
* <p>
* If the operator you are creating is designed to act on the item emitted by a source Maybe, use
* {@code lift}. If your operator is designed to transform the source Maybe as a whole (for instance, by
* applying a particular set of existing RxJava operators to it) use {@link #compose}.
* Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
* requires a non-null {@code MaybeObserver} instance to be returned, which is then unconditionally subscribed to
* the upstream {@code Maybe}. For example, if the operator decided there is no reason to subscribe to the
* upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
* return a {@code MaybeObserver} that should immediately dispose the upstream's {@code Disposable} in its
* {@code onSubscribe} method. Again, using a {@code MaybeTransformer} and extending the {@code Maybe} is
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
* <dl>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
* {@link MaybeOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
* </dl>
*
* @param <R> the downstream's value type (output)
* @param lift
* the MaybeOperator that implements the Maybe-operating function to be applied to the source Maybe
* @return a Maybe that is the result of applying the lifted Operator to the source Maybe
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
* @param <R> the output value type
* @param lift the {@link MaybeOperator} that receives the downstream's {@code MaybeObserver} and should return
* a {@code MaybeObserver} with custom behavior to be used as the consumer for the current
* {@code Maybe}.
* @return the new Maybe instance
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a>
* @see #compose(MaybeTransformer)
*/
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
Expand Down
Loading