From 1f0450b2487e376ffdb05a11165eef943f1aedf8 Mon Sep 17 00:00:00 2001 From: pkamath2 Date: Fri, 16 Nov 2018 14:29:48 +0800 Subject: [PATCH 1/2] Javadoc updates for RXJava Issue 6289 --- src/main/java/io/reactivex/Flowable.java | 66 ++++++++++++++++++++++++ 1 file changed, 66 insertions(+) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index da4f800670..e69533d1eb 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -10345,6 +10345,17 @@ public final Disposable forEachWhile(final Predicate onNext, final Co * is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore those * {@code GroupedPublisher}s that do not concern you. Instead, you can signal to them that they may * discard their buffers by applying an operator like {@link #ignoreElements} to them. + *

+ * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. + * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. + * If not, it may result in groups upstream not being consumed. + * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. + *

+ * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, + * we need to ensure that the maximum concurrency parameter is a value greater than + * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. + * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * *

*
Backpressure:
*
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} @@ -10385,6 +10396,17 @@ public final Flowable> groupBy(Function + * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. + * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. + * If not, it may result in groups upstream not being consumed. + * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. + *

+ * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, + * we need to ensure that the maximum concurrency parameter is a value greater than + * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. + * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * *

*
Backpressure:
*
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} @@ -10428,6 +10450,17 @@ public final Flowable> groupBy(Function + * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. + * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. + * If not, it may result in groups upstream not being consumed. + * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. + *

+ * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, + * we need to ensure that the maximum concurrency parameter is a value greater than + * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. + * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * *

*
Backpressure:
*
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} @@ -10473,6 +10506,17 @@ public final Flowable> groupBy(Function + * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. + * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. + * If not, it may result in groups upstream not being consumed. + * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. + *

+ * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, + * we need to ensure that the maximum concurrency parameter is a value greater than + * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. + * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * *

*
Backpressure:
*
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} @@ -10521,6 +10565,17 @@ public final Flowable> groupBy(Function + * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. + * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. + * If not, it may result in groups upstream not being consumed. + * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. + *

+ * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, + * we need to ensure that the maximum concurrency parameter is a value greater than + * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. + * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * *

*
Backpressure:
*
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} @@ -10617,6 +10672,17 @@ public final Flowable> groupBy(Function + * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. + * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. + * If not, it may result in groups upstream not being consumed. + * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. + *

+ * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, + * we need to ensure that the maximum concurrency parameter is a value greater than + * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. + * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * *

*
Backpressure:
*
Both the returned and its inner {@code GroupedFlowable}s honor backpressure and the source {@code Publisher} From 00d7158fa67b28beedf17268068e404686782f01 Mon Sep 17 00:00:00 2001 From: pkamath2 Date: Fri, 16 Nov 2018 18:57:05 +0800 Subject: [PATCH 2/2] With review comments on issue 6289 --- src/main/java/io/reactivex/Flowable.java | 91 ++++++++++-------------- 1 file changed, 36 insertions(+), 55 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index e69533d1eb..c3c48e3c95 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -10346,15 +10346,12 @@ public final Disposable forEachWhile(final Predicate onNext, final Co * {@code GroupedPublisher}s that do not concern you. Instead, you can signal to them that they may * discard their buffers by applying an operator like {@link #ignoreElements} to them. *

- * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. - * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. - * If not, it may result in groups upstream not being consumed. - * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. - *

- * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, - * we need to ensure that the maximum concurrency parameter is a value greater than - * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. - * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * Note that the {@link GroupedFlowable}s should be subscribed to as soon as possible, otherwise, + * the unconsumed groups may starve other groups due to the internal backpressure + * coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using + * {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency + * value to be greater or equal to the expected number of groups, possibly using + * {@code Integer.MAX_VALUE} if the number of expected groups is unknown. * *

*
Backpressure:
@@ -10397,16 +10394,12 @@ public final Flowable> groupBy(Function - * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. - * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. - * If not, it may result in groups upstream not being consumed. - * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. - *

- * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, - * we need to ensure that the maximum concurrency parameter is a value greater than - * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. - * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. - * + * Note that the {@link GroupedFlowable}s should be subscribed to as soon as possible, otherwise, + * the unconsumed groups may starve other groups due to the internal backpressure + * coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using + * {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency + * value to be greater or equal to the expected number of groups, possibly using + * {@code Integer.MAX_VALUE} if the number of expected groups is unknown. *

*
Backpressure:
*
Both the returned and its inner {@code Publisher}s honor backpressure and the source {@code Publisher} @@ -10451,15 +10444,12 @@ public final Flowable> groupBy(Function - * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. - * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. - * If not, it may result in groups upstream not being consumed. - * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. - *

- * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, - * we need to ensure that the maximum concurrency parameter is a value greater than - * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. - * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * Note that the {@link GroupedFlowable}s should be subscribed to as soon as possible, otherwise, + * the unconsumed groups may starve other groups due to the internal backpressure + * coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using + * {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency + * value to be greater or equal to the expected number of groups, possibly using + * {@code Integer.MAX_VALUE} if the number of expected groups is unknown. * *

*
Backpressure:
@@ -10507,15 +10497,12 @@ public final Flowable> groupBy(Function - * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. - * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. - * If not, it may result in groups upstream not being consumed. - * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. - *

- * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, - * we need to ensure that the maximum concurrency parameter is a value greater than - * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. - * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * Note that the {@link GroupedFlowable}s should be subscribed to as soon as possible, otherwise, + * the unconsumed groups may starve other groups due to the internal backpressure + * coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using + * {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency + * value to be greater or equal to the expected number of groups, possibly using + * {@code Integer.MAX_VALUE} if the number of expected groups is unknown. * *

*
Backpressure:
@@ -10566,15 +10553,12 @@ public final Flowable> groupBy(Function - * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. - * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. - * If not, it may result in groups upstream not being consumed. - * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. - *

- * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, - * we need to ensure that the maximum concurrency parameter is a value greater than - * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. - * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * Note that the {@link GroupedFlowable}s should be subscribed to as soon as possible, otherwise, + * the unconsumed groups may starve other groups due to the internal backpressure + * coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using + * {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency + * value to be greater or equal to the expected number of groups, possibly using + * {@code Integer.MAX_VALUE} if the number of expected groups is unknown. * *

*
Backpressure:
@@ -10673,15 +10657,12 @@ public final Flowable> groupBy(Function - * By default, a {@link GroupedFlowable} buffers only {@code Flowable.BUFFER_SIZE} items to emit. - * So, it is important that a subscription to consume these items begin as soon as the {@link GroupedFlowable}s are created. - * If not, it may result in groups upstream not being consumed. - * Typically {@code flatMap} or {@code concatMapEager} can be used to ensure all {@link GroupedFlowable} items are consumed. - *

- * Note that, while using {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} operators, - * we need to ensure that the maximum concurrency parameter is a value greater than - * or equal to the number of expected groups, or {@code Integer.MAX_VALUE} if the number of expected groups is unknown. - * Using the default maximum concurrency may also sometimes lead to starvation of unconsumed groups upstream. + * Note that the {@link GroupedFlowable}s should be subscribed to as soon as possible, otherwise, + * the unconsumed groups may starve other groups due to the internal backpressure + * coordination of the {@code groupBy} operator. Such hangs can be usually avoided by using + * {@link #flatMap(Function, int)} or {@link #concatMapEager(Function, int, int)} and overriding the default maximum concurrency + * value to be greater or equal to the expected number of groups, possibly using + * {@code Integer.MAX_VALUE} if the number of expected groups is unknown. * *

*
Backpressure: