From db16b094d931ec0a6ef479f4f257d9883b3d7af1 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 1 Nov 2018 10:30:34 +0100 Subject: [PATCH 1/4] 2.x: Add materialize() and dematerialize() --- src/main/java/io/reactivex/Completable.java | 22 ++++- src/main/java/io/reactivex/Maybe.java | 19 ++++ src/main/java/io/reactivex/Single.java | 46 ++++++++++ .../completable/CompletableMaterialize.java | 40 ++++++++ .../operators/maybe/MaybeMaterialize.java | 40 ++++++++ .../mixed/MaterializeSingleObserver.java | 71 +++++++++++++++ .../operators/single/SingleDematerialize.java | 91 +++++++++++++++++++ .../operators/single/SingleMaterialize.java | 40 ++++++++ .../CompletableMaterializeTest.java | 58 ++++++++++++ .../operators/maybe/MaybeMaterializeTest.java | 67 ++++++++++++++ .../single/SingleDematerializeTest.java | 29 ++++++ .../single/SingleMaterializeTest.java | 58 ++++++++++++ 12 files changed, 580 insertions(+), 1 deletion(-) create mode 100644 src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java create mode 100644 src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java create mode 100644 src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java create mode 100644 src/main/java/io/reactivex/internal/operators/single/SingleMaterialize.java create mode 100644 src/test/java/io/reactivex/internal/operators/completable/CompletableMaterializeTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/maybe/MaybeMaterializeTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java create mode 100644 src/test/java/io/reactivex/internal/operators/single/SingleMaterializeTest.java diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 486562b483..cfd5da120a 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -26,7 +26,7 @@ import io.reactivex.internal.operators.completable.*; import io.reactivex.internal.operators.maybe.*; import io.reactivex.internal.operators.mixed.*; -import io.reactivex.internal.operators.single.SingleDelayWithCompletable; +import io.reactivex.internal.operators.single.*; import io.reactivex.internal.util.ExceptionHelper; import io.reactivex.observers.TestObserver; import io.reactivex.plugins.RxJavaPlugins; @@ -1782,6 +1782,26 @@ public final Completable lift(final CompletableOperator onLift) { return RxJavaPlugins.onAssembly(new CompletableLift(this, onLift)); } + /** + * Maps the signal types of this Completable into a {@link Notification} of the same kind + * and emits it as a single success value to downstream. + *

+ * + *

+ *
Scheduler:
+ *
{@code materialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @param the intended target element type of the notification + * @return the new Single instance + * @since 2.2.4 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Single> materialize() { + return RxJavaPlugins.onAssembly(new CompletableMaterialize(this)); + } + /** * Returns a Completable which subscribes to this and the other Completable and completes * when both of them complete or one emits an error. diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 346d35e16a..3bbe0b0704 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -3377,6 +3377,25 @@ public final Maybe map(Function mapper) { return RxJavaPlugins.onAssembly(new MaybeMap(this, mapper)); } + /** + * Maps the signal types of this Maybe into a {@link Notification} of the same kind + * and emits it as a single success value to downstream. + *

+ * + *

+ *
Scheduler:
+ *
{@code materialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Single instance + * @since 2.2.4 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Single> materialize() { + return RxJavaPlugins.onAssembly(new MaybeMaterialize(this)); + } + /** * Flattens this and another Maybe into a single Flowable, without any transformation. *

diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 8d4013be66..bc4604563d 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2302,6 +2302,33 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler sch return delaySubscription(Observable.timer(time, unit, scheduler)); } + /** + * Maps the {@link Notification} success value of this Single back into normal + * {@code onSuccess}, {@code onError} or {@code onComplete} signals as a + * {@link Maybe} source. + *

+ * Note that {@code this} should be of type {@code Single>} or + * the transformation will result in an {@code onError} signal of + * {@link ClassCastException}. Currently, the Java language doesn't allow specifying + * methods for certain type argument shapes only (unlike extension methods would), + * hence the forced casting in this operator. + *

+ *
Scheduler:
+ *
{@code delaySubscription} does by default subscribe to the current Single + * on the {@link Scheduler} you provided, after the delay.
+ *
+ * @param the type inside the Notification + * @return the new Maybe instance + * @since 2.2.4 - experimental + */ + @SuppressWarnings("unchecked") + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + @Experimental + public final Maybe dematerialize() { + return RxJavaPlugins.onAssembly(new SingleDematerialize((Single)this)); + } + /** * Calls the specified consumer with the success item after this item has been emitted to the downstream. *

@@ -2871,6 +2898,25 @@ public final Single map(Function mapper) { return RxJavaPlugins.onAssembly(new SingleMap(this, mapper)); } + /** + * Maps the signal types of this Single into a {@link Notification} of the same kind + * and emits it as a single success value to downstream. + *

+ * + *

+ *
Scheduler:
+ *
{@code materialize} does not operate by default on a particular {@link Scheduler}.
+ *
+ * @return the new Single instance + * @since 2.2.4 - experimental + */ + @Experimental + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final Single> materialize() { + return RxJavaPlugins.onAssembly(new SingleMaterialize(this)); + } + /** * Signals true if the current Single signals a success value that is Object-equals with the value * provided. diff --git a/src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java b/src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java new file mode 100644 index 0000000000..5eda7f6ae2 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/completable/CompletableMaterialize.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.internal.operators.mixed.MaterializeSingleObserver; + +/** + * Turn the signal types of a Completable source into a single Notification of + * equal kind. + * + * @param the element type of the source + * @since 2.2.4 - experimental + */ +@Experimental +public final class CompletableMaterialize extends Single> { + + final Completable source; + + public CompletableMaterialize(Completable source) { + this.source = source; + } + + @Override + protected void subscribeActual(SingleObserver> observer) { + source.subscribe(new MaterializeSingleObserver(observer)); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java new file mode 100644 index 0000000000..2b74829ba1 --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/maybe/MaybeMaterialize.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.internal.operators.mixed.MaterializeSingleObserver; + +/** + * Turn the signal types of a Maybe source into a single Notification of + * equal kind. + * + * @param the element type of the source + * @since 2.2.4 - experimental + */ +@Experimental +public final class MaybeMaterialize extends Single> { + + final Maybe source; + + public MaybeMaterialize(Maybe source) { + this.source = source; + } + + @Override + protected void subscribeActual(SingleObserver> observer) { + source.subscribe(new MaterializeSingleObserver(observer)); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java b/src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java new file mode 100644 index 0000000000..ef8a87076d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/mixed/MaterializeSingleObserver.java @@ -0,0 +1,71 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.mixed; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * A consumer that implements the consumer types of Maybe, Single and Completable + * and turns their signals into Notifications for a SingleObserver. + * @param the element type of the source + * @since 2.2.4 - experimental + */ +@Experimental +public final class MaterializeSingleObserver +implements SingleObserver, MaybeObserver, CompletableObserver, Disposable { + + final SingleObserver> downstream; + + Disposable upstream; + + public MaterializeSingleObserver(SingleObserver> downstream) { + this.downstream = downstream; + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(upstream, d)) { + this.upstream = d; + downstream.onSubscribe(this); + } + } + + @Override + public void onComplete() { + downstream.onSuccess(Notification.createOnComplete()); + } + + @Override + public void onSuccess(T t) { + downstream.onSuccess(Notification.createOnNext(t)); + } + + @Override + public void onError(Throwable e) { + downstream.onSuccess(Notification.createOnError(e)); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void dispose() { + upstream.dispose(); + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java b/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java new file mode 100644 index 0000000000..954e00724a --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java @@ -0,0 +1,91 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.disposables.Disposable; +import io.reactivex.internal.disposables.DisposableHelper; + +/** + * Maps the Notification success value of the source back to normal + * onXXX method call as a Maybe. + * @param the element type of the notification and result + * @since 2.2.4 - experimental + */ +@Experimental +public final class SingleDematerialize extends Maybe { + + final Single source; + + public SingleDematerialize(Single source) { + this.source = source; + } + + @Override + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new DematerializeObserver(observer)); + } + + static final class DematerializeObserver implements SingleObserver, Disposable { + + final MaybeObserver downstream; + + Disposable upstream; + + DematerializeObserver(MaybeObserver downstream) { + this.downstream = downstream; + } + + @Override + public void dispose() { + upstream.dispose(); + } + + @Override + public boolean isDisposed() { + return upstream.isDisposed(); + } + + @Override + public void onSubscribe(Disposable d) { + if (DisposableHelper.validate(upstream, d)) { + upstream = d; + downstream.onSubscribe(this); + } + } + + @Override + public void onSuccess(Object t) { + if (t instanceof Notification) { + @SuppressWarnings("unchecked") + Notification notification = (Notification)t; + if (notification.isOnNext()) { + downstream.onSuccess(notification.getValue()); + } else if (notification.isOnComplete()) { + downstream.onComplete(); + } else { + downstream.onError(notification.getError()); + } + } else { + downstream.onError(new ClassCastException("io.reactivex.Notification expected but got " + t.getClass())); + } + } + + @Override + public void onError(Throwable e) { + downstream.onError(e); + } + } +} diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleMaterialize.java b/src/main/java/io/reactivex/internal/operators/single/SingleMaterialize.java new file mode 100644 index 0000000000..e22b64865d --- /dev/null +++ b/src/main/java/io/reactivex/internal/operators/single/SingleMaterialize.java @@ -0,0 +1,40 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import io.reactivex.*; +import io.reactivex.annotations.Experimental; +import io.reactivex.internal.operators.mixed.MaterializeSingleObserver; + +/** + * Turn the signal types of a Single source into a single Notification of + * equal kind. + * + * @param the element type of the source + * @since 2.2.4 - experimental + */ +@Experimental +public final class SingleMaterialize extends Single> { + + final Single source; + + public SingleMaterialize(Single source) { + this.source = source; + } + + @Override + protected void subscribeActual(SingleObserver> observer) { + source.subscribe(new MaterializeSingleObserver(observer)); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/completable/CompletableMaterializeTest.java b/src/test/java/io/reactivex/internal/operators/completable/CompletableMaterializeTest.java new file mode 100644 index 0000000000..aec11e5a61 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/completable/CompletableMaterializeTest.java @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.completable; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.subjects.CompletableSubject; + +public class CompletableMaterializeTest { + + @Test + @SuppressWarnings("unchecked") + public void error() { + TestException ex = new TestException(); + Completable.error(ex) + .materialize() + .test() + .assertResult(Notification.createOnError(ex)); + } + + @Test + @SuppressWarnings("unchecked") + public void empty() { + Completable.complete() + .materialize() + .test() + .assertResult(Notification.createOnComplete()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeCompletableToSingle(new Function>>() { + @Override + public SingleSource> apply(Completable v) throws Exception { + return v.materialize(); + } + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(CompletableSubject.create().materialize()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/maybe/MaybeMaterializeTest.java b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMaterializeTest.java new file mode 100644 index 0000000000..f429ecddf2 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/maybe/MaybeMaterializeTest.java @@ -0,0 +1,67 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.maybe; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.subjects.MaybeSubject; + +public class MaybeMaterializeTest { + + @Test + @SuppressWarnings("unchecked") + public void success() { + Maybe.just(1) + .materialize() + .test() + .assertResult(Notification.createOnNext(1)); + } + + @Test + @SuppressWarnings("unchecked") + public void error() { + TestException ex = new TestException(); + Maybe.error(ex) + .materialize() + .test() + .assertResult(Notification.createOnError(ex)); + } + + @Test + @SuppressWarnings("unchecked") + public void empty() { + Maybe.empty() + .materialize() + .test() + .assertResult(Notification.createOnComplete()); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeMaybeToSingle(new Function, SingleSource>>() { + @Override + public SingleSource> apply(Maybe v) throws Exception { + return v.materialize(); + } + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(MaybeSubject.create().materialize()); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java new file mode 100644 index 0000000000..82f0e5caa9 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java @@ -0,0 +1,29 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.*; + +public class SingleDematerializeTest { + + @Test + public void success() { + Single.just(Notification.createOnNext(1)) + .dematerialize() + .test() + .assertResult(1); + } +} diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleMaterializeTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleMaterializeTest.java new file mode 100644 index 0000000000..3a97155978 --- /dev/null +++ b/src/test/java/io/reactivex/internal/operators/single/SingleMaterializeTest.java @@ -0,0 +1,58 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.internal.operators.single; + +import org.junit.Test; + +import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.subjects.SingleSubject; + +public class SingleMaterializeTest { + + @Test + @SuppressWarnings("unchecked") + public void success() { + Single.just(1) + .materialize() + .test() + .assertResult(Notification.createOnNext(1)); + } + + @Test + @SuppressWarnings("unchecked") + public void error() { + TestException ex = new TestException(); + Maybe.error(ex) + .materialize() + .test() + .assertResult(Notification.createOnError(ex)); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingle(new Function, SingleSource>>() { + @Override + public SingleSource> apply(Single v) throws Exception { + return v.materialize(); + } + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(SingleSubject.create().materialize()); + } +} From 593dc74c37c13ee7867c72b3408a72e61501e2ea Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 1 Nov 2018 15:16:27 +0100 Subject: [PATCH 2/4] Add remaining test cases --- src/main/java/io/reactivex/Single.java | 11 ++++ .../single/SingleDematerializeTest.java | 50 +++++++++++++++++++ 2 files changed, 61 insertions(+) diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index bc4604563d..9ae4bf1e93 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2312,11 +2312,22 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler sch * {@link ClassCastException}. Currently, the Java language doesn't allow specifying * methods for certain type argument shapes only (unlike extension methods would), * hence the forced casting in this operator. + *

+ * In addition, usually the inner value type (T2) has to be expressed again via + * a type argument on this method (see example below). *

*
Scheduler:
*
{@code delaySubscription} does by default subscribe to the current Single * on the {@link Scheduler} you provided, after the delay.
*
+ *

+ * Example: + *


+     * Single.just(Notification.createOnNext(1))
+     * .<Integer>dematerialize()
+     * .test()
+     * .assertResult(1);
+     * 
* @param the type inside the Notification * @return the new Maybe instance * @since 2.2.4 - experimental diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java index 82f0e5caa9..760f277a9d 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java @@ -16,6 +16,9 @@ import org.junit.Test; import io.reactivex.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.functions.Function; +import io.reactivex.subjects.SingleSubject; public class SingleDematerializeTest { @@ -26,4 +29,51 @@ public void success() { .test() .assertResult(1); } + + @Test + public void empty() { + Single.just(Notification.createOnComplete()) + .dematerialize() + .test() + .assertResult(); + } + + @Test + public void error() { + Single.error(new TestException()) + .dematerialize() + .test() + .assertFailure(TestException.class); + } + + @Test + public void errorNotification() { + Single.just(Notification.createOnError(new TestException())) + .dematerialize() + .test() + .assertFailure(TestException.class); + } + + @Test + public void doubleOnSubscribe() { + TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function, MaybeSource>() { + @Override + public MaybeSource apply(Single v) throws Exception { + return v.dematerialize(); + } + }); + } + + @Test + public void dispose() { + TestHelper.checkDisposed(SingleSubject.create().dematerialize()); + } + + @Test + public void wrongType() { + Single.just(1) + .dematerialize() + .test() + .assertFailure(ClassCastException.class); + } } From b755a86263150e974a1d14686f173c973f15908a Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 1 Nov 2018 15:24:43 +0100 Subject: [PATCH 3/4] Correct dematerialize javadoc --- src/main/java/io/reactivex/Single.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 9ae4bf1e93..f5f522a2cc 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2317,7 +2317,7 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler sch * a type argument on this method (see example below). *
*
Scheduler:
- *
{@code delaySubscription} does by default subscribe to the current Single + *
{@code dematerialize} does by default subscribe to the current Single * on the {@link Scheduler} you provided, after the delay.
*
*

From 1ffb12d437580398b3f1e4f6aea0f08275707d17 Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 1 Nov 2018 20:44:17 +0100 Subject: [PATCH 4/4] Use dematerialize selector fix some docs --- src/main/java/io/reactivex/Completable.java | 1 + src/main/java/io/reactivex/Maybe.java | 1 + src/main/java/io/reactivex/Single.java | 30 +++++----- .../operators/single/SingleDematerialize.java | 60 ++++++++++++------- .../single/SingleDematerializeTest.java | 54 +++++++++++++---- 5 files changed, 95 insertions(+), 51 deletions(-) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index cfd5da120a..948d8aecde 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -1794,6 +1794,7 @@ public final Completable lift(final CompletableOperator onLift) { * @param the intended target element type of the notification * @return the new Single instance * @since 2.2.4 - experimental + * @see Single#dematerialize(Function) */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 3bbe0b0704..b1d34260f1 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -3388,6 +3388,7 @@ public final Maybe map(Function mapper) { * * @return the new Single instance * @since 2.2.4 - experimental + * @see Single#dematerialize(Function) */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index f5f522a2cc..168c9c216a 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -2307,37 +2307,36 @@ public final Single delaySubscription(long time, TimeUnit unit, Scheduler sch * {@code onSuccess}, {@code onError} or {@code onComplete} signals as a * {@link Maybe} source. *

- * Note that {@code this} should be of type {@code Single>} or - * the transformation will result in an {@code onError} signal of - * {@link ClassCastException}. Currently, the Java language doesn't allow specifying - * methods for certain type argument shapes only (unlike extension methods would), - * hence the forced casting in this operator. - *

- * In addition, usually the inner value type (T2) has to be expressed again via - * a type argument on this method (see example below). + * The intended use of the {@code selector} function is to perform a + * type-safe identity mapping (see example) on a source that is already of type + * {@code Notification}. The Java language doesn't allow + * limiting instance methods to a certain generic argument shape, therefore, + * a function is used to ensure the conversion remains type safe. *

*
Scheduler:
- *
{@code dematerialize} does by default subscribe to the current Single - * on the {@link Scheduler} you provided, after the delay.
+ *
{@code dematerialize} does not operate by default on a particular {@link Scheduler}.
*
*

* Example: *


      * Single.just(Notification.createOnNext(1))
-     * .<Integer>dematerialize()
+     * .dematerialize(notification -> notification)
      * .test()
      * .assertResult(1);
      * 
- * @param the type inside the Notification + * @param the result type + * @param selector the function called with the success item and should + * return a {@link Notification} instance. * @return the new Maybe instance * @since 2.2.4 - experimental + * @see #materialize() */ - @SuppressWarnings("unchecked") @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) @Experimental - public final Maybe dematerialize() { - return RxJavaPlugins.onAssembly(new SingleDematerialize((Single)this)); + public final Maybe dematerialize(Function> selector) { + ObjectHelper.requireNonNull(selector, "selector is null"); + return RxJavaPlugins.onAssembly(new SingleDematerialize(this, selector)); } /** @@ -2920,6 +2919,7 @@ public final Single map(Function mapper) { * * @return the new Single instance * @since 2.2.4 - experimental + * @see #dematerialize(Function) */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java b/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java index 954e00724a..2e402b05da 100644 --- a/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java +++ b/src/main/java/io/reactivex/internal/operators/single/SingleDematerialize.java @@ -16,36 +16,47 @@ import io.reactivex.*; import io.reactivex.annotations.Experimental; import io.reactivex.disposables.Disposable; +import io.reactivex.exceptions.Exceptions; +import io.reactivex.functions.Function; import io.reactivex.internal.disposables.DisposableHelper; +import io.reactivex.internal.functions.ObjectHelper; /** - * Maps the Notification success value of the source back to normal - * onXXX method call as a Maybe. - * @param the element type of the notification and result + * Maps the success value of the source to a Notification, then + * maps it back to the corresponding signal type. + * @param the element type of the source + * @param the element type of the Notification and result * @since 2.2.4 - experimental */ @Experimental -public final class SingleDematerialize extends Maybe { +public final class SingleDematerialize extends Maybe { - final Single source; + final Single source; - public SingleDematerialize(Single source) { + final Function> selector; + + public SingleDematerialize(Single source, Function> selector) { this.source = source; + this.selector = selector; } @Override - protected void subscribeActual(MaybeObserver observer) { - source.subscribe(new DematerializeObserver(observer)); + protected void subscribeActual(MaybeObserver observer) { + source.subscribe(new DematerializeObserver(observer, selector)); } - static final class DematerializeObserver implements SingleObserver, Disposable { + static final class DematerializeObserver implements SingleObserver, Disposable { + + final MaybeObserver downstream; - final MaybeObserver downstream; + final Function> selector; Disposable upstream; - DematerializeObserver(MaybeObserver downstream) { + DematerializeObserver(MaybeObserver downstream, + Function> selector) { this.downstream = downstream; + this.selector = selector; } @Override @@ -67,19 +78,22 @@ public void onSubscribe(Disposable d) { } @Override - public void onSuccess(Object t) { - if (t instanceof Notification) { - @SuppressWarnings("unchecked") - Notification notification = (Notification)t; - if (notification.isOnNext()) { - downstream.onSuccess(notification.getValue()); - } else if (notification.isOnComplete()) { - downstream.onComplete(); - } else { - downstream.onError(notification.getError()); - } + public void onSuccess(T t) { + Notification notification; + + try { + notification = ObjectHelper.requireNonNull(selector.apply(t), "The selector returned a null Notification"); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + downstream.onError(ex); + return; + } + if (notification.isOnNext()) { + downstream.onSuccess(notification.getValue()); + } else if (notification.isOnComplete()) { + downstream.onComplete(); } else { - downstream.onError(new ClassCastException("io.reactivex.Notification expected but got " + t.getClass())); + downstream.onError(notification.getError()); } } diff --git a/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java b/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java index 760f277a9d..abfbe2151a 100644 --- a/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java +++ b/src/test/java/io/reactivex/internal/operators/single/SingleDematerializeTest.java @@ -18,6 +18,7 @@ import io.reactivex.*; import io.reactivex.exceptions.TestException; import io.reactivex.functions.Function; +import io.reactivex.internal.functions.Functions; import io.reactivex.subjects.SingleSubject; public class SingleDematerializeTest { @@ -25,31 +26,31 @@ public class SingleDematerializeTest { @Test public void success() { Single.just(Notification.createOnNext(1)) - .dematerialize() + .dematerialize(Functions.>identity()) .test() .assertResult(1); } @Test public void empty() { - Single.just(Notification.createOnComplete()) - .dematerialize() + Single.just(Notification.createOnComplete()) + .dematerialize(Functions.>identity()) .test() .assertResult(); } @Test public void error() { - Single.error(new TestException()) - .dematerialize() + Single.>error(new TestException()) + .dematerialize(Functions.>identity()) .test() .assertFailure(TestException.class); } @Test public void errorNotification() { - Single.just(Notification.createOnError(new TestException())) - .dematerialize() + Single.just(Notification.createOnError(new TestException())) + .dematerialize(Functions.>identity()) .test() .assertFailure(TestException.class); } @@ -57,23 +58,50 @@ public void errorNotification() { @Test public void doubleOnSubscribe() { TestHelper.checkDoubleOnSubscribeSingleToMaybe(new Function, MaybeSource>() { + @SuppressWarnings({ "unchecked", "rawtypes" }) @Override public MaybeSource apply(Single v) throws Exception { - return v.dematerialize(); + return v.dematerialize((Function)Functions.identity()); } }); } @Test public void dispose() { - TestHelper.checkDisposed(SingleSubject.create().dematerialize()); + TestHelper.checkDisposed(SingleSubject.>create().dematerialize(Functions.>identity())); } @Test - public void wrongType() { - Single.just(1) - .dematerialize() + public void selectorCrash() { + Single.just(Notification.createOnNext(1)) + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + throw new TestException(); + } + }) + .test() + .assertFailure(TestException.class); + } + + @Test + public void selectorNull() { + Single.just(Notification.createOnNext(1)) + .dematerialize(Functions.justFunction((Notification)null)) + .test() + .assertFailure(NullPointerException.class); + } + + @Test + public void selectorDifferentType() { + Single.just(Notification.createOnNext(1)) + .dematerialize(new Function, Notification>() { + @Override + public Notification apply(Notification v) throws Exception { + return Notification.createOnNext("Value-" + 1); + } + }) .test() - .assertFailure(ClassCastException.class); + .assertResult("Value-1"); } }