From 7cc98560a5798196c856e6429953471d38a235af Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 1 Nov 2018 22:29:24 +0100 Subject: [PATCH 1/3] 2.x: Add dematerialize(selector), deprecate old --- src/main/java/io/reactivex/Flowable.java | 73 +++++++++++++++++-- src/main/java/io/reactivex/Observable.java | 65 ++++++++++++++++- .../flowable/FlowableDematerialize.java | 51 +++++++++---- .../observable/ObservableDematerialize.java | 51 +++++++++---- .../io/reactivex/flowable/FlowableTests.java | 9 ++- .../flowable/FlowableDematerializeTest.java | 45 ++++++++++++ .../ObservableDematerializeTest.java | 45 ++++++++++++ .../reactivex/observable/ObservableTest.java | 7 +- 8 files changed, 305 insertions(+), 41 deletions(-) diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 2e9e6c20ba..a45063f506 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -27,7 +27,7 @@ import io.reactivex.internal.fuseable.*; import io.reactivex.internal.operators.flowable.*; import io.reactivex.internal.operators.mixed.*; -import io.reactivex.internal.operators.observable.ObservableFromPublisher; +import io.reactivex.internal.operators.observable.*; import io.reactivex.internal.schedulers.ImmediateThinScheduler; import io.reactivex.internal.subscribers.*; import io.reactivex.internal.util.*; @@ -8484,6 +8484,7 @@ public final Flowable delaySubscription(long delay, TimeUnit unit, Scheduler *

      * Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
      * .doOnCancel(() -> System.out.println("Cancelled!"));
+     * .dematerialize()
      * .test()
      * .assertResult(1);
      * 
@@ -8491,6 +8492,7 @@ public final Flowable delaySubscription(long delay, TimeUnit unit, Scheduler * with the same event. *

      * Flowable.just(createOnNext(1), createOnNext(2))
+     * .dematerialize()
      * .test()
      * .assertResult(1, 2);
      * 
@@ -8508,14 +8510,74 @@ public final Flowable delaySubscription(long delay, TimeUnit unit, Scheduler * @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects * emitted by the source Publisher * @see ReactiveX operators documentation: Dematerialize + * @see #dematerialize(Function) + * @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead. */ @CheckReturnValue - @BackpressureSupport(BackpressureKind.FULL) @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + @Deprecated + @SuppressWarnings({ "unchecked", "rawtypes" }) public final Flowable dematerialize() { - @SuppressWarnings("unchecked") - Flowable> m = (Flowable>)this; - return RxJavaPlugins.onAssembly(new FlowableDematerialize(m)); + return RxJavaPlugins.onAssembly(new FlowableDematerialize(this, Functions.identity())); + } + + /** + * Returns a Flowable that reverses the effect of {@link #materialize materialize} by transforming the + * {@link Notification} objects extracted from the source items via a selector function + * into their respective {@code Subscriber} signal types. + *

+ * + *

+ * The intended use of the {@code selector} function is to perform a + * type-safe identity mapping (see example) on a source that is already of type + * {@code Notification}. The Java language doesn't allow + * limiting instance methods to a certain generic argument shape, therefore, + * a function is used to ensure the conversion remains type safe. + *

+ * When the upstream signals an {@link Notification#createOnError(Throwable) onError} or + * {@link Notification#createOnComplete() onComplete} item, the + * returned Flowable cancels of the flow and terminates with that type of terminal event: + *


+     * Flowable.just(createOnNext(1), createOnComplete(), createOnNext(2))
+     * .doOnCancel(() -> System.out.println("Canceled!"));
+     * .dematerialize(notification -> notification)
+     * .test()
+     * .assertResult(1);
+     * 
+ * If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated + * with the same event. + *

+     * Flowable.just(createOnNext(1), createOnNext(2))
+     * .dematerialize(notification -> notification)
+     * .test()
+     * .assertResult(1, 2);
+     * 
+ * If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(Publisher)} + * with a {@link #never()} source. + *
+ *
Backpressure:
+ *
The operator doesn't interfere with backpressure which is determined by the source {@code Publisher}'s + * backpressure behavior.
+ *
Scheduler:
+ *
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the output value type + * @param selector function that returns the upstream item and should return a Notification to signal + * the corresponding {@code Subscriber} event to the downstream. + * @return a Flowable that emits the items and notifications embedded in the {@link Notification} objects + * selected from the items emitted by the source Flowable + * @see ReactiveX operators documentation: Dematerialize + * @since 2.2.4 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @BackpressureSupport(BackpressureKind.PASS_THROUGH) + public final Flowable dematerialize(Function> selector) { + ObjectHelper.requireNonNull(selector, "selector is null"); + return RxJavaPlugins.onAssembly(new FlowableDematerialize(this, selector)); } /** @@ -11069,6 +11131,7 @@ public final Flowable map(Function mapper) { * @return a Flowable that emits items that are the result of materializing the items and notifications * of the source Publisher * @see ReactiveX operators documentation: Materialize + * @see #dematerialize(Function) */ @CheckReturnValue @BackpressureSupport(BackpressureKind.FULL) diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index c80e8be95d..db3530069a 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -7587,6 +7587,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule *

      * Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
      * .doOnDispose(() -> System.out.println("Disposed!"));
+     * .dematerialize()
      * .test()
      * .assertResult(1);
      * 
@@ -7594,6 +7595,7 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule * with the same event. *

      * Observable.just(createOnNext(1), createOnNext(2))
+     * .dematerialize()
      * .test()
      * .assertResult(1, 2);
      * 
@@ -7608,13 +7610,69 @@ public final Observable delaySubscription(long delay, TimeUnit unit, Schedule * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects * emitted by the source ObservableSource * @see ReactiveX operators documentation: Dematerialize + * @see #dematerialize(Function) + * @deprecated in 2.2.4; inherently type-unsafe as it overrides the output generic type. Use {@link #dematerialize(Function)} instead. */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) + @Deprecated + @SuppressWarnings({ "unchecked", "rawtypes" }) public final Observable dematerialize() { - @SuppressWarnings("unchecked") - Observable> m = (Observable>)this; - return RxJavaPlugins.onAssembly(new ObservableDematerialize(m)); + return RxJavaPlugins.onAssembly(new ObservableDematerialize(this, Functions.identity())); + } + + /** + * Returns an Observable that reverses the effect of {@link #materialize materialize} by transforming the + * {@link Notification} objects extracted from the source items via a selector function + * into their respective {@code Observer} signal types. + *

+ * + *

+ * The intended use of the {@code selector} function is to perform a + * type-safe identity mapping (see example) on a source that is already of type + * {@code Notification}. The Java language doesn't allow + * limiting instance methods to a certain generic argument shape, therefore, + * a function is used to ensure the conversion remains type safe. + *

+ * When the upstream signals an {@link Notification#createOnError(Throwable) onError} or + * {@link Notification#createOnComplete() onComplete} item, the + * returned Observable disposes of the flow and terminates with that type of terminal event: + *


+     * Observable.just(createOnNext(1), createOnComplete(), createOnNext(2))
+     * .doOnDispose(() -> System.out.println("Disposed!"));
+     * .dematerialize(notification -> notification)
+     * .test()
+     * .assertResult(1);
+     * 
+ * If the upstream signals {@code onError} or {@code onComplete} directly, the flow is terminated + * with the same event. + *

+     * Observable.just(createOnNext(1), createOnNext(2))
+     * .dematerialize(notification -> notification)
+     * .test()
+     * .assertResult(1, 2);
+     * 
+ * If this behavior is not desired, the completion can be suppressed by applying {@link #concatWith(ObservableSource)} + * with a {@link #never()} source. + *
+ *
Scheduler:
+ *
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the output value type + * @param selector function that returns the upstream item and should return a Notification to signal + * the corresponding {@code Observer} event to the downstream. + * @return an Observable that emits the items and notifications embedded in the {@link Notification} objects + * selected from the items emitted by the source ObservableSource + * @see ReactiveX operators documentation: Dematerialize + * @since 2.2.4 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Observable dematerialize(Function> selector) { + ObjectHelper.requireNonNull(selector, "selector is null"); + return RxJavaPlugins.onAssembly(new ObservableDematerialize(this, selector)); } /** @@ -9620,6 +9678,7 @@ public final Observable map(Function mapper) { * @return an Observable that emits items that are the result of materializing the items and notifications * of the source ObservableSource * @see ReactiveX operators documentation: Materialize + * @see #dematerialize(Function) */ @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDematerialize.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDematerialize.java index 930f5aaee6..5fe5211834 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableDematerialize.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableDematerialize.java @@ -16,29 +16,39 @@ import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.internal.subscriptions.SubscriptionHelper; import io.reactivex.plugins.RxJavaPlugins; -public final class FlowableDematerialize extends AbstractFlowableWithUpstream, T> { +public final class FlowableDematerialize extends AbstractFlowableWithUpstream { - public FlowableDematerialize(Flowable> source) { + final Function> selector; + + public FlowableDematerialize(Flowable source, Function> selector) { super(source); + this.selector = selector; } @Override - protected void subscribeActual(Subscriber s) { - source.subscribe(new DematerializeSubscriber(s)); + protected void subscribeActual(Subscriber subscriber) { + source.subscribe(new DematerializeSubscriber(subscriber, selector)); } - static final class DematerializeSubscriber implements FlowableSubscriber>, Subscription { - final Subscriber downstream; + static final class DematerializeSubscriber implements FlowableSubscriber, Subscription { + + final Subscriber downstream; + + final Function> selector; boolean done; Subscription upstream; - DematerializeSubscriber(Subscriber downstream) { + DematerializeSubscriber(Subscriber downstream, Function> selector) { this.downstream = downstream; + this.selector = selector; } @Override @@ -50,22 +60,35 @@ public void onSubscribe(Subscription s) { } @Override - public void onNext(Notification t) { + public void onNext(T item) { if (done) { - if (t.isOnError()) { - RxJavaPlugins.onError(t.getError()); + if (item instanceof Notification) { + Notification notification = (Notification)item; + if (notification.isOnError()) { + RxJavaPlugins.onError(notification.getError()); + } } return; } - if (t.isOnError()) { + + Notification notification; + + try { + notification = ObjectHelper.requireNonNull(selector.apply(item), "The selector returned a null Notification"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); upstream.cancel(); - onError(t.getError()); + onError(ex); + return; } - else if (t.isOnComplete()) { + if (notification.isOnError()) { + upstream.cancel(); + onError(notification.getError()); + } else if (notification.isOnComplete()) { upstream.cancel(); onComplete(); } else { - downstream.onNext(t.getValue()); + downstream.onNext(notification.getValue()); } } diff --git a/src/main/java/io/reactivex/internal/operators/observable/ObservableDematerialize.java b/src/main/java/io/reactivex/internal/operators/observable/ObservableDematerialize.java index 9e79e8eba4..2f864152ee 100644 --- a/src/main/java/io/reactivex/internal/operators/observable/ObservableDematerialize.java +++ b/src/main/java/io/reactivex/internal/operators/observable/ObservableDematerialize.java @@ -15,29 +15,38 @@ import io.reactivex.*; import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; import io.reactivex.plugins.RxJavaPlugins; -public final class ObservableDematerialize extends AbstractObservableWithUpstream, T> { +public final class ObservableDematerialize extends AbstractObservableWithUpstream { - public ObservableDematerialize(ObservableSource> source) { + final Function> selector; + + public ObservableDematerialize(ObservableSource source, Function> selector) { super(source); + this.selector = selector; } @Override - public void subscribeActual(Observer t) { - source.subscribe(new DematerializeObserver(t)); + public void subscribeActual(Observer observer) { + source.subscribe(new DematerializeObserver(observer, selector)); } - static final class DematerializeObserver implements Observer>, Disposable { - final Observer downstream; + static final class DematerializeObserver implements Observer, Disposable { + final Observer downstream; + + final Function> selector; boolean done; Disposable upstream; - DematerializeObserver(Observer downstream) { + DematerializeObserver(Observer downstream, Function> selector) { this.downstream = downstream; + this.selector = selector; } @Override @@ -60,22 +69,36 @@ public boolean isDisposed() { } @Override - public void onNext(Notification t) { + public void onNext(T item) { if (done) { - if (t.isOnError()) { - RxJavaPlugins.onError(t.getError()); + if (item instanceof Notification) { + Notification notification = (Notification)item; + if (notification.isOnError()) { + RxJavaPlugins.onError(notification.getError()); + } } return; } - if (t.isOnError()) { + + Notification notification; + + try { + notification = ObjectHelper.requireNonNull(selector.apply(item), "The selector returned a null Notification"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + upstream.dispose(); + onError(ex); + return; + } + if (notification.isOnError()) { upstream.dispose(); - onError(t.getError()); + onError(notification.getError()); } - else if (t.isOnComplete()) { + else if (notification.isOnComplete()) { upstream.dispose(); onComplete(); } else { - downstream.onNext(t.getValue()); + downstream.onNext(notification.getValue()); } } diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index 4a700b5aea..fa8026da6c 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -14,22 +14,24 @@ package io.reactivex.flowable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.*; -import io.reactivex.Observable; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; import io.reactivex.*; +import io.reactivex.Observable; import io.reactivex.disposables.Disposable; import io.reactivex.exceptions.*; import io.reactivex.flowables.ConnectableFlowable; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.processors.*; @@ -341,7 +343,8 @@ public void testOnSubscribeFails() { @Test public void testMaterializeDematerializeChaining() { Flowable obs = Flowable.just(1); - Flowable chained = obs.materialize().dematerialize(); + Flowable chained = obs.materialize() + .dematerialize(Functions.>identity()); Subscriber subscriber = TestHelper.mockSubscriber(); @@ -1076,7 +1079,7 @@ public void testErrorThrownIssue1685() { Flowable.error(new RuntimeException("oops")) .materialize() .delay(1, TimeUnit.SECONDS) - .dematerialize() + .dematerialize(Functions.>identity()) .subscribe(processor); processor.subscribe(); diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java index f83510f830..1f83e4e0fc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java @@ -24,12 +24,57 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.internal.subscriptions.BooleanSubscription; import io.reactivex.plugins.RxJavaPlugins; import io.reactivex.subscribers.TestSubscriber; +@SuppressWarnings("deprecation") public class FlowableDematerializeTest { + @Test + public void simpleSelector() { + Flowable> notifications = Flowable.just(1, 2).materialize(); + Flowable dematerialize = notifications.dematerialize(Functions.>identity()); + + Subscriber subscriber = TestHelper.mockSubscriber(); + + dematerialize.subscribe(subscriber); + + verify(subscriber, times(1)).onNext(1); + verify(subscriber, times(1)).onNext(2); + verify(subscriber, times(1)).onComplete(); + verify(subscriber, never()).onError(any(Throwable.class)); + } + + @Test + public void selectorCrash() { + Flowable.just(1, 2) + .materialize() + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void selectorNull() { + Flowable.just(1, 2) + .materialize() + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + @Test public void testDematerialize1() { Flowable> notifications = Flowable.just(1, 2).materialize(); diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java index a5017c68c4..5a934c12b0 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java @@ -24,11 +24,56 @@ import io.reactivex.disposables.Disposables; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; +@SuppressWarnings("deprecation") public class ObservableDematerializeTest { + @Test + public void simpleSelector() { + Observable> notifications = Observable.just(1, 2).materialize(); + Observable dematerialize = notifications.dematerialize(Functions.>identity()); + + Observer observer = TestHelper.mockObserver(); + + dematerialize.subscribe(observer); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onComplete(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void selectorCrash() { + Observable.just(1, 2) + .materialize() + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void selectorNull() { + Observable.just(1, 2) + .materialize() + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + return null; + } + }) + .test() + .assertFailure(NullPointerException.class); + } + @Test public void testDematerialize1() { Observable> notifications = Observable.just(1, 2).materialize(); diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 660c57d49a..ab915ffa53 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -14,6 +14,7 @@ package io.reactivex.observable; import static org.junit.Assert.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; import java.util.*; @@ -28,6 +29,7 @@ import io.reactivex.Observer; import io.reactivex.disposables.*; import io.reactivex.functions.*; +import io.reactivex.internal.functions.Functions; import io.reactivex.observables.ConnectableObservable; import io.reactivex.observers.*; import io.reactivex.schedulers.*; @@ -357,7 +359,8 @@ public void testOnSubscribeFails() { @Test public void testMaterializeDematerializeChaining() { Observable obs = Observable.just(1); - Observable chained = obs.materialize().dematerialize(); + Observable chained = obs.materialize() + .dematerialize(Functions.>identity()); Observer observer = TestHelper.mockObserver(); @@ -1096,7 +1099,7 @@ public void testErrorThrownIssue1685() { Observable.error(new RuntimeException("oops")) .materialize() .delay(1, TimeUnit.SECONDS) - .dematerialize() + .dematerialize(Functions.>identity()) .subscribe(subject); subject.subscribe(); From 1e5b65897d997a3311f653e5eea17c58b08f25c9 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 1 Nov 2018 23:03:31 +0100 Subject: [PATCH 2/3] Restore full coverage --- .../flowable/FlowableDematerializeTest.java | 15 +++++++++++++++ .../observable/ObservableDematerializeTest.java | 15 +++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java index 1f83e4e0fc..8ce398b60c 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java @@ -221,4 +221,19 @@ protected void subscribeActual(Subscriber subscriber) { RxJavaPlugins.reset(); } } + + @Test + public void nonNotificationInstanceAfterDispose() { + new Flowable() { + @Override + protected void subscribeActual(Subscriber observer) { + observer.onSubscribe(new BooleanSubscription()); + observer.onNext(Notification.createOnComplete()); + observer.onNext(1); + } + } + .dematerialize() + .test() + .assertResult(); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java index 5a934c12b0..d815a70ea5 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableDematerializeTest.java @@ -220,4 +220,19 @@ protected void subscribeActual(Observer observer) { RxJavaPlugins.reset(); } } + + @Test + public void nonNotificationInstanceAfterDispose() { + new Observable() { + @Override + protected void subscribeActual(Observer observer) { + observer.onSubscribe(Disposables.empty()); + observer.onNext(Notification.createOnComplete()); + observer.onNext(1); + } + } + .dematerialize() + .test() + .assertResult(); + } } From 43a5a36f7c06a95bec6dc301e25145ce1babfe18 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Fri, 2 Nov 2018 00:00:17 +0100 Subject: [PATCH 3/3] Fix parameter naming --- .../operators/flowable/FlowableDematerializeTest.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java index 8ce398b60c..66fd932dcc 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableDematerializeTest.java @@ -226,10 +226,10 @@ protected void subscribeActual(Subscriber subscriber) { public void nonNotificationInstanceAfterDispose() { new Flowable() { @Override - protected void subscribeActual(Subscriber observer) { - observer.onSubscribe(new BooleanSubscription()); - observer.onNext(Notification.createOnComplete()); - observer.onNext(1); + protected void subscribeActual(Subscriber subscriber) { + subscriber.onSubscribe(new BooleanSubscription()); + subscriber.onNext(Notification.createOnComplete()); + subscriber.onNext(1); } } .dematerialize()