Skip to content

2.x: Expand the documentation of the Flowable.lift() operator #5863

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 26, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
150 changes: 133 additions & 17 deletions src/main/java/io/reactivex/Flowable.java
Original file line number Diff line number Diff line change
Expand Up @@ -10063,34 +10063,150 @@ public final Single<T> lastOrError() {
}

/**
* <strong>This method requires advanced knowledge about building operators; please consider
* <strong>This method requires advanced knowledge about building operators, please consider
* other standard composition methods first;</strong>
* Lifts a function to the current Publisher and returns a new Publisher that when subscribed to will pass
* the values of the current Publisher through the Operator function.
* Returns a {@code Flowable} which, when subscribed to, invokes the {@link FlowableOperator#apply(Subscriber) apply(Subscriber)} method
* of the provided {@link FlowableOperator} for each individual downstream {@link Subscriber} and allows the
* insertion of a custom operator by accessing the downstream's {@link Subscriber} during this subscription phase
* and providing a new {@code Subscriber}, containing the custom operator's intended business logic, that will be
* used in the subscription process going further upstream.
* <p>
* Generally, such a new {@code Subscriber} will wrap the downstream's {@code Subscriber} and forwards the
* {@code onNext}, {@code onError} and {@code onComplete} events from the upstream directly or according to the
* emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
* flow control calls of {@code cancel} and {@code request} that would have travelled upstream and perform
* additional actions depending on the same business logic requirements.
* <p>
* In other words, this allows chaining Subscribers together on a Publisher for acting on the values within
* the Publisher.
* <p> {@code
* Publisher.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
* Example:
* <pre><code>
* // Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
*
* public final class CustomSubscriber&lt;T&gt; implements FlowableSubscriber&lt;T&gt;, Subscription {
*
* // The donstream's Subscriber that will receive the onXXX events
* final Subscriber&lt;? super String&gt; downstream;
*
* // The connection to the upstream source that will call this class' onXXX methods
* Subscription upstream;
*
* // The constructor takes the downstream subscriber and usually any other parameters
* public CustomSubscriber(Subscriber&lt;? super String&gt; downstream) {
* this.downstream = downstream;
* }
*
* // In the subscription phase, the upstream sends a Subscription to this class
* // and subsequently this class has to send a Subscription to the downstream.
* // Note that relaying the upstream's Subscription directly is not allowed in RxJava
* &#64;Override
* public void onSubscribe(Subscription s) {
* if (upstream != null) {
* s.cancel();
* } else {
* upstream = s;
* downstream.onSubscribe(this);
* }
* }
*
* // The upstream calls this with the next item and the implementation's
* // responsibility is to emit an item to the downstream based on the intended
* // business logic, or if it can't do so for the particular item,
* // request more from the upstream
* &#64;Override
* public void onNext(T item) {
* String str = item.toString();
* if (str.length() &lt; 2) {
* downstream.onNext(str);
* } else {
* upstream.request(1);
* }
* }
*
* // Some operators may handle the upstream's error while others
* // could just forward it to the downstream.
* &#64;Override
* public void onError(Throwable throwable) {
* downstream.onError(throwable);
* }
*
* // When the upstream completes, usually the downstream should complete as well.
* &#64;Override
* public void onComplete() {
* downstream.onComplete();
* }
*
* // Some operators have to intercept the downstream's request calls to trigger
* // the emission of queued items while others can simply forward the request
* // amount as is.
* &#64;Override
* public void request(long n) {
* upstream.request(n);
* }
*
* // Some operators may use their own resources which should be cleaned up if
* // the downstream cancels the flow before it completed. Operators without
* // resources can simply forward the cancellation to the upstream.
* // In some cases, a cancelled flag may be set by this method so that other parts
* // of this class may detect the cancellation and stop sending events
* // to the downstream.
* &#64;Override
* public void cancel() {
* upstream.cancel();
* }
* }
*
* // Step 2: Create a class that implements the FlowableOperator interface and
* // returns the custom consumer type from above in its apply() method.
* // Such class may define additional parameters to be submitted to
* // the custom consumer type.
*
* final class CustomOperator&lt;T&gt; implements FlowableOperator&lt;String&gt; {
* &#64;Override
* public Subscriber&lt;? super String&gt; apply(Subscriber&lt;? super T&gt; upstream) {
* return new CustomSubscriber&lt;T&gt;(upstream);
* }
* }
*
* // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
* // or reusing an existing one.
*
* Flowable.range(5, 10)
* .lift(new CustomOperator&lt;Integer&gt;())
* .test()
* .assertResult("5", "6", "7", "8", "9");
* </code></pre>
* <p>
* If the operator you are creating is designed to act on the individual items emitted by a source
* Publisher, use {@code lift}. If your operator is designed to transform the source Publisher as a whole
* (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}.
* Creating custom operators can be complicated and it is recommended one consults the
* <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
* the tools, requirements, rules, considerations and pitfalls of implementing them.
* <p>
* Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
* an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Flowable}
* class and creating a {@link FlowableTransformer} with it is recommended.
* <p>
* Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
* requires a non-null {@code Subscriber} instance to be returned, which is then unconditionally subscribed to
* the upstream {@code Flowable}. For example, if the operator decided there is no reason to subscribe to the
* upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
* return a {@code Subscriber} that should immediately cancel the upstream's {@code Subscription} in its
* {@code onSubscribe} method. Again, using a {@code FlowableTransformer} and extending the {@code Flowable} is
* a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
* <dl>
* <dt><b>Backpressure:</b></dt>
* <dd>The {@code Operator} instance provided is responsible to be backpressure-aware or
* document the fact that the consumer of the returned {@code Publisher} has to apply one of
* <dd>The {@code Subscriber} instance returned by the {@link FlowableOperator} is responsible to be
* backpressure-aware or document the fact that the consumer of the returned {@code Publisher} has to apply one of
* the {@code onBackpressureXXX} operators.</dd>
* <dt><b>Scheduler:</b></dt>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
* <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
* {@link FlowableOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
* </dl>
*
* @param <R> the output value type
* @param lifter the Operator that implements the Publisher-operating function to be applied to the source
* Publisher
* @return a Flowable that is the result of applying the lifted Operator to the source Publisher
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
* @param lifter the {@link FlowableOperator} that receives the downstream's {@code Subscriber} and should return
* a {@code Subscriber} with custom behavior to be used as the consumer for the current
* {@code Flowable}.
* @return the new Flowable instance
* @see <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a>
* @see #compose(FlowableTransformer)
*/
@CheckReturnValue
@BackpressureSupport(BackpressureKind.SPECIAL)
Expand Down