diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index e55c8e382c..d86b6ec628 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -136,6 +136,9 @@ public static Completable concat(Iterable sources) /** * Returns a Completable which completes only when all sources complete, one after another. *
+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -152,6 +155,9 @@ public static Completable concat(Publisher sources) /** * Returns a Completable which completes only when all sources complete, one after another. *
+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -197,9 +203,9 @@ public static Completable concat(Publisher sources, *
Scheduler:
*
{@code create} does not operate by default on a particular {@link Scheduler}.
* - * @param source the emitter that is called when a Subscriber subscribes to the returned {@code Flowable} + * @param source the emitter that is called when a CompletableObserver subscribes to the returned {@code Completable} * @return the new Completable instance - * @see FlowableOnSubscribe + * @see CompletableOnSubscribe * @see Cancellable */ @SchedulerSupport(SchedulerSupport.NONE) @@ -249,7 +255,7 @@ public static Completable defer(final Callable comp * Creates a Completable which calls the given error supplier for each subscriber * and emits its returned Throwable. *

- * If the errorSupplier returns null, the child CompletableSubscribers will receive a + * If the errorSupplier returns null, the child CompletableObservers will receive a * NullPointerException. *

*
Scheduler:
@@ -354,6 +360,9 @@ public static Completable fromObservable(final ObservableSource observabl * Returns a Completable instance that subscribes to the given publisher, ignores all values and * emits only the terminal event. *
+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
*
Scheduler:
*
{@code fromPublisher} does not operate by default on a particular {@link Scheduler}.
*
@@ -431,6 +440,9 @@ public static Completable merge(Iterable sources) { * Returns a Completable instance that subscribes to all sources at once and * completes only when all source Completables complete or one of them emits an error. *
+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
*
@@ -448,6 +460,9 @@ public static Completable merge(Publisher sources) * Returns a Completable instance that keeps subscriptions to a limited number of sources at once and * completes only when all source Completables complete or one of them emits an error. *
+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
*
@@ -468,6 +483,9 @@ public static Completable merge(Publisher sources, * completes only when all source Completables terminate in one way or another, combining any exceptions * thrown by either the sources Observable or the inner Completable instances. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well. *
Scheduler:
*
{@code merge0} does not operate by default on a particular {@link Scheduler}.
*
@@ -528,6 +546,9 @@ public static Completable mergeDelayError(Iterable * any error emitted by either the sources observable or any of the inner Completables until all of * them terminate in a way or another. *
+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
*
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
*
@@ -547,6 +568,9 @@ public static Completable mergeDelayError(Publisher * observable or any of the inner Completables until all of * them terminate in a way or another. *
+ *
Backpressure:
+ *
The returned {@code Completable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well.
*
Scheduler:
*
{@code mergeDelayError} does not operate by default on a particular {@link Scheduler}.
*
@@ -732,11 +756,14 @@ public final Observable andThen(ObservableSource next) { } /** - * Returns an Flowable which will subscribe to this Completable and once that is completed then + * Returns a Flowable which will subscribe to this Completable and once that is completed then * will subscribe to the {@code next} Flowable. An error event from this Completable will be * propagated to the downstream subscriber and will result in skipping the subscription of the * Observable. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well. *
Scheduler:
*
{@code andThen} does not operate by default on a particular {@link Scheduler}.
*
@@ -1387,6 +1414,9 @@ public final Observable startWith(Observable other) { * Returns an Observable which first delivers the events * of the other Observable then runs this Completable. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and expects the other {@code Publisher} to honor it as well. *
Scheduler:
*
{@code startWith} does not operate by default on a particular {@link Scheduler}.
*
@@ -1659,6 +1689,8 @@ public final U to(Function converter) { * Returns an Observable which when subscribed to subscribes to this Completable and * relays the terminal events to the subscriber. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. *
Scheduler:
*
{@code toFlowable} does not operate by default on a particular {@link Scheduler}.
*
diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 3f2931d909..de24cf696b 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -408,7 +408,7 @@ public static Flowable combineLatestDelayError(Function @@ -1274,7 +1274,7 @@ public static Flowable concatArrayDelayError(Publisher... so * in order, each one after the previous one completes. *
*
Backpressure:
- *
The operator honors backpressure from downstream. The {@code Publisher} + *
The operator honors backpressure from downstream. The {@code Publisher} * sources are expected to honor backpressure as well. * If any of the source {@code Publisher}s violate this, the operator will signal a * {@code MissingBackpressureException}.
@@ -1300,7 +1300,7 @@ public static Flowable concatArrayEager(Publisher... sources * in order, each one after the previous one completes. *
*
Backpressure:
- *
The operator honors backpressure from downstream. The {@code Publisher} + *
The operator honors backpressure from downstream. The {@code Publisher} * sources are expected to honor backpressure as well. * If any of the source {@code Publisher}s violate this, the operator will signal a * {@code MissingBackpressureException}.
@@ -1564,7 +1564,7 @@ public static Flowable create(FlowableOnSubscribe source, FlowableEmit * *

* The defer Subscriber allows you to defer or delay emitting items from a Publisher until such time as an - * Subscriber subscribes to the Publisher. This allows an {@link Subscriber} to easily obtain updates or a + * Subscriber subscribes to the Publisher. This allows a {@link Subscriber} to easily obtain updates or a * refreshed version of the sequence. *

*
Backpressure:
@@ -1616,7 +1616,7 @@ public static Flowable empty() { } /** - * Returns a Flowable that invokes an {@link Subscriber}'s {@link Subscriber#onError onError} method when the + * Returns a Flowable that invokes a {@link Subscriber}'s {@link Subscriber#onError onError} method when the * Subscriber subscribes to it. *

* @@ -1643,7 +1643,7 @@ public static Flowable error(Callable supplier) { } /** - * Returns a Flowable that invokes an {@link Subscriber}'s {@link Subscriber#onError onError} method when the + * Returns a Flowable that invokes a {@link Subscriber}'s {@link Subscriber#onError onError} method when the * Subscriber subscribes to it. *

* @@ -1702,12 +1702,12 @@ public static Flowable fromArray(T... items) { } /** - * Returns a Flowable that, when an Subscriber subscribes to it, invokes a function you specify and then + * Returns a Flowable that, when a Subscriber subscribes to it, invokes a function you specify and then * emits the value returned from that function. *

* *

- * This allows you to defer the execution of the function you specify until an Subscriber subscribes to the + * This allows you to defer the execution of the function you specify until a Subscriber subscribes to the * Publisher. That is to say, it makes the function "lazy." *

*
Backpressure:
@@ -1718,7 +1718,7 @@ public static Flowable fromArray(T... items) { * * @param supplier * a function, the execution of which should be deferred; {@code fromCallable} will invoke this - * function only when an Subscriber subscribes to the Publisher that {@code fromCallable} returns + * function only when a Subscriber subscribes to the Publisher that {@code fromCallable} returns * @param * the type of the item emitted by the Publisher * @return a Flowable whose {@link Subscriber}s' subscriptions trigger an invocation of the given function @@ -2151,7 +2151,7 @@ public static Flowable interval(long initialDelay, long period, TimeUnit u *
*
Backpressure:
*
The operator signals a {@code MissingBackpressureException} if the downstream - * is not ready to receive the next value. + * is not ready to receive the next value.
*
Scheduler:
*
{@code interval} operates by default on the {@code computation} {@link Scheduler}.
*
@@ -3007,7 +3007,7 @@ public static Flowable merge( } /** - * Flattens an Iterable of Publishers into one Publisher, in a way that allows an Subscriber to receive all + * Flattens an Iterable of Publishers into one Publisher, in a way that allows a Subscriber to receive all * successfully emitted items from each of the source Publishers without being interrupted by an error * notification from one of them. *

@@ -3043,7 +3043,7 @@ public static Flowable mergeDelayError(Iterable @@ -3082,7 +3082,7 @@ public static Flowable mergeDelayError(Iterable @@ -3121,7 +3121,7 @@ public static Flowable mergeArrayDelayError(int maxConcurrency, int buffe } /** - * Flattens an Iterable of Publishers into one Publisher, in a way that allows an Subscriber to receive all + * Flattens an Iterable of Publishers into one Publisher, in a way that allows a Subscriber to receive all * successfully emitted items from each of the source Publishers without being interrupted by an error * notification from one of them, while limiting the number of concurrent subscriptions to these Publishers. *

@@ -3158,7 +3158,7 @@ public static Flowable mergeDelayError(Iterable @@ -3193,7 +3193,7 @@ public static Flowable mergeDelayError(Publisher Flowable mergeDelayError(Publisher @@ -3267,7 +3267,7 @@ public static Flowable mergeArrayDelayError(Publisher... sou } /** - * Flattens two Publishers into one Publisher, in a way that allows an Subscriber to receive all + * Flattens two Publishers into one Publisher, in a way that allows a Subscriber to receive all * successfully emitted items from each of the source Publishers without being interrupted by an error * notification from one of them. *

@@ -3305,7 +3305,7 @@ public static Flowable mergeDelayError(Publisher source1, Pu } /** - * Flattens three Publishers into one Publisher, in a way that allows an Subscriber to receive all + * Flattens three Publishers into one Publisher, in a way that allows a Subscriber to receive all * successfully emitted items from all of the source Publishers without being interrupted by an error * notification from one of them. *

@@ -3348,7 +3348,7 @@ public static Flowable mergeDelayError(Publisher source1, Pu /** - * Flattens four Publishers into one Publisher, in a way that allows an Subscriber to receive all + * Flattens four Publishers into one Publisher, in a way that allows a Subscriber to receive all * successfully emitted items from all of the source Publishers without being interrupted by an error * notification from one of them. *

@@ -3395,7 +3395,7 @@ public static Flowable mergeDelayError( } /** - * Returns a Flowable that never sends any items or notifications to an {@link Subscriber}. + * Returns a Flowable that never sends any items or notifications to a {@link Subscriber}. *

* *

@@ -3409,7 +3409,7 @@ public static Flowable mergeDelayError( * * @param * the type of items (not) emitted by the Publisher - * @return a Flowable that never emits any items or sends any notifications to an {@link Subscriber} + * @return a Flowable that never emits any items or sends any notifications to a {@link Subscriber} * @see ReactiveX operators documentation: Never */ @BackpressureSupport(BackpressureKind.PASS_THROUGH) diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index bf9eae10b7..785a6a4d98 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -86,6 +86,8 @@ public static Maybe ambArray(final MaybeSource... sources) { * Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources provided by * an Iterable sequence. *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -103,8 +105,10 @@ public static Flowable concat(Iterable /** * Returns a Flowable that emits the items emitted by two MaybeSources, one after the other. *

- * + * *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -129,8 +133,10 @@ public static Flowable concat(MaybeSource source1, MaybeSour /** * Returns a Flowable that emits the items emitted by three MaybeSources, one after the other. *

- * + * *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -159,8 +165,10 @@ public static Flowable concat( /** * Returns a Flowable that emits the items emitted by four MaybeSources, one after the other. *

- * + * *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -193,8 +201,12 @@ public static Flowable concat( * Concatenate the single values, in a non-overlapping fashion, of the MaybeSource sources provided by * a Publisher sequence. *
- *
Scheduler:
- *
{@code concat} does not operate by default on a particular {@link Scheduler}.
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer and + * expects the {@code Publisher} to honor backpressure as well. If the sources {@code Publisher} + * violates this, a {@code MissingBackpressurException} is signalled.
+ *
Scheduler:
+ *
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
* @param the value type * @param sources the Publisher of MaybeSource instances @@ -210,6 +222,10 @@ public static Flowable concat(Publisher + *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer and + * expects the {@code Publisher} to honor backpressure as well. If the sources {@code Publisher} + * violates this, a {@code MissingBackpressurException} is signalled.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -230,11 +246,13 @@ public static Flowable concat(Publisher - *
Scheduler:
- *
{@code concatArray} does not operate by default on a particular {@link Scheduler}.
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
+ *
Scheduler:
+ *
{@code concatArray} does not operate by default on a particular {@link Scheduler}.
*
* @param the value type - * @param sources the Publisher of MaybeSource instances + * @param sources the array of MaybeSource instances * @return the new Flowable instance */ @BackpressureSupport(BackpressureKind.FULL) @@ -258,9 +276,7 @@ public static Flowable concatArray(MaybeSource... sources) { * *
*
Backpressure:
- *
The operator honors backpressure from downstream. - * If the {@code Publisher} violate this, it may throw an - * {@code IllegalStateException} when the source {@code Publisher} completes.
+ *
The operator honors backpressure from downstream.
*
Scheduler:
*
{@code concatArrayDelayError} does not operate by default on a particular {@link Scheduler}.
*
@@ -290,7 +306,7 @@ public static Flowable concatArrayDelayError(MaybeSource... * in order, each one after the previous one completes. *
*
Backpressure:
- *
The operator honors backpressure from downstream.
+ *
The operator honors backpressure from downstream.
*
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
*
@@ -306,22 +322,19 @@ public static Flowable concatArrayEager(MaybeSource... sourc } /** - * Concatenates the Iterable sequence of Publishers into a single sequence by subscribing to each Publisher, - * one after the other, one at a time and delays any errors till the all inner Publishers terminate. + * Concatenates the Iterable sequence of MaybeSources into a single sequence by subscribing to each MaybeSource, + * one after the other, one at a time and delays any errors till the all inner MaybeSources terminate. * *
*
Backpressure:
- *
The operator honors backpressure from downstream. The inner {@code Publisher} - * sources are expected to honor backpressure. If any of the inner {@code Publisher}s violates - * this, it may throw an {@code IllegalStateException} when an - * inner {@code Publisher} completes.
+ *
The operator honors backpressure from downstream.
*
Scheduler:
*
{@code concatDelayError} does not operate by default on a particular {@link Scheduler}.
*
* * @param the common element base type * @param sources the Iterable sequence of MaybeSources - * @return the new Publisher with the concatenating behavior + * @return the new Flowable with the concatenating behavior */ @SuppressWarnings({ "unchecked", "rawtypes" }) @BackpressureSupport(BackpressureKind.FULL) @@ -361,9 +374,7 @@ public static Flowable concatDelayError(Publisher *
Backpressure:
- *
Backpressure is honored towards the downstream and outer Publisher is expected - * to honor backpressure. Violating this assumption, the operator will - * signal {@code MissingBackpressureException}.
+ *
Backpressure is honored towards the downstream.
*
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
*
@@ -437,7 +448,7 @@ public static Flowable concatEager(Publisher{@code create} does not operate by default on a particular {@link Scheduler}. *
* @param the value type - * @param onSubscribe the emitter that is called when a MaybeObserver subscribes to the returned {@code Flowable} + * @param onSubscribe the emitter that is called when a MaybeObserver subscribes to the returned {@code Maybe} * @return the new Maybe instance * @see MaybeOnSubscribe * @see Cancellable @@ -488,7 +499,7 @@ public static Maybe empty() { * Returns a Maybe that invokes a subscriber's {@link MaybeObserver#onError onError} method when the * subscriber subscribes to it. *

- * + * *

*
Scheduler:
*
{@code error} does not operate by default on a particular {@link Scheduler}.
@@ -509,8 +520,8 @@ public static Maybe error(Throwable exception) { } /** - * Returns a Maybe that invokes an {@link Observer}'s {@link MaybeObserver#onError onError} method when the - * Observer subscribes to it. + * Returns a Maybe that invokes a {@link MaybeObserver}'s {@link MaybeObserver#onError onError} method when the + * MaybeObserver subscribes to it. *

* *

@@ -519,7 +530,7 @@ public static Maybe error(Throwable exception) { *
* * @param supplier - * a Callable factory to return a Throwable for each individual Subscriber + * a Callable factory to return a Throwable for each individual MaybeObserver * @param * the type of the items (ostensibly) emitted by the Maybe * @return a Maybe that invokes the {@link MaybeObserver}'s {@link MaybeObserver#onError onError} method when @@ -596,7 +607,7 @@ public static Maybe fromCallable(final Callable callable) { * the source {@link Future} * @param * the type of object that the {@link Future} returns, and also the type of item to be emitted by - * the resulting Publisher + * the resulting Maybe * @return a Maybe that emits the item from the source {@link Future} * @see ReactiveX operators documentation: From */ @@ -615,13 +626,11 @@ public static Maybe fromFuture(Future future) { * return value of the {@link Future#get} method of that object, by passing the object into the {@code fromFuture} * method. *

- * Unlike 1.x, cancelling the Flowable won't cancel the future. If necessary, one can use composition to achieve the + * Unlike 1.x, cancelling the Maybe won't cancel the future. If necessary, one can use composition to achieve the * cancellation effect: {@code futureMaybe.doOnCancel(() -> future.cancel(true));}. *

* Important note: This Maybe is blocking on the thread it gets subscribed on; you cannot unsubscribe from it. *

- *
Backpressure:
- *
The operator honors backpressure from downstream.
*
Scheduler:
*
{@code fromFuture} does not operate by default on a particular {@link Scheduler}.
*
@@ -634,7 +643,7 @@ public static Maybe fromFuture(Future future) { * the {@link TimeUnit} of the {@code timeout} argument * @param * the type of object that the {@link Future} returns, and also the type of item to be emitted by - * the resulting Publisher + * the resulting Maybe * @return a Maybe that emits the item from the source {@link Future} * @see ReactiveX operators documentation: From */ @@ -667,9 +676,9 @@ public static Maybe fromRunnable(final Runnable run) { /** * Returns a {@code Maybe} that emits a specified item. *

- * + * *

- * To convert any object into a {@code Single} that emits that object, pass that object into the + * To convert any object into a {@code Maybe} that emits that object, pass that object into the * {@code just} method. *

*
Scheduler:
@@ -749,10 +758,10 @@ public static Flowable merge(Publisher } /** - * Flattens a {@code Single} that emits a {@code Single} into a single {@code Single} that emits the item - * emitted by the nested {@code Single}, without any transformation. + * Flattens a {@code MaybeSource} that emits a {@code MaybeSource} into a single {@code MaybeSource} that emits the item + * emitted by the nested {@code MaybeSource}, without any transformation. *

- * + * *

*

*
Scheduler:
@@ -761,8 +770,8 @@ public static Flowable merge(Publisher * * @param the value type of the sources and the output * @param source - * a {@code Single} that emits a {@code Single} - * @return a {@code Single} that emits the item that is the result of flattening the {@code Single} emitted + * a {@code MaybeSource} that emits a {@code MaybeSource} + * @return a {@code Maybe} that emits the item that is the result of flattening the {@code MaybeSource} emitted * by {@code source} * @see ReactiveX operators documentation: Merge */ @@ -773,11 +782,11 @@ public static Maybe merge(MaybeSource> } /** - * Flattens two Singles into a single Observable, without any transformation. + * Flattens two MaybeSources into a single Flowable, without any transformation. *

- * + * *

- * You can combine items emitted by multiple Singles so that they appear as a single Observable, by + * You can combine items emitted by multiple MaybeSources so that they appear as a single Flowable, by * using the {@code merge} method. *

*
Backpressure:
@@ -788,10 +797,10 @@ public static Maybe merge(MaybeSource> * * @param the common value type * @param source1 - * a Single to be merged + * a MaybeSource to be merged * @param source2 - * a Single to be merged - * @return a Flowable that emits all of the items emitted by the source Singles + * a MaybeSource to be merged + * @return a Flowable that emits all of the items emitted by the source MaybeSources * @see ReactiveX operators documentation: Merge */ @BackpressureSupport(BackpressureKind.FULL) @@ -806,11 +815,11 @@ public static Flowable merge( } /** - * Flattens three Singles into a single Observable, without any transformation. + * Flattens three MaybeSources into a single Flowable, without any transformation. *

- * + * *

- * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * You can combine items emitted by multiple MaybeSources so that they appear as a single Flowable, by using * the {@code merge} method. *

*
Backpressure:
@@ -821,12 +830,12 @@ public static Flowable merge( * * @param the common value type * @param source1 - * a Single to be merged + * a MaybeSource to be merged * @param source2 - * a Single to be merged + * a MaybeSource to be merged * @param source3 - * a Single to be merged - * @return a Flowable that emits all of the items emitted by the source Singles + * a MaybeSource to be merged + * @return a Flowable that emits all of the items emitted by the source MaybeSources * @see ReactiveX operators documentation: Merge */ @BackpressureSupport(BackpressureKind.FULL) @@ -843,11 +852,11 @@ public static Flowable merge( } /** - * Flattens four Singles into a single Observable, without any transformation. + * Flattens four MaybeSources into a single Flowable, without any transformation. *

- * + * *

- * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * You can combine items emitted by multiple MaybeSources so that they appear as a single Flowable, by using * the {@code merge} method. *

*
Backpressure:
@@ -858,14 +867,14 @@ public static Flowable merge( * * @param the common value type * @param source1 - * a Single to be merged + * a MaybeSource to be merged * @param source2 - * a Single to be merged + * a MaybeSource to be merged * @param source3 - * a Single to be merged + * a MaybeSource to be merged * @param source4 - * a Single to be merged - * @return a Flowable that emits all of the items emitted by the source Singles + * a MaybeSource to be merged + * @return a Flowable that emits all of the items emitted by the source MaybeSources * @see ReactiveX operators documentation: Merge */ @BackpressureSupport(BackpressureKind.FULL) @@ -910,17 +919,17 @@ public static Flowable mergeArray(MaybeSource... sources) { } /** - * Flattens an array of MaybeSources into one Publisher, in a way that allows an Subscriber to receive all + * Flattens an array of MaybeSources into one Flowable, in a way that allows a Subscriber to receive all * successfully emitted items from each of the source MaybeSources without being interrupted by an error * notification from one of them. *

* This behaves like {@link #merge(Publisher)} except that if any of the merged MaybeSources notify of an * error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that - * error notification until all of the merged Publishers have finished emitting items. + * error notification until all of the merged MaybeSources have finished emitting items. *

* *

- * Even if multiple merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only + * Even if multiple merged MaybeSources send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its Subscribers once. *

*
Backpressure:
@@ -933,7 +942,7 @@ public static Flowable mergeArray(MaybeSource... sources) { * @param sources * the Iterable of MaybeSources * @return a Flowable that emits items that are the result of flattening the items emitted by the - * Publishers in the Iterable + * MaybeSources in the Iterable * @see ReactiveX operators documentation: Merge */ @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -945,17 +954,17 @@ public static Flowable mergeArrayDelayError(MaybeSource... s /** - * Flattens an Iterable of MaybeSources into one Publisher, in a way that allows an Subscriber to receive all - * successfully emitted items from each of the source Publishers without being interrupted by an error + * Flattens an Iterable of MaybeSources into one Flowable, in a way that allows a Subscriber to receive all + * successfully emitted items from each of the source MaybeSources without being interrupted by an error * notification from one of them. *

- * This behaves like {@link #merge(Publisher)} except that if any of the merged Publishers notify of an + * This behaves like {@link #merge(Publisher)} except that if any of the merged MaybeSources notify of an * error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain from propagating that - * error notification until all of the merged Publishers have finished emitting items. + * error notification until all of the merged MaybeSources have finished emitting items. *

* *

- * Even if multiple merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only + * Even if multiple merged MaybeSources send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its Subscribers once. *

*
Backpressure:
@@ -966,9 +975,9 @@ public static Flowable mergeArrayDelayError(MaybeSource... s * * @param the common element base type * @param sources - * the Iterable of Publishers + * the Iterable of MaybeSources * @return a Flowable that emits items that are the result of flattening the items emitted by the - * Publishers in the Iterable + * MaybeSources in the Iterable * @see ReactiveX operators documentation: Merge */ @SuppressWarnings({ "unchecked", "rawtypes" }) @@ -980,7 +989,7 @@ public static Flowable mergeDelayError(Iterable @@ -1015,7 +1024,7 @@ public static Flowable mergeDelayError(Publisher @@ -1025,7 +1034,7 @@ public static Flowable mergeDelayError(Publisher * *

- * Even if both merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only + * Even if both merged MaybeSources send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its Subscribers once. *

*
Backpressure:
@@ -1052,18 +1061,18 @@ public static Flowable mergeDelayError(MaybeSource source1, } /** - * Flattens three MaybeSource into one Flowable, in a way that allows an Subscriber to receive all + * Flattens three MaybeSource into one Flowable, in a way that allows a Subscriber to receive all * successfully emitted items from all of the source MaybeSources without being interrupted by an error * notification from one of them. *

* This behaves like {@link #merge(MaybeSource, MaybeSource, MaybeSource)} except that if any of the merged - * Publishers notify of an error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain - * from propagating that error notification until all of the merged Publishers have finished emitting + * MaybeSources notify of an error via {@link Subscriber#onError onError}, {@code mergeDelayError} will refrain + * from propagating that error notification until all of the merged MaybeSources have finished emitting * items. *

* *

- * Even if multiple merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only + * Even if multiple merged MaybeSources send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its Subscribers once. *

*
Backpressure:
@@ -1074,7 +1083,7 @@ public static Flowable mergeDelayError(MaybeSource source1, * * @param the common element base type * @param source1 - * a PublMaybeSourceisher to be merged + * a MaybeSource to be merged * @param source2 * a MaybeSource to be merged * @param source3 @@ -1095,18 +1104,18 @@ public static Flowable mergeDelayError(MaybeSource source1, /** - * Flattens four MaybeSources into one Flowable, in a way that allows an Subscriber to receive all - * successfully emitted items from all of the source Publishers without being interrupted by an error + * Flattens four MaybeSources into one Flowable, in a way that allows a Subscriber to receive all + * successfully emitted items from all of the source MaybeSources without being interrupted by an error * notification from one of them. *

* This behaves like {@link #merge(MaybeSource, MaybeSource, MaybeSource, MaybeSource)} except that if any of - * the merged Publishers notify of an error via {@link Subscriber#onError onError}, {@code mergeDelayError} + * the merged MaybeSources notify of an error via {@link Subscriber#onError onError}, {@code mergeDelayError} * will refrain from propagating that error notification until all of the merged MaybeSources have finished * emitting items. *

* *

- * Even if multiple merged Publishers send {@code onError} notifications, {@code mergeDelayError} will only + * Even if multiple merged MaybeSources send {@code onError} notifications, {@code mergeDelayError} will only * invoke the {@code onError} method of its Subscribers once. *

*
Backpressure:
@@ -1141,7 +1150,7 @@ public static Flowable mergeDelayError( } /** - * Returns a Maybe that never sends any items or notifications to an {@link MaybeObserver}. + * Returns a Maybe that never sends any items or notifications to a {@link MaybeObserver}. *

* *

@@ -1153,7 +1162,7 @@ public static Flowable mergeDelayError( * * @param * the type of items (not) emitted by the Maybe - * @return a Maybe that never emits any items or sends any notifications to an {@link MaybeObserver} + * @return a Maybe that never emits any items or sends any notifications to a {@link MaybeObserver} * @see ReactiveX operators documentation: Never */ @SchedulerSupport(SchedulerSupport.NONE) @@ -1165,7 +1174,7 @@ public static Maybe never() { /** * Returns a Single that emits a Boolean value that indicates whether two MaybeSource sequences are the - * same by comparing the items emitted by each Publisher pairwise. + * same by comparing the items emitted by each MaybeSource pairwise. *

* *

@@ -1238,7 +1247,7 @@ public static Maybe timer(long delay, TimeUnit unit) { } /** - * Returns a Flowable that emits one item after a specified delay on a specified Scheduler. + * Returns a Maybe that emits one item after a specified delay on a specified Scheduler. *

* *

@@ -1292,7 +1301,7 @@ public static Maybe unsafeCreate(MaybeSource onSubscribe) { *
{@code using} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the element type of the generated Publisher + * @param the element type of the generated MaybeSource * @param the type of the resource associated with the output sequence * @param resourceSupplier * the factory function to create a resource object that depends on the Maybe @@ -1323,7 +1332,7 @@ public static Maybe using(Callable resourceSupplier, *
{@code using} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the element type of the generated Publisher + * @param the element type of the generated MaybeSource * @param the type of the resource associated with the output sequence * @param resourceSupplier * the factory function to create a resource object that depends on the Maybe @@ -2161,7 +2170,7 @@ public final Maybe delay(Publisher delayIndicator) { * until the other Publisher emits an element or completes normally. *

*

- *
+ *
Backpressure:
*
The {@code Publisher} source is consumed in an unbounded fashion (without applying backpressure). *
Scheduler:
*
This method does not operate by default on a particular {@link Scheduler}.
@@ -2496,8 +2505,8 @@ public final Maybe flatMap(Function * *
@@ -2507,7 +2516,7 @@ public final Maybe flatMap(Function the result value type * @param mapper - * a function that, when applied to the item emitted by the source Maybe, returns a ObservableSource + * a function that, when applied to the item emitted by the source Maybe, returns an ObservableSource * @return the Observable returned from {@code func} when applied to the item emitted by the source Maybe * @see ReactiveX operators documentation: FlatMap */ @@ -2993,6 +3002,302 @@ public final Maybe onExceptionResumeNext(final MaybeSource next) ObjectHelper.requireNonNull(next, "next is null"); return RxJavaPlugins.onAssembly(new MaybeOnErrorNext(this, Functions.justFunction(next), false)); } + /** + * Nulls out references to the upstream producer and downstream MaybeObserver if + * the sequence is terminated or downstream unsubscribes. + *
+ *
Scheduler:
+ *
{@code onTerminateDetach} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return a Maybe which out references to the upstream producer and downstream MaybeObserver if + * the sequence is terminated or downstream unsubscribes + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe onTerminateDetach() { + return RxJavaPlugins.onAssembly(new MaybeDetach(this)); + } + + /** + * Returns a Flowable that repeats the sequence of items emitted by the source Publisher indefinitely. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well. + * If this expectation is violated, the operator may throw an {@code IllegalStateException}.
+ *
Scheduler:
+ *
{@code repeat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return a Flowable that emits the items emitted by the source Publisher repeatedly and in sequence + * @see ReactiveX operators documentation: Repeat + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable repeat() { + return repeat(Long.MAX_VALUE); + } + + /** + * Returns a Flowable that repeats the sequence of items emitted by the source Publisher at most + * {@code count} times. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well. + * If this expectation is violated, the operator may throw an {@code IllegalStateException}.
+ *
Scheduler:
+ *
{@code repeat} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param times + * the number of times the source Publisher items are repeated, a count of 0 will yield an empty + * sequence + * @return a Flowable that repeats the sequence of items emitted by the source Publisher at most + * {@code count} times + * @throws IllegalArgumentException + * if {@code count} is less than zero + * @see ReactiveX operators documentation: Repeat + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable repeat(long times) { + return toFlowable().repeat(times); + } + + /** + * Returns a Flowable that repeats the sequence of items emitted by the source Publisher until + * the provided stop function returns true. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well. + * If this expectation is violated, the operator may throw an {@code IllegalStateException}.
+ *
Scheduler:
+ *
{@code repeatUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param stop + * a boolean supplier that is called when the current Flowable completes and unless it returns + * false, the current Flowable is resubscribed + * @return the new Flowable instance + * @throws NullPointerException + * if {@code stop} is null + * @see ReactiveX operators documentation: Repeat + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable repeatUntil(BooleanSupplier stop) { + return toFlowable().repeatUntil(stop); + } + + /** + * Returns a Flowable that emits the same values as the source Publisher with the exception of an + * {@code onComplete}. An {@code onComplete} notification from the source will result in the emission of + * a {@code void} item to the Publisher provided as an argument to the {@code notificationHandler} + * function. If that Publisher calls {@code onComplete} or {@code onError} then {@code repeatWhen} will + * call {@code onComplete} or {@code onError} on the child subscription. Otherwise, this Publisher will + * resubscribe to the source Publisher. + *

+ * + *

+ *
Backpressure:
+ *
The operator honors downstream backpressure and expects the source {@code Publisher} to honor backpressure as well. + * If this expectation is violated, the operator may throw an {@code IllegalStateException}.
+ *
Scheduler:
+ *
{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param handler + * receives a Publisher of notifications with which a user can complete or error, aborting the repeat. + * @return the source Publisher modified with repeat logic + * @see ReactiveX operators documentation: Repeat + */ + @BackpressureSupport(BackpressureKind.FULL) + @SchedulerSupport(SchedulerSupport.NONE) + public final Flowable repeatWhen(final Function, ? extends Publisher> handler) { + return toFlowable().repeatWhen(handler); + } + + + /** + * Returns a Maybe that mirrors the source Maybe, resubscribing to it if it calls {@code onError} + * (infinite retry count). + *

+ * + *

+ * If the source Publisher calls {@link Subscriber#onError}, this method will resubscribe to the source + * Publisher rather than propagating the {@code onError} call. + *

+ * Any and all items emitted by the source Publisher will be emitted by the resulting Publisher, even + * those emitted during failed subscriptions. For example, if a Publisher fails at first but emits + * {@code [1, 2]} then succeeds the second time and emits {@code [1, 2, 3, 4, 5]} then the complete sequence + * of emissions and notifications would be {@code [1, 2, 1, 2, 3, 4, 5, onComplete]}. + *

+ *
Scheduler:
+ *
{@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @return the nww Maybe instance + * @see ReactiveX operators documentation: Retry + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe retry() { + return retry(Long.MAX_VALUE, Functions.alwaysTrue()); + } + + /** + * Returns a Maybe that mirrors the source Publisher, resubscribing to it if it calls {@code onError} + * and the predicate returns true for that specific exception and retry count. + *

+ * + *

+ *
Scheduler:
+ *
{@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param predicate + * the predicate that determines if a resubscription may happen in case of a specific exception + * and retry count + * @return the nww Maybe instance + * @see #retry() + * @see ReactiveX operators documentation: Retry + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe retry(BiPredicate predicate) { + return toFlowable().retry(predicate).toMaybe(); + } + + /** + * Returns a Maybe that mirrors the source Publisher, resubscribing to it if it calls {@code onError} + * up to a specified number of retries. + *

+ * + *

+ * If the source Publisher calls {@link Subscriber#onError}, this method will resubscribe to the source + * Publisher for a maximum of {@code count} resubscriptions rather than propagating the + * {@code onError} call. + *

+ * Any and all items emitted by the source Publisher will be emitted by the resulting Publisher, even + * those emitted during failed subscriptions. For example, if a Publisher fails at first but emits + * {@code [1, 2]} then succeeds the second time and emits {@code [1, 2, 3, 4, 5]} then the complete sequence + * of emissions and notifications would be {@code [1, 2, 1, 2, 3, 4, 5, onComplete]}. + *

+ *
Scheduler:
+ *
{@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param count + * number of retry attempts before failing + * @return the source Publisher modified with retry logic + * @see ReactiveX operators documentation: Retry + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe retry(long count) { + return retry(count, Functions.alwaysTrue()); + } + + /** + * Retries at most times or until the predicate returns false, whichever happens first. + * + *
+ *
Scheduler:
+ *
{@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param times the number of times to repeat + * @param predicate the predicate called with the failure Throwable and should return true to trigger a retry. + * @return the new Maybe instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe retry(long times, Predicate predicate) { + return toFlowable().retry(times, predicate).toMaybe(); + } + + /** + * Retries the current Flowable if the predicate returns true. + *
+ *
Scheduler:
+ *
{@code retry} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param predicate the predicate that receives the failure Throwable and should return true to trigger a retry. + * @return the new Maybe instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe retry(Predicate predicate) { + return retry(Long.MAX_VALUE, predicate); + } + + /** + * Retries until the given stop function returns true. + *
+ *
Scheduler:
+ *
{@code retryUntil} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param stop the function that should return true to stop retrying + * @return the new Maybe instance + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe retryUntil(final BooleanSupplier stop) { + ObjectHelper.requireNonNull(stop, "stop is null"); + return retry(Long.MAX_VALUE, Functions.predicateReverseFor(stop)); + } + + /** + * Returns a Maybe that emits the same values as the source Maybe with the exception of an + * {@code onError}. An {@code onError} notification from the source will result in the emission of a + * {@link Throwable} item to the Publisher provided as an argument to the {@code notificationHandler} + * function. If that Publisher calls {@code onComplete} or {@code onError} then {@code retry} will call + * {@code onComplete} or {@code onError} on the child subscription. Otherwise, this Publisher will + * resubscribe to the source Publisher. + *

+ * + * + * Example: + * + * This retries 3 times, each time incrementing the number of seconds it waits. + * + *


+     *  Publisher.create((Subscriber s) -> {
+     *      System.out.println("subscribing");
+     *      s.onError(new RuntimeException("always fails"));
+     *  }).retryWhen(attempts -> {
+     *      return attempts.zipWith(Publisher.range(1, 3), (n, i) -> i).flatMap(i -> {
+     *          System.out.println("delay retry by " + i + " second(s)");
+     *          return Publisher.timer(i, TimeUnit.SECONDS);
+     *      });
+     *  }).blockingForEach(System.out::println);
+     * 
+ * + * Output is: + * + *
 {@code
+     * subscribing
+     * delay retry by 1 second(s)
+     * subscribing
+     * delay retry by 2 second(s)
+     * subscribing
+     * delay retry by 3 second(s)
+     * subscribing
+     * } 
+ *
+ *
Scheduler:
+ *
{@code retryWhen} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param handler + * receives a Publisher of notifications with which a user can complete or error, aborting the + * retry + * @return the new Maybe instance + * @see ReactiveX operators documentation: Retry + */ + @SchedulerSupport(SchedulerSupport.NONE) + public final Maybe retryWhen( + final Function, ? extends Publisher> handler) { + return toFlowable().retryWhen(handler).toMaybe(); + } /** * Subscribes to a Maybe and ignores {@code onSuccess} and {@code onComplete} emissions. diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 1329a144b6..9ce2790744 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -118,7 +118,7 @@ public static Observable ambArray(ObservableSource... source * Returns the default 'island' size or capacity-increment hint for unbounded buffers. *

Delegates to {@link Flowable#bufferSize} but is public for convenience. *

The value can be overridden via system parameter {@code rx2.buffer-size} - * before the Flowable class is loaded. + * before the {@link Flowable} class is loaded. * @return the default 'island' size or capacity-increment hint */ public static int bufferSize() { @@ -1320,7 +1320,7 @@ public static Observable defer(Callable * *

@@ -1331,7 +1331,7 @@ public static Observable defer(Callable * the type of the items (ostensibly) emitted by the ObservableSource * @return an Observable that emits no items to the {@link Observer} but immediately invokes the - * {@link Subscriber}'s {@link Subscriber#onComplete() onComplete} method + * {@link Observer}'s {@link Observer#onComplete() onComplete} method * @see ReactiveX operators documentation: Empty */ @SchedulerSupport(SchedulerSupport.NONE) @@ -1351,7 +1351,7 @@ public static Observable empty() { *
* * @param errorSupplier - * a Callable factory to return a Throwable for each individual Subscriber + * a Callable factory to return a Throwable for each individual Observer * @param * the type of the items (ostensibly) emitted by the ObservableSource * @return an Observable that invokes the {@link Observer}'s {@link Observer#onError onError} method when @@ -1640,7 +1640,7 @@ public static Observable fromPublisher(Publisher publisher) *
* * @param the generated value type - * @param generator the Consumer called whenever a particular downstream Subscriber has + * @param generator the Consumer called whenever a particular downstream Observer has * requested a value. The callback then should call {@code onNext}, {@code onError} or * {@code onComplete} to signal a value or a terminal event. Signalling multiple {@code onNext} * in a call will make the operator signal {@code IllegalStateException}. @@ -1661,10 +1661,10 @@ public static Observable generate(final Consumer> generator) { *
{@code generate} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the type of the per-Subscriber state + * @param the type of the per-Observer state * @param the generated value type - * @param initialState the Callable to generate the initial state for each Subscriber - * @param generator the Consumer called with the current state whenever a particular downstream Subscriber has + * @param initialState the Callable to generate the initial state for each Observer + * @param generator the Consumer called with the current state whenever a particular downstream Observer has * requested a value. The callback then should call {@code onNext}, {@code onError} or * {@code onComplete} to signal a value or a terminal event. Signalling multiple {@code onNext} * in a call will make the operator signal {@code IllegalStateException}. @@ -1684,10 +1684,10 @@ public static Observable generate(Callable initialState, final BiCo *
{@code generate} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the type of the per-Subscriber state + * @param the type of the per-Observer state * @param the generated value type - * @param initialState the Callable to generate the initial state for each Subscriber - * @param generator the Consumer called with the current state whenever a particular downstream Subscriber has + * @param initialState the Callable to generate the initial state for each Observer + * @param generator the Consumer called with the current state whenever a particular downstream Observer has * requested a value. The callback then should call {@code onNext}, {@code onError} or * {@code onComplete} to signal a value or a terminal event. Signalling multiple {@code onNext} * in a call will make the operator signal {@code IllegalStateException}. @@ -1712,10 +1712,10 @@ public static Observable generate( *
{@code generate} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the type of the per-Subscriber state + * @param the type of the per-Observer state * @param the generated value type - * @param initialState the Callable to generate the initial state for each Subscriber - * @param generator the Function called with the current state whenever a particular downstream Subscriber has + * @param initialState the Callable to generate the initial state for each Observer + * @param generator the Function called with the current state whenever a particular downstream Observer has * requested a value. The callback then should call {@code onNext}, {@code onError} or * {@code onComplete} to signal a value or a terminal event and should return a (new) state for * the next invocation. Signalling multiple {@code onNext} @@ -1735,10 +1735,10 @@ public static Observable generate(Callable initialState, BiFunction *
{@code generate} does not operate by default on a particular {@link Scheduler}.
*
* - * @param the type of the per-Subscriber state + * @param the type of the per-Observer state * @param the generated value type - * @param initialState the Callable to generate the initial state for each Subscriber - * @param generator the Function called with the current state whenever a particular downstream Subscriber has + * @param initialState the Callable to generate the initial state for each Observer + * @param generator the Function called with the current state whenever a particular downstream Observer has * requested a value. The callback then should call {@code onNext}, {@code onError} or * {@code onComplete} to signal a value or a terminal event and should return a (new) state for * the next invocation. Signalling multiple {@code onNext} diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 0e455d0c2c..dd417a6509 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -104,6 +104,8 @@ public static Single ambArray(final SingleSource... sources) * Concatenate the single values, in a non-overlapping fashion, of the Single sources provided by * an Iterable sequence. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -120,19 +122,19 @@ public static Flowable concat(Iterable *
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
* @param the value type - * @param sources the Publisher of SingleSource instances - * @return the new Flowable instance + * @param sources the ObservableSource of SingleSource instances + * @return the new Observable instance * @since 2.0 */ @SchedulerSupport(SchedulerSupport.NONE) @SuppressWarnings({ "unchecked", "rawtypes" }) - public static Observable concat(Observable> sources) { + public static Observable concat(ObservableSource> sources) { return RxJavaPlugins.onAssembly(new ObservableConcatMap(sources, SingleInternalHelper.toObservable(), 2, ErrorMode.IMMEDIATE)); } @@ -140,6 +142,9 @@ public static Observable concat(Observable + *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and the sources {@code Publisher} is expected to honor it as well.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -158,6 +163,9 @@ public static Flowable concat(Publisher + *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and the sources {@code Publisher} is expected to honor it as well.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -180,6 +188,8 @@ public static Flowable concat(Publisher * *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -208,6 +218,8 @@ public static Flowable concat( *

* *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -240,6 +252,8 @@ public static Flowable concat( *

* *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concat} does not operate by default on a particular {@link Scheduler}.
*
@@ -274,6 +288,8 @@ public static Flowable concat( * Concatenate the single values, in a non-overlapping fashion, of the Single sources provided in * an array. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code concatArray} does not operate by default on a particular {@link Scheduler}.
*
@@ -319,7 +335,7 @@ public static Flowable concatArray(SingleSource... sources) *
{@code create} does not operate by default on a particular {@link Scheduler}.
*
* @param the value type - * @param source the emitter that is called when a SingleObserver subscribes to the returned {@code Flowable} + * @param source the emitter that is called when a SingleObserver subscribes to the returned {@code Single} * @return the new Single instance * @see SingleOnSubscribe * @see Cancellable @@ -390,9 +406,9 @@ public static Single error(final Throwable exception) { } /** - * Returns a {@link Single} that invokes passed function and emits its result for each new Observer that subscribes. + * Returns a {@link Single} that invokes passed function and emits its result for each new SingleObserver that subscribes. *

- * Allows you to defer execution of passed function until Observer subscribes to the {@link Single}. + * Allows you to defer execution of passed function until SingleObserver subscribes to the {@link Single}. * It makes passed function "lazy". * Result of the function invocation will be emitted by the {@link Single}. *

@@ -404,7 +420,7 @@ public static Single error(final Throwable exception) { * function which execution should be deferred, it will be invoked when SingleObserver will subscribe to the {@link Single}. * @param * the type of the item emitted by the {@link Single}. - * @return a {@link Single} whose {@link Observer}s' subscriptions trigger an invocation of the given function. + * @return a {@link Single} whose {@link SingleObserver}s' subscriptions trigger an invocation of the given function. */ @SchedulerSupport(SchedulerSupport.NONE) public static Single fromCallable(final Callable callable) { @@ -586,6 +602,8 @@ public static Single just(final T item) { * Merges an Iterable sequence of SingleSource instances into a single Flowable sequence, * running all SingleSources at once. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
*
@@ -604,6 +622,8 @@ public static Flowable merge(Iterable * Merges a Flowable sequence of SingleSource instances into a single Flowable sequence, * running all SingleSources at once. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
*
@@ -645,13 +665,15 @@ public static Single merge(SingleSource * *

- * You can combine items emitted by multiple Singles so that they appear as a single Observable, by + * You can combine items emitted by multiple Singles so that they appear as a single Flowable, by * using the {@code merge} method. *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
*
@@ -676,13 +698,15 @@ public static Flowable merge( } /** - * Flattens three Singles into a single Observable, without any transformation. + * Flattens three Singles into a single Flowable, without any transformation. *

* *

- * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using * the {@code merge} method. *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
*
@@ -711,13 +735,15 @@ public static Flowable merge( } /** - * Flattens four Singles into a single Observable, without any transformation. + * Flattens four Singles into a single Flowable, without any transformation. *

* *

- * You can combine items emitted by multiple Singles so that they appear as a single Observable, by using + * You can combine items emitted by multiple Singles so that they appear as a single Flowable, by using * the {@code merge} method. *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer.
*
Scheduler:
*
{@code merge} does not operate by default on a particular {@link Scheduler}.
*
@@ -1492,6 +1518,8 @@ public final Single cast(final Class clazz) { *

* *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. *
Scheduler:
*
{@code concatWith} does not operate by default on a particular {@link Scheduler}.
*
@@ -1804,6 +1832,9 @@ public final Single flatMap(Function * *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer + * and the {@code Publisher} returned by the mapper function is expected to honor it as well. *
Scheduler:
*
{@code flatMapPublisher} does not operate by default on a particular {@link Scheduler}.
*
@@ -1975,6 +2006,8 @@ public final Single contains(final Object value, final BiPredicate + *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. *
Scheduler:
*
{@code mergeWith} does not operate by default on a particular {@link Scheduler}.
*
@@ -2133,6 +2166,8 @@ public final Single onErrorResumeNext( /** * Repeatedly re-subscribes to the current Single and emits each success value. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. *
Scheduler:
*
{@code repeat} does not operate by default on a particular {@link Scheduler}.
*
@@ -2148,6 +2183,8 @@ public final Flowable repeat() { /** * Re-subscribes to the current Single at most the given number of times and emits each success value. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. *
Scheduler:
*
{@code repeat} does not operate by default on a particular {@link Scheduler}.
*
@@ -2166,6 +2203,9 @@ public final Flowable repeat(long times) { * the Publisher returned by the handler function signals a value in response to a * value signalled through the Flowable the handle receives. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. + * The {@code Publisher} returned by the handler function is expected to honor backpressure as well. *
Scheduler:
*
{@code repeatWhen} does not operate by default on a particular {@link Scheduler}.
*
@@ -2185,6 +2225,8 @@ public final Flowable repeatWhen(Function, ? extends /** * Re-subscribes to the current Single until the given BooleanSupplier returns true. *
+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. *
Scheduler:
*
{@code repeatUntil} does not operate by default on a particular {@link Scheduler}.
*
@@ -2658,6 +2700,8 @@ public final Completable toCompletable() { *

* *

+ *
Backpressure:
+ *
The returned {@code Flowable} honors the backpressure of the downstream consumer. *
Scheduler:
*
{@code toFlowable} does not operate by default on a particular {@link Scheduler}.
*
diff --git a/src/main/java/io/reactivex/internal/fuseable/FuseToMaybe.java b/src/main/java/io/reactivex/internal/fuseable/FuseToMaybe.java index e0e18a550c..5646e3ba90 100644 --- a/src/main/java/io/reactivex/internal/fuseable/FuseToMaybe.java +++ b/src/main/java/io/reactivex/internal/fuseable/FuseToMaybe.java @@ -16,7 +16,7 @@ import io.reactivex.Maybe; /** - * Interface indicating a operator implementation can be macro-fused back to Maybe in case + * Interface indicating an operator implementation can be macro-fused back to Maybe in case * the operator goes from Maybe to some other reactive type and then the sequence calls * for toMaybe again: *
diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeDetach.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDetach.java
new file mode 100644
index 0000000000..a29d299f8b
--- /dev/null
+++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeDetach.java
@@ -0,0 +1,94 @@
+/**
+ * Copyright 2016 Netflix, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is
+ * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
+ * the License for the specific language governing permissions and limitations under the License.
+ */
+
+package io.reactivex.internal.operators.maybe;
+
+import io.reactivex.*;
+import io.reactivex.disposables.Disposable;
+import io.reactivex.internal.disposables.DisposableHelper;
+
+/**
+ * Breaks the references between the upstream and downstream when the Maybe terminates.
+ *
+ * @param  the value type
+ */
+public final class MaybeDetach extends AbstractMaybeWithUpstream {
+
+    public MaybeDetach(MaybeSource source) {
+        super(source);
+    }
+
+    @Override
+    protected void subscribeActual(MaybeObserver observer) {
+        source.subscribe(new DetachMaybeObserver(observer));
+    }
+
+    static final class DetachMaybeObserver implements MaybeObserver, Disposable {
+
+        MaybeObserver actual;
+
+        Disposable d;
+
+        public DetachMaybeObserver(MaybeObserver actual) {
+            this.actual = actual;
+        }
+
+        @Override
+        public void dispose() {
+            actual = null;
+            d.dispose();
+            d = DisposableHelper.DISPOSED;
+        }
+
+        @Override
+        public boolean isDisposed() {
+            return d.isDisposed();
+        }
+
+        @Override
+        public void onSubscribe(Disposable d) {
+            if (DisposableHelper.validate(this.d, d)) {
+                this.d = d;
+
+                actual.onSubscribe(this);
+            }
+        }
+
+        @Override
+        public void onSuccess(T value) {
+            d = DisposableHelper.DISPOSED;
+            MaybeObserver a = actual;
+            if (a != null) {
+                a.onSuccess(value);
+            }
+        }
+
+        @Override
+        public void onError(Throwable e) {
+            d = DisposableHelper.DISPOSED;
+            MaybeObserver a = actual;
+            if (a != null) {
+                a.onError(e);
+            }
+        }
+
+        @Override
+        public void onComplete() {
+            d = DisposableHelper.DISPOSED;
+            MaybeObserver a = actual;
+            if (a != null) {
+                a.onComplete();
+            }
+        }
+    }
+}
diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmpty.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmpty.java
index f444e488ee..e7482051c9 100644
--- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmpty.java
+++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmpty.java
@@ -19,7 +19,7 @@
 
 /**
  * Signals true if the source Maybe signals onComplete, signals false if the source Maybe
- * signals onNext.
+ * signals onSuccess.
  * 
  * @param  the value type
  */
diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmptySingle.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmptySingle.java
index cd684401d2..f87be2f541 100644
--- a/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmptySingle.java
+++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeIsEmptySingle.java
@@ -21,7 +21,7 @@
 
 /**
  * Signals true if the source Maybe signals onComplete, signals false if the source Maybe
- * signals onNext.
+ * signals onSuccess.
  * 
  * @param  the value type
  */
diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java
index 35af39391e..299de69d1e 100644
--- a/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java
+++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableReplay.java
@@ -54,7 +54,7 @@ public Object call() {
      * @param  the value type of the ConnectableObservable
      * @param  the result value type
      * @param connectableFactory the factory that returns a ConnectableObservable for each individual subscriber
-     * @param selector the function that receives a Observable and should return another Observable that will be subscribed to
+     * @param selector the function that receives an Observable and should return another Observable that will be subscribed to
      * @return the new Observable instance
      */
     public static  Observable multicastSelector(
diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java
index 8c6adeab46..897b389c27 100644
--- a/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java
+++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableScalarXMap.java
@@ -110,7 +110,7 @@ public static  Observable scalarXMap(T value,
     }
 
     /**
-     * Maps a scalar value to a ObservableSource and subscribes to it.
+     * Maps a scalar value to an ObservableSource and subscribes to it.
      *
      * @param  the scalar value type
      * @param  the mapped Publisher's element type.
diff --git a/src/main/java/io/reactivex/processors/FlowableProcessor.java b/src/main/java/io/reactivex/processors/FlowableProcessor.java
index 33db662ef9..9e504b9060 100644
--- a/src/main/java/io/reactivex/processors/FlowableProcessor.java
+++ b/src/main/java/io/reactivex/processors/FlowableProcessor.java
@@ -18,7 +18,7 @@
 import io.reactivex.Flowable;
 
 /**
- * Represents a Subscriber and an Flowable (Publisher) at the same time, allowing
+ * Represents a Subscriber and a Flowable (Publisher) at the same time, allowing
  * multicasting events from a single source to multiple child Subscribers.
  * 

All methods except the onSubscribe, onNext, onError and onComplete are thread-safe. * Use {@link #toSerialized()} to make these methods thread-safe as well. diff --git a/src/main/java/io/reactivex/subjects/Subject.java b/src/main/java/io/reactivex/subjects/Subject.java index 30a6c70aeb..5ce375bede 100644 --- a/src/main/java/io/reactivex/subjects/Subject.java +++ b/src/main/java/io/reactivex/subjects/Subject.java @@ -16,7 +16,7 @@ import io.reactivex.*; /** - * Represents an Observer and a Observable at the same time, allowing + * Represents an Observer and an Observable at the same time, allowing * multicasting events from a single source to multiple child Subscribers. *

All methods except the onSubscribe, onNext, onError and onComplete are thread-safe. * Use {@link #toSerialized()} to make these methods thread-safe as well. diff --git a/src/test/java/io/reactivex/BaseTypeParser.java b/src/test/java/io/reactivex/BaseTypeParser.java new file mode 100644 index 0000000000..62c9d9e585 --- /dev/null +++ b/src/test/java/io/reactivex/BaseTypeParser.java @@ -0,0 +1,135 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.io.File; +import java.util.*; + +/** + * Parses the java file of a reactive base type to allow discovering Javadoc mistakes algorithmically. + */ +public class BaseTypeParser { + + public static class RxMethod { + public String signature; + + public String backpressureKind; + + public String schedulerKind; + + public String javadoc; + + public String backpressureDocumentation; + + public String schedulerDocumentation; + + public int javadocLine; + + public int methodLine; + + public int backpressureDocLine; + + public int schedulerDocLine; + } + + public static List parse(File f, String baseClassName) throws Exception { + List list = new ArrayList(); + + StringBuilder b = JavadocForAnnotations.readFile(f); + + int baseIndex = b.indexOf("public abstract class " + baseClassName); + + if (baseIndex < 0) { + throw new AssertionError("Wrong base class file: " + baseClassName); + } + + for (;;) { + RxMethod m = new RxMethod(); + + int javadocStart = b.indexOf("/**", baseIndex); + + if (javadocStart < 0) { + break; + } + + int javadocEnd = b.indexOf("*/", javadocStart + 2); + + m.javadoc = b.substring(javadocStart, javadocEnd + 2); + m.javadocLine = JavadocForAnnotations.lineNumber(b, javadocStart); + + int backpressureDoc = b.indexOf("

Backpressure:
", javadocStart); + if (backpressureDoc > 0 && backpressureDoc < javadocEnd) { + m.backpressureDocLine = JavadocForAnnotations.lineNumber(b, backpressureDoc); + int nextDD = b.indexOf("", backpressureDoc); + if (nextDD > 0 && nextDD < javadocEnd) { + m.backpressureDocumentation = b.substring(backpressureDoc, nextDD + 5); + } + } + + int schedulerDoc = b.indexOf("
Scheduler:
", javadocStart); + if (schedulerDoc > 0 && schedulerDoc < javadocEnd) { + m.schedulerDocLine = JavadocForAnnotations.lineNumber(b, schedulerDoc); + int nextDD = b.indexOf("", schedulerDoc); + if (nextDD > 0 && nextDD < javadocEnd) { + m.schedulerDocumentation = b.substring(schedulerDoc, nextDD + 5); + } + } + + int staticMethodDef = b.indexOf("public static ", javadocEnd + 2); + int instanceMethodDef = b.indexOf("public final ", javadocEnd + 2); + + int javadocStartNext = b.indexOf("/**", javadocEnd + 2); + if (javadocStartNext < 0) { + javadocStartNext = Integer.MAX_VALUE; + } + + int definitionStart = -1; + + if (staticMethodDef > 0 && staticMethodDef < javadocStartNext && staticMethodDef < instanceMethodDef) { + definitionStart = staticMethodDef; + } + if (instanceMethodDef > 0 && staticMethodDef < javadocStartNext && instanceMethodDef < staticMethodDef) { + definitionStart = instanceMethodDef; + } + + if (definitionStart > 0) { + int methodDefEnd = b.indexOf("{", definitionStart); + + m.signature = b.substring(definitionStart, methodDefEnd + 1); + + m.methodLine = JavadocForAnnotations.lineNumber(b, definitionStart); + + int backpressureSpec = b.indexOf("@BackpressureSupport(", javadocEnd); + if (backpressureSpec > 0 && backpressureSpec < definitionStart) { + int backpressureSpecEnd = b.indexOf(")", backpressureSpec + 21); + m.backpressureKind = b.substring(backpressureSpec + 21, backpressureSpecEnd); + } + + int schhedulerSpec = b.indexOf("@SchedulerSupport(", javadocEnd); + if (schhedulerSpec > 0 && schhedulerSpec < definitionStart) { + int schedulerSpecEnd = b.indexOf(")", schhedulerSpec + 18); + m.schedulerKind = b.substring(schhedulerSpec + 18, schedulerSpecEnd); + } + + list.add(m); + baseIndex = methodDefEnd; + } else { + baseIndex = javadocEnd + 2; + } + + } + + return list; + } +} diff --git a/src/test/java/io/reactivex/FixLicenseHeaders.java b/src/test/java/io/reactivex/FixLicenseHeaders.java new file mode 100644 index 0000000000..02d4eae29f --- /dev/null +++ b/src/test/java/io/reactivex/FixLicenseHeaders.java @@ -0,0 +1,132 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.io.*; +import java.util.*; + +import org.junit.Test; + +/** + * Adds license header to java files. + */ +public class FixLicenseHeaders { + + String[] header = { + "/**", + " * Copyright 2016 Netflix, Inc.", + " *", + " * Licensed under the Apache License, Version 2.0 (the \"License\"); you may not use this file except in", + " * compliance with the License. You may obtain a copy of the License at", + " *", + " * http://www.apache.org/licenses/LICENSE-2.0", + " *", + " * Unless required by applicable law or agreed to in writing, software distributed under the License is", + " * distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See", + " * the License for the specific language governing permissions and limitations under the License.", + " */", + "" + }; + + @Test + public void checkAndUpdateLicenses() throws Exception { + if (System.getenv("CI") != null) { + // no point in changing the files in CI + return; + } + File f = MaybeNo2Dot0Since.findSource("Flowable"); + if (f == null) { + return; + } + + Queue dirs = new ArrayDeque(); + + File parent = f.getParentFile(); + dirs.offer(parent); + dirs.offer(new File(parent.getAbsolutePath().replace('\\', '/').replace("src/main/java", "src/perf/java"))); + dirs.offer(new File(parent.getAbsolutePath().replace('\\', '/').replace("src/main/java", "src/test/java"))); + + StringBuilder fail = new StringBuilder(); + + while (!dirs.isEmpty()) { + f = dirs.poll(); + + File[] list = f.listFiles(); + if (list != null && list.length != 0) { + + for (File u : list) { + if (u.isDirectory()) { + dirs.offer(u); + } else { + if (u.getName().endsWith(".java")) { + + List lines = new ArrayList(); + BufferedReader in = new BufferedReader(new FileReader(u)); + try { + for (;;) { + String line = in.readLine(); + if (line == null) { + break; + } + + lines.add(line); + } + } finally { + in.close(); + } + + if (!lines.get(0).equals(header[0]) && !lines.get(1).equals(header[1])) { + fail.append("java.lang.RuntimeException: missing header added, refresh and re-run tests!\r\n") + .append(" at ") + ; + + String fn = u.toString().replace('\\', '/'); + + int idx = fn.indexOf("io/reactivex/"); + + fn = fn.substring(idx).replace('/', '.').replace(".java", ""); + + fail.append(fn).append(" (") + ; + + int jdx = fn.lastIndexOf('.'); + + fail.append(fn.substring(jdx + 1)); + + fail.append(".java:1)\r\n\r\n"); + + lines.addAll(0, Arrays.asList(header)); + + PrintWriter w = new PrintWriter(new FileWriter(u)); + + try { + for (String s : lines) { + w.println(s); + } + } finally { + w.close(); + } + } + } + } + } + } + } + + if (fail.length() != 0) { + System.out.println(fail); + throw new AssertionError(fail.toString()); + } + } +} diff --git a/src/test/java/io/reactivex/JavadocForAnnotations.java b/src/test/java/io/reactivex/JavadocForAnnotations.java index 9d3ee115da..b5e974db88 100644 --- a/src/test/java/io/reactivex/JavadocForAnnotations.java +++ b/src/test/java/io/reactivex/JavadocForAnnotations.java @@ -88,7 +88,7 @@ static final void scanFor(StringBuilder sourceCode, String annotation, String in if (k < 0 || k > idx) { // when printed on the console, IDEs will create a clickable link to help navigate to the offending point - e.append("java.lang.RuntimeException: missing ").append(inDoc).append("\r\n") + e.append("java.lang.RuntimeException: missing ").append(inDoc).append(" section\r\n") ; int lc = lineNumber(sourceCode, idx); @@ -219,7 +219,6 @@ public void checkObservableScheduler() throws Exception { } @Test - @Ignore("In the next PR these will be fixed") public void checkSingleBackpressure() throws Exception { checkSource(Single.class.getSimpleName(), false); } @@ -230,7 +229,6 @@ public void checkSingleScheduler() throws Exception { } @Test - @Ignore("In the next PR these will be fixed") public void checkCompletableBackpressure() throws Exception { checkSource(Completable.class.getSimpleName(), false); } @@ -241,7 +239,6 @@ public void checkCompletableScheduler() throws Exception { } @Test - @Ignore("In the next PR these will be fixed") public void checkMaybeBackpressure() throws Exception { checkSource(Maybe.class.getSimpleName(), false); } diff --git a/src/test/java/io/reactivex/JavadocWording.java b/src/test/java/io/reactivex/JavadocWording.java new file mode 100644 index 0000000000..7fe8291994 --- /dev/null +++ b/src/test/java/io/reactivex/JavadocWording.java @@ -0,0 +1,790 @@ +/** + * Copyright 2016 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import java.util.List; + +import static org.junit.Assert.*; +import org.junit.Test; + +import io.reactivex.BaseTypeParser.RxMethod; + +/** + * Check if the method wording is consistent with the target base type. + */ +public class JavadocWording { + + public static int lineNumber(CharSequence s, int index) { + int cnt = 1; + for (int i = 0; i < index; i++) { + if (s.charAt(i) == '\n') { + cnt++; + } + } + return cnt; + } + + @Test + public void maybeDocRefersToMaybeTypes() throws Exception { + List list = BaseTypeParser.parse(MaybeNo2Dot0Since.findSource("Maybe"), "Maybe"); + + assertFalse(list.isEmpty()); + + StringBuilder e = new StringBuilder(); + + for (RxMethod m : list) { + int jdx; + if (m.javadoc != null) { + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("onNext", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher") + && !m.signature.contains("Flowable") + && !m.signature.contains("Observable") + && !m.signature.contains("ObservableSource")) { + e.append("java.lang.RuntimeException: Maybe doc mentions onNext but no Flowable/Observable in signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Subscriber", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher") + && !m.signature.contains("Flowable")) { + e.append("java.lang.RuntimeException: Maybe doc mentions Subscriber but not using Flowable\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observer", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource") + && !m.signature.contains("Observable")) { + + if (idx < 5 || !m.javadoc.substring(idx - 5, idx + 8).equals("MaybeObserver")) { + e.append("java.lang.RuntimeException: Maybe doc mentions Observer but not using Observable\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Publisher", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher")) { + if (idx == 0 || !m.javadoc.substring(idx - 1, idx + 9).equals("(Publisher")) { + e.append("java.lang.RuntimeException: Maybe doc mentions Publisher but not in the signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Flowable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Flowable")) { + e.append("java.lang.RuntimeException: Maybe doc mentions Flowable but not in the signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Single", jdx); + if (idx >= 0) { + if (!m.signature.contains("Single")) { + e.append("java.lang.RuntimeException: Maybe doc mentions Single but not in the signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("SingleSource", jdx); + if (idx >= 0) { + if (!m.signature.contains("SingleSource")) { + e.append("java.lang.RuntimeException: Maybe doc mentions SingleSource but not in the signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Observable")) { + e.append("java.lang.RuntimeException: Maybe doc mentions Observable but not in the signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("ObservableSource", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource")) { + e.append("java.lang.RuntimeException: Maybe doc mentions ObservableSource but not in the signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + aOrAn(e, m, "Maybe"); + missingClosingDD(e, m, "Maybe"); + backpressureMentionedWithoutAnnotation(e, m, "Maybe"); + } + } + + if (e.length() != 0) { + System.out.println(e); + + fail(e.toString()); + } + } + + @Test + public void flowableDocRefersToFlowableTypes() throws Exception { + List list = BaseTypeParser.parse(MaybeNo2Dot0Since.findSource("Flowable"), "Flowable"); + + assertFalse(list.isEmpty()); + + StringBuilder e = new StringBuilder(); + + for (RxMethod m : list) { + int jdx; + if (m.javadoc != null) { + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("onSuccess", jdx); + if (idx >= 0) { + if (!m.signature.contains("Maybe") + && !m.signature.contains("MaybeSource") + && !m.signature.contains("Single") + && !m.signature.contains("SingleSource")) { + e.append("java.lang.RuntimeException: Flowable doc mentions onSuccess\r\n at io.reactivex.") + .append("Flowable (Flowable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observer", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource") + && !m.signature.contains("Observable")) { + e.append("java.lang.RuntimeException: Flowable doc mentions Observer but not using Flowable\r\n at io.reactivex.") + .append("Flowable (Flowable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Observable")) { + e.append("java.lang.RuntimeException: Flowable doc mentions Observable but not in the signature\r\n at io.reactivex.") + .append("Flowable (Flowable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("ObservableSource", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource")) { + e.append("java.lang.RuntimeException: Flowable doc mentions ObservableSource but not in the signature\r\n at io.reactivex.") + .append("Flowable (Flowable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + aOrAn(e, m, "Flowable"); + missingClosingDD(e, m, "Flowable"); + backpressureMentionedWithoutAnnotation(e, m, "Flowable"); + } + } + + if (e.length() != 0) { + System.out.println(e); + + fail(e.toString()); + } + } + + @Test + public void observableDocRefersToObservableTypes() throws Exception { + List list = BaseTypeParser.parse(MaybeNo2Dot0Since.findSource("Observable"), "Observable"); + + assertFalse(list.isEmpty()); + + StringBuilder e = new StringBuilder(); + + for (RxMethod m : list) { + int jdx; + if (m.javadoc != null) { + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("onSuccess", jdx); + if (idx >= 0) { + if (!m.signature.contains("Maybe") + && !m.signature.contains("MaybeSource") + && !m.signature.contains("Single") + && !m.signature.contains("SingleSource")) { + e.append("java.lang.RuntimeException: Observable doc mentions onSuccess\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Flowable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Flowable")) { + if (idx < 6 || !m.javadoc.substring(idx - 6, idx + 8).equals("@link Flowable")) { + e.append("java.lang.RuntimeException: Observable doc mentions Flowable but not in the signature\r\n at io.reactivex.") + .append("Observable (Observable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Publisher", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher")) { + e.append("java.lang.RuntimeException: Observable doc mentions Publisher but not in the signature\r\n at io.reactivex.") + .append("Observable (Observable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Subscriber", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher") + && !m.signature.contains("Flowable")) { + e.append("java.lang.RuntimeException: Observable doc mentions Subscriber but not using Flowable\r\n at io.reactivex.") + .append("Observable (Observable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + aOrAn(e, m, "Observable"); + missingClosingDD(e, m, "Observable"); + backpressureMentionedWithoutAnnotation(e, m, "Observable"); + } + } + + if (e.length() != 0) { + System.out.println(e); + + fail(e.toString()); + } + } + + @Test + public void singleDocRefersToSingleTypes() throws Exception { + List list = BaseTypeParser.parse(MaybeNo2Dot0Since.findSource("Single"), "Single"); + + assertFalse(list.isEmpty()); + + StringBuilder e = new StringBuilder(); + + for (RxMethod m : list) { + int jdx; + if (m.javadoc != null) { + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("onNext", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher") + && !m.signature.contains("Flowable") + && !m.signature.contains("Observable") + && !m.signature.contains("ObservableSource")) { + e.append("java.lang.RuntimeException: Single doc mentions onNext but no Flowable/Observable in signature\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Subscriber", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher") + && !m.signature.contains("Flowable")) { + e.append("java.lang.RuntimeException: Single doc mentions Subscriber but not using Flowable\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observer", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource") + && !m.signature.contains("Observable")) { + + if (idx < 6 || !m.javadoc.substring(idx - 6, idx + 8).equals("SingleObserver")) { + e.append("java.lang.RuntimeException: Single doc mentions Observer but not using Observable\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Publisher", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher")) { + if (idx == 0 || !m.javadoc.substring(idx - 1, idx + 9).equals("(Publisher")) { + e.append("java.lang.RuntimeException: Single doc mentions Publisher but not in the signature\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Flowable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Flowable")) { + e.append("java.lang.RuntimeException: Single doc mentions Flowable but not in the signature\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Maybe", jdx); + if (idx >= 0) { + if (!m.signature.contains("Maybe")) { + e.append("java.lang.RuntimeException: Single doc mentions Maybe but not in the signature\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("MaybeSource", jdx); + if (idx >= 0) { + if (!m.signature.contains("MaybeSource")) { + e.append("java.lang.RuntimeException: Single doc mentions SingleSource but not in the signature\r\n at io.reactivex.") + .append("Maybe (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Observable")) { + e.append("java.lang.RuntimeException: Single doc mentions Observable but not in the signature\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("ObservableSource", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource")) { + e.append("java.lang.RuntimeException: Single doc mentions ObservableSource but not in the signature\r\n at io.reactivex.") + .append("Single (Single.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + + aOrAn(e, m, "Single"); + missingClosingDD(e, m, "Single"); + backpressureMentionedWithoutAnnotation(e, m, "Single"); + } + } + + if (e.length() != 0) { + System.out.println(e); + + fail(e.toString()); + } + } + + @Test + public void completableDocRefersToCompletableTypes() throws Exception { + List list = BaseTypeParser.parse(MaybeNo2Dot0Since.findSource("Completable"), "Completable"); + + assertFalse(list.isEmpty()); + + StringBuilder e = new StringBuilder(); + + for (RxMethod m : list) { + int jdx; + if (m.javadoc != null) { + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("onNext", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher") + && !m.signature.contains("Flowable") + && !m.signature.contains("Observable") + && !m.signature.contains("ObservableSource")) { + e.append("java.lang.RuntimeException: Completable doc mentions onNext but no Flowable/Observable in signature\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Subscriber", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher") + && !m.signature.contains("Flowable")) { + e.append("java.lang.RuntimeException: Completable doc mentions Subscriber but not using Flowable\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observer", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource") + && !m.signature.contains("Observable")) { + + if (idx < 11 || !m.javadoc.substring(idx - 11, idx + 8).equals("CompletableObserver")) { + e.append("java.lang.RuntimeException: Maybe doc mentions Observer but not using Observable\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Publisher", jdx); + if (idx >= 0) { + if (!m.signature.contains("Publisher")) { + if (idx == 0 || !m.javadoc.substring(idx - 1, idx + 9).equals("(Publisher")) { + e.append("java.lang.RuntimeException: Completable doc mentions Publisher but not in the signature\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + } + + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Flowable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Flowable")) { + e.append("java.lang.RuntimeException: Completable doc mentions Flowable but not in the signature\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Single", jdx); + if (idx >= 0) { + if (!m.signature.contains("Single")) { + e.append("java.lang.RuntimeException: Completable doc mentions Single but not in the signature\r\n at io.reactivex.") + .append("Completable (Maybe.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("SingleSource", jdx); + if (idx >= 0) { + if (!m.signature.contains("SingleSource")) { + e.append("java.lang.RuntimeException: Completable doc mentions SingleSource but not in the signature\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("Observable", jdx); + if (idx >= 0) { + if (!m.signature.contains("Observable")) { + e.append("java.lang.RuntimeException: Completable doc mentions Observable but not in the signature\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + jdx = 0; + for (;;) { + int idx = m.javadoc.indexOf("ObservableSource", jdx); + if (idx >= 0) { + if (!m.signature.contains("ObservableSource")) { + e.append("java.lang.RuntimeException: Completable doc mentions ObservableSource but not in the signature\r\n at io.reactivex.") + .append("Completable (Completable.java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + } + jdx = idx + 6; + } else { + break; + } + } + aOrAn(e, m, "Completable"); + missingClosingDD(e, m, "Completable"); + backpressureMentionedWithoutAnnotation(e, m, "Completable"); + } + } + + if (e.length() != 0) { + System.out.println(e); + + fail(e.toString()); + } + } + + static void aOrAn(StringBuilder e, RxMethod m, String baseTypeName) { + aOrAn(e, m, " an", "Single", baseTypeName); + aOrAn(e, m, " an", "Maybe", baseTypeName); + aOrAn(e, m, " a", "Observer", baseTypeName); + aOrAn(e, m, " a", "Observable", baseTypeName); + aOrAn(e, m, " an", "Publisher", baseTypeName); + aOrAn(e, m, " an", "Subscriber", baseTypeName); + aOrAn(e, m, " an", "Flowable", baseTypeName); + + aOrAn(e, m, " a", "Observable", baseTypeName); + + } + + static void aOrAn(StringBuilder e, RxMethod m, String wrongPre, String word, String baseTypeName) { + int jdx = 0; + int idx; + for (;;) { + idx = m.javadoc.indexOf(wrongPre + " " + word, jdx); + if (idx >= 0) { + e.append("java.lang.RuntimeException: a/an typo ") + .append(word) + .append("\r\n at io.reactivex.") + .append(baseTypeName) + .append(" (") + .append(baseTypeName) + .append(".java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + jdx = idx + 6; + } else { + break; + } + } + + for (;;) { + idx = m.javadoc.indexOf(wrongPre + " {@link " + word, jdx); + if (idx >= 0) { + e.append("java.lang.RuntimeException: a/an typo ") + .append(word) + .append("\r\n at io.reactivex.") + .append(baseTypeName) + .append(" (") + .append(baseTypeName) + .append(".java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + jdx = idx + 6; + } else { + break; + } + } + + for (;;) { + idx = m.javadoc.indexOf(wrongPre + " {@linkplain " + word, jdx); + if (idx >= 0) { + e.append("java.lang.RuntimeException: a/an typo ") + .append(word) + .append("\r\n at io.reactivex.") + .append(baseTypeName) + .append(" (") + .append(baseTypeName) + .append(".java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + jdx = idx + 6; + } else { + break; + } + } + + for (;;) { + idx = m.javadoc.indexOf(wrongPre + " {@code " + word, jdx); + if (idx >= 0) { + e.append("java.lang.RuntimeException: a/an typo ") + .append(word) + .append("\r\n at io.reactivex.") + .append(baseTypeName) + .append(" (") + .append(baseTypeName) + .append(".java:").append(m.javadocLine + lineNumber(m.javadoc, idx) - 1).append(")\r\n\r\n"); + jdx = idx + 6; + } else { + break; + } + } + + } + + static void missingClosingDD(StringBuilder e, RxMethod m, String baseTypeName) { + int jdx = 0; + for (;;) { + int idx1 = m.javadoc.indexOf("
", jdx); + int idx2 = m.javadoc.indexOf("
", jdx); + + if (idx1 < 0 && idx2 < 0) { + break; + } + + int idx3 = m.javadoc.indexOf("
", idx1 + 4); + + if (idx1 > 0 && idx2 > 0 && (idx3 < 0 || (idx2 < idx3 && idx3 > 0))) { + jdx = idx2 + 5; + } else { + e.append("java.lang.RuntimeException: unbalanced
") + .append("\r\n at io.reactivex.") + .append(baseTypeName) + .append(" (") + .append(baseTypeName) + .append(".java:").append(m.javadocLine + lineNumber(m.javadoc, idx1) - 1).append(")\r\n\r\n"); + break; + } + } + } + + static void backpressureMentionedWithoutAnnotation(StringBuilder e, RxMethod m, String baseTypeName) { + if (m.backpressureDocLine > 0 && m.backpressureKind == null) { + e.append("java.lang.RuntimeException: backpressure documented but not annotated ") + .append("\r\n at io.reactivex.") + .append(baseTypeName) + .append(" (") + .append(baseTypeName) + .append(".java:").append(m.backpressureDocLine).append(")\r\n\r\n"); + } + } +} diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeDelayErrorTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeDelayErrorTest.java index 207909b954..da1afdf896 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeDelayErrorTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeDelayErrorTest.java @@ -227,7 +227,7 @@ public void testMergeFlowableOfFlowables() { @Override public void subscribe(Subscriber> observer) { observer.onSubscribe(new BooleanSubscription()); - // simulate what would happen in an Flowable + // simulate what would happen in a Flowable observer.onNext(o1); observer.onNext(o2); observer.onComplete(); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java index 62894472c9..3920e13f8f 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableMergeTest.java @@ -82,7 +82,7 @@ public void testMergeFlowableOfFlowables() { @Override public void subscribe(Subscriber> observer) { observer.onSubscribe(new BooleanSubscription()); - // simulate what would happen in an Flowable + // simulate what would happen in a Flowable observer.onNext(o1); observer.onNext(o2); observer.onComplete(); diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index fc686ecc1b..a875877c3c 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -16,6 +16,7 @@ import static org.junit.Assert.*; import java.io.IOException; +import java.lang.management.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; @@ -2873,4 +2874,127 @@ public Object apply(final Object[] o) throws Exception { } }).test().assertResult(5); } + + static long usedMemoryNow() { + MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); + + MemoryUsage heapMemoryUsage = memoryMXBean.getHeapMemoryUsage(); + + return heapMemoryUsage.getUsed(); + } + + @Test + public void onTerminateDetach() throws Exception { + System.gc(); + + Thread.sleep(150); + + long before = usedMemoryNow(); + + Maybe source = Flowable.just((Object)new Object[10000000]).toMaybe(); + + long middle = usedMemoryNow(); + + MaybeObserver observer = new MaybeObserver() { + @SuppressWarnings("unused") + Disposable u; + + @Override + public void onSubscribe(Disposable d) { + this.u = d; + } + + @Override + public void onSuccess(Object value) { + + } + + @Override + public void onError(Throwable e) { + + } + + @Override + public void onComplete() { + + } + + }; + source.onTerminateDetach().subscribe(observer); + + source = null; + + System.gc(); + + Thread.sleep(250); + + long after = usedMemoryNow(); + + String log = String.format("%.2f MB -> %.2f MB -> %.2f MB%n", + before / 1024.0 / 1024.0, + middle / 1024.0 / 1024.0, + after / 1024.0 / 1024.0); + + System.out.printf(log); + + if (middle < after * 3) { + fail("There seems to be a memory leak: " + log); + } + + assertNotNull(observer); // hold onto the reference to prevent premature GC + } + + @Test + public void repeat() { + Maybe.just(1).repeat().take(5).test().assertResult(1, 1, 1, 1, 1); + + Maybe.just(1).repeat(5).test().assertResult(1, 1, 1, 1, 1); + + Maybe.just(1).repeatUntil(new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + return false; + } + }).take(5).test().assertResult(1, 1, 1, 1, 1); + + Maybe.just(1).repeatWhen(new Function, Publisher>() { + @Override + public Publisher apply(Flowable v) throws Exception { + return v; + } + }).take(5).test().assertResult(1, 1, 1, 1, 1); + } + + @Test + public void retry() { + Maybe.just(1).retry().test().assertResult(1); + + Maybe.just(1).retry(5).test().assertResult(1); + + Maybe.just(1).retry(Functions.alwaysTrue()).test().assertResult(1); + + Maybe.just(1).retry(5, Functions.alwaysTrue()).test().assertResult(1); + + Maybe.just(1).retry(new BiPredicate() { + @Override + public boolean test(Integer a, Throwable e) throws Exception { + return true; + } + }).test().assertResult(1); + + Maybe.just(1).retryUntil(new BooleanSupplier() { + @Override + public boolean getAsBoolean() throws Exception { + return false; + } + }).test().assertResult(1); + + Maybe.just(1).retryWhen(new Function, Publisher>() { + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public Publisher apply(Flowable v) throws Exception { + return (Publisher)v; + } + }).test().assertResult(1); + } } diff --git a/src/test/java/io/reactivex/subscribers/SafeSubscriberTest.java b/src/test/java/io/reactivex/subscribers/SafeSubscriberTest.java index 6daec43835..350a819ca5 100644 --- a/src/test/java/io/reactivex/subscribers/SafeSubscriberTest.java +++ b/src/test/java/io/reactivex/subscribers/SafeSubscriberTest.java @@ -114,7 +114,7 @@ public void testOnErrorAfterOnCompleted() { } /** - * A Observable that doesn't do the right thing on UnSubscribe/Error/etc in that it will keep sending events down the pipe regardless of what happens. + * An Observable that doesn't do the right thing on UnSubscribe/Error/etc in that it will keep sending events down the pipe regardless of what happens. */ private static class TestObservable implements Publisher {