@@ -10063,34 +10063,150 @@ public final Single<T> lastOrError() {
10063
10063
}
10064
10064
10065
10065
/**
10066
- * <strong>This method requires advanced knowledge about building operators; please consider
10066
+ * <strong>This method requires advanced knowledge about building operators, please consider
10067
10067
* other standard composition methods first;</strong>
10068
- * Lifts a function to the current Publisher and returns a new Publisher that when subscribed to will pass
10069
- * the values of the current Publisher through the Operator function.
10068
+ * Returns a {@code Flowable} which, when subscribed to, invokes the {@link FlowableOperator#apply(Subscriber) apply(Subscriber)} method
10069
+ * of the provided {@link FlowableOperator} for each individual downstream {@link Subscriber} and allows the
10070
+ * insertion of a custom operator by accessing the downstream's {@link Subscriber} during this subscription phase
10071
+ * and providing a new {@code Subscriber}, containing the custom operator's intended business logic, that will be
10072
+ * used in the subscription process going further upstream.
10073
+ * <p>
10074
+ * Generally, such a new {@code Subscriber} will wrap the downstream's {@code Subscriber} and forwards the
10075
+ * {@code onNext}, {@code onError} and {@code onComplete} events from the upstream directly or according to the
10076
+ * emission pattern the custom operator's business logic requires. In addition, such operator can intercept the
10077
+ * flow control calls of {@code cancel} and {@code request} that would have travelled upstream and perform
10078
+ * additional actions depending on the same business logic requirements.
10070
10079
* <p>
10071
- * In other words, this allows chaining Subscribers together on a Publisher for acting on the values within
10072
- * the Publisher.
10073
- * <p> {@code
10074
- * Publisher.map(...).filter(...).take(5).lift(new OperatorA()).lift(new OperatorB(...)).subscribe()
10080
+ * Example:
10081
+ * <pre><code>
10082
+ * // Step 1: Create the consumer type that will be returned by the FlowableOperator.apply():
10083
+ *
10084
+ * public final class CustomSubscriber<T> implements FlowableSubscriber<T>, Subscription {
10085
+ *
10086
+ * // The donstream's Subscriber that will receive the onXXX events
10087
+ * final Subscriber<? super String> downstream;
10088
+ *
10089
+ * // The connection to the upstream source that will call this class' onXXX methods
10090
+ * Subscription upstream;
10091
+ *
10092
+ * // The constructor takes the downstream subscriber and usually any other parameters
10093
+ * public CustomSubscriber(Subscriber<? super String> downstream) {
10094
+ * this.downstream = downstream;
10095
+ * }
10096
+ *
10097
+ * // In the subscription phase, the upstream sends a Subscription to this class
10098
+ * // and subsequently this class has to send a Subscription to the downstream.
10099
+ * // Note that relaying the upstream's Subscription directly is not allowed in RxJava
10100
+ * @Override
10101
+ * public void onSubscribe(Subscription s) {
10102
+ * if (upstream != null) {
10103
+ * s.cancel();
10104
+ * } else {
10105
+ * upstream = s;
10106
+ * downstream.onSubscribe(this);
10107
+ * }
10108
+ * }
10109
+ *
10110
+ * // The upstream calls this with the next item and the implementation's
10111
+ * // responsibility is to emit an item to the downstream based on the intended
10112
+ * // business logic, or if it can't do so for the particular item,
10113
+ * // request more from the upstream
10114
+ * @Override
10115
+ * public void onNext(T item) {
10116
+ * String str = item.toString();
10117
+ * if (str.length() < 2) {
10118
+ * downstream.onNext(str);
10119
+ * } else {
10120
+ * upstream.request(1);
10121
+ * }
10122
+ * }
10123
+ *
10124
+ * // Some operators may handle the upstream's error while others
10125
+ * // could just forward it to the downstream.
10126
+ * @Override
10127
+ * public void onError(Throwable throwable) {
10128
+ * downstream.onError(throwable);
10129
+ * }
10130
+ *
10131
+ * // When the upstream completes, usually the downstream should complete as well.
10132
+ * @Override
10133
+ * public void onComplete() {
10134
+ * downstream.onComplete();
10135
+ * }
10136
+ *
10137
+ * // Some operators have to intercept the downstream's request calls to trigger
10138
+ * // the emission of queued items while others can simply forward the request
10139
+ * // amount as is.
10140
+ * @Override
10141
+ * public void request(long n) {
10142
+ * upstream.request(n);
10143
+ * }
10144
+ *
10145
+ * // Some operators may use their own resources which should be cleaned up if
10146
+ * // the downstream cancels the flow before it completed. Operators without
10147
+ * // resources can simply forward the cancellation to the upstream.
10148
+ * // In some cases, a cancelled flag may be set by this method so that other parts
10149
+ * // of this class may detect the cancellation and stop sending events
10150
+ * // to the downstream.
10151
+ * @Override
10152
+ * public void cancel() {
10153
+ * upstream.cancel();
10154
+ * }
10075
10155
* }
10156
+ *
10157
+ * // Step 2: Create a class that implements the FlowableOperator interface and
10158
+ * // returns the custom consumer type from above in its apply() method.
10159
+ * // Such class may define additional parameters to be submitted to
10160
+ * // the custom consumer type.
10161
+ *
10162
+ * final class CustomOperator<T> implements FlowableOperator<String> {
10163
+ * @Override
10164
+ * public Subscriber<? super String> apply(Subscriber<? super T> upstream) {
10165
+ * return new CustomSubscriber<T>(upstream);
10166
+ * }
10167
+ * }
10168
+ *
10169
+ * // Step 3: Apply the custom operator via lift() in a flow by creating an instance of it
10170
+ * // or reusing an existing one.
10171
+ *
10172
+ * Flowable.range(5, 10)
10173
+ * .lift(new CustomOperator<Integer>())
10174
+ * .test()
10175
+ * .assertResult("5", "6", "7", "8", "9");
10176
+ * </code></pre>
10076
10177
* <p>
10077
- * If the operator you are creating is designed to act on the individual items emitted by a source
10078
- * Publisher, use {@code lift}. If your operator is designed to transform the source Publisher as a whole
10079
- * (for instance, by applying a particular set of existing RxJava operators to it) use {@link #compose}.
10178
+ * Creating custom operators can be complicated and it is recommended one consults the
10179
+ * <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a> page about
10180
+ * the tools, requirements, rules, considerations and pitfalls of implementing them.
10181
+ * <p>
10182
+ * Note that implementing custom operators via this {@code lift()} method adds slightly more overhead by requiring
10183
+ * an additional allocation and indirection per assembled flows. Instead, extending the abstract {@code Flowable}
10184
+ * class and creating a {@link FlowableTransformer} with it is recommended.
10185
+ * <p>
10186
+ * Note also that it is not possible to stop the subscription phase in {@code lift()} as the {@code apply()} method
10187
+ * requires a non-null {@code Subscriber} instance to be returned, which is then unconditionally subscribed to
10188
+ * the upstream {@code Flowable}. For example, if the operator decided there is no reason to subscribe to the
10189
+ * upstream source because of some optimization possibility or a failure to prepare the operator, it still has to
10190
+ * return a {@code Subscriber} that should immediately cancel the upstream's {@code Subscription} in its
10191
+ * {@code onSubscribe} method. Again, using a {@code FlowableTransformer} and extending the {@code Flowable} is
10192
+ * a better option as {@link #subscribeActual} can decide to not subscribe to its upstream after all.
10080
10193
* <dl>
10081
10194
* <dt><b>Backpressure:</b></dt>
10082
- * <dd>The {@code Operator } instance provided is responsible to be backpressure-aware or
10083
- * document the fact that the consumer of the returned {@code Publisher} has to apply one of
10195
+ * <dd>The {@code Subscriber } instance returned by the {@link FlowableOperator} is responsible to be
10196
+ * backpressure-aware or document the fact that the consumer of the returned {@code Publisher} has to apply one of
10084
10197
* the {@code onBackpressureXXX} operators.</dd>
10085
10198
* <dt><b>Scheduler:</b></dt>
10086
- * <dd>{@code lift} does not operate by default on a particular {@link Scheduler}.</dd>
10199
+ * <dd>{@code lift} does not operate by default on a particular {@link Scheduler}, however, the
10200
+ * {@link FlowableOperator} may use a {@code Scheduler} to support its own asynchronous behavior.</dd>
10087
10201
* </dl>
10088
10202
*
10089
10203
* @param <R> the output value type
10090
- * @param lifter the Operator that implements the Publisher-operating function to be applied to the source
10091
- * Publisher
10092
- * @return a Flowable that is the result of applying the lifted Operator to the source Publisher
10093
- * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Implementing-Your-Own-Operators">RxJava wiki: Implementing Your Own Operators</a>
10204
+ * @param lifter the {@link FlowableOperator} that receives the downstream's {@code Subscriber} and should return
10205
+ * a {@code Subscriber} with custom behavior to be used as the consumer for the current
10206
+ * {@code Flowable}.
10207
+ * @return the new Flowable instance
10208
+ * @see <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">RxJava wiki: Writing operators</a>
10209
+ * @see #compose(FlowableTransformer)
10094
10210
*/
10095
10211
@CheckReturnValue
10096
10212
@BackpressureSupport(BackpressureKind.SPECIAL)
0 commit comments