From b0c8b42b23d2d4ced039ad113a60c3f6db71b5ca Mon Sep 17 00:00:00 2001 From: akarnokd Date: Wed, 20 May 2015 10:40:51 +0200 Subject: [PATCH] Deprecated onBackpressureBlock. --- src/main/java/rx/Observable.java | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/src/main/java/rx/Observable.java b/src/main/java/rx/Observable.java index 919548fd32..9b0479cc0c 100644 --- a/src/main/java/rx/Observable.java +++ b/src/main/java/rx/Observable.java @@ -5332,14 +5332,23 @@ public final Observable onBackpressureDrop() { *

* Note that if the upstream Observable does support backpressure, this operator ignores that capability * and doesn't propagate any backpressure requests from downstream. + *

+ * Warning! Using a chain like {@code source.onBackpressureBlock().subscribeOn(scheduler)} is prone to + * deadlocks because the consumption of the internal queue is scheduled behind a blocked emission by + * the subscribeOn. In order to avoid this, the operators have to be swapped in the chain: + * {@code source.subscribeOn(scheduler).onBackpressureBlock()} and in general, no subscribeOn operator should follow + * this operator. * * @param maxQueueLength the maximum number of items the producer can emit without blocking * @return the source Observable modified to block {@code onNext} notifications on overflow * @see ReactiveX operators documentation: backpressure operators - * @Experimental The behavior of this can change at any time. + * @Experimental The behavior of this can change at any time. + * @deprecated The operator doesn't work properly with {@link #subscribeOn(Scheduler)} and is prone to + * deadlocks. It will be removed/unavailable starting from 1.1. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) */ @Experimental + @Deprecated public final Observable onBackpressureBlock(int maxQueueLength) { return lift(new OperatorOnBackpressureBlock(maxQueueLength)); } @@ -5355,13 +5364,22 @@ public final Observable onBackpressureBlock(int maxQueueLength) { *

* Note that if the upstream Observable does support backpressure, this operator ignores that capability * and doesn't propagate any backpressure requests from downstream. + *

+ * Warning! Using a chain like {@code source.onBackpressureBlock().subscribeOn(scheduler)} is prone to + * deadlocks because the consumption of the internal queue is scheduled behind a blocked emission by + * the subscribeOn. In order to avoid this, the operators have to be swapped in the chain: + * {@code source.subscribeOn(scheduler).onBackpressureBlock()} and in general, no subscribeOn operator should follow + * this operator. * * @return the source Observable modified to block {@code onNext} notifications on overflow * @see ReactiveX operators documentation: backpressure operators * @Experimental The behavior of this can change at any time. + * @deprecated The operator doesn't work properly with {@link #subscribeOn(Scheduler)} and is prone to + * deadlocks. It will be removed/unavailable starting from 1.1. * @since (if this graduates from Experimental/Beta to supported, replace this parenthetical with the release number) */ @Experimental + @Deprecated public final Observable onBackpressureBlock() { return onBackpressureBlock(rx.internal.util.RxRingBuffer.SIZE); }